Source code for GRSdriver.driver_landsat_col2

import glob
import os
import xml.etree.ElementTree as ET

import numpy as np
from numba import jit

import pandas as pd
import geopandas as gpd
import xarray as xr
import rioxarray as xrio
import xmltodict

from rasterio.features import rasterize
import scipy.odr as odr
from affine import Affine
from osgeo import gdal, ogr
# import cartopy.crs as ccrs
from pyproj import CRS

from importlib_resources import files

import eoreader as eo
from eoreader.reader import Reader
from eoreader.bands import RAW_CLOUDS
from eoreader.keywords import TO_REFLECTANCE

from . import __package__

opj = os.path.join

BAND_NAMES = np.array(['B01', 'B02', 'B03', 'B08', 'B04', 'B05', 'B09', 'B06', 'B07','B08','B09'])
BAND_NAMES_EOREADER = np.array(['CA', 'BLUE', 'GREEN', 'PAN', 'RED', 'NIR',
                                'SWIR_CIRRUS', 'SWIR_1', 'SWIR_2','TIR_1','TIR_2'])

BAND_ID = [b.replace('B', '') for b in BAND_NAMES]
NATIVE_RESOLUTION = [30, 30, 30, 15, 30, 30, 30, 30, 30,100,100]
WAVELENGTH = np.array([443, 490, 560, 590, 665, 865, 1370, 1610, 2190,11000,12000])
BAND_WIDTH = [25, 60, 60, 173, 33, 28, 21, 95, 287,590,1010]

INFO = pd.DataFrame({'bandId': range(len(BAND_NAMES)),
                     'ESA': BAND_NAMES,
                     'EOREADER': BAND_NAMES_EOREADER,
                     'Wavelength (nm)': WAVELENGTH,
                     'Band width (nm)': BAND_WIDTH,
                     'Resolution (m)': NATIVE_RESOLUTION}).set_index('bandId').T


