"""This module contains the FlowObject class.
"""
import numpy as np
# pylint: disable=no-name-in-module
from . import _grid # type: ignore
from . import _flow # type: ignore
from .grid_object import GridObject
__all__ = ['FlowObject']
[docs]
class FlowObject():
"""A class containing containing (water-) flow information about a given
digital elevation model (DEM).
"""
[docs]
def __init__(self, grid: GridObject,
bc: np.ndarray | GridObject | None = None,
hybrid : bool = True):
"""The constructor for the FlowObject. Takes a GridObject as input,
computes flow direction information and saves them as an FlowObject.
Parameters
----------
grid : GridObject
The GridObject that will be the basis of the computation.
bc : ndarray or GridObject, optional
Boundary conditions for sink filling. `bc` should be an array
of np.uint8 that matches the shape of the DEM. Values of 1
indicate pixels that should be fixed to their values in the
original DEM and values of 0 indicate pixels that should be
filled.
hybrid: bool, optional
Should hybrid reconstruction algorithm be used to fill
sinks? Defaults to True. Hybrid reconstruction is faster
but requires additional memory be allocated for a queue.
Notes
-----
Large intermediate arrays are created during the initialization
process, which could lead to issues when using very large DEMs.
"""
dims = grid.shape
dem = grid.z
filled_dem = np.zeros_like(dem, dtype=np.float32, order='F')
restore_nans = False
if bc is None:
bc = np.ones_like(dem, dtype=np.uint8)
bc[1:-1, 1:-1] = 0 # Set interior pixels to 0
nans = np.isnan(dem)
dem[nans] = -np.inf
bc[nans] = 1
restore_nans = True
if bc.shape != dims:
err = ("The shape of the provided boundary conditions does not "
f"match the shape of the DEM. {dims}")
raise ValueError(err)from None
if isinstance(bc, GridObject):
bc = bc.z
queue = np.zeros_like(dem, dtype=np.int64, order='F')
if hybrid:
_grid.fillsinks_hybrid(filled_dem, queue, dem, bc, dims)
else:
_grid.fillsinks(filled_dem, dem, bc, dims)
if restore_nans:
dem[nans] = np.nan
filled_dem[nans] = np.nan
flats = np.zeros_like(dem, dtype=np.int32, order='F')
_grid.identifyflats(flats, filled_dem, dims)
costs = np.zeros_like(dem, dtype=np.float32, order='F')
conncomps = np.zeros_like(dem, dtype=np.int64, order='F')
_grid.gwdt_computecosts(costs, conncomps, flats, dem, filled_dem, dims)
dist = np.zeros_like(flats, dtype=np.float32, order='F')
prev = conncomps # prev: dtype=np.int64
heap = queue # heap: dtype=np.int64
back = np.zeros_like(flats, dtype=np.int64, order='F')
_grid.gwdt(dist, prev, costs, flats, heap, back, dims)
source = heap # source: dtype=np.int64
direction = np.zeros_like(dem, dtype=np.uint8, order='F')
_grid.flow_routing_d8_carve(
source, direction, filled_dem, dist, flats, dims)
target = back # target: dtype=int64
_grid.flow_routing_targets(target, source, direction, dims)
self.path = grid.path
self.name = grid.name
# raster metadata
self.target = target # dtype=np.int64
self.source = source # dtype=np.int64
self.direction = direction # dtype=np.unit8
self.shape = grid.shape
self.cellsize = grid.cellsize
self.strides = tuple(s // grid.z.itemsize for s in grid.z.strides)
# georeference
self.bounds = grid.bounds
self.transform = grid.transform
self.crs = grid.crs
[docs]
def flow_accumulation(self, weights: np.ndarray | float = 1.0):
"""Computes the flow accumulation for a given flow network using
optional weights. The flow accumulation represents the amount of flow
each cell receives from its upstream neighbors.
Parameters
----------
weights : np.ndarray | float, optional
An array of the same shape as the flow grid representing weights
for each cell, or a constant float value used as the weight for all
cells. If `weights=1.0` (default), the flow accumulation is
unweighted. If an ndarray is provided, it must match the shape of
the flow grid., by default 1.0
Returns
-------
GridObject
A new GridObject containing the flow accumulation grid.
Raises
------
ValueError
If the shape of the `weights` array does not match the shape of the
flow network grid.
"""
acc = np.zeros_like(self.source, dtype=np.float32, order='F')
if weights == 1.0:
weights = np.ones_like(self.source, dtype=np.float32, order='F')
elif isinstance(weights, np.ndarray):
if weights.shape != acc.shape:
err = ("The shape of the provided weights ndarray does not "
f"match the shape of the FlowObject. {self.shape}")
raise ValueError(err)from None
else:
weights = np.full(self.shape, weights, dtype=np.float32, order='F')
_flow.flow_accumulation(
acc, self.source, self.direction, weights, self.shape)
result = GridObject()
result.path = self.path
result.name = self.name
result.z = acc
result.cellsize = self.cellsize
result.bounds = self.bounds
result.transform = self.transform
result.crs = self.crs
return result
[docs]
def drainagebasins(self):
"""Delineate drainage basins from a flow network.
Returns
-------
GridObject
An integer-valued GridObject with a unique label for each drainage
basin.
"""
basins = np.zeros(self.shape, dtype=np.int64, order='F')
_flow.drainagebasins(basins, self.source, self.target, self.shape)
result = GridObject()
result.path = self.path
result.name = self.name
result.z = basins
result.cellsize = self.cellsize
result.bounds = self.bounds
result.transform = self.transform
result.crs = self.crs
return result
# 'Magic' functions:
# ------------------------------------------------------------------------
def __len__(self):
return len(self.target)
def __iter__(self):
return iter(self.target)
def __getitem__(self, index):
if isinstance(index, tuple):
array_name, idx = index
if array_name == 'target':
return self.target[idx]
if array_name == 'source':
return self.source[idx]
if array_name == 'direction':
return self.direction[idx]
raise ValueError(
"Invalid raster name ('target', 'source', or 'direction').")
raise ValueError(
"Index must be a tuple with (raster_name, index).")
def __setitem__(self, index, value):
# Check if the index is a tuple
if isinstance(index, tuple):
array_name, idx = index
if array_name == 'target':
self.target[idx] = value
elif array_name == 'source':
self.source[idx] = value
elif array_name == 'direction':
self.direction[idx] = value
else:
raise ValueError(
"Invalid raster name.('target', 'source', or 'direction')")
else:
raise ValueError(
"Index must be a tuple with (raster_name, index).")
def __array__(self):
# Concatenate the arrays along their first axis.
# Not practical to compute with, but if there is a need to manually
# plot a FlowObject it'll show the logic nicely.
return np.concatenate((self.target, self.source, self.direction))