Source code for akantu_geomechanical_solver.akantu_geomechanical_solver

#!/usr/bin/env python
"""
Implementation of the Akantu Geomechanical simulator.

"""

__author__ = "Emil Gallyamov"
__credits__ = [
    "Emil Gallyamov <emil.gallyamov@epfl.ch>",
]
__copyright__ = (
    "Copyright (©) 2016-2021 EPFL (Ecole Polytechnique Fédérale"
    " de Lausanne) Laboratory (LSMS - Laboratoire de Simulation"
    " en Mécanique des Solides)"
)
__license__ = "LGPLv3"


import numpy as np
import os

try:
    from mpi4py import MPI

    comm = MPI.COMM_WORLD
    prank = comm.Get_rank()
    psize = comm.Get_size()
except ImportError:
    prank = 0

import akantu as aka


[docs] class IterativeOptions: """ The class storing iterative parameters and keeping track of Newton and PCG iterations Args: pcg_maxiterations (int): Prescribes the maximum number of iterations for the partitioned conjugate gradient (PCG) solver. newton_maxiterations (int): The maximum number of iterations of the Newton-Raphson (NR) solver. f_rtol (float): Relative tolerance that will multiply the initial residuals both of the solid and flow subproblems. f_rock_tol (float): Absolute tolerance for the residual of the solid subproblem checked for NR convergence. f_flow_tol (float): Absolute tolerance for the residual of the flow subproblem checked both in NR and PCG solvers. f0_rock_norm (float): Initial norm of the residual for the solid subproblem. f0_flow_norm (float): Initial norm of the residual for the flow subproblem. Attributes: pcg_maxiterations (int): Stores pcg_maxiterations newton_maxiterations (int): Stores newton_maxiterations. f_rtol (float): Stores f_rtol. f_rock_tol (float): Stores f_rock_tol. f_flow_tol (float): Stores f_flow_tol. f0_rock_norm (float): Stores f0_rock_norm. f0_flow_norm (float): Stores f0_flow_norm. pcg_iteration (int): Stores current iteration number of the PCG solver. nr_iteration (int): Stores current iteration number of the NR solver. """ def __init__( self, pcg_maxiterations: int = 10, newton_maxiterations: int = 10, f_rtol: float = 1.0e-4, f_rock_tol: float = 1.0e-8, f_flow_tol: float = 1.0e-8, f0_rock_norm: float = 1.0, f0_flow_norm: float = 1.0, ): self.pcg_maxiterations = pcg_maxiterations self.newton_maxiterations = newton_maxiterations self.f_rtol = f_rtol # value for the check |F|<f_rtol |F_0| self.f_rock_tol = f_rock_tol # value for the check |F|<f_tol self.f_flow_tol = f_flow_tol # value for the check |F|<f_tol self.f0_rock_norm = f0_rock_norm self.f0_flow_norm = f0_flow_norm self.pcg_iteration = 0 self.nr_iteration = 0
[docs] def check_pcg_cvgence(self, f_norm: float): """Checks convergence of the PCG solver by comparing the residual with maximum of two tolerances: relative-tolerance based and absolute one. Args: f_norm (float): Norm of the residual of the flow subproblem. Returns: check (bool): True if converged, False if not. """ check = self.isless( f_norm, max(self.f_flow_tol, self.f_rtol * self.f0_flow_norm) ) return check
[docs] def check_newton_cvgence(self, res_rock: float, res_flow: float): """Checks convergence of the NR solver by comparing residuals of the solid and fluid subproblems with absolute tolerances. Args: f_norm (float): Norm of the residual of the flow subproblem. Returns: check (bool): True if converged, False if not. """ cvged = self.isless( res_rock, max(self.f_rock_tol, self.f_rtol * self.f0_rock_norm) ) and self.isless(res_flow, self.f_flow_tol) return cvged
[docs] def isless(self, a: float, b: float, abs_tol=1e-09): """Compares two floats with user-provided absolute tolerance. Args: a (float): First float to compare. b (float): Second float to compare. abs_tol (float): User-provided absolute tolerance. Returns: check (bool): True if a is less or equal to b + tolerance, False otherwise. """ return a <= b + abs_tol
[docs] class AkantuGeomechanicalSolver: """ The class containing the geomechanical solver based on HeatTransferModel and SolidMechanicsModel of Akantu FEM library. Currently possible to have faults and fractures with the same hydro-mechanical properties only. Args: rock_model (SolidMechanicsModel or SolidMechanicsModelCohesive): Reference to the solid subproblem. flow_model (HeatTransferModel or HeatTransferModelCohesive): Reference to the flow subproblem. alpha (float): Biot coefficient. dump_iterations (bool): Boolean indicating dump of every newton iteration. update_stiffness (bool): Stiffness of the solid subproblem is updated while solving with Newton-Raphson. Otherwise, initial elastic stiffness is employed. Attributes: rock_model (aka.SolidMechanicsModel or aka.SolidMechanicsModelCohesive): Stores rock_model. flow_model (aka.HeatTransferModel or aka.HeatTransferInterfaceModel): Stores flow_model. alpha (float): Stores alpha. dump_iterations (bool): Stores dump_iterations. update_stiffness (bool): Stores update_stiffness. heat_interface_model (bool): A boolean indicating if the flow model is of type HeatTransferInterfaceModel. ghost_types (list): Stores possible ghost types. rock_dof_manager (aka.DOFManager): Reference to the DOFManager of rock_model flow_dof_manager (aka.DOFManager): Reference to the DOFManager of flow_model rhs_rock (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing residual for the rock_model rhs_flow (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing residual for the flow_model r2 (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing r2 vector of PCG x2k (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing x2k vector of PCG b1a (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing b1a vector of PCG A12x2 (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing the product [A12]{x2} x1k (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing x1k vector of PCG A21x1 (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing the product [A21]{x1} A22x2 (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing the product [A22]{x2} A11x1 (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing the product [A11]{x1} dx2 (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing dx2 vector of PCG z2 (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing z2 vector of PCG p2 (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing p2 vector of PCG p1 (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing p1 vector of PCG q (aka.SolverVectorDefault or aka.SolverVectorDistributed): Reference to the solver vector containing q vector of PCG """ def __init__( self, rock_model, flow_model, alpha: float, dump_iterations=False, update_stiffness=True, ): self.rock_model = rock_model self.rock_model.getNewSolver( "linear_static", aka.TimeStepSolverType.static, aka.NonLinearSolverType.linear, ) self.rock_model.setIntegrationScheme( "linear_static", "displacement", aka.IntegrationSchemeType.pseudo_time ) self.rock_model.setDisplacementRelease( self.rock_model.getDisplacementRelease() + 1 ) self.rock_model.updateCurrentPosition() self.rock_model.assembleStiffnessMatrix(True) self.flow_model = flow_model self.flow_model.getNewSolver( "linear_dynamic", aka.TimeStepSolverType.dynamic, aka.NonLinearSolverType.linear, ) self.flow_model.setIntegrationScheme( "linear_dynamic", "temperature", aka.IntegrationSchemeType.backward_euler, aka.IntegrationScheme._temperature, ) assert self.flow_model.getTimeStep() != 0.0, "Zero time step in flow model" self.flow_model.setTimeStep( self.flow_model.getTimeStep(), "linear_dynamic") if isinstance(self.flow_model, aka.HeatTransferInterfaceModel): self.heat_interface_model = True else: self.heat_interface_model = False self.update_stiffness = update_stiffness self.alpha = alpha self.ghost_types = [aka._not_ghost, aka._ghost] self.rock_dof_manager = self.rock_model.getDOFManager() self.flow_dof_manager = self.flow_model.getDOFManager() self.rhs_rock = self.rock_dof_manager.getNewGlobalVector("rhs_rock") self.rhs_flow = self.flow_dof_manager.getNewGlobalVector("rhs_flow") self.r2 = self.flow_dof_manager.getNewGlobalVector("r2") self.x2k = self.flow_dof_manager.getNewGlobalVector("x2k") self.b1a = self.rock_dof_manager.getNewGlobalVector("b1a") self.A12x2 = self.rock_dof_manager.getNewGlobalVector("A12x2") self.x1k = self.rock_dof_manager.getNewGlobalVector("x1k") self.A21x1 = self.flow_dof_manager.getNewGlobalVector("A21x1") self.A22x2 = self.flow_dof_manager.getNewGlobalVector("A22x2") self.A11x1 = self.rock_dof_manager.getNewGlobalVector("A11x1") self.dx2 = self.flow_dof_manager.getNewGlobalVector("dx2") self.z2 = self.flow_dof_manager.getNewGlobalVector("z2") self.p2 = self.flow_dof_manager.getNewGlobalVector("p2") self.p1 = self.rock_dof_manager.getNewGlobalVector("p1") self.q = self.flow_dof_manager.getNewGlobalVector("q") self.dump_iterations = dump_iterations
[docs] def update_time_step_A22_solve(self, new_time_step: float): """Updates the time step in the flow subpoblem. Necessary for the A_{22} operator in PCG. Args: new_time_step (float): Value of the new time step. """ self.flow_model.setTimeStep(new_time_step, "linear_dynamic")
[docs] def solve_step(self, delta_u_0, delta_p_0, it_options: IterativeOptions) -> bool: """Solves a single time step by the Newton Raphson solver. Args: delta_u_0 (ndarray[float]): An array containing the first guess for the displacement. delta_p_0 (ndarray[float]): An array containing the first guess for the pressure. it_options (IterativeOptions): Contains iterative options to be used within NR and PCG Returns: cvged (bool): True if a is the time step converged, False otherwise. """ # reset the newton tolerance and iteration counter it_options.f0_rock_norm = it_options.f_rock_tol / it_options.f_rtol it_options.nr_iteration = 0 u = self.rock_model.getDisplacement() p = self.flow_model.getTemperature() p_rate = self.flow_model.getTemperatureRate() # set pressure rate to zero at the beginning of each step p_rate.fill(0) du_accum = np.zeros(u.shape) # test initial solution self.compute_residual_rock(u + delta_u_0, p + delta_p_0, self.rhs_rock) self.compute_residual_flow(delta_u_0, self.rhs_flow) # residuals without blocked DOFs are used for convergence check res_rock_norm_0 = self.getNormWithoutBlockedDOFs( self.rock_model, self.rhs_rock) res_flow_norm_0 = self.getNormWithoutBlockedDOFs( self.flow_model, self.rhs_flow) # full residual is used to adjust the convergence criterium for solid part res_rock_norm_full = self.getNorm(self.rock_model, self.rhs_rock) # check convergence of NR solver cvged = it_options.check_newton_cvgence( res_rock_norm_0, res_flow_norm_0) if prank == 0: print( f"\nNewton 0 iteration: p res {res_flow_norm_0} vs {it_options.f_flow_tol}; u res {res_rock_norm_0} vs {max(it_options.f_rock_tol, it_options.f_rtol * it_options.f0_rock_norm)}; u res full {res_rock_norm_full}", flush=True, ) if cvged: u += delta_u_0 p += delta_p_0 p_rate += delta_p_0 / self.flow_model.getTimeStep() return True else: # adjust the convergence criterium for the solid subproblem it_options.f0_rock_norm = res_rock_norm_full # assembling rhs for the solid subproblem self.compute_residual_rock(u, p, self.rhs_rock) # assembling rhs for the flow problem (no pressure increase) residual_flow = self.flow_model.getExternalHeatRate().copy() self.flow_model.assembleInternalHeatRate() # internal heat rates are not multiplied by -1 like internal forces residual_flow -= self.flow_model.getInternalHeatRate() self.rhs_flow.zero() self.flow_model.getDOFManager().assembleToGlobalArray( "temperature", residual_flow, self.rhs_flow ) while it_options.nr_iteration < it_options.newton_maxiterations: it_options.nr_iteration += 1 # solving a single Newton iteration (linearized problem) by the PCG solver delta_u, delta_p = self.pcg_solve( delta_u_0, delta_p_0, self.rhs_rock, self.rhs_flow, it_options ) delta_u_0.fill(0) delta_p_0.fill(0) # update displacement & pressure fields u += delta_u p += delta_p p_rate += delta_p / self.flow_model.getTimeStep() du_accum += delta_u # update displacement release and current position, so that the stiffness matrix will be reassembled for the next NR iteration self.rock_model.setDisplacementRelease( self.rock_model.getDisplacementRelease() + 1 ) self.rock_model.updateCurrentPosition() # update residuals self.compute_residual_rock(u, p, self.rhs_rock) res_rock_norm = self.getNormWithoutBlockedDOFs( self.rock_model, self.rhs_rock ) self.compute_residual_flow(du_accum, self.rhs_flow) res_flow_norm = self.getNormWithoutBlockedDOFs( self.flow_model, self.rhs_flow ) # assembling stiffness based on the new displacement state if self.update_stiffness: self.rock_model.assembleStiffnessMatrix(True) # output data on a single NR iteration delta_u_norm = self.getNorm(self.rock_model, delta_u) delta_p_norm = self.getNorm(self.flow_model, delta_p) if prank == 0: print( f"\nNewton {it_options.nr_iteration} iteration: p res {res_flow_norm} vs {it_options.f_flow_tol}; u res {res_rock_norm} vs {max(it_options.f_rock_tol, it_options.f_rtol * it_options.f0_rock_norm)}; delta u norm {delta_u_norm}; delta p norm {delta_p_norm}", flush=True, ) # check NR convergence cvged = it_options.check_newton_cvgence( res_rock_norm, res_flow_norm) if self.dump_iterations: self.rock_model.dump("solid_mechanics_model") self.flow_model.dump("heat_transfer") if self.heat_interface_model: self.rock_model.dump("cohesive elements") self.flow_model.dump("heat_interfaces") if cvged: return True return False
[docs] def pcg_solve(self, x1_0, x2_0, b1, b2, it_options: IterativeOptions): """Solves a linear two-fields problem by partitioned conjugate gradient methods (see Prevost 1997). Args: x1_0 (ndarray[float]): An array containing the first guess for the displacement increment. x2_0 (ndarray[float]): An array containing the first guess for the pressure increment. b1 (aka.SolverVector): A global vector containing the rhs for the solid subproblem. b2 (aka.SolverVector): A global vector containing the rhs for the flow subproblem. it_options (IterativeOptions): Contains iterative options to be used within NR and PCG Returns: x1 (ndarray[float]): An array containing the resulting displacement increment. x2 (ndarray[float]): An array containing the resulting pressure increment. Raises: Exception: PCG solver did not converge. """ # assemble / synchronize NORMALS on cohesives before fixing them by assembling internal forces self.rock_model.assembleInternalForces() # fix normals and openings during pcg solve nb_materials = self.rock_model.getNbMaterials() for material_id in range(nb_materials): material = self.rock_model.getMaterial(material_id) if isinstance(material, aka.MaterialCohesive): material.fixCohesiveState() # get dof managers solid_dof_manager = self.rock_model.getDOFManager() flow_dof_manager = self.flow_model.getDOFManager() self.r2.zero() if isinstance(b2, aka.SolverVector): self.r2.copy(b2) else: flow_dof_manager.assembleToGlobalArray("temperature", b2, self.r2) # solve for x1 self.x2k.zero() flow_dof_manager.assembleToGlobalArray("temperature", x2_0, self.x2k) self.b1a.zero() if isinstance(b2, aka.SolverVector): self.b1a.copy(b1) else: solid_dof_manager.assembleToGlobalArray( "displacement", b1, self.b1a) self.A12x2.zero() self.A12Mult(self.x2k, self.A12x2, False) self.b1a -= self.A12x2 self.x1k.zero() self.A11SolveLinear(self.b1a, self.x1k) # update x1k with the fixed dofs from x1_0 blocked_dofs_u = self.rock_model.getBlockedDOFs() x1_0[np.nonzero(~blocked_dofs_u)] = 0 solid_dof_manager.assembleToGlobalArray("displacement", x1_0, self.x1k) # solve 22 self.A21x1.zero() self.A21Mult(self.x1k, self.A21x1, True) self.A22x2.zero() self.A22Mult(self.x2k, self.A22x2) self.r2 -= self.A21x1 self.r2 -= self.A22x2 # take ALL dofs not only free ones bn = self.getNorm(self.flow_model, self.r2) # set defaults it_options.f0_flow_norm = bn self.dx2.zero() flow_dof_manager.assembleToGlobalArray("temperature", x2_0, self.dx2) self.z2.zero() self.A22SolveLinear(self.r2, self.z2) self.p2.zero() self.p2.copy(self.z2) rho1 = self.r2.dot(self.z2) self.p1.zero() self.q.zero() cvged = False for k in range(it_options.pcg_maxiterations): # update current pcg iteration it_options.pcg_iteration = k self.A12Mult(self.p2, self.A12x2, True) self.b1a.copy(self.A12x2) self.A11SolveLinear(self.b1a, self.p1) self.p1 *= -1 self.A21Mult(self.p1, self.A21x1, True) self.A22Mult(self.p2, self.A22x2) self.q.copy(self.A21x1) self.q += self.A22x2 nominator = self.r2.dot(self.z2) denominator = self.p2.dot(self.q) Alpha = 1.0 if nominator == 0: Alpha = 0 elif not (nominator == denominator): Alpha = nominator / denominator # update x1k self.x1k.add(self.p1, Alpha) self.dx2.copy(self.p2) self.dx2 *= Alpha self.x2k.add(self.dx2, 1) self.r2.add(self.q, -Alpha) r2_norm = self.getNormWithoutBlockedDOFs(self.flow_model, self.r2) dx2_norm = self.getNormWithoutBlockedDOFs( self.flow_model, self.dx2) if prank == 0: print( f"\niteration {it_options.pcg_iteration} {os.linesep}r2 {r2_norm} vs. max[r0 {it_options.f0_flow_norm} * {it_options.f_rtol}, rtol {it_options.f_flow_tol}] {os.linesep}dx2 {dx2_norm}" ) cvged = it_options.check_pcg_cvgence(r2_norm) if cvged: break self.A22SolveLinear(self.r2, self.z2) r2z2 = self.r2.dot(self.z2) Beta = 0.0 if rho1 != 0: Beta = r2z2 / rho1 self.p2 *= Beta self.p2 += self.z2 rho1 = r2z2 # end of loop if not cvged: raise Exception( "PCG did not converge after {it_options.pcg_iteration}") x1 = np.zeros(x1_0.shape) solid_dof_manager.getArrayPerDOFs("displacement", self.x1k, x1) x2 = np.zeros(x2_0.shape) flow_dof_manager.getArrayPerDOFs("temperature", self.x2k, x2) # unfix state of cohesive elements nb_materials = self.rock_model.getNbMaterials() for material_id in range(nb_materials): material = self.rock_model.getMaterial(material_id) if isinstance(material, aka.MaterialCohesive): material.unfixCohesiveState() return x1, x2
[docs] def compute_residual_flow(self, du, residual_flow): """Updates the residual to the flow subproblem according to the following equation: :math:'\{f^p\}_{n+1} - [C] \{p\}_{n+1} - [S] \{\dot{p}\}_{n+1} - [A_{\textrm{dp}}] \{\dot{u}\}_{n+1}' Args: du (ndarray[float]): An array with displacement increment. residual_flow (aka.SolverVector or ndarray): A global vector containing the residual for the flow subproblem. """ du_copy = du.copy() p = self.flow_model.getTemperature() p_rate = self.flow_model.getTemperatureRate() nb_nodes = self.rock_model.getMesh().getNbNodes() p_backup = p.copy() p_rate_backup = p_rate.copy() residual_array = np.zeros([nb_nodes, 1]) self.flow_model.getDOFManager().getTimeStepSolver("implicit").assembleResidual() self.flow_model.getDOFManager().getArrayPerDOFs( "temperature", self.flow_model.getDOFManager().getResidual(), residual_array ) A21du = np.zeros([nb_nodes, 1]) # A21du is divided by the timestep inside A21Mult function self.A21Mult(du_copy, A21du, True) residual_array -= A21du if isinstance(residual_flow, aka.SolverVector): residual_flow.zero() self.flow_model.getDOFManager().assembleToGlobalArray( "temperature", residual_array, residual_flow ) else: np.copyto(residual_flow, residual_array) # rollback nodal values np.copyto(p, p_backup) np.copyto(p_rate, p_rate_backup)
[docs] def compute_residual_rock(self, u_in, p_in, residual_rock): """Updates the residual to the solid subproblem according to the following equation: :math:'\{f^d\}_{n+1} - [K]\{d\}_{n+1} + [A_{\textrm{pd}}] \{p\}_{n+1}' Args: u_in (ndarray[float]): An array with the displacement. p_in (ndarray[float]): An array with the pressure. residual_rock (aka.SolverVector or ndarra): A global vector containing the residual for the solid subproblem. """ nb_nodes = self.rock_model.getMesh().getNbNodes() dim = self.rock_model.getSpatialDimension() residual_array = self.rock_model.getExternalForce().copy() u_copy = u_in.copy() p_copy = p_in.copy() u = self.rock_model.getDisplacement() u_backup = u.copy() np.copyto(u, u_copy) self.rock_model.assembleInternalForces() # internal forces are multiplied by -1 in their assembly in Akantu residual_array += self.rock_model.getInternalForce() # subtract A12p. "-" sign is already included in A12p assembly A12p = np.zeros([nb_nodes, dim]) self.A12Mult(p_copy, A12p, False) residual_array -= A12p if isinstance(residual_rock, aka.SolverVector): residual_rock.zero() self.rock_model.getDOFManager().assembleToGlobalArray( "displacement", residual_array, residual_rock ) else: np.copyto(residual_rock, residual_array) # rollback nodal values np.copyto(u, u_backup)
[docs] def A12Mult(self, p, A12x2, ignore_blocked_dofs): """Wrapper over A12MultArray. If delta_p is a SolverVector, transforms it into an array and passes to A12MultArray. If A12x2 is a SolverVector, transforms the resulting array into the SolverVector. Args: p (ndarray[float] or SolverVector): An array with the pressure. A12x2 (ndarray or SolverVector): An array with the final product. ignore_blocked_dofs (bool): Turns off contribution of blocked pressure dofs to A12p product """ nb_nodes = self.rock_model.getMesh().getNbNodes() dim = self.rock_model.getSpatialDimension() p_array = np.zeros([nb_nodes, 1]) A12x2_array = np.zeros([nb_nodes, dim]) if isinstance(p, aka.SolverVector): self.flow_model.getDOFManager().getArrayPerDOFs("temperature", p, p_array) else: np.copyto(p_array, p) self.A12MultArray(p_array, A12x2_array, ignore_blocked_dofs) if isinstance(A12x2, aka.SolverVector): A12x2.zero() self.rock_model.getDOFManager().assembleToGlobalArray( "displacement", A12x2_array, A12x2 ) else: np.copyto(A12x2, A12x2_array)
[docs] def A12MultArray(self, p_array, A12x2_array, ignore_blocked_dofs): """Quadpoint-wise multiplication of Apd with pressure. Args: p_array (ndarray): An array with the pressure. A12x2 (ndarray): An array with the final product. ignore_blocked_dofs (bool): Turns off contribution of blocked pressure dofs to A12p product """ mesh = self.rock_model.getMesh() nb_nodes = mesh.getNbNodes() assert nb_nodes == p_array.shape[0], "Wrong dimension of p_array" # put fixed pressure values to zero p_copy = p_array.copy() if ignore_blocked_dofs: self.zeroAtBlockedDOFs(self.flow_model, p_copy) # interpolate pressures at both solid and cohesive elements # p on qpoints is synced in computeTempOnQPoints in model self.interpolate_pressure_on_qpoints(p_copy) A12x2_array.fill(0.0) dim = self.rock_model.getSpatialDimension() for ghost_type in self.ghost_types: for el_type in mesh.elementTypes(dim, ghost_type, aka._ek_not_defined): nb_elements = mesh.getNbElement(el_type, ghost_type) if nb_elements == 0: continue if mesh.getKind(el_type) == aka._ek_regular: fem = self.rock_model.getFEEngine() elif mesh.getKind(el_type) == aka._ek_cohesive: fem = self.rock_model.getFEEngine("CohesiveFEEngine") shapes_derivatives = fem.getShapesDerivatives( el_type, ghost_type ).copy() size_of_shapes_derivatives = shapes_derivatives.shape[1] nb_quad_points = fem.getNbIntegrationPoints(el_type) nb_nodes_per_element = mesh.getNbNodesPerElement(el_type) int_f_dn_dx = np.zeros( (nb_elements, nb_nodes_per_element * dim)) if mesh.getKind(el_type) == aka._ek_regular: # accessing interpolated values # as there is a single material, mesh numbering and material numbering coincide pressure_at_quads = self.flow_model.getTemperatureOnQpoints( el_type, ghost_type ) field_tensor_at_quads = np.zeros( (nb_elements * nb_quad_points, dim * dim) ) # fill along diagonal with pressure values for i in range(dim): field_tensor_at_quads[:, i + i * dim] = ( pressure_at_quads.reshape( nb_elements * nb_quad_points) ) # multiplication by shape function derivatives f_dn_dx = np.zeros( (nb_elements * nb_quad_points, size_of_shapes_derivatives) ) fem.computeBtD(field_tensor_at_quads, f_dn_dx, el_type, ghost_type) # scaling with alpha f_dn_dx *= self.alpha fem.integrate( f_dn_dx, int_f_dn_dx, size_of_shapes_derivatives, el_type, ghost_type, ) elif mesh.getKind(el_type) == aka._ek_cohesive: # accessing interpolated values pressure_at_quads = self.flow_model.getTemperatureOnQpointsCoh( el_type, ghost_type ) shape_functions = fem.getShapeFunctions() # get normals to cohesives from solid subproblem # this algorithm currently supports only a single cohesive / interface material nb_materials = self.rock_model.getNbMaterials() for material_id in range(nb_materials): material = self.rock_model.getMaterial(material_id) if isinstance(material, aka.MaterialCohesive): normals = material.getNormalsAtQuads( el_type, ghost_type) break # important to transit from material local numbering to mesh global numbering of cohesive elements el_local_numbering = self.rock_model.getMaterialLocalNumbering( el_type, ghost_type ).squeeze() quad_local_numbering = np.zeros( nb_elements * nb_quad_points, dtype=int ) for i in range(dim): quad_local_numbering[i::dim] = ( el_local_numbering * nb_quad_points + i ) normals_global_numbering = normals[quad_local_numbering] # compute pressure force on crack face p_vector = normals_global_numbering.copy() p_vector = np.multiply(p_vector, pressure_at_quads) # compute product of pressure force with shape functions shapes = fem.getShapes(el_type, ghost_type) shape_size = shape_functions.getShapeSize(el_type) p_vec_N = np.zeros( (nb_elements * nb_quad_points, dim * shape_size)) for _p_vec, _p_vec_N, _shapes in zip(p_vector, p_vec_N, shapes): p_vec_N_view = _p_vec_N.view().reshape(dim, shape_size) np.copyto(p_vec_N_view, np.outer( _p_vec, _shapes).transpose()) # partially integrate pressure forces partial_int_p_vec_N = np.zeros( (nb_elements, dim * shape_size)) fem.integrate( p_vec_N, partial_int_p_vec_N, dim * shape_size, el_type, ghost_type, ) # assemble partial in full integral vector for _part_int_p_vec_N, _int_f_dn_dx in zip( partial_int_p_vec_N, int_f_dn_dx ): _int_f_dn_dx[: dim * shape_size] = _part_int_p_vec_N _int_f_dn_dx[dim * shape_size:] = -_part_int_p_vec_N # Apd has -1 as a prefactor in the weak form int_f_dn_dx *= -1 # assemble into resulting vector self.rock_model.getDOFManager().assembleElementalArrayLocalArray( int_f_dn_dx, A12x2_array, el_type, ghost_type )
[docs] def A21Mult(self, du, A21x1, ignore_blocked_dofs): """Wrapper over A21MultArray. If delta_p is a SolverVector, transforms it into an array and passes to A21MultArray. If A21x1 is a SolverVector, transforms the resulting array into the SolverVector. Args: delta_u (ndarray[float] or SolverVector): An array with displacement increment. A21x1 (ndarray or SolverVector): An array with the final product. ignore_blocked_dofs (bool): Turns off contribution of the blocked displacement dofs to A21du product """ nb_nodes = self.rock_model.getMesh().getNbNodes() dim = self.rock_model.getSpatialDimension() du_array = np.zeros([nb_nodes, dim]) A21x1_array = np.zeros([nb_nodes, 1]) # Pass displacement into an array if isinstance(du, aka.SolverVector): self.rock_model.getDOFManager().getArrayPerDOFs( "displacement", du, du_array ) else: np.copyto(du_array, du) # call the function operating on arrays self.A21MultArray(du_array, A21x1_array, ignore_blocked_dofs) # transform final product into a solver vector if needed if isinstance(A21x1, aka.SolverVector): A21x1.zero() self.flow_model.getDOFManager().assembleToGlobalArray( "temperature", A21x1_array, A21x1 ) else: np.copyto(A21x1, A21x1_array)
[docs] def A21MultArray(self, du_array, A21x1_array, ignore_blocked_dofs): """Quadpoint-wise multiplication of Adp with displacement increment. Args: du_array (ndarray): An array with displacement increment. A21x1_array (ndarray): An array with the final product. ignore_blocked_dofs (bool): Turns off contribution of blocked displacement dofs to the A21du product """ mesh = self.rock_model.getMesh() nb_nodes = mesh.getNbNodes() assert nb_nodes == du_array.shape[0], "Wrong dimension of du_array" du_copy = du_array.copy() # set fixed displacement dofs to zero if ignore_blocked_dofs: self.zeroAtBlockedDOFs(self.rock_model, du_copy) dim = self.flow_model.getSpatialDimension() A21x1_array.fill(0.0) # assemble and synchronize quadrature fields for el_type in mesh.elementTypes(dim, aka._not_ghost, aka._ek_not_defined): nb_elements = mesh.getNbElement(el_type, aka._not_ghost) if mesh.getKind(el_type) == aka._ek_regular: # gradu is synced self.compute_gradu_on_qpoints(du_copy, el_type) elif mesh.getKind(el_type) == aka._ek_cohesive: # compute openings of cohesive elements and then synchronize them # this algorithm currently supports only a single cohesive / interface material nb_materials = self.rock_model.getNbMaterials() for material_id in range(nb_materials): material = self.rock_model.getMaterial(material_id) if isinstance(material, aka.MaterialCohesive): delta_opening = material.getOpening( el_type, aka._not_ghost) normals = material.getNormalsAtQuads( el_type, aka._not_ghost) material.computeOpening( du_copy, delta_opening, el_type, aka._not_ghost ) break # synchronize cohesive openings self.rock_model.synchronizeField(aka.smmc_opening) # collect computed values from above both on ghosts and not ghosts for ghost_type in self.ghost_types: for el_type in mesh.elementTypes(dim, ghost_type, aka._ek_not_defined): nb_elements = mesh.getNbElement(el_type, ghost_type) if nb_elements == 0: continue if mesh.getKind(el_type) == aka._ek_regular: fem = self.flow_model.getFEEngine() elif mesh.getKind(el_type) == aka._ek_cohesive: fem = self.flow_model.getFEEngine("InterfacesFEEngine") nb_quad_points = fem.getNbIntegrationPoints(el_type) nb_nodes_per_element = mesh.getNbNodesPerElement(el_type) eps_vol = np.zeros((nb_elements * nb_quad_points, 1)) if mesh.getKind(el_type) == aka._ek_regular: grad_u = np.zeros( (nb_elements * nb_quad_points, dim * dim)) # function below transits from mat local numbering to mesh global els numbering self.assemble_gradu_global(grad_u, el_type, ghost_type) # extracting the volumetric strain for i in range(dim): eps_vol[:, 0] += grad_u[:, i + i * dim] # scale with Biot's coefficient for bulk elements eps_vol *= self.alpha elif mesh.getKind(el_type) == aka._ek_cohesive: # compute jump in displacement increment across cohesive elements nb_materials = self.rock_model.getNbMaterials() for material_id in range(nb_materials): material = self.rock_model.getMaterial(material_id) if isinstance(material, aka.MaterialCohesive): delta_opening = material.getOpening( el_type, ghost_type) normals = material.getNormalsAtQuads( el_type, ghost_type) delta_opening = material.getOpening( el_type, ghost_type) normals = material.getNormalsAtQuads( el_type, ghost_type) break delta_normal_opening = self.computeNormalOpening( delta_opening, normals ) # important to transit from material local numbering to mesh global numbering of cohesive elements el_local_numbering = self.rock_model.getMaterialLocalNumbering( el_type, ghost_type ).squeeze() quad_local_numbering = np.zeros( nb_elements * nb_quad_points, dtype=int ) for i in range(dim): quad_local_numbering[i::dim] = ( el_local_numbering * nb_quad_points + i ) delta_normal_opening_global = delta_normal_opening[ quad_local_numbering ] eps_vol = delta_normal_opening_global # multiplying volumetric strain with shape functions eps_vol_by_shapes = np.zeros( (nb_elements * nb_quad_points, nb_nodes_per_element) ) fem.computeNtb(eps_vol, eps_vol_by_shapes, el_type, ghost_type) # integration part int_Nt_eps_vol = np.zeros((nb_elements, nb_nodes_per_element)) fem.integrate( eps_vol_by_shapes, int_Nt_eps_vol, nb_nodes_per_element, el_type, ghost_type, ) # divide by the time step int_Nt_eps_vol /= self.flow_model.getTimeStep() # assemble into resulting vector self.flow_model.getDOFManager().assembleElementalArrayLocalArray( int_Nt_eps_vol, A21x1_array, el_type, ghost_type )
[docs] def computeNormalOpening(self, opening, normal): """Dot product between opening vector and normals to cohesives. Args: opening (ndarray): An array with opening vectors. normal (ndarray): An array with normal vectors. Returns: normal_opening (ndarray): An array with normal openings. """ normal_opening = np.einsum("ij,ij->i", opening, normal) return normal_opening
[docs] def A22Mult(self, p, A22x2): """Explicit multiplication of A22 matrix with pressure vector. Args: p (ndarray or SolverVector): An array with pressures. A22x2 (ndarray or SolverVector): An array with final product. """ nb_nodes = self.rock_model.getMesh().getNbNodes() dp = np.zeros(nb_nodes) p_rate = np.zeros(nb_nodes) if isinstance(p, aka.SolverVector): self.flow_model.getDOFManager().getArrayPerDOFs("temperature", p, dp) else: np.copyto(dp, p) np.copyto(p_rate, dp / self.flow_model.getTimeStep()) if not self.flow_model.getDOFManager().hasMatrix("J"): self.flow_model.getTimeStepSolver().assembleMatrix("J") if isinstance(A22x2, aka.SolverVector): A22x2.zero() self.flow_model.getDOFManager().assembleMatMulVectToGlobalArray( "temperature", "K", dp, A22x2 ) self.flow_model.getDOFManager().assembleMatMulVectToGlobalArray( "temperature", "M", p_rate, A22x2 ) else: assert ( psize == 1 ), "A22Mult with numpy-array type A22x2 works only in sequential" K_ = self.flow_model.getDOFManager().getMatrix("K") K = aka.AkantuSparseMatrix(K_) M_ = self.flow_model.getDOFManager().getMatrix("M") M = aka.AkantuSparseMatrix(M_) np.copyto(A22x2, K.dot(dp)) A22x2 += M.dot(p_rate)
[docs] def A11SolveLinear(self, rhs_solid, solution): """Solution of the linear system A11x=b by the Akantu direct solver. Literally x = A11^{inv} b. Args: rhs_solid (ndarray or SolverVector): Right hand side of the equation. solution (ndarray or SolverVector): An array with solution of linear system. """ ext_force = self.rock_model.getExternalForce() ext_force_backup = ext_force.copy() u = self.rock_model.getDisplacement() u_backup = u.copy() nb_nodes = self.rock_model.getMesh().getNbNodes() dim = self.rock_model.getSpatialDimension() # parallelization is taken into account rhs_local = np.zeros([nb_nodes, dim]) if isinstance(rhs_solid, aka.SolverVector): self.rock_model.getDOFManager().getArrayPerDOFs( "displacement", rhs_solid, rhs_local ) else: np.copyto(rhs_local, rhs_solid) self.zeroAtBlockedDOFs(self.rock_model, rhs_local) np.copyto(ext_force, rhs_local) u.fill(0) self.rock_model.solveStep("linear_static") # roll back the displacement release so that the stiffness is not updated self.rock_model.setDisplacementRelease( self.rock_model.getDisplacementRelease() - 1 ) if isinstance(solution, aka.SolverVector): solution.zero() self.rock_model.getDOFManager().assembleToGlobalArray( "displacement", u, solution ) else: np.copyto(solution, u) # put back the original values np.copyto(u, u_backup) np.copyto(ext_force, ext_force_backup)
[docs] def A22SolveLinear(self, rhs_flow, solution): """Solution of the linear system A22x=b by the Akantu direct solver. Literally x = A22^{inv} b. Args: rhs_flow (ndarray or SolverVector): Right hand side of the equation. solution (ndarray or SolverVector): An array with solution of the linear system. """ ext_rate = self.flow_model.getExternalHeatRate() ext_rate_backup = ext_rate.copy() press = self.flow_model.getTemperature() press_rate = self.flow_model.getTemperatureRate() press_backup = press.copy() press_rate_backup = press_rate.copy() rhs_local = np.zeros(ext_rate.shape) if isinstance(rhs_flow, aka.SolverVector): self.flow_model.getDOFManager().getArrayPerDOFs( "temperature", rhs_flow, rhs_local ) else: np.copyto(rhs_local, rhs_flow) self.zeroAtBlockedDOFs(self.flow_model, rhs_local) np.copyto(ext_rate, rhs_local) press.fill(0) press_rate.fill(0) self.flow_model.solveStep("linear_dynamic") if isinstance(solution, aka.SolverVector): solution.zero() self.flow_model.getDOFManager().assembleToGlobalArray( "temperature", press, solution ) else: np.copyto(solution, press) # rollback the original values np.copyto(press, press_backup) np.copyto(press_rate, press_rate_backup) np.copyto(ext_rate, ext_rate_backup)
[docs] def getNormWithoutBlockedDOFs(self, model, array): """Computes norm of a vector ignoring blocked and ghost dofs. Args: model (SolidMechanicsModel or HeatTransferModel): Reference to the model. array (ndarray or SolverVector): An array whose norm the function is computing. """ if isinstance(array, aka.SolverVector): norm = array.normFreeDOFs() return norm else: array_copy = array.copy() self.zeroAtBlockedDOFs(model, array_copy) self.zeroAtGhosts(model, array_copy) array_copy = array_copy.flatten() dot = np.dot(array_copy, array_copy) dot_reduced = comm.allreduce(dot, op=MPI.SUM) norm = np.sqrt(dot_reduced) return norm
[docs] def getNorm(self, model, array): """Computes norm of a vector including blocked dofs but excluding ghosts. Args: model (SolidMechanicsModel or HeatTransferModel): Reference to the model. array (ndarray or SolverVector): An array whose norm the function is computing. """ if isinstance(array, aka.SolverVector): # norm on solver vector includes only local and masters norm = array.norm() return norm else: array_copy = array.copy() self.zeroAtGhosts(model, array_copy) array_copy = array_copy.flatten() dot = np.dot(array_copy, array_copy) dot_reduced = comm.allreduce(dot, op=MPI.SUM) norm = np.sqrt(dot_reduced) return norm
[docs] def zeroAtBlockedDOFs(self, model, array): """Set blocked DOFs to zero. Args: model (SolidMechanicsModel or HeatTransferModel): Reference to the model. array (ndarray or SolverVector): An array who is being modified. """ if isinstance(model, aka.SolidMechanicsModel): # bring blocked dofs to zero blocked_dofs_u = self.rock_model.getBlockedDOFs() array[blocked_dofs_u.nonzero()] = 0 else: # put to zero at blocked dof positions blocked_dofs_p = self.flow_model.getBlockedDOFs() array[np.where(blocked_dofs_p)] = 0
[docs] def zeroAtGhosts(self, model, array): """Set ghost DOFs to zero. Args: model (SolidMechanicsModel or HeatTransferModel): Reference to the model. array (ndarray or SolverVector): An array who is being modified. """ if isinstance(model, aka.SolidMechanicsModel): # put to zero at pure ghost nodes node_flags = self.rock_model.getMesh().getNodesFlags() pure_ghost_pos = np.asarray( node_flags == np.uint8(aka._pure_ghost) ).nonzero()[0] array[pure_ghost_pos, :] = 0 # same for slaves slave_pos = np.asarray( node_flags == np.uint8(aka._slave)).nonzero()[0] array[slave_pos, :] = 0 else: # put to zero at pure ghost nodes node_flags = self.flow_model.getMesh().getNodesFlags() pure_ghost_pos = np.asarray( node_flags == np.uint8(aka._pure_ghost) ).nonzero()[0] array[pure_ghost_pos] = 0 # same for slaves slave_pos = np.asarray( node_flags == np.uint8(aka._slave)).nonzero()[0] array[slave_pos] = 0
[docs] def interpolate_pressure_on_qpoints(self, p_in): """Interpolates pressure field p_in on quadrature points and stores it in the temperature_on_qpoints array. Args: p_in (ndarray): Nodal pressure field to be interpolated. """ p = self.flow_model.getTemperature() p_backup = p.copy() np.copyto(p, p_in) self.flow_model.computeTempOnQpoints(aka._not_ghost) # roll back the initial values np.copyto(p, p_backup)
[docs] def compute_gradu_on_qpoints(self, u, el_type): """Computes gradient of u, stores it in gradU internal field, and synchronizes it. Args: u (ndarray): Displacement field. el_type (aka.ElementType): Type of elements on which gradU is computed. """ fem = self.rock_model.getFEEngine() # compute gradu within each material for specified el_type for mat_id in range(self.rock_model.getNbMaterials()): mat = self.rock_model.getMaterial(mat_id) elem_filter = mat.getElementFilter(el_type, aka._not_ghost) if len(elem_filter) == 0: continue grad_u = mat.getGradU(el_type, aka._not_ghost) fem.gradientOnIntegrationPoints( u, grad_u, self.rock_model.getSpatialDimension(), el_type, aka._not_ghost, elem_filter, ) # synchronize grad_u field between partitions self.rock_model.synchronizeField(aka.smm_gradu)
[docs] def assemble_gradu_global(self, grad_u_global, el_type, ghost_type): """Assembles grad_u of each material into a global grad_u. Args: grad_u_global (ndarray): Global gradU array. el_type (aka.ElementType): Type of elements on which gradU is computed. ghost_type (aka.GhostType): Ghost type of elements. """ dim = self.rock_model.getSpatialDimension() nb_elements = self.rock_model.getMesh().getNbElement(el_type, ghost_type) if nb_elements == 0: return fem = self.rock_model.getFEEngine() nb_quad_points = fem.getNbIntegrationPoints(el_type) assert grad_u_global.shape == ( nb_elements * nb_quad_points, dim * dim, ), "Provided grad_u has a wrong shape" # assemble grad_u of each material into a global grad_u for mat_id in range(self.rock_model.getNbMaterials()): mat = self.rock_model.getMaterial(mat_id) elem_filter = np.squeeze(mat.getElementFilter(el_type, ghost_type)) if len(elem_filter) == 0: continue grad_u = mat.getGradU(el_type, ghost_type) global_ips = np.zeros(len(elem_filter) * nb_quad_points, dtype=int) for i in range(nb_quad_points): global_ips[i::nb_quad_points] = elem_filter * \ nb_quad_points + i mask = np.full(grad_u_global.shape, False) mask[global_ips] = True np.copyto(grad_u_global, grad_u, where=mask)