# Source code for landlab.components.drainage_density.drainage_density

"""Landlab component to calculate drainage density."""
from warnings import warn

import numpy as np

from landlab import Component

[docs]class DrainageDensity(Component):
r"""
Calculate drainage density over a DEM.

Landlab component that implements the distance to channel algorithm of
Tucker et al., 2001.

This component requires EITHER a channel__mask array with 1's
where channels exist and 0's elsewhere, OR a set of coefficients
and exponents for a slope-area relationship and a
channelization threshold to compare against that relationship.

If an array is provided it MUST be of type np.uint8. See the example
below for how to make such an array.

The channel__mask array will be assigned to an at-node field with the
name channel__mask. If the channel__mask was originaly created from a
passed array, a user can update this array to change the mask.

If the channel__mask is created using an area coefficent,
slope coefficient, area exponent, slope exponent, and channelization
threshold, the location of the mask will be re-update when
calculate_drainage_density is called.

If an area coefficient, :math:C_A, a slope coefficent, :math:C_S, an
area exponent, :math:m_r, a slope exponent, :math:n_r, and
channelization threshold :math:T_C are provided, nodes that meet the
criteria

.. math::

C_A A^{m_r} C_s S^{n_r} > T_c

where :math:A is the drainage density and :math:S is the local slope,
will be marked as channel nodes.

The calculate_drainage_density function returns drainage density for the
model domain. This function calculates the distance from every node to the
nearest channel node :math:L along the flow line of steepest descent
(assuming D8 routing).

This component stores this distance a field, called:
surface_to_channel__minimum_distance. The drainage density is then
calculated (after Tucker et al., 2001):

.. math::

D_d = \frac{1}{2\overline{L}}

where :math:\overline{L} is the mean L for the model domain.

Examples
--------
>>> import numpy as np
>>> from landlab import RasterModelGrid
>>> from landlab.components import FlowAccumulator, FastscapeEroder
>>> mg = RasterModelGrid((10, 10))
>>> np.random.seed(50)
>>> noise = np.random.rand(100)
>>> mg.at_node['topographic__elevation'] += noise
>>> mg.at_node['topographic__elevation'] # doctest: +NORMALIZE_WHITESPACE
array([ 0.49460165,  0.2280831 ,  0.25547392,  0.39632991,  0.3773151 ,
0.99657423,  0.4081972 ,  0.77189399,  0.76053669,  0.31000935,
0.3465412 ,  0.35176482,  0.14546686,  0.97266468,  0.90917844,
0.5599571 ,  0.31359075,  0.88820004,  0.67457307,  0.39108745,
0.50718412,  0.5241035 ,  0.92800093,  0.57137307,  0.66833757,
0.05225869,  0.3270573 ,  0.05640164,  0.17982769,  0.92593317,
0.93801522,  0.71409271,  0.73268761,  0.46174768,  0.93132927,
0.40642024,  0.68320577,  0.64991587,  0.59876518,  0.22203939,
0.68235717,  0.8780563 ,  0.79671726,  0.43200225,  0.91787822,
0.78183368,  0.72575028,  0.12485469,  0.91630845,  0.38771099,
0.29492955,  0.61673141,  0.46784623,  0.25533891,  0.83899589,
0.1786192 ,  0.22711417,  0.65987645,  0.47911625,  0.07344734,
0.13896007,  0.11230718,  0.47778497,  0.54029623,  0.95807105,
0.58379231,  0.52666409,  0.92226269,  0.91925702,  0.25200886,
0.68263261,  0.96427612,  0.22696165,  0.7160172 ,  0.79776011,
0.9367512 ,  0.8537225 ,  0.42154581,  0.00543987,  0.03486533,
0.01390537,  0.58890993,  0.3829931 ,  0.11481895,  0.86445401,
0.82165703,  0.73749168,  0.84034417,  0.4015291 ,  0.74862   ,
0.55962945,  0.61323757,  0.29810165,  0.60237917,  0.42567684,
0.53854438,  0.48672986,  0.49989164,  0.91745948,  0.26287702])
>>> fr = FlowAccumulator(mg, flow_director='D8')
>>> fsc = FastscapeEroder(mg, K_sp=.01, m_sp=.5, n_sp=1)
>>> for x in range(100):
...     fr.run_one_step()
...     fsc.run_one_step(dt = 10.0)
...     mg.at_node['topographic__elevation'][mg.core_nodes] += .01
>>> channels = np.array(mg.at_node['drainage_area'] > 5, dtype=np.uint8)
>>> mean_drainage_density = dd.calculate_drainage_density()
>>> np.isclose(mean_drainage_density, 0.3831100571)
True

Alternatively you can pass a set of coefficients to identify the channel
mask. Next shows the same example as above, but with these coefficients
provided.

>>> mg = RasterModelGrid((10, 10))
>>> np.random.seed(50)
>>> noise = np.random.rand(100)
>>> mg.at_node['topographic__elevation'] += noise
>>> fr = FlowAccumulator(mg, flow_director='D8')
>>> fsc = FastscapeEroder(mg, K_sp=.01, m_sp=.5, n_sp=1)
>>> for x in range(100):
...     fr.run_one_step()
...     fsc.run_one_step(dt = 10.0)
...     mg.at_node['topographic__elevation'][mg.core_nodes] += .01
>>> channels = np.array(mg.at_node['drainage_area'] > 5, dtype=np.uint8)
>>> dd = DrainageDensity(mg,
...                      area_coefficient=1.0,
...                      slope_coefficient=1.0,
...                      area_exponent=1.0,
...                      slope_exponent=0.0,
...                      channelization_threshold=5)
>>> mean_drainage_density = dd.calculate_drainage_density()
>>> np.isclose(mean_drainage_density, 0.3831100571)
True

References
----------
**Required Software Citation(s) Specific to this Component**

None Listed

Tucker, G., Catani, F., Rinaldo, A., Bras, R. (2001). Statistical analysis
of drainage density from digital terrain data. Geomorphology 36(3-4),
187-202. https://dx.doi.org/10.1016/s0169-555x(00)00056-8

"""

