Skip to content
Extraits de code Groupes Projets
postpro.py 3,52 ko
Newer Older
  • Learn to ignore specific revisions
  • #!/usr/bin/env python3
    #  -*- coding: utf-8 -*-
    
    """
    Some classes to process OpenDrift's outputs
    
    Author: Thomas Dobbelaere, Earth and Life Institute, UCLouvain, Belgium
    Last modified: 19 July 2022
    """
    
    import numpy as np
    from typing import List, Tuple, Dict
    from netCDF4 import Dataset
    
    def _extract_lonlat(ncfile: str) -> Tuple[np.ndarray, np.ndarray]:
        with Dataset(ncfile) as da:
            lon = np.ma.filled(da['lon'][:], np.nan)
            lat = np.ma.filled(da['lat'][:], np.nan)
        return lon, lat
    
    class DataMap:
        """
        Base class to produce maps with given resolution over a given bounding box based on particle trajtories computed by OpenDrift
        """
    
        def __init__(self, minlon: float, maxlon: float, minlat: float, maxlat: float, resolution: int) -> None:
            self.lon = np.arange(minlon, maxlon+resolution, resolution)
            nx = len(self.lon)
            self.lat = np.arange(minlat, maxlat+resolution, resolution)
            ny = len(self.lat)
            self.data = np.zeros((ny-1)*(nx-1))
            self.shape = (ny-1,nx-1)
            self.dx = resolution
        
        def _contains(self, lon: np.ndarray, lat: np.ndarray) -> np.ndarray:
            isok = ~np.isnan(lon)
            isok[isok] = (lon[isok] >= self.lon.min()) & (lon[isok] <= self.lon.max()) & \
                            (lat[isok] >= self.lat.min()) & (lat[isok] <= self.lat.max())
            return isok
    
        def get_cell_indices(self, lon: np.ndarray, lat: np.ndarray) -> np.ndarray:
            isok = self._contains(lon, lat)
            indices = np.full(lon.shape, -1, dtype=np.int32)
            ix = ((lon[isok]-self.lon[0]) / self.dx).astype(np.int32)
            iy = ((lat[isok]-self.lat[0]) / self.dx).astype(np.int32)
            indices[isok] = iy*(self.lon.size-1)+ix
    
            return indices        
    
    class ArrivalTimeMap(DataMap):
        """
        Compute map of minimum arrival time based on backtracking simulation outputs from OpenOil.
        This is performed by computing the 5% percentile q such that 95% of particles crossing a given pixel needed at least q days to reach their source point
        """
    
        arrival_times_per_pixel: Dict[int,List[int]] = {}
    
        def __init__(self, minlon: float, maxlon: float, minlat: float, maxlat: float, resolution: int) -> None:
            super().__init__(minlon, maxlon, minlat, maxlat, resolution)
            self.touched = np.full(self.data.size, False)
    
        def add_arrival_times_from_file(self, ncfile: str) -> None:
            with Dataset(ncfile,"r") as da:
                time = da["time"][:]
            time[:] = np.abs(time-time[0])
            indices = self.get_cell_indices(*_extract_lonlat(ncfile))
            unique_ids = np.unique(indices)[1:]
            for px in unique_ids:
                # for each pixel, find all trajectories that crossed it
                itraj = np.where(np.sum(indices == px, axis=1) > 0)[0]
                # then find minimum arrival time from the pixel for all found trajectories
                itime = np.argmax(indices[itraj] == px, axis=1)
                min_time = time[itime].tolist()
                if self.touched[px]:
                    self.arrival_times_per_pixel[px].extend(min_time)
                else:
                    self.arrival_times_per_pixel[px] = min_time
            self.touched[unique_ids] = True
        
        def get_arrival_time_days(self) -> np.ndarray:
            for px, arrival_times in self.arrival_times_per_pixel.items():
                self.data[px] = np.quantile(arrival_times,0.05) / 86400.0
            arrival_days = np.where(self.touched, self.data, np.nan)
            return arrival_days.reshape(self.shape)