Source code for landlab.components.normal_fault.normal_fault

#!/usr/bin/env python
"""Rock uplift along a normal fault.

Landlab component that implements rock uplift by a normal fault. Note
that this component does not make any attempt to advect topography
laterally.
"""

import numpy as np

from landlab import Component
from landlab import FieldError

TWO_PI = 2.0 * np.pi


[docs] class NormalFault(Component): """NormalFault implements relative rock motion due to a normal fault. The fault can have an arbitrary trace given by two points (x1, y1) and (x2, y2) in the `fault_trace` input parameter. These value of these points is in model-space coordinates and is not based on node id values or number of rows and columns. This NormalFault component permits two primary methods for enacting fault motion. 1. **run_one_step**: The throw rate is provided through the ``fault_throw_rate_through_time`` parameter. This rate can be constant or arbitrary. See the NormalFault tutorial in the landlab tutorials repository for an extensive example. In this case, the NormalFault component will keep track of the cumulative amount of model-run-time and set the rate based on interpolating the provided rate-time history. *NOTE: this means that the model run timesteps must align with the time-rate relationship provided to NormalFault*. Improving this is on the developers todo list but is of low priority. 2. **run_one_earthquake**: A single uplift event of size dz can be specified by this method. If NormalFault is used in this way, any specifications provided in the ``fault_throw_rate_through_time`` keyword argument will be ignored. Note that the NormalFault component does not prevent a user from combining the **run_one_step** and **run_one_earthquake** methods. It is encumbent upon the user, however, to ensure that these two methods are used in combination correctly for their specific use case. References ---------- **Required Software Citation(s) Specific to this Component** None Listed **Additional References** None Listed """ _name = "NormalFault" _unit_agnostic = True _info = { "topographic__elevation": { "dtype": float, "intent": "inout", "optional": True, "units": "m", "mapping": "node", "doc": "Land surface topographic elevation", } }
[docs] def __init__( self, grid, faulted_surface="topographic__elevation", fault_throw_rate_through_time=(("time", [0]), ("rate", [0.001])), fault_dip_angle=90.0, fault_trace=(("x1", 0), ("y1", 0), ("x2", 1), ("y2", 1)), include_boundaries=False, ): """Instantiation of a NormalFault. Parameters ---------- grid : ModelGrid faulted_surface : str or list of str Surface that is modified by the NormalFault component. Must be a field name or a list of field names if the fault should uplift more than one field. Default value is `topographic__elevation`. If the faulted surface does not yet exist, it will be ingored. The ``run_one_step`` method will check to see an ignored field has been added and if it has been, it will modify it. fault_throw_rate_through_time : dict, optional Dictionary that specifies the time varying throw rate on the fault. Expected format is: `fault_throw_rate_through_time = {'time': array, 'rate': array}` Default value is a constant rate of 0.001 (units not specified). This is acomplished by providing the dictionary `{'time': [0], 'rate': [0.001]}`. NormalFault uses numpy.interp to interpolate the time and rate pattern to the current model time. This function uses the first value for all values less than the first, and the last value for all values greater than the last, and thus providing only one number results in all times getting a rate of that value. fault_dip_angle : float, optional Dip angle of the fault in degrees. Default value is 90 degrees. fault_trace : dictionary, optional Dictionary that specifies the coordinates of two locations on the fault trace. Expected format is .. code-block:: python fault_trace = {"x1": float, "y1": float, "x2": float, "y2": float} where the vector from `(x1, y1)` to `(x2, y2)` defines the strike of the fault trace. The orientation of the fault dip relative to the strike follows the right hand rule. Default is for the fault to strike NE. include_boundaries : boolean, optional Flag to indicate if model grid boundaries should be uplifted. If set to `True` uplifted model grid boundaries will be set to the average value of their upstream nodes. Default value is False. Examples -------- Create a grid on which we will run the NormalFault component. >>> from landlab import RasterModelGrid >>> from landlab.components import NormalFault >>> grid = RasterModelGrid((6, 6), xy_spacing=10) Add an elevation field. >>> z = grid.add_zeros("topographic__elevation", at="node") Set the parameter values for the NormalFault component. >>> param_dict = { ... "faulted_surface": "topographic__elevation", ... "fault_dip_angle": 90.0, ... "fault_throw_rate_through_time": { ... "time": [0, 9, 10], ... "rate": [0, 0, 0.05], ... }, ... "fault_trace": {"y1": 0, "x1": 0, "y2": 30, "x2": 60}, ... "include_boundaries": False, ... } Instantiate a NormalFault component. >>> nf = NormalFault(grid, **param_dict) >>> nf.faulted_nodes.reshape(grid.shape) array([[False, False, False, False, False, False], [False, True, False, False, False, False], [False, True, True, True, False, False], [False, True, True, True, True, False], [False, True, True, True, True, False], [False, False, False, False, False, False]], dtype=bool) As we can see, only a subset of the nodes have been identified as *faulted nodes*. Because we have set include_boundaries' to False none of the boundary nodes are faulted nodes. Next we will run the NormalFault for 30 1-year timesteps. >>> dt = 1.0 >>> for i in range(30): ... nf.run_one_step(dt) ... >>> z.reshape(grid.shape) array([[ 0., 0., 0., 0., 0., 0.], [ 0., 1., 0., 0., 0., 0.], [ 0., 1., 1., 1., 0., 0.], [ 0., 1., 1., 1., 1., 0.], [ 0., 1., 1., 1., 1., 0.], [ 0., 0., 0., 0., 0., 0.]]) This results in uplift of the faulted nodes, as we would expect. If the user knows how much uplift (dz) they want to occur in an event, they can use the **run_one_earthquake** function with a specified dz. In this case fault_throw_rate_through_time will be ignored. >>> nf.run_one_earthquake(dz=100) >>> z.reshape(grid.shape) array([[ 0., 0., 0., 0., 0., 0.], [ 0., 101., 0., 0., 0., 0.], [ 0., 101., 101., 101., 0., 0.], [ 0., 101., 101., 101., 101., 0.], [ 0., 101., 101., 101., 101., 0.], [ 0., 0., 0., 0., 0., 0.]]) Next, we make a very simple landscape model. We need a few components and we will set include_boundaries to True. >>> from landlab.components import FastscapeEroder, FlowAccumulator >>> grid = RasterModelGrid((6, 6), xy_spacing=10) >>> z = grid.add_zeros("topographic__elevation", at="node") >>> param_dict = { ... "faulted_surface": "topographic__elevation", ... "fault_dip_angle": 90.0, ... "fault_throw_rate_through_time": { ... "time": [0, 900, 1000], ... "rate": [0, 0, 0.05], ... }, ... "fault_trace": {"y1": 0, "x1": 0, "y2": 30, "x2": 60}, ... "include_boundaries": True, ... } >>> nf = NormalFault(grid, **param_dict) >>> fr = FlowAccumulator(grid) >>> fs = FastscapeEroder(grid, K_sp=0.01) Run this model for 300 100-year timesteps. >>> dt = 100.0 >>> for i in range(300): ... nf.run_one_step(dt) ... fr.run_one_step() ... fs.run_one_step(dt) ... >>> z.reshape(grid.shape).round(decimals=2) array([[ 0. , 0. , 0. , 0. , 0. , 0. ], [ 5. , 5. , 0. , 0. , 0. , 0. ], [ 7.39, 7.38, 2.38, 2.89, 0. , 0. ], [ 9.36, 11.43, 5.51, 6.42, 3.54, 3.54], [ 15.06, 15.75, 10.6 , 11.42, 8.54, 8.54], [ 15.06, 15.06, 10.7 , 11.42, 8.54, 8.54]]) The faulted nodes have been uplifted and eroded! Note that here the boundary nodes are also uplifted. NormalFault keeps track of internal time. For example, if a user wanted to only run NormalFault every tenth timestep (or some more seismogenically reasonable set of times). >>> grid = RasterModelGrid((6, 6), xy_spacing=10) >>> z = grid.add_zeros("topographic__elevation", at="node") >>> nf = NormalFault(grid, **param_dict) >>> fr = FlowAccumulator(grid) >>> fs = FastscapeEroder(grid, K_sp=0.01) >>> model_time = 0.0 >>> dt = 100.0 >>> for i in range(300): ... if i % 10 == 0: ... nf.run_one_step(dt * 10) ... fr.run_one_step() ... fs.run_one_step(dt) ... model_time += dt ... >>> model_time 30000.0 >>> nf.current_time 30000.0 """ fault_throw_rate_through_time = dict(fault_throw_rate_through_time) fault_trace = dict(fault_trace) super().__init__(grid) # save a reference to the grid # get the surface to be faulted self._surfaces = {} self._not_yet_instantiated = [] if isinstance(faulted_surface, list): # if faulted surface is a list, then itterate through multiple # surfaces and save for surf in faulted_surface: try: self._surfaces[surf] = grid.at_node[surf] except FieldError: self._not_yet_instantiated.append(surf) else: self._surfaces[faulted_surface] = grid.at_node[faulted_surface] if fault_dip_angle > 90.0: raise ValueError( "NormaFault fault_dip_angle must be less than 90 " "degrees." ) # get the fault throw parameter values from the parameter dictionary self._throw_time = np.array(fault_throw_rate_through_time["time"]) self._throw_rate = np.array(fault_throw_rate_through_time["rate"]) self._fault_dip = np.deg2rad(fault_dip_angle) self._uplift = self._throw_rate * np.sin(self._fault_dip) # Identify in current boundaries will be included self._include_boundaries = include_boundaries # Instantiate record of current time. self._current_time = 0.0 # get the fault trace dictionary and use to to calculate where the # faulted nodes are located. self._fault_trace = fault_trace dx = self._fault_trace["x2"] - self._fault_trace["x1"] dy = self._fault_trace["y2"] - self._fault_trace["y1"] self._fault_azimuth = np.mod(np.arctan2(dy, dx), TWO_PI) self._fault_anti_azimuth = self._fault_azimuth + np.pi # deal with the edge case in which dx == 0 if dx == 0: self._dy_over_dx = 0.0 self._fault_trace_y_intercept = 0.0 self._fault_trace_x_intercept = self._fault_trace["x2"] else: self._dy_over_dx = dy / dx self._fault_trace_y_intercept = self._fault_trace["y1"] - ( self._dy_over_dx * self._fault_trace["x1"] ) self._fault_trace_x_intercept = 0.0 # set the considered nodes based on whether the boundaries will be # included in the faulted terrain. if self._include_boundaries: potential_nodes = np.arange(self._grid.size("node")) else: potential_nodes = self._grid.core_nodes # select those nodes that are on the correct side of the fault dx_pn = self._grid.x_of_node[potential_nodes] - self._fault_trace_x_intercept dy_pn = self._grid.y_of_node[potential_nodes] - self._fault_trace_y_intercept potential_angles = np.mod(np.arctan2(dy_pn, dx_pn), TWO_PI) if self._fault_anti_azimuth <= TWO_PI: faulted_node_ids = potential_nodes[ ( (potential_angles > self._fault_azimuth) & (potential_angles <= (self._fault_anti_azimuth)) ) ] else: faulted_node_ids = potential_nodes[ ( (potential_angles > self._fault_azimuth) | (potential_angles <= np.mod(self._fault_anti_azimuth, TWO_PI)) ) ] # save a n-node array of boolean identifing faulted nodes. self._faulted_nodes = np.zeros(self._grid.size("node"), dtype=bool) self._faulted_nodes[faulted_node_ids] = True
@property def faulted_nodes(self): """At node array indicating which nodes are on the upthrown block.""" return self._faulted_nodes def _check_surfaces(self): if len(self._not_yet_instantiated) > 0: still_not_instantiated = [] for surf in self._not_yet_instantiated: if surf in self._grid.at_node: self._surfaces[surf] = self._grid.at_node[surf] else: still_not_instantiated.append(surf) self._not_yet_instantiated = still_not_instantiated
[docs] def run_one_earthquake(self, dz): """Run one earthquake with uplift of magnitude ``dz``.""" self._check_surfaces() # save z before uplift only if using include boundaries. if self._include_boundaries: surfs_before_uplift = {} for surf_name in self._surfaces: surfs_before_uplift[surf_name] = self._surfaces[surf_name].copy() # uplift the faulted_nodes for surf_name in self._surfaces: self._surfaces[surf_name][self._faulted_nodes] += dz # if faulted nodes includes boundaries we must do some extra work because # landlab components will typically not erode these boundaries. This means # they will be uplifted but not eroded. if self._include_boundaries: # here our goal is to set faulted boundaries to average of open # node faulted neighbors # create boolean of the faulted boundary nodes faulted_boundaries = self._faulted_nodes.copy() faulted_boundaries[self._grid.core_nodes] = False core_nodes = np.zeros(self._grid.size("node"), dtype=bool) core_nodes[self._grid.core_nodes] = True neighbor_is_core = core_nodes[self._grid.adjacent_nodes_at_node] neighbor_is_faulted = self._faulted_nodes[self._grid.adjacent_nodes_at_node] neighbor_for_averaging = neighbor_is_faulted & neighbor_is_core # Identify nodes that have at least one adjacent node that is both # faulted and not a boundary node. # average the pre-uplift topography on those adjacent nodes and assign # to the boundary node. # here we use the pre-uplift elevation because other steps in the model # may diminish this topography. averaged = neighbor_for_averaging[faulted_boundaries].sum(axis=1) == 1 if any(averaged): averaged_nodes = np.where(faulted_boundaries)[0][np.where(averaged)[0]] for surf_name in self._surfaces: elevations_to_average = surfs_before_uplift[surf_name][ self._grid.adjacent_nodes_at_node ] elevations_to_average[self._grid.adjacent_nodes_at_node == -1] = ( np.nan ) elevations_to_average[~neighbor_for_averaging] = np.nan self._surfaces[surf_name][averaged_nodes] = np.nanmean( elevations_to_average[averaged_nodes], axis=1 ) # identify any boundary nodes that are not being averaged. This will # happen at the corners on RasterModelGrids. Average over adjacent # nodes that are faulted. These nodes will be boundary nodes. # here we use the current topography as we will have just updated the # adjacent nodes in the prior block. if any(~averaged): un_averaged_nodes = np.where(faulted_boundaries)[0][ np.where(~averaged)[0] ] for surf_name in self._surfaces: elevations_to_average = self._surfaces[surf_name][ self._grid.adjacent_nodes_at_node ] elevations_to_average[self._grid.adjacent_nodes_at_node == -1] = ( np.nan ) elevations_to_average[~neighbor_is_faulted] = np.nan self._surfaces[surf_name][un_averaged_nodes] = np.nanmean( elevations_to_average[un_averaged_nodes], axis=1 )
[docs] def run_one_step(self, dt): """Run_one_step method for NormalFault. Parameters ---------- dt : float Time increment used to advance the NormalFault component. current_time : float, optional If NormalFault is not being advanced by dt every timestep with all components, its internal time may be incorrect, this can be remedied by providing a value for current time. Default value is None which results in the internal timekeeping not being changed. """ # calculate the current uplift rate current_uplift_rate = np.interp( self._current_time, self._throw_time, self._throw_rate ) # run one earthquake of size current_uplift_rate * dt self.run_one_earthquake(current_uplift_rate * dt) # increment time self._current_time += dt