_name = "DrainageDensity"

_unit_agnostic = True

_info = {
"area_coefficient": {
"dtype": float,
"intent": "in",
"optional": True,
"units": "-",
"mapping": "node",
"doc": "Area coefficient to define channels.",
},
"area_exponent": {
"dtype": float,
"intent": "in",
"optional": True,
"units": "-",
"mapping": "node",
"doc": "Area exponent to define channels.",
},
"dtype": np.uint8,
"intent": "in",
"optional": True,
"units": "-",
"mapping": "node",
"doc": "Logical map of at which grid nodes channels are present",
},
"channelization_threshold": {
"dtype": float,
"intent": "in",
"optional": True,
"units": "-",
"mapping": "node",
"doc": (
"Channelization threshold for use with area and slope "
"coefficients and exponents."
),
},
"dtype": int,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "ID of link downstream of each node, which carries the discharge",
},
"dtype": int,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "Node array of receivers (node that receives flow from current node)",
},
"flow__upstream_node_order": {
"dtype": int,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "Node array containing downstream-to-upstream ordered list of node IDs",
},
"slope_coefficient": {
"dtype": float,
"intent": "in",
"optional": True,
"units": "-",
"mapping": "node",
"doc": "Slope coefficient to define channels.",
},
"slope_exponent": {
"dtype": float,
"intent": "in",
"optional": True,
"units": "-",
"mapping": "node",
"doc": "Slope exponent to define channels.",
},
"surface_to_channel__minimum_distance": {
"dtype": float,
"intent": "out",
"optional": False,
"units": "m",
"mapping": "node",
"doc": "Distance from each node to the nearest channel",
},
"topographic__steepest_slope": {
"dtype": float,
"intent": "in",
"optional": False,
"units": "-",
"mapping": "node",
"doc": "The steepest *downhill* slope",
},
}

