Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Some classes to process OpenDrift's outputs
Author: Thomas Dobbelaere, Earth and Life Institute, UCLouvain, Belgium
Last modified: 19 July 2022
"""
import numpy as np
from typing import List, Tuple, Dict
from netCDF4 import Dataset
def _extract_lonlat(ncfile: str) -> Tuple[np.ndarray, np.ndarray]:
with Dataset(ncfile) as da:
lon = np.ma.filled(da['lon'][:], np.nan)
lat = np.ma.filled(da['lat'][:], np.nan)
return lon, lat
class DataMap:
"""
Base class to produce maps with given resolution over a given bounding box based on particle trajtories computed by OpenDrift
"""
def __init__(self, minlon: float, maxlon: float, minlat: float, maxlat: float, resolution: int) -> None:
self.lon = np.arange(minlon, maxlon+resolution, resolution)
nx = len(self.lon)
self.lat = np.arange(minlat, maxlat+resolution, resolution)
ny = len(self.lat)
self.data = np.zeros((ny-1)*(nx-1))
self.shape = (ny-1,nx-1)
self.dx = resolution
def _contains(self, lon: np.ndarray, lat: np.ndarray) -> np.ndarray:
isok = ~np.isnan(lon)
isok[isok] = (lon[isok] >= self.lon.min()) & (lon[isok] <= self.lon.max()) & \
(lat[isok] >= self.lat.min()) & (lat[isok] <= self.lat.max())
return isok
def get_cell_indices(self, lon: np.ndarray, lat: np.ndarray) -> np.ndarray:
isok = self._contains(lon, lat)
indices = np.full(lon.shape, -1, dtype=np.int32)
ix = ((lon[isok]-self.lon[0]) / self.dx).astype(np.int32)
iy = ((lat[isok]-self.lat[0]) / self.dx).astype(np.int32)
indices[isok] = iy*(self.lon.size-1)+ix
return indices
class ArrivalTimeMap(DataMap):
"""
Compute map of minimum arrival time based on backtracking simulation outputs from OpenOil.
This is performed by computing the 5% percentile q such that 95% of particles crossing a given pixel needed at least q days to reach their source point
"""
arrival_times_per_pixel: Dict[int,List[int]] = {}
def __init__(self, minlon: float, maxlon: float, minlat: float, maxlat: float, resolution: int) -> None:
super().__init__(minlon, maxlon, minlat, maxlat, resolution)
self.touched = np.full(self.data.size, False)
def add_arrival_times_from_file(self, ncfile: str) -> None:
with Dataset(ncfile,"r") as da:
time = da["time"][:]
time[:] = np.abs(time-time[0])
indices = self.get_cell_indices(*_extract_lonlat(ncfile))
unique_ids = np.unique(indices)[1:]
for px in unique_ids:
# for each pixel, find all trajectories that crossed it
itraj = np.where(np.sum(indices == px, axis=1) > 0)[0]
# then find minimum arrival time from the pixel for all found trajectories
itime = np.argmax(indices[itraj] == px, axis=1)
min_time = time[itime].tolist()
if self.touched[px]:
self.arrival_times_per_pixel[px].extend(min_time)
else:
self.arrival_times_per_pixel[px] = min_time
self.touched[unique_ids] = True
def get_arrival_time_days(self) -> np.ndarray:
for px, arrival_times in self.arrival_times_per_pixel.items():
self.data[px] = np.quantile(arrival_times,0.05) / 86400.0
arrival_days = np.where(self.touched, self.data, np.nan)
return arrival_days.reshape(self.shape)