Source code for landlab.components.steepness_index.channel_steepness

"""Created on Mon Oct 19.

@author: dejh
"""

import numpy as np

from landlab import Component


[docs] class SteepnessFinder(Component): """This component calculates steepness indices, sensu Wobus et al. 2006, for a Landlab landscape. Follows broadly the approach used in GeomorphTools, geomorphtools.org. Examples -------- >>> import numpy as np >>> from landlab import RasterModelGrid >>> from landlab.components import FlowAccumulator, FastscapeEroder >>> from landlab.components import SteepnessFinder >>> mg = RasterModelGrid((3, 10), xy_spacing=100.0) >>> for nodes in ( ... mg.nodes_at_right_edge, ... mg.nodes_at_bottom_edge, ... mg.nodes_at_top_edge, ... ): ... mg.status_at_node[nodes] = mg.BC_NODE_IS_CLOSED >>> _ = mg.add_zeros("topographic__elevation", at="node") >>> mg.at_node["topographic__elevation"][mg.core_nodes] = ( ... mg.node_x[mg.core_nodes] / 1000.0 ... ) >>> fr = FlowAccumulator(mg, flow_director="D8") >>> sp = FastscapeEroder(mg, K_sp=0.01) >>> sf = SteepnessFinder(mg, min_drainage_area=10000.0) >>> for i in range(10): ... mg.at_node["topographic__elevation"][mg.core_nodes] += 10.0 ... _ = fr.run_one_step() ... sp.run_one_step(1000.0) ... >>> sf.calculate_steepnesses() >>> mg.at_node["channel__steepness_index"].reshape((3, 10))[1, :] array([ 0. , 29.28427125, 1. , 1. , 1. , 1. , 1. , 1. , 0.99999997, 0. ]) >>> sf.hillslope_mask array([ True, True, True, True, True, True, True, True, True, True, False, False, False, False, False, False, False, False, False, True, True, True, True, True, True, True, True, True, True, True], dtype=bool) >>> sf = SteepnessFinder(mg, min_drainage_area=10000.0, discretization_length=350.0) >>> sf.calculate_steepnesses() >>> mg.at_node["channel__steepness_index"].reshape((3, 10))[1, :] array([ 0. , 3.08232295, 3.08232295, 3.08232295, 1. , 1. , 1. , 1. , 0. , 0. ]) >>> sf = SteepnessFinder(mg, min_drainage_area=10000.0, elev_step=1.5) >>> sf.calculate_steepnesses() >>> mg.at_node["channel__steepness_index"].reshape((3, 10))[1, :] array([ 0. , 1.22673541, 1.2593727 , 1.27781936, 1.25659369, 1.12393156, 0.97335328, 0.79473963, 0.56196578, 0. ]) References ---------- **Required Software Citation(s) Specific to this Component** None Listed **Additional References** Wobus, C. W., Whipple, K. X., Kirby, E., Snyder, N. P., Johnson, J., Spyropolou, K., Crosby, B. T., and Sheenan, D.: Tectonics from topography: Procedures, promise, and pitfalls, in: Tectonics, Climate, and Landscape Evolution, edited by: Willett, S. D., Hovius, N., Brandon, M. T., and Fisher, D., Geological Society of America Special Paper 398, Geological Society of America, Boulder, CO, USA, 55–74, 2006. """ _name = "SteepnessFinder" _unit_agnostic = True _info = { "channel__steepness_index": { "dtype": float, "intent": "out", "optional": False, "units": "variable", "mapping": "node", "doc": "the local steepness index", }, "drainage_area": { "dtype": float, "intent": "in", "optional": False, "units": "m**2", "mapping": "node", "doc": "Upstream accumulated surface area contributing to the node's discharge", }, "flow__link_to_receiver_node": { "dtype": int, "intent": "in", "optional": False, "units": "-", "mapping": "node", "doc": "ID of link downstream of each node, which carries the discharge", }, "flow__receiver_node": { "dtype": int, "intent": "in", "optional": False, "units": "-", "mapping": "node", "doc": "Node array of receivers (node that receives flow from current node)", }, "flow__upstream_node_order": { "dtype": int, "intent": "in", "optional": False, "units": "-", "mapping": "node", "doc": "Node array containing downstream-to-upstream ordered list of node IDs", }, "topographic__elevation": { "dtype": float, "intent": "in", "optional": False, "units": "m", "mapping": "node", "doc": "Land surface topographic elevation", }, "topographic__steepest_slope": { "dtype": float, "intent": "in", "optional": False, "units": "-", "mapping": "node", "doc": "The steepest *downhill* slope", }, }
[docs] def __init__( self, grid, reference_concavity=0.5, min_drainage_area=1.0e6, elev_step=0.0, discretization_length=0.0, ): """ Parameters ---------- grid : RasterModelGrid A landlab RasterModelGrid. reference_concavity : float (default 0.5) The reference concavity to use in the calculation. min_drainage_area : float (m**2; default 1.e6) The minimum drainage area above which steepness indices are calculated. Defaults to 1.e6 m**2, per Wobus et al. 2006. elev_step : float (m; default 0.) If >0., becomes a vertical elevation change step to use to discretize the data (per Wobus). If 0., all nodes are used and no discretization happens. discretization_length : float (m; default 0.) If >0., becomes the lengthscale over which to segment the profiles - i.e., one different steepness index value is calculated every discretization_length. If only one (or no) points are present in a segment, it will be lumped together with the next segment. If zero, one value is assigned to each channel node. """ super().__init__(grid) if grid.at_node["flow__receiver_node"].size != grid.size("node"): raise NotImplementedError( "A route-to-multiple flow director has been " "run on this grid. The landlab development team has not " "verified that SteepnessFinder is compatible with " "route-to-multiple methods. Please open a GitHub Issue " "to start this process." ) self._reftheta = reference_concavity self._min_drainage = min_drainage_area assert elev_step >= 0.0, "elev_step must be >= 0!" self._elev_step = elev_step self._discretization = discretization_length self._ksn = self._grid.add_zeros( "channel__steepness_index", at="node", clobber=True ) self._mask = self._grid.ones("node", dtype=bool) # this one needs modifying if smooth_elev self._elev = self._grid.at_node["topographic__elevation"]
[docs] def calculate_steepnesses(self): """This is the main method. Call it to calculate local steepness indices at all points with drainage areas greater than *min_drainage_area*. This "run" method can optionally take the same parameter set as provided at instantiation. If they are provided, they will override the existing values from instantiation. Normalized steepness of any node without a defined value is reported as 0. These nodes are also identified in the mask retrieved with :func:`hillslope_mask`. """ self._mask.fill(True) self._ksn.fill(0.0) reftheta = self._reftheta min_drainage = self._min_drainage elev_step = self._elev_step discretization_length = self._discretization upstr_order = self._grid.at_node["flow__upstream_node_order"] # get an array of only nodes with A above threshold: valid_dstr_order = ( upstr_order[ self._grid.at_node["drainage_area"][upstr_order] >= min_drainage ] )[::-1] # note elevs are guaranteed to be in order, UNLESS a fill # algorithm has been used. nodes_incorporated = self._grid.zeros("node", dtype=bool) # now do each poss channel in turn # get the head of the first (longest!) channel: for dstr_order_index in range(valid_dstr_order.size): this_ch_top_node = valid_dstr_order[dstr_order_index] # top node if not nodes_incorporated[this_ch_top_node]: nodes_incorporated[this_ch_top_node] = True nodes_in_channel = [this_ch_top_node] penultimate_node = this_ch_top_node current_node_incorporated = False while not current_node_incorporated: next_node = self._grid.at_node["flow__receiver_node"][ penultimate_node ] if next_node == penultimate_node: # end of flow path break nodes_in_channel.append(next_node) current_node_incorporated = nodes_incorporated[next_node] # ^ this is a COPY op, so we're free to update the array nodes_incorporated[next_node] = True penultimate_node = next_node # by here, we have a full, unique reach in nodes_in_channel # it incorporates a single, duplicate node at the lower end # Now, if this segment long enough? if elev_step: top_elev = self._elev[nodes_in_channel[0]] base_elev = self._elev[nodes_in_channel[-1]] # work up the channel from the base to make new interp pts interp_pt_elevs = np.arange(base_elev, top_elev, elev_step) if interp_pt_elevs.size <= 1: # <1 step; bail on this whole segment break # now we can fairly closely follow the Geomorphtools # algorithm: ch_nodes = np.array(nodes_in_channel) # ^ this is top-to-bottom ch_A = self._grid.at_node["drainage_area"][ch_nodes] ch_dists = self.channel_distances_downstream(ch_nodes) ch_S = self.interpolate_slopes_with_step( ch_nodes, ch_dists, interp_pt_elevs ) else: # all the nodes; much easier as links work ch_nodes = np.array(nodes_in_channel) ch_dists = self.channel_distances_downstream(ch_nodes) ch_A = self._grid.at_node["drainage_area"][ch_nodes] ch_S = self._grid.at_node["topographic__steepest_slope"][ch_nodes] assert np.all(ch_S >= 0.0) # if we're doing spatial discretization, do it here: if discretization_length: ch_ksn = self.calc_ksn_discretized( ch_dists, ch_A, ch_S, reftheta, discretization_length ) else: # not discretized # also chopping off the final node, as above log_A = np.log10(ch_A[:-1]) log_S = np.log10(ch_S[:-1]) # we're potentially propagating nans here if S<=0 log_ksn = log_S + reftheta * log_A ch_ksn = 10.0**log_ksn # save the answers into the main arrays: assert np.all(self._mask[ch_nodes[:-1]]) # Final node gets trimmed off... self._ksn[ch_nodes[:-1]] = ch_ksn self._mask[ch_nodes] = False # now a final sweep to remove any undefined ksn values: self._mask[self._ksn == -1.0] = True self._ksn[self._ksn == -1.0] = 0.0
[docs] def channel_distances_downstream(self, ch_nodes): """Calculates distances downstream from top node of a defined flowpath. Parameters ---------- ch_nodes : array of ints The nodes along a single defined flow path, starting upstream. Returns ------- ch_dists : array of floats Distances downstream from top node of ch_nodes. Examples -------- >>> import numpy as np >>> from landlab import RasterModelGrid >>> from landlab.components import FlowAccumulator >>> mg = RasterModelGrid((4, 5), xy_spacing=(10.0, 5.0)) >>> for nodes in ( ... mg.nodes_at_right_edge, ... mg.nodes_at_bottom_edge, ... mg.nodes_at_top_edge, ... ): ... mg.status_at_node[nodes] = mg.BC_NODE_IS_CLOSED >>> mg.status_at_node[[6, 12, 13, 14]] = mg.BC_NODE_IS_CLOSED >>> _ = mg.add_field("topographic__elevation", mg.node_x, at="node") >>> fr = FlowAccumulator(mg, flow_director="D8") >>> sf = SteepnessFinder(mg) >>> _ = fr.run_one_step() >>> ch_nodes = np.array([8, 7, 11, 10]) >>> sf.channel_distances_downstream(ch_nodes) array([ 0. , 10. , 21.18033989, 31.18033989]) """ ch_links = self._grid.at_node["flow__link_to_receiver_node"][ch_nodes] ch_dists = np.empty_like(ch_nodes, dtype=float) # dists from ch head, NOT drainage divide ch_dists[0] = 0.0 np.cumsum(self._grid.length_of_d8[ch_links[:-1]], out=ch_dists[1:]) return ch_dists
[docs] def interpolate_slopes_with_step(self, ch_nodes, ch_dists, interp_pt_elevs): """Maps slopes to nodes, interpolating withing defined vertical intervals. This follows Geomorphtools' discretization methods. It is essentially a downwind map of the slopes. Parameters ---------- ch_nodes : array of ints The nodes along a single defined flow path, starting upstream. ch_dists : array of floats Distances downstream from top node of ch_nodes. interp_pt_elevs : array of floats Elevations at the discretizing points along the profile, in order of increasing elevation. Returns ------- ch_S : array of floats Interpolated slopes at each node in the flowpath (always positive). Examples -------- >>> import numpy as np >>> from landlab import RasterModelGrid >>> from landlab.components import FlowAccumulator >>> mg = RasterModelGrid((3, 10), xy_spacing=(10.0, 5.0)) >>> for nodes in ( ... mg.nodes_at_right_edge, ... mg.nodes_at_bottom_edge, ... mg.nodes_at_top_edge, ... ): ... mg.status_at_node[nodes] = mg.BC_NODE_IS_CLOSED >>> _ = mg.add_field("topographic__elevation", mg.node_x**1.1, at="node") >>> fr = FlowAccumulator(mg, flow_director="D8") >>> sf = SteepnessFinder(mg) >>> _ = fr.run_one_step() >>> ch_nodes = np.arange(18, 9, -1) >>> ch_dists = sf.channel_distances_downstream(ch_nodes) >>> interp_pt_elevs = np.array([0.0, 30.0, 60.0, 90.0, 120.0]) >>> sf.interpolate_slopes_with_step(ch_nodes, ch_dists, interp_pt_elevs) array([ 1.67970205, 1.67970205, 1.67970205, 1.65129294, 1.62115336, 1.5811951 , 1.53157521, 1.44240187, 1.36442227]) >>> mg.at_node["topographic__steepest_slope"][ch_nodes] array([ 1.69383001, 1.66972677, 1.64200694, 1.60928598, 1.56915472, 1.51678178, 1.43964028, 1.25892541, 0. ]) >>> mg.at_node["topographic__elevation"][:] = mg.node_x >>> interp_pt_elevs = np.array([0.0, 25.0, 50.0, 75.0, 80.0]) >>> sf.interpolate_slopes_with_step(ch_nodes, ch_dists, interp_pt_elevs) array([ 1., 1., 1., 1., 1., 1., 1., 1., 1.]) """ ch_z = self._grid.at_node["topographic__elevation"][ch_nodes] assert ( ch_z[0] >= interp_pt_elevs[-1] ), "Highest interp_pt_elev must be below top channel node" interp_pt_x = np.interp(interp_pt_elevs, ch_z[::-1], ch_dists[::-1]) interp_pt_S = np.empty_like(interp_pt_elevs) # now a downwind map of the slopes onto the nodes # slopes are defined positive z_diff = interp_pt_elevs[:-1] - interp_pt_elevs[1:] x_diff = interp_pt_x[1:] - interp_pt_x[:-1] np.divide(z_diff, x_diff, out=interp_pt_S[:-1]) interp_pt_S[-1] = interp_pt_S[-2] # Map S back onto nodes ch_S = np.interp(ch_z, interp_pt_elevs, interp_pt_S) return ch_S
[docs] def calc_ksn_discretized( self, ch_dists, ch_A, ch_S, ref_theta, discretization_length ): """Calculate normalized steepness index on defined channel segments. Every segment must have at least 2 nodes along it. If not, segments will be automatically merged to achieve this. The channel will be segmented starting at the *downstream* end. NB: The final node in the channel does not receive an index, as it either belongs to a longer, existing flow path, or it is a boundary node with S = 0. Neither works. Parameters ---------- ch_dists : array of floats Distances downstream from top node of a single stream path. ch_A : array of floats Drainage areas at each node in the flowpath. ch_S : array of floats Slope at each node in the flowpath (defined as positive). ref_theta : float The reference concavity; must be positive. discretization_length : float (m) The streamwise length of each segment. Returns ------- ch_ksn : array of floats The normalized steepness index at each node in the flowpath, EXCEPT THE LAST. (i.e., length is (ch_dists.size - 1)). Values will be the same within each defined segment. Examples -------- >>> import numpy as np >>> from landlab import RasterModelGrid >>> from landlab.components import FlowAccumulator >>> from landlab.components import SteepnessFinder >>> mg = RasterModelGrid((3, 10), xy_spacing=(10.0, 5.0)) >>> for nodes in ( ... mg.nodes_at_right_edge, ... mg.nodes_at_bottom_edge, ... mg.nodes_at_top_edge, ... ): ... mg.status_at_node[nodes] = mg.BC_NODE_IS_CLOSED >>> _ = mg.add_field("topographic__elevation", mg.node_x, at="node") >>> fr = FlowAccumulator(mg, flow_director="D8") >>> sf = SteepnessFinder(mg) >>> _ = fr.run_one_step() >>> ch_nodes = np.arange(18, 9, -1) >>> ch_dists = sf.channel_distances_downstream(ch_nodes) >>> ch_A = mg.at_node["drainage_area"][ch_nodes] >>> ch_S = mg.at_node["topographic__steepest_slope"][ch_nodes] >>> ksn_25 = sf.calc_ksn_discretized(ch_dists, ch_A, ch_S, 0.5, 25.0) >>> ksn_25.size == ch_dists.size - 1 True >>> ksn_25 array([ -1. , 11.0668192 , 11.0668192 , 15.70417802, 15.70417802, 15.70417802, 19.3433642 , 19.3433642 ]) >>> ksn_10 = sf.calc_ksn_discretized(ch_dists, ch_A, ch_S, 0.5, 10.0) >>> ksn_10 array([ 8.40896415, 8.40896415, 13.16074013, 13.16074013, 16.5487546 , 16.5487546 , 19.3433642 , 19.3433642 ]) >>> ch_ksn_overdiscretized = sf.calc_ksn_discretized( ... ch_dists, ch_A, ch_S, 0.5, 10.0 ... ) >>> np.allclose(ch_ksn_overdiscretized, ksn_10) True """ ch_ksn = np.empty_like(ch_A) # need to remove the influence of the final node in the seg, # as it reflects either the edge of the grid (S=0) or a point # after a confluence - hence the 0.000001 seg_ends = np.arange(ch_dists[-1] - 0.000001, 0.0, -discretization_length)[::-1] # ^ counts up from 0, but terminates at the far end cleanly pts_in_each_seg = np.searchsorted(seg_ends, ch_dists) num_segs = pts_in_each_seg[-1] i = num_segs - 1 # the final pt is no longer included while i >= 0: old_i = i pts_in_seg = pts_in_each_seg == i num_pts_in_seg = int(pts_in_seg.sum()) # if i == num_segs: # true_pts_in_seg = pts_in_each_seg.copy() # pts_in_each_seg[-1] = False # else: # true_pts_in_seg = pts_in_each_seg # make sure there's always 2 pts in the seg... while num_pts_in_seg < 2: i -= 1 pts_in_seg = np.logical_and( pts_in_each_seg <= old_i, pts_in_each_seg >= i ) num_pts_in_seg = int(pts_in_seg.sum()) if i < 0: break if num_pts_in_seg < 2: # must be at the end of the seg... # nodes in invalid segs at the end get ksn = -1. ch_ksn[pts_in_seg] = -1.0 break seg_A = ch_A[pts_in_seg] seg_S = ch_S[pts_in_seg] logseg_A = np.log10(seg_A) logseg_S = np.log10(seg_S) meanlogseg_A = np.mean(logseg_A) meanlogseg_S = np.mean(logseg_S) logseg_ksn = meanlogseg_S + ref_theta * meanlogseg_A ch_ksn[pts_in_seg] = 10.0**logseg_ksn i -= 1 return ch_ksn[:-1]
@property def steepness_indices(self): """Return the array of channel steepness indices. Nodes not in the channel receive zeros. """ return self._ksn @property def hillslope_mask(self): """Return a boolean array, False where steepness indices exist.""" return self._mask @property def masked_steepness_indices(self): """Returns a masked array version of the 'channel__steepness_index' field. This enables easier plotting of the values with. :func:`landlab.imshow_grid_at_node` or similar. Examples -------- Make a topographic map with an overlay of steepness values: >>> from landlab import imshow_grid_at_node >>> from landlab import RasterModelGrid >>> from landlab.components import FlowAccumulator, FastscapeEroder >>> from landlab.components import SteepnessFinder >>> mg = RasterModelGrid((5, 5), xy_spacing=100.0) >>> for nodes in ( ... mg.nodes_at_right_edge, ... mg.nodes_at_bottom_edge, ... mg.nodes_at_top_edge, ... ): ... mg.status_at_node[nodes] = mg.BC_NODE_IS_CLOSED >>> _ = mg.add_zeros("topographic__elevation", at="node") >>> mg.at_node["topographic__elevation"][mg.core_nodes] = ( ... mg.node_x[mg.core_nodes] / 1000.0 ... ) >>> np.random.seed(0) >>> mg.at_node["topographic__elevation"][mg.core_nodes] += np.random.rand( ... mg.number_of_core_nodes ... ) >>> fr = FlowAccumulator(mg, flow_director="D8") >>> sp = FastscapeEroder(mg, K_sp=0.01) >>> cf = SteepnessFinder(mg, min_drainage_area=20000.0) >>> for i in range(10): ... mg.at_node["topographic__elevation"][mg.core_nodes] += 10.0 ... _ = fr.run_one_step() ... sp.run_one_step(1000.0) ... >>> _ = fr.run_one_step() >>> cf.calculate_steepnesses() >>> imshow_grid_at_node(mg, "topographic__elevation", allow_colorbar=False) >>> imshow_grid_at_node( ... mg, cf.masked_steepness_indices, color_for_closed=None, cmap="winter" ... ) """ return np.ma.array(self.steepness_indices, mask=self.hillslope_mask)