Source code for landlab.components.overland_flow.kinematic_wave_rengers

#!/usr/bin/env python

import numpy as np

from landlab import Component
from landlab import RasterModelGrid


[docs] class KinematicWaveRengers(Component): """ This code is based on an overland flow model by Francis Rengers and colleagues, after Julien et al., 1995. It uses an explicit face-centered solution to a depth-varying Manning's equation, broadly following, e.g., Mugler et al., 2011. It was implemented in Landlab by DEJH, March '16. Please cite Rengers et al., 2016, Model Predictions of Water Runoff in Steep Catchments after Wildfire, WRR. Note: You will not have a good day if you have pits present in your topo before routing flow with this component. Fill pits before instantiating the component (or call :func:`update_topographic_params` once you have filled after instantiation). Note this module assumes that the topography DOES NOT change during the run. If it does, call :func:`update_topographic_params` to update the component to the new topo. Boundary condition control can be... interesting with this component. Be sure to close boundaries you do not wish water to leave - or enter! - through. To allow free water discharge from the grid edge it is recommended to use fixed gradient boundary conditions at the open edges. The component will then set the fixed gradient as equal to the underlying topographic gradient throughout the run. It is also possible to fix the water depth at the open edge, but this is not really recommended. Construction:: KinematicWaveRengers(grid, mannings_n=0.03, critical_flow_depth=0.003, mannings_epsilon=0.33333333, dt_max=0.3, max_courant=0.2, min_surface_water_depth=1.e-8) Parameters ---------- grid : RasterModelGrid A grid. mannings_n : float A value to use for Manning's n in the Manning discharge equation. critical_flow_depth : float (m) An index flow depth for the depth-varying Manning's equation, controlling the depth at which the effective Manning's n begins to increase. mannings_epsilon : float An exponent for the depth-varying Manning's equation, controlling the rate of increase of effective Manning's n at small flow depths. dt_max : float or None (s) The largest permitted internal timestep for the component. If the Courant criterion produces a more restrictive condition, that will be used instead. max_courant : float The maximum permitted Courant number for the courant stability criterion. min_surface_water_depth : float (m) A water depth below which surface water thickness may never fall, to ensure model stabilty. Examples -------- >>> from landlab import RasterModelGrid >>> from landlab.components import KinematicWaveRengers >>> mg = RasterModelGrid((5, 10), 10.0) >>> mg.status_at_node[mg.nodes_at_left_edge] = mg.BC_NODE_IS_FIXED_GRADIENT >>> mg.status_at_node[mg.nodes_at_top_edge] = mg.BC_NODE_IS_CLOSED >>> mg.status_at_node[mg.nodes_at_bottom_edge] = mg.BC_NODE_IS_CLOSED >>> mg.status_at_node[mg.nodes_at_right_edge] = mg.BC_NODE_IS_CLOSED >>> _ = mg.add_field("node", "topographic__elevation", 0.05 * mg.node_x) >>> _ = mg.add_empty("node", "surface_water__depth") >>> mg.at_node["surface_water__depth"].fill(1.0e-8) >>> dt = 60.0 # 1 min intervals >>> rain_intensities = (1.0e-5, 1.0e-5, 1.0e-5, 1.0e-5, 1.0e-5) >>> kw = KinematicWaveRengers(mg) >>> for i in rain_intensities: ... kw.run_one_step(dt, rainfall_intensity=i) ... >>> mg.at_node["surface_water__depth"] array([ 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 2.95578314e-03, 2.95578314e-03, 2.90945761e-03, 2.82912876e-03, 2.70127141e-03, 2.51202011e-03, 2.24617193e-03, 1.88032853e-03, 1.35451064e-03, 1.00000000e-08, 2.95578314e-03, 2.95578314e-03, 2.90945761e-03, 2.82912876e-03, 2.70127141e-03, 2.51202011e-03, 2.24617193e-03, 1.88032853e-03, 1.35451064e-03, 1.00000000e-08, 2.95578314e-03, 2.95578314e-03, 2.90945761e-03, 2.82912876e-03, 2.70127141e-03, 2.51202011e-03, 2.24617193e-03, 1.88032853e-03, 1.35451064e-03, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08, 1.00000000e-08]) """ _name = "KinematicWaveRengers" _unit_agnostic = False _info = { "surface_water__depth": { "dtype": float, "intent": "inout", "optional": False, "units": "m", "mapping": "node", "doc": "Depth of water on the surface", }, "topographic__elevation": { "dtype": float, "intent": "in", "optional": False, "units": "m", "mapping": "node", "doc": "Land surface topographic elevation", }, "surface_water__discharge": { "dtype": float, "intent": "out", "optional": False, "units": "m**3/s", "mapping": "node", "doc": "Volumetric discharge of surface water", }, "surface_water__velocity": { "dtype": float, "intent": "out", "optional": False, "units": "m/s", "mapping": "node", "doc": "Speed of water flow above the surface", }, }
[docs] def __init__( self, grid, mannings_n=0.03, critical_flow_depth=0.003, mannings_epsilon=0.33333333, dt_max=0.3, max_courant=0.2, min_surface_water_depth=1.0e-8, ): """Initialize the kinematic wave approximation overland flow component.""" super().__init__(grid) if not isinstance(self.grid, RasterModelGrid): ValueError("KinematicWaveRengers: grid must be regular") if np.isclose(dt_max, 0.0): raise ValueError("KinematicWaveRengers: dt must be > 0.0") active = np.nonzero(self.grid.status_at_node != self.grid.BC_NODE_IS_CLOSED) self._h = self.grid.at_node["surface_water__depth"] self._active = active self._hc = critical_flow_depth self._n = mannings_n self._negepsilon = -mannings_epsilon self._dt_max = dt_max self._min_surface_water_depth = min_surface_water_depth self._active_depths = self.grid.at_node["surface_water__depth"][active] all_grads = self.grid.calc_grad_at_link("topographic__elevation") hoz_grads = self.grid.map_mean_of_horizontal_active_links_to_node(all_grads) vert_grads = self.grid.map_mean_of_vertical_active_links_to_node(all_grads) self._hozslopept5 = np.fabs(hoz_grads[active]) ** 0.5 self._vertslopept5 = np.fabs(vert_grads[active]) ** 0.5 self._velx = self.grid.zeros("node", dtype=float) self._vely = self.grid.zeros("node", dtype=float) self._qy = np.zeros(grid.number_of_nodes + 1, dtype=float) self._qx = np.zeros(grid.number_of_nodes + 1, dtype=float) self._poshozgrads = hoz_grads > 0.0 self._posvertgrads = vert_grads > 0.0 if np.isclose(self.grid.dx, self.grid.dy): self._equaldims = True self._courant_prefactor = max_courant * self.grid.dx else: self._equaldims = False self._courant_prefactor = max_courant * self.grid.dx * self.grid.dy self._neighbors = self.grid.adjacent_nodes_at_node.copy() self._neighbors[self._neighbors == self.grid.BAD_INDEX] = -1 self._water_balance = [] self._actives_BCs = ( self.grid.status_at_node[active] == self.grid.BC_NODE_IS_FIXED_VALUE ) self._actives_BCs_water_depth = self._h[active][self._actives_BCs] fixed_grad_nodes = self.grid.fixed_gradient_boundary_nodes.copy() fixed_grad_anchors = self.grid.fixed_gradient_boundary_node_anchor_node # ^add this value to the anchor nodes to update the BCs # these also need to be mapped to active_IDs: blank_nodes = self.grid.zeros("node", dtype=bool) blank_nodes[fixed_grad_nodes] = True self._fixed_grad_nodes_active = np.where(blank_nodes[active])[0] blank_nodes.fill(False) blank_nodes[fixed_grad_anchors] = True self._fixed_grad_anchors_active = np.where(blank_nodes[active])[0] # create outputs self.initialize_output_fields()
[docs] def run_one_step( self, dt, rainfall_intensity=0.00001, update_topography=False, track_min_depth=False, ): """Update fields with current hydrologic conditions. Parameters ---------- rain_intensity : float or array (m/s) The rainfall intensity across the grid (water input rate at each node). update_topography : bool Set to true if the topography of the grid evolves during the run. track_min_depth : bool At *very* low rainfall inputs, there is a possibility this component could allow creation of small amounts of water mass. Set to true to track this mass, and use the :func:`water_balance` property to investigate its evolution through time. """ elapsed_time_in_dt = 0.0 # this is only since the start of the timestep active = self._active self._hnew = self._h[active] _hnew = self._hnew if update_topography: self.update_topographic_params() while elapsed_time_in_dt < dt: internal_dt = self.calc_grads_and_timesteps( update_topography, track_min_depth ) remaining_dt = dt - elapsed_time_in_dt # now reduce timestep is needed if limited by total tstep length internal_dt = min(internal_dt, remaining_dt).clip(0.0) # this section uses our final-array-val-is-zero trick _qx_left = self._qx[self._neighbors[:, 2]].clip(min=0.0) _qx_right = self._qx[self._neighbors[:, 0]].clip(max=0.0) _qy_top = self._qy[self._neighbors[:, 1]].clip(min=0.0) _qy_bottom = self._qy[self._neighbors[:, 3]].clip(max=0.0) # FR's rainfall handling was here. We're going to assume that the # component is being driven by a "LL style" rainfall record, where # the provided rainfall_intensity is constant across the provide # dt. If it's not, it needs to be handled outside the component. # now add the rainfall input if type(rainfall_intensity) is not np.ndarray: _hnew += internal_dt * rainfall_intensity else: _hnew += internal_dt * rainfall_intensity[active] # set the BCs _hnew[self._actives_BCs] = self._actives_BCs_water_depth # flux it round _hnew -= internal_dt / self.grid.dx * np.fabs(self._qy[active]) _hnew -= internal_dt / self.grid.dy * np.fabs(self._qx[active]) _hnew += internal_dt / self.grid.dx * (_qy_top - _qy_bottom)[active] _hnew += internal_dt / self.grid.dy * (_qx_left - _qx_right)[active] _hnew[self._fixed_grad_nodes_active] = _hnew[ self._fixed_grad_anchors_active ] # update the internal clock elapsed_time_in_dt += internal_dt # update the actual fields self._h[active] = _hnew self.grid.at_node["surface_water__velocity"][:] = np.sqrt( np.square(self._velx) + np.square(self._vely) ) self.grid.at_node["surface_water__discharge"][:] = np.sqrt( np.square(self._qx[:-1]) + np.square(self._qy[:-1]) )
[docs] def calc_grads_and_timesteps(self, update_topography, track_min_depth): """ Perform the first part of the calculation for the main run, mainly velocities and fluxes. The main objective of this part of the calculation is to derive the stable timestep for the run. Parameters ---------- update_topography : bool If False, the underlying surface topography is assumed unchanged since the last run. track_min_depth : bool If True, the internal list _water_balance will be appended with the volumetric fractional change in mass balance during the run. Returns ------- internal_dt : float The internally stable timestep that will be used on this loop. """ active = self._active _hnew = self._hnew if update_topography: self.update_topographic_params() # assert the minimum water depth - this could introduce an element of # mass gain, but should remain minor _hnew.clip(self._min_surface_water_depth, out=_hnew) if track_min_depth: self._water_balance.append( (_hnew - self._h[active]).sum() / self._h[active].sum() ) n = self._n * (_hnew / self._hc) ** self._negepsilon twothirds_hnewbyn = _hnew**0.66666666 / n self._vely[active] = twothirds_hnewbyn * self._vertslopept5 self._velx[active] = twothirds_hnewbyn * self._hozslopept5 self._vely[self._posvertgrads] *= -1.0 self._velx[self._poshozgrads] *= -1.0 self._qy[active] = self._vely[active] * _hnew # m**2/s self._qx[active] = self._velx[active] * _hnew # m**2/s max_vely = np.fabs(self._vely).max() max_velx = np.fabs(self._velx).max() if self._equaldims: courant_dt = self._courant_prefactor / (max_velx + max_vely) else: # note prefactor is NOT THE SAME as above in this case courant_dt = self._courant_prefactor / ( self.grid.dy * max_velx + self.grid.dx * max_vely ) if self._dt_max is not None: internal_dt = np.min((self._dt_max, courant_dt)) else: internal_dt = courant_dt self._internal_dt = internal_dt return internal_dt
[docs] def update_topographic_params(self): """ If the topo changes during the run, change the held params used by :func:`run_one_step`. """ active = np.where(self.grid.status_at_node != self.grid.BC_NODE_IS_CLOSED)[0] all_grads = self.grid.calculate_gradients_at_links("topographic__elevation") hoz_grads = self.grid.map_mean_of_horizontal_active_links_to_node(all_grads) vert_grads = self.grid.map_mean_of_vertical_active_links_to_node(all_grads) self._hozslopept5 = np.fabs(hoz_grads[active]) ** 0.5 self._vertslopept5 = np.fabs(vert_grads[active]) ** 0.5 self._poshozgrads = hoz_grads > 0.0 self._posvertgrads = vert_grads > 0.0 fixed_grad_nodes = self.grid.fixed_gradient_boundary_nodes fixed_grad_anchors = self.grid.fixed_gradient_boundary_node_anchor_node # ^add this value to the anchor nodes to update the BCs # these also need to be mapped to active_IDs: blank_nodes = self.grid.zeros("node", dtype=bool) blank_nodes[fixed_grad_nodes] = True self._fixed_grad_nodes_active = np.where(blank_nodes[active])[0] blank_nodes.fill(False) blank_nodes[fixed_grad_anchors] = True self._fixed_grad_anchors_active = np.where(blank_nodes[active])[0] # check is the grid topology has changed... if not np.all(np.equal(self._active, active)): self._active = active self._velx.fill(0.0) self._vely.fill(0.0) self._qy.fill(0.0) self._qx.fill(0.0) self._neighbors = self.grid.adjacent_nodes_at_node.copy() self._neighbors[self._neighbors == self.grid.BAD_INDEX] = -1 self._actives_BCs = ( self.grid.status_at_node[active] == self.grid.BC_NODE_IS_FIXED_VALUE ) self._actives_BCs_water_depth = self._h[self._actives_BCs]
@property def water_balance(self): """ Return a list of the fractional gain/loss of water mass during the run, if it was tracked using the track_min_depth flag. """ if self._water_balance == []: raise ValueError("No record of water balance was found!") else: return self._water_balance @property def internal_timestep(self): """ Return the internal timestep last used by the kinematic wave component. """ try: return self._internal_dt except AttributeError: # the component hasn't started running yet _ = self.calc_grads_and_timesteps(False, False) return self._internal_dt