Source code for GRSdriver.utils



import numpy as np
import pandas as pd
import xarray as xr

import pyproj
from rasterio import features
from affine import Affine
import geopandas as gpd

[docs] class SpatioTemp():
[docs] def wktbox(self, center_lon, center_lat, width=100, height=100, ellps='WGS84'): ''' :param center_lon: decimal longitude :param center_lat: decimal latitude :param width: width of the box in m :param height: height of the box in m :return: wkt of the box centered on provided coordinates ''' #width,height = width/2 , height/2 geod = pyproj.Geod(ellps=ellps) rect_diag = np.sqrt(width ** 2 + height ** 2) azimuth1 = np.arctan(width / height) azimuth2 = np.arctan(-width / height) azimuth3 = np.arctan(width / height) + np.pi # first point + 180 degrees azimuth4 = np.arctan(-width / height) + np.pi # second point + 180 degrees pt1_lon, pt1_lat, _ = geod.fwd(center_lon, center_lat, azimuth1 * 180 / np.pi, rect_diag) pt2_lon, pt2_lat, _ = geod.fwd(center_lon, center_lat, azimuth2 * 180 / np.pi, rect_diag) pt3_lon, pt3_lat, _ = geod.fwd(center_lon, center_lat, azimuth3 * 180 / np.pi, rect_diag) pt4_lon, pt4_lat, _ = geod.fwd(center_lon, center_lat, azimuth4 * 180 / np.pi, rect_diag) wkt_point = 'POINT (%.6f %.6f)' % (center_lon, center_lat) wkt_poly = 'POLYGON (( %.6f %.6f, %.6f %.6f, %.6f %.6f, %.6f %.6f, %.6f %.6f ))' % ( pt1_lon, pt1_lat, pt2_lon, pt2_lat, pt3_lon, pt3_lat, pt4_lon, pt4_lat, pt1_lon, pt1_lat) return wkt_poly
def transform_from_latlon(self, lat, lon): lat = np.asarray(lat) lon = np.asarray(lon) trans = Affine.translation(lon[0], lat[0]) scale = Affine.scale(lon[1] - lon[0], lat[1] - lat[0]) return trans * scale
[docs] def rasterize(self, shapes, coords, latitude='lat', longitude='lon', fill=np.nan, **kwargs): """Rasterize a list of (geometry, fill_value) tuples onto the given xray coordinates. This only works for 1d latitude and longitude arrays. usage: ----- 1. read shapefile to geopandas.GeoDataFrame `states = gpd.read_file(shp_dir)` 2. encode the different shapefiles that capture those lat-lons as different numbers i.e. 0.0, 1.0 ... and otherwise np.nan `shapes = (zip(states.geometry, range(len(states))))` 3. Assign this to a new coord in your original xarray.DataArray `ds['states'] = rasterize(shapes, ds.coords, longitude='X', latitude='Y')` arguments: --------- : **kwargs (dict): passed to `rasterio.rasterize` function attrs: ----- :transform (affine.Affine): how to translate from latlon to ...? :raster (numpy.ndarray): use rasterio.features.rasterize fill the values outside the .shp file with np.nan :spatial_coords (dict): dictionary of {"X":xr.DataArray, "Y":xr.DataArray()} with "X", "Y" as keys, and xr.DataArray as values returns: ------- :(xr.DataArray): DataArray with `values` of nan for points outside shapefile and coords `Y` = latitude, 'X' = longitude. """ # transform = transform_from_latlon(coords[latitude], coords[longitude]) out_shape = (len(coords[latitude]), len(coords[longitude])) raster = features.rasterize(shapes, out_shape=out_shape, fill=fill, # transform=transform, dtype=float, **kwargs) spatial_coords = {latitude: coords[latitude], longitude: coords[longitude]} return xr.DataArray(raster, coords=spatial_coords, dims=(latitude, longitude))
[docs] @staticmethod def clip_raster(raster, lat, lon, extent_m): ''' :param raster as rioxarray object with documented coordinate system :param lat: latitude (float) of central point of buffer :param lon: longitude (float) of central point of buffer :param extent_m: extent of square centered on lat/lon :param crs: crs for reprojection, use lat/lon epsg4326 otherwise. :return: clipped raster ''' # distance is d/2 of the square buffer around the point, # from center to corner; # find buffer width in meters buffer_width_m = extent_m / np.sqrt(2) # EPSG:4326 sets Coordinate Reference System to WGS84 to match input wgs84_pt_gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy([lon], [lat], crs='4326')) # find suitable projected coordinate system for distance utm_crs = wgs84_pt_gdf.estimate_utm_crs() # reproject to UTM -> create square buffer (cap_style = 3) around point -> reproject back to WGS84 buffer = wgs84_pt_gdf.to_crs(utm_crs).buffer(buffer_width_m, cap_style=3) # get buffer in the raster coordinate system buffer = buffer.to_crs(raster.rio.crs) # clipping return raster.rio.clip(buffer)