Source code for landlab.components.bedrock_landslider.bedrock_landslider

"""Grid-based simulation of bedrock landslides.

Benjamin Campforts
"""

import numpy as np

from landlab import Component
from landlab.grid.nodestatus import NodeStatus

from ..depression_finder.lake_mapper import _FLOODED
from .cfuncs import _landslide_runout

MAX_HEIGHT_SLOPE = 100  # in m


[docs] class BedrockLandslider(Component): """Calculate the location and magnitude of episodic bedrock landsliding. Landlab component that calculates the location and magnitude of episodic bedrock landsliding following the Cullman criterion. See the publication: Campforts B., Shobe C.M., Steer P., Vanmaercke M., Lague D., Braun J. (2020) HyLands 1.0: a hybrid landscape evolution model to simulate the impact of landslides and landslide-derived sediment on landscape evolution. Geosci Model Dev: 13(9):3863–86. `https://dx.doi.org/10.5194/esurf-6-1-2018 <https://dx.doi.org/10.5194/esurf-6-1-2018>`_ Campforts, B., Shobe, C. M., Overeem, I., & Tucker, G. E. (2022). The Art of Landslides: How Stochastic Mass Wasting Shapes Topography and Influences Landscape Dynamics. Journal of Geophysical Research: Earth Surface, 127(8), 1–16. https://doi.org/10.1029/2022JF006745 Examples -------- >>> import numpy as np >>> from numpy import testing >>> from landlab import RasterModelGrid >>> from landlab.components import PriorityFloodFlowRouter, BedrockLandslider Make a ``RasterModelGrid`` and create a plateau. * 5x5 grid * Initial topography is set to plateau value of 10 >>> mg = RasterModelGrid((5, 5), xy_spacing=1.0) >>> z = mg.add_zeros("topographic__elevation", at="node") >>> s = mg.add_zeros("soil__depth", at="node") >>> b = mg.add_zeros("bedrock__elevation", at="node") Make plateau at 10 m >>> b += 10 Lower boundary cell to 0 >>> b[2] = 0 >>> z[:] = b + s Instantiate the :class:`~.priority_flood_flow_router.PriorityFloodFlowRouter` for flow accumulation and the ``BedrockLandslider`` >>> fd = PriorityFloodFlowRouter( ... mg, ... separate_hill_flow=True, ... suppress_out=True, ... ) >>> hy = BedrockLandslider(mg, landslides_return_time=1) Run the flow director and ``BedrockLandslider`` for one timestep >>> fd.run_one_step() >>> vol_suspended_sediment_yield, volume_leaving = hy.run_one_step(dt=1) After one timestep, we can predict exactly where the landslide will occur. The return time is set to 1 year so that probability for sliding is 100%. The angle of internal friction is 1 m/m, the topographical gradient is 10 m/m. At cardinal cells, the sliding plane will be at *(1 + 10) / 2 = 5.5* m/m. With a *dx* of 1, the cardinal cell next to the critical sliding node must be 5.5 m and the diagonal one at *5.5 * sqrt(2) = 7.8* m >>> testing.assert_almost_equal( ... [5.5 * np.sqrt(2), 5.5, 5.5 * np.sqrt(2)], z[6:9], decimal=5 ... ) References ---------- **Required Software Citation(s) Specific to this Component** Campforts B., Shobe C.M., Steer P., Vanmaercke M., Lague D., Braun J. (2020) BedrockLandslider 1.0: a hybrid landscape evolution model to simulate the impact of landslides and landslide-derived sediment on landscape evolution. Geosci Model Dev: 13(9):3863–86. `https://dx.doi.org/10.5194/esurf-6-1-2018 <https://dx.doi.org/10.5194/esurf-6-1-2018>`_ **Additional References** None Listed """ _name = "BedrockLandslider" _unit_agnostic = True _info = { "topographic__elevation": { "dtype": float, "intent": "inout", "optional": False, "units": "m", "mapping": "node", "doc": "Land surface topographic elevation", }, "topographic__steepest_slope": { "dtype": float, "intent": "in", "optional": False, "units": "-", "mapping": "node", "doc": "The steepest *downhill* slope", }, "soil__depth": { "dtype": float, "intent": "inout", "optional": False, "units": "m", "mapping": "node", "doc": "Depth of soil or weathered bedrock", }, "flow__receiver_node": { "dtype": int, "intent": "in", "optional": False, "units": "-", "mapping": "node", "doc": "Node array of receivers (node that receives flow from current node)", }, "flow__upstream_node_order": { "dtype": int, "intent": "in", "optional": False, "units": "-", "mapping": "node", "doc": "Node array containing downstream-to-upstream ordered list of node IDs", }, # Note that this field has to be provided in addition to the \ # flow__receiver_node and will be used to route sediments over the hillslope "hill_flow__receiver_node": { "dtype": int, "intent": "in", "optional": False, "units": "-", "mapping": "node", "doc": "Node array of receivers (node that receives flow from current node)", }, # Note that this field has to be provided in addition to the \ # flow__receiver_proportions and will be used to route sediments # over the hillslope "hill_flow__receiver_proportions": { "dtype": float, "intent": "in", "optional": False, "units": "-", "mapping": "node", "doc": "Node array of proportion of flow sent to each receiver.", }, "hill_topographic__steepest_slope": { "dtype": float, "intent": "in", "optional": False, "units": "-", "mapping": "node", "doc": "The steepest *downhill* slope", }, "LS_sediment__flux": { "dtype": float, "intent": "out", "optional": False, "units": "m3/s", "mapping": "node", "doc": "Sediment flux originating from landslides \ (volume per unit time of sediment entering each node)", }, "landslide__erosion": { "dtype": float, "intent": "out", "optional": False, "units": "m", "mapping": "node", "doc": "Total erosion caused by landsliding ", }, "landslide__deposition": { "dtype": float, "intent": "out", "optional": False, "units": "m", "mapping": "node", "doc": "Total deposition of derived sediment", }, "landslide_sediment_point_source": { "dtype": float, "intent": "out", "optional": False, "units": "m3", "mapping": "node", "doc": "Landslide derived sediment, as point sources on all the \ critical nodes where landslides initiate, \ before landslide runout is calculated ", }, } _cite_as = """ @Article{gmd-13-3863-2020, AUTHOR = {Campforts B., Shobe C.M., Steer P., Vanmaercke M., Lague D., Braun J.}, TITLE = {BedrockLandslider 1.0: a hybrid landscape evolution model to simulate the impact of landslides and landslide-derived sediment on landscape evolution.}, JOURNAL = {Geoscientific Model Development}, VOLUME = {13}, YEAR = {2020}, NUMBER = {9}, PAGES = {3863--3886}, URL = {https://doi.org/10.5194/gmd-13-3863-2020}, DOI = {10.5194/gmd-13-3863-2020} }"""
[docs] def __init__( self, grid, angle_int_frict=1.0, threshold_slope=None, cohesion_eff=1e4, landslides_return_time=1e5, rho_r=2700, grav=9.81, fraction_fines_LS=0, phi=0, max_pixelsize_landslide=1e9, seed=2021, verbose_landslides=False, landslides_on_boundary_nodes=True, critical_sliding_nodes=None, min_deposition_slope=0, ): """Initialize the BedrockLandslider model. Parameters ---------- grid : ModelGrid Landlab ModelGrid object angle_int_frict: float, optional Materials angle of internal friction in [m/m] threshold_slope: float, optional Threshold slope used in non-linear deposition scheme [m/m] Default value is set to angle_int_frict if not specified cohesion_eff : float, optional Effective cohesion of material [m L^-1 T^-2]. landslides_return_time : float, optional Return time for stochastic landslide events to occur rho_r : float, optional Bulk density rock [m L^-3]. fraction_fines_LS : float Fraction of permanently suspendable fines in bedrock Value must be between 0 and 1 [-]. phi : float, optional Sediment porosity, value must be between 0 and 1 [-]. max_pixelsize_landslide : int , optional Maximum size for landslides in number of pixels verbose_landslides : bool , optional Print output as number of simulated landslides per timestep seed : float , optional Provide seed to set stochastic model. If not provided, seed is set to 2021. Provide None to keep current seed. landslides_on_boundary_nodes : bool, optional Allow landslides to initiate (critical node) and extend over boundary nodes. critical_sliding_nodes : list, optional Provide list with critical nodes where landslides have to initiate This cancels the stochastic part of the algorithm and allows the user to form landslides at the provided critical nodes. """ super().__init__(grid) topo = self.grid.at_node["topographic__elevation"] soil = self.grid.at_node["soil__depth"] if "bedrock__elevation" not in grid.at_node: grid.add_field("bedrock__elevation", topo - soil, at="node", dtype=float) # Check consistency of bedrock, soil and topographic elevation fields if not np.allclose( grid.at_node["bedrock__elevation"] + grid.at_node["soil__depth"], grid.at_node["topographic__elevation"], ): raise RuntimeError( "The sum of bedrock elevation and topographic elevation should be equal" ) self.initialize_output_fields() # Store grid and parameters self._angle_int_frict = angle_int_frict if threshold_slope is None: self._threshold_slope = angle_int_frict else: self._threshold_slope = threshold_slope self._cohesion_eff = cohesion_eff self._rho_r = rho_r self._grav = grav self._fraction_fines_LS = fraction_fines_LS self._phi = phi self._landslides_return_time = landslides_return_time self._max_pixelsize_landslide = max_pixelsize_landslide self._verbose_landslides = verbose_landslides self._landslides_on_boundary_nodes = landslides_on_boundary_nodes self._critical_sliding_nodes = critical_sliding_nodes self._min_deposition_slope = min_deposition_slope # Data structures to store properties of simulated landslides. self._landslides_size = [] self._landslides_volume = [] self._landslides_volume_sed = [] self._landslides_volume_bed = [] # Check input values if phi >= 1.0 or phi < 0.0: raise ValueError(f"Porosity must be between 0 and 1 ({phi})") if fraction_fines_LS > 1.0 or fraction_fines_LS < 0.0: raise ValueError( f"Fraction of fines must be between 0 and 1 ({fraction_fines_LS})" ) # Set seed if seed is not None: np.random.seed(seed)
# Getters for properties @property def fraction_fines(self): """ Fraction of permanently suspendable fines in bedrock. Value must be between 0 and 1 [-]. """ return self._fraction_fines_LS @property def phi(self): """ Sediment porosity, value must be between 0 and 1 [-]. """ return self._phi @property def landslides_size(self): """ List with the size of simulated landslides. The list is reset every time the _landslide_erosion function is called """ return self._landslides_size @property def landslides_volume(self): """ List with the volume of simulated landslides. The list is reset every time the _landslide_erosion function is called """ return self._landslides_volume @property def landslides_volume_sed(self): """ List with the volume of sediment eroded by landslides. The list is reset every time the _landslide_erosion function is called """ return self._landslides_volume_sed @property def landslides_volume_bed(self): """ List with the volume of bedrock eroded by landslides. The list is reset every time the _landslide_erosion function is called """ return self._landslides_volume_bed def _landslide_erosion(self, dt): """ Calculate bedrock landsliding for a time period 'dt'. Parameters ---------- dt: float The imposed timestep. Returns ------- suspended_sed : float Volume of suspended sediment. """ # Pointers topo = self.grid.at_node["topographic__elevation"] bed = self.grid.at_node["bedrock__elevation"] steepest_slope = self.grid.at_node["topographic__steepest_slope"] soil_d = self.grid.at_node["soil__depth"] landslide_sed_in = self.grid.at_node["landslide_sediment_point_source"] landslide__ero = self.grid.at_node["landslide__erosion"] # Reset LS Plains landslide__ero.fill(0.0) # Reset landslide sediment point source field landslide_sed_in.fill(0.0) # Reset data structures to store properties of simulated landslides. self._landslides_size = [] self._landslides_volume = [] self._landslides_volume_sed = [] self._landslides_volume_bed = [] # Identify flooded nodes flood_status = self.grid.at_node["flood_status_code"] flooded_nodes = np.nonzero(flood_status == _FLOODED)[0] # In the following section the location of critical nodes where # landsldies are initatated is calcualted, unless these critical nodes # are provided as critical_sliding_nodes if self._critical_sliding_nodes is None: # Calculate gradients height_cell = topo - topo[self.grid.at_node["flow__receiver_node"]] height_cell[flooded_nodes] = 0 height_cell[height_cell > MAX_HEIGHT_SLOPE] = MAX_HEIGHT_SLOPE angle_int_frict_radians = np.arctan(self._angle_int_frict) height_critical = np.divide( (4 * self._cohesion_eff / (self._grav * self._rho_r)) * (np.sin(np.arctan(steepest_slope)) * np.cos(angle_int_frict_radians)), 1 - np.cos(np.arctan(steepest_slope) - angle_int_frict_radians), where=(1 - np.cos(np.arctan(steepest_slope) - angle_int_frict_radians)) > 0, out=np.zeros_like(steepest_slope), ) spatial_prob = np.divide( height_cell, height_critical, where=height_critical > 0, out=np.zeros_like(height_critical), ) spatial_prob[np.arctan(steepest_slope) <= angle_int_frict_radians] = 0 spatial_prob[spatial_prob > 1] = 1 # Temporal probability temporal_prob = 1 - np.exp(-dt / self._landslides_return_time) # Combined probability combined_prob = temporal_prob * spatial_prob sliding = np.random.rand(combined_prob.size) < combined_prob # Now, find the critical node, which is the receiver of critical_landslide_nodes # Critical nodes must be unique (a given node can have more receivers...) critical_landslide_nodes = np.unique( self.grid.at_node["flow__receiver_node"][np.where(sliding)] ) # Remove boundary nodes if not self._landslides_on_boundary_nodes: critical_landslide_nodes = critical_landslide_nodes[ ~self.grid.node_is_boundary(critical_landslide_nodes) ] else: critical_landslide_nodes = np.array(self._critical_sliding_nodes) # output variables suspended_sed = 0.0 if self._verbose_landslides: print(f"nbSlides = {len(critical_landslide_nodes)}") store_cumul_volume = 0.0 while critical_landslide_nodes.size > 0: crit_node = critical_landslide_nodes[0] # start at first critical node crit_node_el = topo[crit_node] # get 8 neighbors and only keep those to active nodes which are upstream neighbors = np.concatenate( ( self.grid.active_adjacent_nodes_at_node[crit_node], self.grid.diagonal_adjacent_nodes_at_node[crit_node], ) ) neighbors = neighbors[neighbors != -1] neighbors_up = neighbors[topo[neighbors] > crit_node_el] x_crit_node = self.grid.node_x[crit_node] y_crit_node = self.grid.node_y[crit_node] dist_to_initial_node = np.sqrt( np.add( np.square(x_crit_node - self.grid.node_x[neighbors_up]), np.square(y_crit_node - self.grid.node_y[neighbors_up]), ) ) slope_neighbors_to_crit_node = ( topo[neighbors_up] - crit_node_el ) / dist_to_initial_node neighbors_up = neighbors_up[ slope_neighbors_to_crit_node > self._angle_int_frict ] slope_neighbors_to_crit_node = slope_neighbors_to_crit_node[ slope_neighbors_to_crit_node > self._angle_int_frict ] if slope_neighbors_to_crit_node.size > 0: slope_slide = max(slope_neighbors_to_crit_node) store_volume_bed = 0.0 store_volume_sed = 0.0 upstream_count = 0 upstream_neighbors = neighbors_up if not self._landslides_on_boundary_nodes: upstream_neighbors = upstream_neighbors[ ~self.grid.node_is_boundary(upstream_neighbors) ] # Fix sliding angle of particular LS sliding_angle = (self._angle_int_frict + slope_slide) / 2.0 nb_landslide_cells = 0 # If landslides become unrealistically big, exit algorithm while upstream_neighbors.size > 0 and ( upstream_count <= self._max_pixelsize_landslide and nb_landslide_cells < 1e5 ): distance_to_crit_node = np.sqrt( np.add( np.square( x_crit_node - self.grid.node_x[upstream_neighbors[0]] ), np.square( y_crit_node - self.grid.node_y[upstream_neighbors[0]] ), ) ) new_el = crit_node_el + distance_to_crit_node * sliding_angle nb_landslide_cells += 1 if new_el < topo[upstream_neighbors[0]]: # Do actual slide upstream_count += 1 sed_landslide_ero = np.clip( min( soil_d[upstream_neighbors[0]], topo[upstream_neighbors[0]] - new_el, ), a_min=0.0, a_max=None, ) soil_d[upstream_neighbors[0]] -= sed_landslide_ero topo[upstream_neighbors[0]] = new_el bed_landslide_ero = np.clip( bed[upstream_neighbors[0]] - (new_el - soil_d[upstream_neighbors[0]]), a_min=0.0, a_max=None, ) bed[upstream_neighbors[0]] -= bed_landslide_ero topo[upstream_neighbors[0]] = new_el vol_sed = ( sed_landslide_ero * (1 - self._phi) * (self.grid.dx**2) ) vol_bed = bed_landslide_ero * (self.grid.dx**2) store_volume_sed = store_volume_sed + vol_sed store_volume_bed = store_volume_bed + vol_bed neighbors = np.concatenate( ( self.grid.active_adjacent_nodes_at_node[ upstream_neighbors[0] ], self.grid.diagonal_adjacent_nodes_at_node[ upstream_neighbors[0] ], ) ) neighbors = neighbors[neighbors != -1] neighbors_up = neighbors[topo[neighbors] > crit_node_el] upstream_neighbors = [*upstream_neighbors, *neighbors_up] temp, idx = np.unique(upstream_neighbors, return_index=True) upstream_neighbors = np.array(upstream_neighbors) upstream_neighbors = upstream_neighbors[np.sort(idx)] if not self._landslides_on_boundary_nodes: upstream_neighbors = upstream_neighbors[ ~self.grid.node_is_boundary(upstream_neighbors) ] # if one of the LS pixels also appears in critical_landslide_nodes list, # remove it there so that no new landslide is initialized critical_landslide_nodes = critical_landslide_nodes[ np.where(critical_landslide_nodes != upstream_neighbors[0]) ] landslide__ero[upstream_neighbors[0]] = ( sed_landslide_ero + bed_landslide_ero ) upstream_neighbors = np.delete(upstream_neighbors, 0, 0) store_volume = store_volume_sed + store_volume_bed store_cumul_volume += store_volume if upstream_count > 0: landslide_sed_in[crit_node] += (store_volume / dt) * ( 1.0 - self._fraction_fines_LS ) suspended_sed += (store_volume / dt) * self._fraction_fines_LS self._landslides_size.append(upstream_count) self._landslides_volume.append(store_volume) self._landslides_volume_sed.append(store_volume_sed) self._landslides_volume_bed.append(store_volume_bed) if critical_landslide_nodes.size > 0: critical_landslide_nodes = np.delete(critical_landslide_nodes, 0, 0) return suspended_sed def _landslide_runout(self, dt): """ Calculate landslide runout using a non-local deposition algorithm based on: * Carretier S., Martinod P., Reich M., Godderis Y. (2016) Modelling sediment clasts transport during landscape evolution. Earth Surf Dyn: 4(1):237–51. * Campforts B., Shobe C.M., Steer P., Vanmaercke M., Lague D., Braun J. (2020) HyLands 1.0: a hybrid landscape evolution model to simulate the impact of landslides and landslide-derived sediment on landscape evolution. Geosci Model Dev: 13(9):3863–86. Parameters ---------- dt : float Timestep. Returns ------- dh_hill : float Hillslope erosion over the simulated domain. volume_leaving : float Total volume of sediment leaving the simulated domain. flux_core_nodes : float Sediment flux over the simulated domain. """ topo = self.grid.at_node["topographic__elevation"] bed = self.grid.at_node["bedrock__elevation"] soil_d = self.grid.at_node["soil__depth"] sed_flux = self.grid.at_node["LS_sediment__flux"] stack_rev = np.flip(self.grid.at_node["flow__upstream_node_order"]) landslide_depo = self.grid.at_node["landslide__deposition"] landslide_sed_in = self.grid.at_node["landslide_sediment_point_source"] node_status = self.grid.status_at_node # Only process core nodes stack_rev_sel = stack_rev[node_status[stack_rev] == NodeStatus.CORE] receivers = self.grid.at_node["hill_flow__receiver_node"] fract_receivers = self.grid.at_node["hill_flow__receiver_proportions"] # keep only steepest slope slope = np.max(self.grid.at_node["hill_topographic__steepest_slope"], axis=1) slope[slope < 0] = 0.0 flux_in = landslide_sed_in * dt # flux_in, in m3 per timestep # L following carretier 2016 transport_length_hill = np.where( slope < self._threshold_slope, self.grid.dx / (1 - (slope / self._threshold_slope) ** 2), 1e6, ) flux_out = np.zeros(topo.shape) dh_hill = np.zeros(topo.shape) topo_copy = np.array(topo) max_depo = np.zeros(topo.shape) length_adjacent_cells = np.array( [ self.grid.dx, self.grid.dx, self.grid.dx, self.grid.dx, self.grid.dx * np.sqrt(2), self.grid.dx * np.sqrt(2), self.grid.dx * np.sqrt(2), self.grid.dx * np.sqrt(2), ] ) _landslide_runout( self.grid.dx, self._phi, self._min_deposition_slope, stack_rev_sel, receivers, fract_receivers, flux_in, transport_length_hill, flux_out, dh_hill, topo_copy, max_depo, length_adjacent_cells, ) sed_flux[:] = flux_out flux_core_nodes = np.sum(flux_in[self.grid.status_at_node == 0]) volume_leaving = np.sum(flux_in) # Qs_leaving # in m3 per timestep # Change sediment layer soil_d[:] += dh_hill topo[:] = bed + soil_d # Reset Qs landslide_sed_in.fill(0.0) # Update deposition field landslide_depo[:] = dh_hill return dh_hill, volume_leaving, flux_core_nodes
[docs] def run_one_step(self, dt): """Advance BedrockLandslider component by one time step of size dt. Parameters ---------- dt: float The imposed timestep. Returns ------- vol_suspended_sediment_yield : float volume of sediment evacuated as syspended sediment. volume_leaving : float Volume of sediment leaving the domain. """ dt = float(dt) if self.current_time is None: self.current_time = dt else: self.current_time += dt # Landslides vol_suspended_sediment_yield = self._landslide_erosion(dt) dh_hill, volume_leaving, flux_core_nodes = self._landslide_runout(dt) return vol_suspended_sediment_yield, volume_leaving