Source code for sigmaepsilon.solid.fem.linsolve

# -*- coding: utf-8 -*-
import numpy as np
from numpy import ndarray
import numpy as np
from scipy.sparse import coo_matrix as coo, spmatrix, \
    isspmatrix as isspmatrix_np
from scipy.sparse.linalg import spsolve, spsolve_triangular, splu
from typing import Union
from time import time
#from concurrent.futures import ThreadPoolExecutor

from linkeddeepdict import LinkedDeepDict
from neumann.array import matrixform

from .imap import unbox_lhs
from .preproc import fem_coeff_matrix_coo, box_fem_data_sparse
from ..config import __haspardiso__

if __haspardiso__:
    import pypardiso as ppd
    from pypardiso import PyPardisoSolver
    #from pypardiso.scipy_aliases import pypardiso_core

arraylike = Union[ndarray, spmatrix]


[docs]def solve_standard_form(K: coo, f: np.ndarray, *args, use_umfpack:bool=True, summary:bool=False, permc_spec:str='COLAMD', solver:str=None, mtype:int=11, assume_regular:bool=False, **kwargs): """ Solves the discrete equilibrium equations :math:`\mathbf{K} \mathbf{u} = \mathbf{f}` for :math:`\mathbf{u}`. Parameters ---------- K : scipy.sparse.spmatrix The stiffness matrix in coo format. f : numpy.ndarray The vector of nodal loads. use_umfpack : bool, Optional Only if solver is 'scipy'. See the documentation :func:`scipy.sparse.linalg.spsolve` for further meaning. Default is True. permc_spec : str, Optional Only if solver is 'scipy'. See the documentation :func:`scipy.sparse.linalg.spsolve` for further meaning. solver : str, Optional The solver to use. Currently supported options are 'scipy' and 'pardiso'. If nothing is specified, we prefer 'pardiso' if it is around, otherwise the solver falls back to SciPy. mtype : int, Optional Matrix type indicator for the pypardiso solver. The Default value is 11, which denassumes a real and nonsymmetric coefficient matrix. assume_regular : bool, Optional If True, it is assumed that the input is according to the standard form, otherwise some preprocessing on the inputs is performed. Returns ------- numpy.ndarray The solution as a numpy array. The returned array has the same shape as `f`. dict, Optional A summary including the most important characteristics of the solution process. Only if 'summary' is True. See also -------- :func:`scipy.sparse.linalg.spsolve` :func:`scipy.sparse.linalg.splu` """ solver = 'pardiso' if __haspardiso__ else 'scipy' if solver is None else solver if solver == 'pardiso' and not __haspardiso__: raise ImportError( "You need to install 'pypardiso' for solver type <{}>".format(solver)) if not assume_regular: K.eliminate_zeros() K.sum_duplicates() f = matrixform(f) if solver == 'pardiso': if mtype == 11: t0 = time() u = ppd.spsolve(K, f) dt = time() - t0 else: pds = PyPardisoSolver(mtype=mtype) t0 = time() u = pds.solve(K, f) dt = time() - t0 elif solver == 'scipy': if len(args) > 0: if 'lower' in args: solver = 'scipy.sparse.linalg.spsolve_triangular (lower)' t0 = time() u = spsolve_triangular(K, f, lower=True) dt = time() - t0 elif 'upper' in args: solver = 'scipy.sparse.linalg.spsolve_triangular (upper)' t0 = time() u = spsolve_triangular(K, f, lower=False) dt = time() - t0 elif 'SLU' in args: solver = 'scipy.sparse.linalg.SuperLU' K = K.tocsc() t0 = time() LU = splu(K) u = LU.solve(f) dt = time() - t0 else: if use_umfpack: use_umfpack = True if f.shape[1] == 1 else False solver = 'scipy.sparse.linalg.spsolve' t0 = time() u = spsolve(K, f, permc_spec=permc_spec, use_umfpack=use_umfpack) dt = time() - t0 else: raise NotImplementedError( "Selected solver '{}' is not supported!".format(solver)) if isspmatrix_np(u): u = u.todense() u = u.reshape(f.shape) # residual if summary: d = LinkedDeepDict(**{ 'time': dt, 'N': u.shape[0], 'solver': solver, 'options': {} }) if solver == 'scipy': d['options']['use_umfpack'] = use_umfpack d['options']['permc_spec'] = permc_spec elif solver == 'pardiso': d['options']['mtype'] = mtype return u, d else: return u
def linsolve_sparse(K_coo: coo, Kp_coo: coo, f: np.ndarray, *args, use_umfpack=True, sparsify=False, summary=True, **kwargs): """ Notes: ------ If the load vector 'f' is dense, it must contain values for all nodes, even the passive ones. """ # boxing N = f.shape[0] K, f, loc_to_glob = box_fem_data_sparse(K_coo, Kp_coo, f) # solution u, d = solve_standard_form(K, f, *args, sparsify=sparsify, use_umfpack=use_umfpack, summary=True, **kwargs) # unboxing u = unbox_lhs(u, loc_to_glob, N) if summary: d['regular'] = loc_to_glob is not None return u, d return u def linsolve_bulk(K_bulk: np.ndarray, Kp_coo: coo, f: np.ndarray, gnum: np.ndarray, *args, **kwargs): K_coo = fem_coeff_matrix_coo(K_bulk, inds=gnum, N=f.shape[0]) return linsolve_sparse(K_coo, Kp_coo, f, *args, **kwargs) def linsolve(K: arraylike, *args, gnum=None, **kwargs): if isspmatrix_np(K): return linsolve_sparse(K, *args, **kwargs) elif isinstance(K, np.ndarray) and gnum is not None: return linsolve_bulk(K, *args, gnum, **kwargs) """def linsolve_bulk_pop(K_bulk: np.ndarray, Kp_coo: coo, f: np.ndarray, gnum: np.ndarray, fcores: np.ndarray, *args, sparsify=True, use_umfpack=True, summary=False, parallel=True, max_workers=4, permc_spec='COLAMD', **kwargs): f = matrixform(f) N = f.shape[0] loc_to_glob, glob_to_loc = \ index_mappers(gnum, N=N, return_inverse=True) nPop, nE = fcores.shape gnum = box_dof_numbering(gnum, glob_to_loc) M = gnum.max() + 1 dtype = K_bulk.dtype shape = (M, M) # rhs, lhs u = np.zeros((nPop,) + f.shape, dtype=dtype) f = box_rhs(f.astype(dtype), loc_to_glob) Kp_coo = box_spmatrix(Kp_coo, glob_to_loc) # stiffness utils rows, cols = list(map(lambda x: x.flatten(), irows_icols_bulk(gnum))) def K_bulk_w(i): return weighted_stiffness_bulk( K_bulk, fcores[i]).flatten() def K_coo(i): return coo((K_bulk_w(i), (rows, cols)), shape=shape, dtype=dtype) def K(i): return K_coo(i) + Kp_coo def solve_and_unbox(i): ui = solve_standard_form(K(i), f, *args, sparsify=sparsify, use_umfpack=use_umfpack, permc_spec=permc_spec, **kwargs) u[i, :, :] = unbox_lhs(ui, loc_to_glob, N) # serial and parallel solutions if not parallel: t0 = time() list(map(solve_and_unbox, range(nPop))) dt = time() - t0 else: max_workers = min(max_workers, nPop) t0 = time() with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(solve_and_unbox, range(nPop)) dt = time() - t0 if summary: d = { 'time [ms]': dt * 1000, 'avg. time [ms]': dt * 1000 / nPop, 'N': M, 'use_umfpack': use_umfpack, 'permc_spec': permc_spec, 'sparsify': sparsify, 'core': 'scipy.sparse.linalg.spsolve', 'max workers': max_workers if parallel else 1, 'population size': nPop } return u, d return u """