[docs] class LandsatDriver(): def __init__(self, image_path, band_idx=[0, 1, 2, 3, 4, 5, 6, 7, 8], band_tbp_idx=[0, 1, 2, 3, 4, 5, 7, 8], subset=None, resolution=30, verbose=False, **kwargs): self.abspath = os.path.abspath(image_path) dirroot, basename = os.path.split(self.abspath) self.verbose = verbose self.band_idx = band_idx self.band_tbp_idx = band_tbp_idx self.subset = subset self.resolution = resolution self.INFO = INFO[band_idx] # ----------------------------------------------------- # define prod and geom where data will be loaded # ----------------------------------------------------- self.prod = xr.Dataset() self.geom = xr.Dataset() self.solar_irradiance = None # Open instance of eoreader reader = Reader() # Open the product reader = reader.open(image_path, remove_tmp=True, **kwargs) self.reader = reader self.satellite = reader.constellation.value self.datetime = reader.datetime self.acquisition_date = reader.datetime self.tile = reader.tile_name # save geographic data self.extent = reader.extent() self.bounds = self.extent.bounds minx, miny, maxx, maxy = self.bounds.values[0] self.crs = self.reader.crs() self.epsg = self.extent.crs.to_epsg() self.transform = Affine(resolution, 0., minx, 0., -resolution, maxy) # -------------------------------- # Spectral Response Functions # -------------------------------- if '8' in self.satellite: srf_file = files(__package__+'.rsr.data').joinpath('rsr_landsat_8_oli_tirs.nc') elif '9' in self.satellite: srf_file = files(__package__+'.rsr.data').joinpath('rsr_landsat_9_oli_tirs.nc') else: print('Problem to fetch spectral response functions for ', self.satellite) self.SRFs = xr.open_dataset(srf_file) def load_product(self, add_time=True, **kwargs): self.load_bands(subset=self.subset,add_time=add_time, **kwargs) self.load_geom() self.load_mask() self.prod = xr.merge([self.prod, self.geom.interp(x=self.prod.x,y=self.prod.y), self.mask.interp(x=self.prod.x,y=self.prod.y)],compat='override') del self.geom,self.mask # ----------------------------------------------------------- # convert band from normalized radiance into reflectance # ----------------------------------------------------------- mus = np.cos(np.radians(self.prod['sza'])) for wl, band in self.prod.bands.groupby('wl'): # convert into reflectance vis-NIR-SWIR bands (upper limit 3000nm) if float(wl) <= 3000: self.prod['bands'].loc[{'wl':[wl]}]= band / mus self.prod.attrs = self.prod.attrs self.prod.attrs['satellite'] = self.satellite self.prod.attrs['solar_irradiance'] = 'NA' # self.solar_irradiance #[:, 1] self.prod.attrs['solar_irradiance_unit'] = 'W/m²/µm' self.prod.attrs['acquisition_date'] = str(self.acquisition_date) def load_mask(self): self.mask = self.reader.load([RAW_CLOUDS], pixel_size=self.resolution ).squeeze().rename( {RAW_CLOUDS: "flags_l1"}).astype(np.uint32).drop_vars('band') def load_bands(self, subset=None, add_time=True, **kwargs): # ---------------------------------- # getting bands # ---------------------------------- window = None if subset is not None: if subset.dtype == 'geometry': window = subset.to_crs(self.crs).bounds.values[0] else: window = subset[0] bands = self.reader.stack(list(BAND_NAMES_EOREADER[self.band_idx]), pixel_size=self.resolution, window=window, **kwargs) # fix for naming in differnt EOreader versions if 'z' in bands.coords: bands = bands.rename({'z': 'bands'}) # ---------------------------------- # setting up coordinates and dimensions # ---------------------------------- self.prod = bands.assign_coords(wl=('bands', self.INFO.loc['Wavelength (nm)'])). \ swap_dims({'bands': 'wl'}).drop({'band', 'bands', 'variable'}) self.prod['wl'] = self.prod.wl.astype(float) self.prod = self.prod.assign_coords(bandID=('wl', self.INFO.loc['ESA'].values)) self.prod = self.prod.to_dataset(name='bands', promote_attrs=True) self.prod.attrs['wl_to_process'] = WAVELENGTH[self.band_tbp_idx] # add spectral response function self.prod = xr.merge([self.prod, self.SRFs.sel(wl=self.prod.wl.values,method="nearest")], compat='override').drop_vars('bandID') # compute central wavelengths wl_true = [] for wl_, srf in self.prod.SRF.groupby('wl'): srf = srf.dropna('wl_hr') wl_true.append((srf.wl_hr * srf).integrate('wl_hr') / srf.integrate('wl_hr')) wl_true = xr.concat(wl_true, dim='wl') wl_true.name = 'wl_true' self.prod = xr.merge([self.prod, wl_true],compat='override') # add time if add_time: self.prod = self.prod.assign_coords(time=self.datetime) #.expand_dims('time') # self.prod.clear() def load_geom(self, scale_factor=0.01, nodata=0, geoms=(['sza', 'SZA'], ['vza', 'VZA'], ['vaa', 'VAA'], ['saa', 'SAA'])): # basic angle loading from geotiff (mean value of bands, needs to be improved for each band separately) geom = [] for geom_ in geoms: ang_file = glob.glob(opj(self.abspath, '*' + geom_[1] + '.TIF'))[0] raster = xrio.open_rasterio(ang_file).squeeze() raster.rio.write_nodata(nodata, inplace=True) raster = raster.where(raster != raster.rio.nodata) * scale_factor raster.name = geom_[0] geom.append(raster) geom = xr.merge(geom,compat='override') geom['raa'] = (geom['saa'] - geom['vaa']) # %360 self.geom = geom.drop_vars(['vaa', 'saa','band'])
[docs] @staticmethod def scat_angle(sza, vza, azi): ''' self.azi: azimuth in rad for convention azi=180 when sun-sensor in opposition :return: scattering angle in deg ''' sza = np.radians(sza) vza = np.radians(vza) azi = np.radians(azi) ang = -np.cos(sza) * np.cos(vza) - np.sin(sza) * np.sin(vza) * np.cos(azi) ang = np.arccos(ang) return np.degrees(ang)
# ------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------ # ------------------------------------------------------------------------------------------