[docs]    def __init__(
self,
grid,
area_coefficient=None,
slope_coefficient=None,
area_exponent=None,
slope_exponent=None,
channelization_threshold=None,
):
"""Initialize the DrainageDensity component.

Parameters
----------
grid : ModelGrid
channel__mask : Array that holds 1's where
channels exist and 0's elsewhere
area_coefficient : coefficient to multiply drainage area by,
for calculating channelization threshold
slope_coefficient : coefficient to multiply slope by,
for calculating channelization threshold
area_exponent : exponent to raise drainage area to,
for calculating channelization threshold
slope_exponent : exponent to raise slope to,
for calculating channelization threshold
channelization_threshold : threshold value above
which channels exist
"""
super().__init__(grid)

raise NotImplementedError(
"A route-to-multiple flow director has been "
"run on this grid. The landlab development team has not "
"verified that DrainageDensity is compatible with "
"route-to-multiple methods. Please open a GitHub Issue "
"to start this process."
)

if area_coefficient is not None:
warn(
"coefficient supplied. Defaulting "
"to channel mask, ignoring area "
"coefficient."
)
if slope_coefficient is not None:
warn(
"coefficient supplied. Defaulting "
"to channel mask, ignoring slope "
"coefficient."
)
if area_exponent is not None:
warn(
"exponent supplied. Defaulting "
"to channel mask, ignoring area "
"exponent."
)
if slope_exponent is not None:
warn(
"exponent supplied. Defaulting "
"to channel mask, ignoring slope "
"exponent."
)
if channelization_threshold is not None:
warn(
"threshold supplied. Defaulting "
"threshold."
)
raise ValueError(
"Length of channel mask is not equal to " "number of grid nodes"
)

warn("Existing channel__mask grid field was overwritten.")

if area_coefficient is None:
raise ValueError(
"No channel mask and no area "
"coefficient supplied. Either "
"a channel mask or all 5 threshold "
"parameters are needed."
)
if slope_coefficient is None:
raise ValueError(
"No channel mask and no slope "
"coefficient supplied. Either "
"a channel mask or all 5 threshold "
"parameters are needed."
)
if area_exponent is None:
raise ValueError(
"No channel mask and no area "
"exponent supplied. Either "
"a channel mask or all 5 threshold "
"parameters are needed."
)
if slope_exponent is None:
raise ValueError(
"No channel mask and no slope "
"exponent supplied. Either "
"a channel mask or all 5 threshold "
"parameters are needed."
)
if channelization_threshold is None:
raise ValueError(
"No channel mask and no channelization "
"threshold supplied. Either "
"a channel mask or all 5 threshold "
"parameters are needed."
)

self._area_coefficient = area_coefficient
self._slope_coefficient = slope_coefficient
self._area_exponent = area_exponent
self._slope_exponent = slope_exponent
self._channelization_threshold = channelization_threshold

# for this component to work with Cython acceleration,
# the channel_network must be uint8, not bool...

# Upstream node order
self._upstream_order = grid.at_node["flow__upstream_node_order"]

# Distance to channel
if "surface_to_channel__minimum_distance" not in grid.at_node:
"surface_to_channel__minimum_distance", at="node", dtype=float
)
self._distance_to_channel = grid.at_node["surface_to_channel__minimum_distance"]

raise NotImplementedError(
"If you provided a channel mask to "
"DrainageDensity, update it by updating the "
)

self._area_coefficient
* np.power(self._grid.at_node["drainage_area"], self._area_exponent)
* self._slope_coefficient
* np.power(
self._grid.at_node["topographic__steepest_slope"], self._slope_exponent
)
) > self._channelization_threshold

[docs]    def calculate_drainage_density(self):
"""Calculate drainage density.

If the channel mask is defined based on slope and area coefficients,
it will be update based on the current drainage area and slope fields.

Returns
-------
landscape_drainage_density : float (1/m)
Drainage density over the model domain.
"""
from .cfuncs import _calc_dists_to_channel

_calc_dists_to_channel(
self._channel_network,
self._upstream_order,
self._grid.length_of_d8,
self._distance_to_channel,
self._grid.number_of_nodes,
)
landscape_drainage_density = 1.0 / (
2.0
* np.mean(
self._grid.at_node["surface_to_channel__minimum_distance"][
self._grid.core_nodes
]
)
)
# this is THE drainage density
return landscape_drainage_density