Source code for landlab.components.flow_director.flow_director_d8

#! /usr/env/python

"""
flow_director_d8.py: provides the component FlowDirectorsD8.

This components finds the steepest single-path steepest descent flow
directions and considers diagonal links between nodes on a raster grid. It is
not implemented for irregular grids. For a method that works for irregular
grids and does not consider diagonal links for rasters, use
FlowDirectorSteepest instead.
"""

import numpy

from landlab import LinkStatus
from landlab.components.flow_director import flow_direction_DN
from landlab.components.flow_director.flow_director_to_one import _FlowDirectorToOne


[docs] class FlowDirectorD8(_FlowDirectorToOne): """Single-path (steepest direction) flow direction with diagonals on rasters. Single-path (steepest direction) flow direction finding on raster grids by the D8 method. This method considers flow on all eight links such that flow is possible on orthogonal and on diagonal links. The method that considers only orthogonal links (D4 method) for raster grids is FlowDirectorSteepest. This method is not implemented for Voroni grids, use FlowDirectorSteepest instead. Stores as ModelGrid fields: - Node array of receivers (nodes that receive flow), or ITS OWN ID if there is no receiver: *'flow__receiver_node'* - Node array of steepest downhill slopes: *'topographic__steepest_slope'* - Node array containing ID of link that leads from each node to its receiver, or BAD_INDEX_VALUE if no link: *'flow__link_to_receiver_node'* - Boolean node array of all local lows: *'flow__sink_flag'* The primary method of this class is :func:`run_one_step`. Examples -------- >>> import numpy as np >>> from landlab import RasterModelGrid >>> from landlab.components import FlowDirectorD8 >>> mg = RasterModelGrid((3, 3), xy_spacing=(1, 1)) >>> mg.set_closed_boundaries_at_grid_edges(True, True, True, False) >>> _ = mg.add_field( ... "topographic__elevation", ... mg.node_x + mg.node_y, ... at="node", ... ) >>> fd = FlowDirectorD8(mg, "topographic__elevation") >>> fd.surface_values array([ 0., 1., 2., 1., 2., 3., 2., 3., 4.]) >>> fd.run_one_step() >>> mg.at_node["flow__receiver_node"] array([0, 1, 2, 3, 0, 5, 6, 7, 8]) >>> mg.at_node["topographic__steepest_slope"] array([ 0. , 0. , 0. , 0. , 1.41421356, 0. , 0. , 0. , 0. ]) >>> mg.at_node["flow__link_to_receiver_node"] array([-1, -1, -1, -1, 12, -1, -1, -1, -1]) >>> mg.at_node["flow__sink_flag"].astype(int) array([1, 1, 1, 1, 0, 1, 1, 1, 1]) >>> mg_2 = RasterModelGrid((5, 4), xy_spacing=(1, 1)) >>> topographic__elevation = [ ... [0.0, 0.0, 0.0, 0.0], ... [0.0, 21.0, 10.0, 0.0], ... [0.0, 31.0, 20.0, 0.0], ... [0.0, 32.0, 30.0, 0.0], ... [0.0, 0.0, 0.0, 0.0], ... ] >>> _ = mg_2.add_field( ... "topographic__elevation", ... topographic__elevation, ... at="node", ... ) >>> mg_2.set_closed_boundaries_at_grid_edges(True, True, True, False) >>> fd_2 = FlowDirectorD8(mg_2) >>> fd_2.run_one_step() >>> mg_2.at_node["flow__receiver_node"].reshape(mg_2.shape) array([[ 0, 1, 2, 3], [ 4, 1, 2, 7], [ 8, 6, 6, 11], [12, 10, 10, 15], [16, 17, 18, 19]]) The flow directors also have the ability to return the flow receiver nodes >>> receiver = fd.direct_flow() >>> receiver array([0, 1, 2, 3, 0, 5, 6, 7, 8]) References ---------- **Required Software Citation(s) Specific to this Component** None Listed **Additional References** O'Callaghan, J., Mark, D. (1984). The extraction of drainage networks from digital elevation data. Computer Vision, Graphics, and Image Processing 28(3), 323 - 344. https://dx.doi.org/10.1016/s0734-189x(84)80011-0 """ _name = "FlowDirectorD8" _info = { "flow__link_to_receiver_node": { "dtype": int, "intent": "out", "optional": False, "units": "-", "mapping": "node", "doc": "ID of link downstream of each node, which carries the discharge", }, "flow__receiver_node": { "dtype": int, "intent": "out", "optional": False, "units": "-", "mapping": "node", "doc": "Node array of receivers (node that receives flow from current node)", }, "flow__sink_flag": { "dtype": bool, "intent": "out", "optional": False, "units": "-", "mapping": "node", "doc": "Boolean array, True at local lows", }, "topographic__elevation": { "dtype": float, "intent": "in", "optional": True, "units": "m", "mapping": "node", "doc": "Land surface topographic elevation", }, "topographic__steepest_slope": { "dtype": float, "intent": "out", "optional": False, "units": "-", "mapping": "node", "doc": "The steepest *downhill* slope", }, }
[docs] def __init__(self, grid, surface="topographic__elevation"): """ Parameters ---------- grid : ModelGrid A grid of type RasterModelGrid. surface : field name at node or array of length node, optional The surface to direct flow across, default is field at node: topographic__elevation,. """ self._method = "D8" super().__init__(grid, surface) try: self._grid.nodes_at_d8 except AttributeError: self._is_Voroni = True else: self._is_Voroni = False if self._is_Voroni: raise NotImplementedError( "FlowDirectorD8 not implemented for" "irregular grids, use" "FlowDirectorSteepest" ) self.updated_boundary_conditions()
[docs] def updated_boundary_conditions(self): """Method to update FlowDirectorD8 when boundary conditions change. Call this if boundary conditions on the grid are updated after the component is instantiated. """ self._active_links = numpy.arange(self._grid.number_of_d8) nodes_at_d8 = self._grid.nodes_at_d8[self._active_links] self._activelink_tail = nodes_at_d8[:, 0] self._activelink_head = nodes_at_d8[:, 1]
[docs] def run_one_step(self): """Find flow directions and save to the model grid. run_one_step() checks for updated boundary conditions, calculates slopes on links, finds baselevel nodes based on the status at node, calculates flow directions, and saves results to the grid. an alternative to direct_flow() is direct_flow() which does the same things but also returns the receiver nodes not return values. """ self.direct_flow()
[docs] def direct_flow(self): """Find flow directions, save to the model grid, and return receivers. direct_flow() checks for updated boundary conditions, calculates slopes on links, finds baselevel nodes based on the status at node, calculates flow directions, saves results to the grid, and returns a at-node array of receiver nodes. This array is stored in the grid at: grid['node']['flow__receiver_node'] an alternative to direct_flow() is run_one_step() which does the same things but also returns a at-node array of receiver nodes. This array is stored in the grid at: grid['node']['flow__receiver_node'] """ self._check_updated_bc() # update the surface, if it was provided as a model grid field. self._changed_surface() # step 1. Calculate link slopes. link_slope = -self._grid.calc_grad_at_d8(self._surface_values) link_slope[self._grid.status_at_d8 != LinkStatus.ACTIVE] = 0 # Step 2. Find and save base level nodes. (baselevel_nodes,) = numpy.where( numpy.logical_or( self._grid.status_at_node == self._grid.BC_NODE_IS_FIXED_VALUE, self._grid.status_at_node == self._grid.BC_NODE_IS_FIXED_GRADIENT, ) ) # Calculate flow directions by D8 method receiver, steepest_slope, sink, recvr_link = flow_direction_DN.flow_directions( self._surface_values, self._active_links, self._activelink_tail, self._activelink_head, link_slope, grid=self._grid, baselevel_nodes=baselevel_nodes, ) # Save the four ouputs of this component. self._grid["node"]["flow__receiver_node"][:] = receiver self._grid["node"]["topographic__steepest_slope"][:] = steepest_slope self._grid["node"]["flow__link_to_receiver_node"][:] = recvr_link self._grid["node"]["flow__sink_flag"][:] = numpy.zeros_like( receiver, dtype=bool ) self._grid["node"]["flow__sink_flag"][sink] = True return receiver
if __name__ == "__main__": # pragma: no cover import doctest doctest.testmod()