diff --git a/computing/api.py b/computing/api.py index 33dbbbe2..450b16ed 100644 --- a/computing/api.py +++ b/computing/api.py @@ -90,6 +90,7 @@ from .mws.mws_centroid import generate_mws_centroid_data from .misc.facilities_proximity import generate_facilities_proximity_task from .misc.antyodaya import generate_antyodaya_layer_task +from .misc.livestocks import generate_livestocks_layer_task from .misc.digital_elevation_model import generate_dem_layer from .misc.canal_layer import canal_vector from .STAC_specs.stac_collection import generate_stac_collection_task @@ -1611,6 +1612,28 @@ def generate_antyodaya(request): return Response({"Exception": e}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) +@api_view(["POST"]) +@schema(None) +def generate_livestocks(request): + print("Inside generate_livestocks API.") + try: + state = request.data.get("state").lower() + district = request.data.get("district").lower() + block = request.data.get("block").lower() + sync_to_geoserver = request.data.get("sync_to_geoserver", True) + overwrite = request.data.get("overwrite", False) + generate_livestocks_layer_task.apply_async( + args=[state, district, block, sync_to_geoserver, overwrite], + queue="nrm", + ) + return Response( + {"Success": "Successfully initiated"}, status=status.HTTP_200_OK + ) + except Exception as e: + print("Exception in generate_livestocks api :: ", e) + return Response({"Exception": e}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + @api_view(["POST"]) @schema(None) def generate_stac_collection(request): diff --git a/computing/misc/livestocks.py b/computing/misc/livestocks.py new file mode 100644 index 00000000..56be28b9 --- /dev/null +++ b/computing/misc/livestocks.py @@ -0,0 +1,302 @@ +"""Clip livestock census data by tehsil and optionally publish it. + +Names may use spaces or snake_case. The local GPKG is always written first; +successful GeoServer layers are registered in the database by their WFS URL. +""" + +from __future__ import annotations + +import re +import sqlite3 +import time +from functools import lru_cache +from pathlib import Path +from typing import Any + +import pyogrio +from django.conf import settings + +from computing.models import Dataset, LayerType +from computing.utils import ( + fix_invalid_geometry_in_gdf, + push_shape_to_geoserver, + save_layer_info_to_db, + update_layer_sync_status, +) +from nrm_app.celery import app +from utilities.constants import LIVESTOCKS +from utilities.geoserver_utils import Geoserver, GeoserverException + + +SOURCE_LAYER = "livestock" +LOCATION_INDEX = "idx_livestock_location" +OUTPUT_DIR = "data/livestock/output/tehsil_data" +GEOSERVER_WORKSPACE = "livestocks" +LAYER_PREFIX = "livestocks" +DATASET_NAME = "Livestock Census" +ALGORITHM = "local-pan-india-tehsil-clip" +ALGORITHM_VERSION = "1.0" +INTEGER_COLUMNS = ( + "pc11_village_id", + "pc11_state_id", + "pc11_district_id", + "pc11_subdistrict_id", + "cattle_male", + "cattle_female", + "cattle_total", + "buffalo_male", + "buffalo_female", + "buffalo_total", + "sheep_male", + "sheep_female", + "sheep_total", + "goat_male", + "goat_female", + "goat_total", + "pig_male", + "pig_female", + "pig_total", +) + + +def _repo_path(path: str | Path) -> Path: + path = Path(path) + return path if path.is_absolute() else Path(settings.BASE_DIR) / path + + +def _slug(value: Any) -> str: + return re.sub(r"[^a-z0-9]+", "_", str(value or "").lower()).strip("_") + + +def _match_key(value: Any) -> str: + return re.sub(r"[^a-z0-9]+", "", str(value or "").lower()) + + +def _canonical_asset_name(value: Any) -> str: + return re.sub(r"[^A-Za-z0-9]+", " ", str(value or "")).strip().upper() + + +def _quote_sql(value: str) -> str: + return str(value).replace("'", "''") + + +def _bool(value: Any) -> bool: + if isinstance(value, bool): + return value + return str(value or "").strip().lower() in {"1", "true", "yes", "y"} + + +def _layer_name(district: str, block: str) -> str: + return f"{LAYER_PREFIX}_{_slug(district)}_{_slug(block)}" + + +def _output_dir(state: str, district: str, block: str) -> Path: + return _repo_path(OUTPUT_DIR) / _slug(state) / _slug(district) / _slug(block) + + +def _ensure_source_index(source_path: Path) -> None: + """Create the location index once; future clips then use indexed reads.""" + with sqlite3.connect(source_path) as connection: + connection.execute( + f""" + CREATE INDEX IF NOT EXISTS {LOCATION_INDEX} + ON {SOURCE_LAYER} (state_name, district_name, TEHSIL) + """ + ) + connection.commit() + + +@lru_cache(maxsize=1) +def _location_rows() -> tuple[tuple[str, str, str], ...]: + source_path = _repo_path(LIVESTOCKS) + _ensure_source_index(source_path) + rows = pyogrio.read_dataframe( + source_path, + sql=( + "SELECT DISTINCT state_name, district_name, TEHSIL " + f"FROM {SOURCE_LAYER}" + ), + read_geometry=False, + ) + return tuple( + (row.state_name, row.district_name, row.TEHSIL) + for row in rows.itertuples(index=False) + ) + + +def _resolve_location(state: str, district: str, block: str) -> tuple[str, str, str]: + state_key, district_key, block_key = map(_match_key, (state, district, block)) + state_matches = [row for row in _location_rows() if _match_key(row[0]) == state_key] + if not state_matches: + raise ValueError(f"State not found in livestock asset: {state}") + + district_matches = [ + row for row in state_matches if _match_key(row[1]) == district_key + ] + if not district_matches: + available = sorted({row[1] for row in state_matches})[:20] + raise ValueError( + f"District not found in livestock asset: {district}. " + f"Available examples: {available}" + ) + + block_matches = [row for row in district_matches if _match_key(row[2]) == block_key] + if not block_matches: + available = sorted({row[2] for row in district_matches})[:30] + raise ValueError( + f"TEHSIL/block not found in livestock asset: {block}. " + f"Available examples: {available}" + ) + return block_matches[0] + + +def _read_clip(state_name: str, district_name: str, tehsil_name: str): + source_path = _repo_path(LIVESTOCKS) + _ensure_source_index(source_path) + where = ( + f"state_name = '{_quote_sql(state_name)}' AND " + f"district_name = '{_quote_sql(district_name)}' AND " + f"TEHSIL = '{_quote_sql(tehsil_name)}'" + ) + gdf = pyogrio.read_dataframe(source_path, layer=SOURCE_LAYER, where=where) + if gdf.empty: + raise ValueError(f"No livestock rows found for {state_name}/{district_name}/{tehsil_name}") + if gdf.crs is None: + gdf = gdf.set_crs("EPSG:4326") + elif gdf.crs.to_epsg() != 4326: + gdf = gdf.to_crs("EPSG:4326") + return _coerce_nullable_integer_columns(fix_invalid_geometry_in_gdf(gdf)) + + +def _coerce_nullable_integer_columns(gdf): + for column in INTEGER_COLUMNS: + if column in gdf.columns: + gdf[column] = gdf[column].astype("Int64") + return gdf + + +def _write_clip(gdf, output_dir: Path, layer_name: str) -> Path: + output_dir.mkdir(parents=True, exist_ok=True) + gpkg_path = output_dir / f"{layer_name}.gpkg" + zip_path = output_dir / f"{layer_name}.zip" + for path in (gpkg_path, zip_path): + if path.exists(): + path.unlink() + pyogrio.write_dataframe(gdf, gpkg_path, layer=layer_name, driver="GPKG") + return gpkg_path + + +def _publish_to_geoserver(gpkg_path: Path, layer_name: str, overwrite: bool) -> dict[str, Any]: + try: + geoserver = Geoserver() + try: + geoserver.get_workspace(GEOSERVER_WORKSPACE) + except GeoserverException as exc: + if exc.status != 404: + raise + geoserver.create_workspace(GEOSERVER_WORKSPACE) + + response = push_shape_to_geoserver( + str(gpkg_path.with_suffix("")), + store_name=layer_name, + workspace=GEOSERVER_WORKSPACE, + layer_name=layer_name if overwrite else None, + file_type="gpkg", + ) + return {"ok": True, "response": response} + except Exception as exc: + return { + "ok": False, + "error_type": exc.__class__.__name__, + "error": str(exc)[:500], + } + + +@app.task(bind=True) +def generate_livestocks_layer_task( + self, + state: str, + district: str, + block: str, + sync_to_geoserver: bool = True, + overwrite: bool = False, +) -> dict[str, Any]: + """Clip one state/district/TEHSIL from the local pan-India livestock GPKG.""" + started = time.perf_counter() + layer_name = _layer_name(district, block) + output_dir = _output_dir(state, district, block) + + resolved_state = _canonical_asset_name(state) + resolved_district = _canonical_asset_name(district) + resolved_block = _canonical_asset_name(block) + try: + gdf = _read_clip(resolved_state, resolved_district, resolved_block) + except ValueError: + resolved_state, resolved_district, resolved_block = _resolve_location( + state, district, block + ) + gdf = _read_clip(resolved_state, resolved_district, resolved_block) + gpkg_path = _write_clip(gdf, output_dir, layer_name) + + geoserver = None + layer_id = None + geoserver_url = None + if _bool(sync_to_geoserver): + geoserver = _publish_to_geoserver(gpkg_path, layer_name, _bool(overwrite)) + if geoserver.get("ok"): + geoserver_url = ( + f"{settings.GEOSERVER_URL.rstrip('/')}/{GEOSERVER_WORKSPACE}/ows" + "?service=WFS&version=1.0.0&request=GetFeature" + f"&typeName={GEOSERVER_WORKSPACE}:{layer_name}" + "&outputFormat=application/json" + ) + Dataset.objects.get_or_create( + name=DATASET_NAME, + defaults={ + "layer_type": LayerType.VECTOR, + "workspace": GEOSERVER_WORKSPACE, + }, + ) + layer_id = save_layer_info_to_db( + state=resolved_state, + district=resolved_district, + block=resolved_block, + layer_name=layer_name, + asset_id=geoserver_url, + dataset_name=DATASET_NAME, + algorithm=ALGORITHM, + algorithm_version=ALGORITHM_VERSION, + misc={ + "is_generated_locally": True, + "source": _repo_path(LIVESTOCKS).as_posix(), + "gpkg_path": gpkg_path.as_posix(), + "output_dir": output_dir.as_posix(), + "geoserver_workspace": GEOSERVER_WORKSPACE, + "geoserver_layer_name": layer_name, + "geoserver_url": geoserver_url, + }, + is_override=_bool(overwrite), + is_gee_asset=False, + ) + if layer_id: + update_layer_sync_status( + layer_id=layer_id, + sync_to_geoserver=True, + ) + + return { + "status": "success", + "layer_name": layer_name, + "rows": int(len(gdf)), + "source": _repo_path(LIVESTOCKS).as_posix(), + "gpkg_path": gpkg_path.as_posix(), + "output_dir": output_dir.as_posix(), + "state_name": resolved_state, + "district_name": resolved_district, + "tehsil": resolved_block, + "sync_to_geoserver": _bool(sync_to_geoserver), + "geoserver": geoserver, + "geoserver_url": geoserver_url, + "layer_id": layer_id, + "elapsed_seconds": round(time.perf_counter() - started, 3), + } diff --git a/computing/urls.py b/computing/urls.py index 5f9881a2..203ed0aa 100644 --- a/computing/urls.py +++ b/computing/urls.py @@ -186,6 +186,11 @@ api.generate_antyodaya, name="generate_antyodaya", ), + path( + "generate_livestocks/", + api.generate_livestocks, + name="generate_livestocks", + ), path( "generate_stac_collection/", api.generate_stac_collection, diff --git a/computing/utils.py b/computing/utils.py index cc3fb85d..76b888f8 100644 --- a/computing/utils.py +++ b/computing/utils.py @@ -1,1191 +1,1033 @@ -import copy -import json -import logging -import os -import shutil -import zipfile -from datetime import datetime, timedelta - -import ee -import fiona -import geopandas as gpd -import requests -from django.conf import settings -from shapely.geometry import shape -from shapely.validation import explain_validity - -from computing.models import Dataset, Layer -from geoadmin.models import ( - DistrictSOI, - State_Disritct_Block_Properties, - StateSOI, - TehsilSOI, -) -from projects.models import Project -from utilities.constants import ( - ADMIN_BOUNDARY_OUTPUT_DIR, - GEE_ASSET_PATH, - GEE_HELPER_PATH, - GEE_PATHS, - SHAPEFILE_DIR, -) -from utilities.gee_utils import ( - check_task_status, - ee_initialize, - get_gee_asset_path, - get_gee_dir_path, - get_geojson_from_gcs, - is_asset_public, - is_gee_asset_exists, - sync_vector_to_gcs, - valid_gee_text, -) -from utilities.geoserver_utils import Geoserver -from django.core.mail import EmailMessage, get_connection -import time - -logger = logging.getLogger(__name__) - - -def generate_shape_files(path): - gdf = gpd.read_file(path + ".json") - if os.path.exists(path): - # Only replace the target shapefile directory. Removing the parent - # state/workspace directory here corrupts sibling outputs on reruns. - shutil.rmtree(path) - - os.makedirs(os.path.dirname(path), exist_ok=True) - gdf.to_file( - path, - driver="ESRI Shapefile", - ) - return path - - -def convert_to_zip(dir_name, file_type): - if file_type == "gpkg": - with zipfile.ZipFile(dir_name + ".zip", "w", zipfile.ZIP_DEFLATED) as zipf: - zipf.write(dir_name + ".gpkg", arcname=os.path.basename(dir_name + ".gpkg")) - return dir_name + ".zip" - else: - return shutil.make_archive(dir_name, "zip", dir_name + "/") - - -def push_shape_to_geoserver( - path, store_name=None, workspace=None, layer_name=None, file_type="shp" -): - geo = Geoserver() - - print(f"layer_name: {layer_name}") - if layer_name: - try: - print(f"Attempting to delete store: {layer_name}") - geo.delete_vector_store(workspace=workspace, store=layer_name) - print(f"Successfully deleted store: {layer_name}") - except Exception as e: - print(f"Store does not exist or error deleting: {str(e)}") - - zip_path = convert_to_zip(path, file_type) - print(f"Zip path: {zip_path}") - print(f"Store name: {store_name}") - print(f"Workspace: {workspace}") - - response = geo.create_shp_datastore( - path=zip_path, - store_name=store_name, - workspace=workspace, - file_extension=file_type, - ) - print(f"Response: {response}") - return response - - -def kml_to_geojson(state_name, district_name, block_name, kml_path): - fiona.drvsupport.supported_drivers["kml"] = ( - "rw" # enable KML support which is disabled by default - ) - fiona.drvsupport.supported_drivers["KML"] = ( - "rw" # enable KML support which is disabled by default - ) - gdf = gpd.read_file(kml_path) - geometry_types = gdf.geometry.geometry.type.unique() - state_dir = os.path.join(ADMIN_BOUNDARY_OUTPUT_DIR, state_name) - - for gtype in geometry_types: - df = gdf.loc[gdf.geometry.geometry.type == gtype] - path = os.path.join(state_dir, f"{district_name}_{block_name}_{gtype}") - df.to_file(path + ".json", driver="GeoJSON") - generate_shape_files(path) - push_shape_to_geoserver(path, workspace="test_workspace") - - -def convert_kml_to_shapefile(kml_path, output_dir, shapefile_name): - if not os.path.exists(output_dir + "/" + shapefile_name): - os.makedirs(output_dir + "/" + shapefile_name) - - shapefile_path = os.path.join( - output_dir + "/" + shapefile_name, shapefile_name + ".shp" - ) - print("path path", shapefile_path) - cmd = f"ogr2ogr -f 'ESRI Shapefile' {shapefile_path} {kml_path}" # output.shp input.kml - os.system(command=cmd) - - return output_dir + "/" + shapefile_name - - -def kml_to_shp(state_name, district_name, block_name, kml_path): - shapefile_name = f"{district_name}_{block_name}" - shapefile_layer_path = convert_kml_to_shapefile( - kml_path, SHAPEFILE_DIR, shapefile_name - ) - - push_shape_to_geoserver(shapefile_layer_path, workspace="customkml") - - # os.remove(kml_path) - # shutil.rmtree(shapefile_layer_path) - os.remove(shapefile_layer_path + ".zip") - - -def sync_layer_to_geoserver(state_name, fc, layer_name, workspace): - state_dir = os.path.join("data/fc_to_shape", state_name) - if not os.path.exists(state_dir): - os.mkdir(state_dir) - path = os.path.join(state_dir, f"{layer_name}") - # Write the feature collection into json file - with open(path + ".json", "w") as f: - try: - f.write(f"{json.dumps(fc)}") - except Exception as e: - print(e) - - path = generate_shape_files(path) - return push_shape_to_geoserver(path, workspace=workspace, layer_name=layer_name) - - -def sync_fc_to_geoserver(fc, shp_folder, layer_name, workspace, style_name=None): - try: - geojson_fc = fc.getInfo() - except Exception as e: - print("Exception in getInfo()", e) - task_id = sync_vector_to_gcs(fc, layer_name, "GeoJSON") - check_task_status([task_id]) - - geojson_fc = get_geojson_from_gcs(layer_name) - geo = Geoserver() - if len(geojson_fc["features"]) > 0: - state_dir = os.path.join("data/fc_to_shape", shp_folder) - if not os.path.exists(state_dir): - os.mkdir(state_dir) - path = os.path.join(state_dir, f"{layer_name}") - - # Convert to GeoDataFrame - gdf = gpd.GeoDataFrame.from_features(geojson_fc["features"]) - - # Set CRS (Earth Engine uses EPSG:4326 by default) - gdf.crs = "EPSG:4326" - - gdf = fix_invalid_geometry_in_gdf(gdf) - - # Save as GeoPackage - gdf.to_file(path + ".gpkg", driver="GPKG") - res = push_shape_to_geoserver(path, workspace=workspace, file_type="gpkg") - if style_name: - style_res = geo.publish_style( - layer_name=layer_name, style_name=style_name, workspace=workspace - ) - print("Style response:", style_res) - return res - else: - return "No features in FeatureCollection" - - -def sync_project_fc_to_geoserver(fc, project_name, layer_name, workspace): - print("inside") - print(layer_name) - try: - geojson_fc = fc.getInfo() - except Exception as e: - print("Exception in getInfo()", e) - task_id = sync_vector_to_gcs(fc, layer_name, "GeoJSON") - check_task_status([task_id]) - - geojson_fc = get_geojson_from_gcs(layer_name) - print(len(geojson_fc["features"])) - if len(geojson_fc["features"]) > 0: - state_dir = os.path.join("data/fc_to_shape", project_name) - if not os.path.exists(state_dir): - os.mkdir(state_dir) - path = os.path.join(state_dir, f"{layer_name}") - - # Convert to GeoDataFrame - gdf = gpd.GeoDataFrame.from_features(geojson_fc["features"]) - - # Set CRS (Earth Engine uses EPSG:4326 by default) - gdf.crs = "EPSG:4326" - - gdf = fix_invalid_geometry_in_gdf(gdf) - - # Save as GeoPackage - gdf.to_file(path + ".gpkg", driver="GPKG") - print("pushed to geoserver") - return push_shape_to_geoserver( - path, workspace=workspace, layer_name=layer_name, file_type="gpkg" - ) - else: - print("no features found") - return - - -def to_camelcase(text): - words = text.split() - camelcase = words[0].lower() - for word in words[1:]: - camelcase += word.capitalize() - return camelcase - - -def create_chunk(aoi, description, chunk_size): - size = aoi.size().getInfo() - parts = size // chunk_size - # task_ids = [] - rois = [] - descs = [] - for part in range(parts + 1): - start = part * chunk_size - end = start + chunk_size - block_name_for_parts = description + "_" + str(start) + "-" + str(end) - roi = ee.FeatureCollection(aoi.toList(aoi.size()).slice(start, end)) - if roi.size().getInfo() > 0: - descs.append(block_name_for_parts) - rois.append(roi) - - return rois, descs - - -def merge_chunks( - aoi, - folder_list, - description, - chunk_size, - chunk_asset_path=GEE_HELPER_PATH, - merge_asset_path=GEE_ASSET_PATH, - merge_asset_id=None, -): - print("Merge Chunk task initiated") - ee_initialize() - size = aoi.size().getInfo() - parts = size // chunk_size - assets = [] - for part in range(parts + 1): - start = part * chunk_size - end = start + chunk_size - block_name_for_parts = description + "_" + str(start) + "-" + str(end) - src_asset_id = ( - get_gee_dir_path(folder_list, chunk_asset_path) + block_name_for_parts - ) - if is_gee_asset_exists(src_asset_id): - assets.append(ee.FeatureCollection(src_asset_id)) - - asset = ee.FeatureCollection(assets).flatten() - - asset_id = merge_asset_id or ( - get_gee_dir_path(folder_list, merge_asset_path) + description - ) - try: - # Export an ee.FeatureCollection as an Earth Engine asset. - task = ee.batch.Export.table.toAsset( - **{ - "collection": asset, - "description": description, - "assetId": asset_id, - } - ) - - task.start() - print("Successfully started the merge chunk", task.status()) - return task.status()["id"] - except Exception as e: - print(f"Error occurred in running merge task: {e}") - return None - - -def fix_invalid_geometry_in_gdf(gdf): - invalid = gdf[~gdf.is_valid] - if not invalid.empty: - print("Invalid geometries found:") - for idx, geom in invalid.geometry.items(): - print(f"Index {idx}: {explain_validity(geom)}") - gdf.loc[idx, "geometry"] = gdf.loc[idx, "geometry"].buffer(0) - - return gdf - - -def get_season_key(date): - """Return season key like 'rabi_2017-2018' based on Indian cropping seasons.""" - month = date.month - year = date.year - next_year = year + 1 - - if month in [1, 2]: - return f"rabi_{year - 1}-{year}" # Jan–Feb → Rabi of previous year - elif month in [11, 12]: - return f"rabi_{year}-{next_year}" # Nov–Dec → Rabi starting this year - elif month in [3, 4, 5, 6]: - return f"zaid_{year}-{next_year}" - elif month in [7, 8, 9, 10]: - return f"kharif_{year}-{next_year}" - else: - return None - - -def get_agri_year_key(season_key): - """Convert a season key to agricultural year key (e.g., rabi_2017-2018 → 2017-2018).""" - season, years = season_key.split("_") - start_year, end_year = map(int, years.split("-")) - - if season in ["kharif", "rabi"]: - return f"{start_year}-{end_year}" - elif season == "zaid": - return f"{start_year - 1}-{start_year}" # Zaid 2018-2019 → Agri year 2017-2018 - else: - return None - - -def calculate_precipitation_season( - geojson_filepath, draught_asset_id, start_year=2017, end_year=2024 -): - - # Load the GeoJSON file - with open(geojson_filepath, "r") as f: - feature_collection = json.load(f) - - features_ee = [] - - for feature in feature_collection["features"]: - original_props = feature["properties"] - new_props = {} - - # Copy UID - if "uid" in original_props: - new_props["uid"] = original_props["uid"] - - agri_year_totals = {} - - # Parse precipitation date keys - for key, val in original_props.items(): - try: - date = datetime.strptime(key, "%Y-%m-%d") - season_key = get_season_key(date) - if not season_key: - continue - - agri_key = get_agri_year_key(season_key) - if not agri_key: - continue - - agri_start = int(agri_key.split("-")[0]) - if not (start_year <= agri_start <= end_year): - continue - - season = season_key.split("_")[0] # kharif, rabi etc - full_key = f"{season}_{agri_key}" - - agri_year_totals[full_key] = agri_year_totals.get(full_key, 0) + float( - val - ) - - except Exception: - continue - - # Add all seasonal totals to new_props - for agri_key, total in agri_year_totals.items(): - new_props[f"precipitation_{agri_key}"] = total - - # Create EE Feature - geom_ee = ee.Geometry(feature["geometry"]) - feature_ee = ee.Feature(geom_ee, new_props) - features_ee.append(feature_ee) - - # Left side FC - mws_fc = ee.FeatureCollection(features_ee) - - return mws_fc - - -def generate_geojson_with_ci_and_ndvi(zoi_asset, ci_asset, ndvi_asset, proj_id): - # Load project - proj_obj = Project.objects.get(pk=proj_id) - - # Build CI and NDVI asset paths - asset_path_ci = ( - get_gee_dir_path( - [proj_obj.name], asset_path=GEE_PATHS["WATER_REJ"]["GEE_ASSET_PATH"] - ) - + ci_asset - ) - - asset_path_ndvi = ( - get_gee_dir_path( - [proj_obj.name], asset_path=GEE_PATHS["WATER_REJ"]["GEE_ASSET_PATH"] - ) - + ndvi_asset - ) - - # Load FeatureCollections - zoi = ee.FeatureCollection(zoi_asset) - ci = ee.FeatureCollection(asset_path_ci) - ndvi = ee.FeatureCollection(asset_path_ndvi) - - # ------------------------- - # STEP 1: Join ZOI with Cropping Intensity - # ------------------------- - join = ee.Join.inner() - filter = ee.Filter.intersects(leftField=".geo", rightField=".geo") - zoi_ci_joined = join.apply(zoi, ci, filter) - - def merge_zoi_ci(pair): - zoi_feat = ee.Feature(pair.get("primary")) - ci_feat = ee.Feature(pair.get("secondary")) - merged_props = zoi_feat.toDictionary().combine(ci_feat.toDictionary(), True) - return ee.Feature(zoi_feat.geometry(), merged_props) - - zoi_with_ci = ee.FeatureCollection(zoi_ci_joined.map(merge_zoi_ci)) - - # ------------------------- - # STEP 2: Join ZOI+CI with NDVI - # ------------------------- - zoi_ndvi_joined = join.apply(zoi_with_ci, ndvi, filter) - - def merge_zoi_ci_ndvi(pair): - ci_feat = ee.Feature(pair.get("primary")) - ndvi_feat = ee.Feature(pair.get("secondary")) - merged_props = ci_feat.toDictionary().combine(ndvi_feat.toDictionary(), True) - return ee.Feature(ci_feat.geometry(), merged_props) - - final_merged = ee.FeatureCollection(zoi_ndvi_joined.map(merge_zoi_ci_ndvi)) - - # ------------------------- - # STEP 3: Export or Push to GeoServer - # ------------------------- - layer_name = f"WaterRejapp_zoi_{proj_obj.name}_{proj_obj.id}" - sync_project_fc_to_geoserver(final_merged, proj_obj.name, layer_name, "waterrej") - - -def get_directory_size(path): - total_size = 0 - for dirpath, dirnames, filenames in os.walk(path): - for filename in filenames: - file_path = os.path.join(dirpath, filename) - if os.path.isfile(file_path): - total_size += os.path.getsize(file_path) - return total_size - - -def generate_geojson_with_ci_ndvi_ndmi( - zoi_asset, ci_asset, ndvi_asset, ndmi_asset, proj_id -): - - # Load project - proj_obj = Project.objects.get(pk=proj_id) - - zoi = ee.FeatureCollection(zoi_asset) - print("Number of features zoi:", zoi.size().getInfo()) - - ci = ee.FeatureCollection(ci_asset) - print("Number of features zoi:", ci.size().getInfo()) - ndvi = ee.FeatureCollection(ndmi_asset) - print("Number of features zoi:", ndvi.size().getInfo()) - ndmi = ee.FeatureCollection(ndmi_asset) - print("Number of features zoi:", ndmi.size().getInfo()) - - # ------------------------- - # STEP 1: Join ZOI with CI - # ------------------------- - join = ee.Join.inner() - filter = ee.Filter.intersects(leftField=".geo", rightField=".geo") - zoi_ci_joined = join.apply(zoi, ci, filter) - - def merge_zoi_ci(pair): - zoi_feat = ee.Feature(pair.get("primary")) - ci_feat = ee.Feature(pair.get("secondary")) - merged_props = zoi_feat.toDictionary().combine(ci_feat.toDictionary(), True) - return ee.Feature(zoi_feat.geometry(), merged_props) # ✅ keep ZOI geom - - zoi_with_ci = ee.FeatureCollection(zoi_ci_joined.map(merge_zoi_ci)) - - # ------------------------- - # STEP 2: Join with NDVI - # ------------------------- - zoi_ndvi_joined = join.apply(zoi_with_ci, ndvi, filter) - - def merge_zoi_ci_ndvi(pair): - prev_feat = ee.Feature(pair.get("primary")) - ndvi_feat = ee.Feature(pair.get("secondary")) - merged_props = prev_feat.toDictionary().combine(ndvi_feat.toDictionary(), True) - return ee.Feature(prev_feat.geometry(), merged_props) # ✅ still ZOI geom - - zoi_ci_ndvi = ee.FeatureCollection(zoi_ndvi_joined.map(merge_zoi_ci_ndvi)) - - # ------------------------- - # STEP 3: Join with NDMI - # ------------------------- - zoi_ndmi_joined = join.apply(zoi_ci_ndvi, ndmi, filter) - - def merge_zoi_ci_ndvi_ndmi(pair): - prev_feat = ee.Feature(pair.get("primary")) - ndmi_feat = ee.Feature(pair.get("secondary")) - merged_props = prev_feat.toDictionary().combine(ndmi_feat.toDictionary(), True) - return ee.Feature(prev_feat.geometry(), merged_props) # ✅ keep ZOI geom - - final_merged = ee.FeatureCollection(zoi_ndmi_joined.map(merge_zoi_ci_ndvi_ndmi)) - - # ------------------------- - # STEP 4: Export or Push to GeoServer - # ------------------------- - layer_name = f"WaterRejapp_zoi_{proj_obj.name}_{proj_obj.id}" - print(layer_name) - sync_project_fc_to_geoserver(final_merged, proj_obj.name, layer_name, "waterrej") - - -def generate_geojson_with_ci_ndvi(zoi_asset, ci_asset, ndvi_asset, proj_id): - # Load project - proj_obj = Project.objects.get(pk=proj_id) - - # Initialize Earth Engine - ee_initialize(4) - - # Load FeatureCollections - zoi = ee.FeatureCollection(zoi_asset) - ci = ee.FeatureCollection(ci_asset) - ndvi = ee.FeatureCollection(ndvi_asset) - - print("ZOI:", zoi.size().getInfo()) - print("CI:", ci.size().getInfo()) - print("NDVI:", ndvi.size().getInfo()) - - # Common join logic on UID - join = ee.Join.inner() - uid_filter = ee.Filter.equals(leftField="UID", rightField="UID") - - # --- Join ZOI + CI --- - zoi_ci_joined = join.apply(zoi, ci, uid_filter) - - def merge_zoi_ci(pair): - zoi_feat = ee.Feature(pair.get("primary")) - ci_feat = ee.Feature(pair.get("secondary")) - merged_props = zoi_feat.toDictionary().combine(ci_feat.toDictionary(), True) - # Keep ZOI geometry only - return ee.Feature(zoi_feat.geometry(), merged_props) - - zoi_with_ci = ee.FeatureCollection(zoi_ci_joined.map(merge_zoi_ci)) - - # --- Join with NDVI --- - zoi_ndvi_joined = join.apply(zoi_with_ci, ndvi, uid_filter) - - def merge_with_ndvi(pair): - base_feat = ee.Feature(pair.get("primary")) - ndvi_feat = ee.Feature(pair.get("secondary")) - merged_props = base_feat.toDictionary().combine(ndvi_feat.toDictionary(), True) - # Always retain ZOI geometry - return ee.Feature(base_feat.geometry(), merged_props) - - merged_final = ee.FeatureCollection(zoi_ndvi_joined.map(merge_with_ndvi)) - - # --- Ensure ZOI geometry retained in all features --- - merged_final = merged_final.map( - lambda f: ee.Feature( - f.setGeometry( - ee.Feature( - zoi.filter(ee.Filter.eq("UID", f.get("UID"))).first() - ).geometry() - ) - ) - ) - - layer_name = f"WaterRejapp_zoi_{proj_obj.name}_{proj_obj.id}" - print(layer_name) - - sync_project_fc_to_geoserver(merged_final, proj_obj.name, layer_name, "waterrej") - - -def save_layer_info_to_db( - state, - district, - block, - layer_name, - asset_id, - dataset_name, - sync_to_geoserver=False, - layer_version="1.0", - algorithm=None, - algorithm_version="1.0", - misc=None, - is_override=False, -): - print("inside the save_layer_info_to_db function") - - dataset = Dataset.objects.get(name=dataset_name) - - try: - state_obj = StateSOI.objects.get(state_name__iexact=state) - district_obj = DistrictSOI.objects.get( - district_name__iexact=district, state=state_obj - ) - block_obj = TehsilSOI.objects.get( - tehsil_name__iexact=block, district=district_obj - ) - except Exception as e: - print("Error fetching in state district block:", e) - return - - is_public = is_asset_public(asset_id) - - # Check if there’s an existing layer - existing_layer = ( - Layer.objects.filter( - dataset=dataset, - layer_name=layer_name, - state=state_obj, - district=district_obj, - block=block_obj, - ) - .order_by("-layer_version") - .first() - ) - - if existing_layer: - if existing_layer.algorithm_version != algorithm_version: - # Algorithm version changed --> create new record with incremented layer_version - new_layer_version = str(float(existing_layer.layer_version) + 1) - print( - f"Algorithm version changed. Creating new layer version: {new_layer_version}" - ) - layer_obj = Layer.objects.create( - dataset=dataset, - layer_name=layer_name, - state=state_obj, - district=district_obj, - block=block_obj, - layer_version=new_layer_version, - algorithm=algorithm, - algorithm_version=algorithm_version, - is_sync_to_geoserver=sync_to_geoserver, - is_public_gee_asset=is_public, - is_override=is_override, - misc=misc, - gee_asset_path=asset_id, - ) - else: - # Algorithm version is same --> update existing layer - print("Algorithm version same. Updating existing layer.") - for field, value in { - "algorithm": algorithm, - "algorithm_version": algorithm_version, - "is_sync_to_geoserver": sync_to_geoserver, - "is_public_gee_asset": is_public, - "is_override": is_override, - "misc": misc, - "gee_asset_path": asset_id, - }.items(): - setattr(existing_layer, field, value) - existing_layer.save() - layer_obj = existing_layer - else: - # No existing record --> create a new one - print("No existing layer found. Creating new one.") - layer_obj = Layer.objects.create( - dataset=dataset, - layer_name=layer_name, - state=state_obj, - district=district_obj, - block=block_obj, - layer_version=layer_version, - algorithm=algorithm, - algorithm_version=algorithm_version, - is_sync_to_geoserver=sync_to_geoserver, - is_public_gee_asset=is_public, - is_override=is_override, - misc=misc, - gee_asset_path=asset_id, - ) - - print(f"Saved layer info (id={layer_obj.id}, version={layer_obj.layer_version})") - return layer_obj.id - - -def get_existing_end_year(dataset_name, layer_name): - """fetch objects from db on the basis of dataset name and layer_name""" - dataset = Dataset.objects.get(name=dataset_name) - layer_obj = Layer.objects.get(dataset=dataset, layer_name=layer_name) - existing_end_date = layer_obj.misc["end_year"] - print("existing_end_date", existing_end_date) - return existing_end_date - - -def get_layer_object(state, district, block, layer_name, dataset_name): - state_obj = StateSOI.objects.get(state_name__iexact=state) - district_obj = DistrictSOI.objects.get( - district_name__iexact=district, state=state_obj - ) - block_obj = TehsilSOI.objects.get(tehsil_name__iexact=block, district=district_obj) - layer_obj = ( - Layer.objects.filter( - state=state_obj, - district=district_obj, - block=block_obj, - layer_name=layer_name, - dataset__name=dataset_name, - ) - .order_by("-layer_version") - .first() - ) - return layer_obj - - -def update_dashboard_geojson( - state=None, - district=None, - block=None, - layer_name=None, - workspace_name=None, - proj_id=None, -): - if state and block and block: - print(f"🔄 Updating GeoJSON for {state}, {district}, {block}") - - # Get related objects - state_obj = StateSOI.objects.get(state_name=state) - district_obj = DistrictSOI.objects.get(district_name=district) - tehsil_obj = TehsilSOI.objects.get(tehsil_name=block) # fixed typo - - # Get or create main record - obj, created = State_Disritct_Block_Properties.objects.get_or_create( - state=state_obj, district=district_obj, tehsil=tehsil_obj - ) - else: - obj = Project.objects.get(pk=proj_id) - - # Map suffix to json_key - suffix_to_key = { - "wb": "wb_geojson", - "zoi": "zoi_geojson", - "mws": "mws_geojson", - } - - # Detect which key this layer corresponds to - json_key = None - for suffix, key in suffix_to_key.items(): - if layer_name == f"{state}_{district}_{block}_{suffix}": - json_key = key - break - - if not json_key: - print(f"⚠️ Layer name {layer_name} did not match any known type.") - return - - # Construct GeoServer URL - waterrej_url = ( - f"https://geoserver.core-stack.org:8443/geoserver/waterrej/ows?" - f"service=WFS&version=1.0.0&request=GetFeature&typeName={workspace_name}:{layer_name}" - f"&outputFormat=application%2Fjson" - ) - - # Load existing dashboard_geojson or create new - if proj_id: - misc = obj.dashboard_geojson or {} - else: - misc = obj.geojson_path or {} - - # Ensure waterrej section exists - if "waterrej" not in misc: - misc["waterrej"] = {} - - # Update or add this specific json_key - misc["waterrej"][json_key] = waterrej_url - - # Save the updated JSON field - obj.dashboard_geojson = misc - obj.save() - - print(f"✅ Added/Updated {json_key} for {state}, {district}, {block}") - - -def clean_geometry(geom): - """ - Clean geometry: - - Dissolve multipolygon → single polygon - - Remove holes automatically - - Fix invalid topology - - Buffer tiny polygons - """ - - # 1. Dissolve multi-polygons and remove holes - geom = geom.dissolve(maxError=1) - - # 2. Fix invalid rings by simplifying slightly (NEVER buffer(0)) - geom = geom.simplify(1) - - # 3. Buffer polygons smaller than 1 pixel (< 900 m²) - area = geom.area() - geom = ee.Algorithms.If( - area.lt(900), - geom.buffer(15), - geom, # ensure raster pixel center is captured - ) - - return ee.Geometry(geom) - - -def safe_reduce_max(image, geom, scale=30): - geom = clean_geometry(geom) - - val = ( - image.unmask(0) - .reduceRegion( - reducer=ee.Reducer.max(), - geometry=geom, - scale=scale, - maxPixels=1e13, - tileScale=4, - bestEffort=True, - ) - .get("b1") - ) - - return ee.Number(ee.Algorithms.If(val, val, 0)) - - -# ------------------------------------------------------ -# SAFE REDUCE MAX FUNCTION -# ------------------------------------------------------ -def safe_reduce_max(image, geom, scale=30): - geom = clean_geometry(geom) - - result = ( - image.unmask(0) - .reduceRegion( - reducer=ee.Reducer.max(), - geometry=geom, - scale=scale, - maxPixels=1e13, - tileScale=4, - bestEffort=True, - ) - .get("b1") - ) - - # Convert null → 0 - return ee.Number(ee.Algorithms.If(result, result, 0)) - - -# ------------------------------------------------------ -# MAIN FUNCTION TO PROCESS SWB LAYER -# ------------------------------------------------------ -def generate_swb_layer_with_max_so_catchment( - roi=None, - app_type="MWS", - asset_suffix=None, - asset_folder=None, - gee_account_id=None, -): - ee_initialize(gee_account_id) - - # Build asset paths - base_path = get_gee_dir_path( - asset_folder, asset_path=GEE_PATHS[app_type]["GEE_ASSET_PATH"] - ) - - so_asset = f"{base_path}stream_order_{asset_suffix}_raster" - ca_asset = f"{base_path}catchment_area_{asset_suffix}_raster" - - # Load rasters - stream_order_band = ee.Image(so_asset).select("b1") - catchment_band = ee.Image(ca_asset).select("b1") - - # Processing per waterbody - def compute_for_feature(feature): - geom = feature.geometry() - - max_so = safe_reduce_max(stream_order_band, geom, scale=30) - max_ca = safe_reduce_max(catchment_band, geom, scale=30) - - return feature.set( - { - "max_stream_order": max_so, - "max_catchment_area": max_ca, - } - ) - - # Map over the feature collection - return roi.map(compute_for_feature) - - -def _get_prod_backend_url(): - return getattr(settings, "PROD_BACKEND_URL", "").rstrip("/") - - -def _get_prod_api_key(): - return getattr(settings, "PROD_BACKEND_API_KEY", "") - - -def _sync_layer_to_prod_db(payload: dict): - prod_url = _get_prod_backend_url() - if not prod_url: - return None - - endpoint = prod_url + "/api/v1/sync_layer_remote/" - try: - response = requests.post( - endpoint, - json=payload, - headers={"X-Api-Key": _get_prod_api_key()}, - timeout=30, - ) - if response.status_code not in (200, 201): - logger.warning( - "Prod DB sync returned %s for layer %s: %s", - response.status_code, - payload.get("layer_name"), - response.text, - ) - return None - layer_id = response.json().get("layer_id") - logger.info( - "Layer %s synced to prod DB (id=%s).", payload.get("layer_name"), layer_id - ) - return layer_id - except requests.RequestException as e: - logger.error( - "Failed to sync layer %s to prod DB: %s", payload.get("layer_name"), e - ) - return None - - -def _update_layer_sync_remote( - layer_id, sync_to_geoserver=None, is_stac_specs_generated=None -): - prod_url = _get_prod_backend_url() - if not prod_url or layer_id is None: - return - - endpoint = prod_url + "/api/v1/update_layer_sync_remote/" - payload = { - "layer_id": layer_id, - "sync_to_geoserver": sync_to_geoserver, - "is_stac_specs_generated": is_stac_specs_generated, - } - try: - response = requests.post( - endpoint, - json=payload, - headers={"X-Api-Key": _get_prod_api_key()}, - timeout=30, - ) - if response.status_code not in (200, 201): - logger.warning( - "Prod layer sync status update returned %s for layer %s: %s", - response.status_code, - layer_id, - response.text, - ) - else: - logger.info("Layer sync status updated on prod DB for id=%s.", layer_id) - except requests.RequestException as e: - logger.error( - "Failed to update layer sync status on prod DB for id=%s: %s", layer_id, e - ) - - -def update_layer_sync_status( - layer_id, sync_to_geoserver=None, is_stac_specs_generated=None -): - if _get_prod_backend_url(): - _update_layer_sync_remote( - layer_id, - sync_to_geoserver=sync_to_geoserver, - is_stac_specs_generated=is_stac_specs_generated, - ) - return layer_id - - try: - layer_obj = Layer.objects.filter(id=layer_id).first() - if layer_obj is None: - return None - - update_fields = [] - if sync_to_geoserver is not None: - layer_obj.is_sync_to_geoserver = sync_to_geoserver - update_fields.append("is_sync_to_geoserver") - if is_stac_specs_generated is not None: - layer_obj.is_stac_specs_generated = is_stac_specs_generated - update_fields.append("is_stac_specs_generated") - - # `save(update_fields=...)` fires the post_save signal so the STAC - # auto-trigger handler in `computing.signals` can pick up the flip. - if update_fields: - layer_obj.save(update_fields=update_fields) - print( - f"Updated {update_fields} for layer ID: {layer_id} " - f"(sync={sync_to_geoserver}, stac={is_stac_specs_generated})" - ) - return layer_id - - except Exception as e: - print(f"Error updating layer sync status: {e}") - - -def _is_cache_valid(cache: dict, workspace: str) -> bool: - if workspace not in cache: - return False - age = time.time() - cache[workspace]["cached_at"] - if age > 3600: - logger.info(f"Cache expired for {workspace} (age: {int(age)}s)") - return False - return True - - -def _set_cache(cache: dict, workspace: str, data: set): - cache[workspace] = { - "data": data, - "cached_at": time.time(), - } - - -def send_report_email( - result, - report_type: str = "missing_layers", - recipients: list = None, -) -> bool: - """ - Generic reusable function to email a JSON report. - report_type: "missing_layers" or "missing_excel_files" - """ - if recipients is None: - recipients = getattr(settings, "MISSING_LAYER_RECIPIENTS", []) - - if isinstance(recipients, str): - recipients = [recipients] - - if not recipients: - logger.error("No recipients configured for report email.") - return False - - if report_type == "missing_layers": - subject = "Missing Layers Report" - attachment_name = "missing_layers.json" - - strict_result = result.get("Mandatory", {}) - can_be_empty_result = result.get("can_be_empty", {}) - - # Filter out workspaces with zero missing layers - strict_filtered = { - ws: data for ws, data in strict_result.items() if data.get("missing_layers") - } - can_be_empty_filtered = { - ws: data - for ws, data in can_be_empty_result.items() - if data.get("missing_layers") - } - - strict_summary = [] - total_strict_missing = 0 - for layer, data in strict_filtered.items(): - count = len(data.get("missing_layers", [])) - total_strict_missing += count - strict_summary.append(f"{layer}: {count}") - - can_be_empty_summary = [] - total_can_be_empty_missing = 0 - for layer, data in can_be_empty_filtered.items(): - count = len(data.get("missing_layers", [])) - total_can_be_empty_missing += count - can_be_empty_summary.append(f"{layer}: {count}") - - total_missing = total_strict_missing + total_can_be_empty_missing - - body = ( - "Missing Layers Report\n\n" - f"Total Missing: {total_missing}\n" - f" - Mandatory (needs attention): {total_strict_missing}\n" - f" - Can-Be-Empty (may be legitimately absent): {total_can_be_empty_missing}\n\n" - "---- Mandatory Workspaces (data expected everywhere) ----\n" - + ( - "\n".join(strict_summary) - if strict_summary - else "None — nothing missing" - ) - + "\n\n" - "---- Can-Be-Empty Workspaces (some locations may legitimately have no data) ----\n" - + ( - "\n".join(can_be_empty_summary) - if can_be_empty_summary - else "None — nothing missing" - ) - + "\n\nDetailed report attached." - ) - result = { - "Mandatory": strict_filtered, - "can_be_empty": can_be_empty_filtered, - } - - elif report_type == "missing_excel_files": - subject = "Missing Stats Excel/JSON Files Report" - attachment_name = "missing_excel_files.json" - total_locations = len(result) - total_missing_files = sum(len(loc.get("missing_files", [])) for loc in result) - total_xlsx_issues = sum(1 for loc in result if loc.get("xlsx_issues")) - body = ( - "Missing Stats Excel/JSON Files Report\n\n" - f"Tehsils which files(json/excel) are missing: {total_locations}\n" - f"Total missing files: {total_missing_files}\n" - f"Tehsils with xlsx sheet missing: {total_xlsx_issues}\n\n" - "Detailed report attached." - ) - - else: - logger.error(f"Unknown report_type: {report_type}") - return False - - attachment_content = json.dumps(result, indent=4) - max_retries = 3 - - for attempt in range(max_retries): - connection = None - try: - connection = get_connection(timeout=120) - connection.open() - email = EmailMessage( - subject=subject, - body=body, - from_email=settings.EMAIL_HOST_USER, - to=recipients, - connection=connection, - ) - email.attach( - attachment_name, - attachment_content, - "application/json", - ) - email.send() - logger.info(f"{subject} sent to {recipients}") - logger.info( - f"Attachment size: " - f"{len(attachment_content.encode('utf-8')) / 1024:.2f} KB" - ) - return True - except Exception as e: - logger.exception(f"Attempt {attempt + 1}/{max_retries} failed: {e}") - if attempt < max_retries - 1: - wait_time = 5 * (attempt + 1) - logger.info(f"Retrying after {wait_time} seconds...") - time.sleep(wait_time) - else: - logger.error("All attempts to send email failed.") - return False - finally: - if connection: - try: - connection.close() - except Exception: - pass - return False +import copy +import json +import logging +import os +import shutil +import zipfile +from datetime import datetime, timedelta + +import ee +import fiona +import geopandas as gpd +import requests +from django.conf import settings +from shapely.geometry import shape +from shapely.validation import explain_validity + +from computing.models import Dataset, Layer +from geoadmin.models import ( + DistrictSOI, + State_Disritct_Block_Properties, + StateSOI, + TehsilSOI, +) +from projects.models import Project +from utilities.constants import ( + ADMIN_BOUNDARY_OUTPUT_DIR, + GEE_ASSET_PATH, + GEE_HELPER_PATH, + GEE_PATHS, + SHAPEFILE_DIR, +) +from utilities.gee_utils import ( + check_task_status, + ee_initialize, + get_gee_asset_path, + get_gee_dir_path, + get_geojson_from_gcs, + is_asset_public, + is_gee_asset_exists, + sync_vector_to_gcs, + valid_gee_text, +) +from utilities.geoserver_utils import Geoserver + +logger = logging.getLogger(__name__) + + +def generate_shape_files(path): + gdf = gpd.read_file(path + ".json") + if os.path.exists(path): + # Only replace the target shapefile directory. Removing the parent + # state/workspace directory here corrupts sibling outputs on reruns. + shutil.rmtree(path) + + os.makedirs(os.path.dirname(path), exist_ok=True) + gdf.to_file( + path, + driver="ESRI Shapefile", + ) + return path + + +def convert_to_zip(dir_name, file_type): + if file_type == "gpkg": + with zipfile.ZipFile(dir_name + ".zip", "w", zipfile.ZIP_DEFLATED) as zipf: + zipf.write(dir_name + ".gpkg", arcname=os.path.basename(dir_name + ".gpkg")) + return dir_name + ".zip" + else: + return shutil.make_archive(dir_name, "zip", dir_name + "/") + + +def push_shape_to_geoserver( + path, store_name=None, workspace=None, layer_name=None, file_type="shp" +): + geo = Geoserver() + + print(f"layer_name: {layer_name}") + if layer_name: + try: + print(f"Attempting to delete store: {layer_name}") + geo.delete_vector_store(workspace=workspace, store=layer_name) + print(f"Successfully deleted store: {layer_name}") + except Exception as e: + print(f"Store does not exist or error deleting: {str(e)}") + + zip_path = convert_to_zip(path, file_type) + print(f"Zip path: {zip_path}") + print(f"Store name: {store_name}") + print(f"Workspace: {workspace}") + + response = geo.create_shp_datastore( + path=zip_path, + store_name=store_name, + workspace=workspace, + file_extension=file_type, + ) + print(f"Response: {response}") + return response + + +def kml_to_geojson(state_name, district_name, block_name, kml_path): + fiona.drvsupport.supported_drivers["kml"] = ( + "rw" # enable KML support which is disabled by default + ) + fiona.drvsupport.supported_drivers["KML"] = ( + "rw" # enable KML support which is disabled by default + ) + gdf = gpd.read_file(kml_path) + geometry_types = gdf.geometry.geometry.type.unique() + state_dir = os.path.join(ADMIN_BOUNDARY_OUTPUT_DIR, state_name) + + for gtype in geometry_types: + df = gdf.loc[gdf.geometry.geometry.type == gtype] + path = os.path.join(state_dir, f"{district_name}_{block_name}_{gtype}") + df.to_file(path + ".json", driver="GeoJSON") + generate_shape_files(path) + push_shape_to_geoserver(path, workspace="test_workspace") + + +def convert_kml_to_shapefile(kml_path, output_dir, shapefile_name): + if not os.path.exists(output_dir + "/" + shapefile_name): + os.makedirs(output_dir + "/" + shapefile_name) + + shapefile_path = os.path.join( + output_dir + "/" + shapefile_name, shapefile_name + ".shp" + ) + print("path path", shapefile_path) + cmd = f"ogr2ogr -f 'ESRI Shapefile' {shapefile_path} {kml_path}" # output.shp input.kml + os.system(command=cmd) + + return output_dir + "/" + shapefile_name + + +def kml_to_shp(state_name, district_name, block_name, kml_path): + shapefile_name = f"{district_name}_{block_name}" + shapefile_layer_path = convert_kml_to_shapefile( + kml_path, SHAPEFILE_DIR, shapefile_name + ) + + push_shape_to_geoserver(shapefile_layer_path, workspace="customkml") + + # os.remove(kml_path) + # shutil.rmtree(shapefile_layer_path) + os.remove(shapefile_layer_path + ".zip") + + +def sync_layer_to_geoserver(state_name, fc, layer_name, workspace): + state_dir = os.path.join("data/fc_to_shape", state_name) + if not os.path.exists(state_dir): + os.mkdir(state_dir) + path = os.path.join(state_dir, f"{layer_name}") + # Write the feature collection into json file + with open(path + ".json", "w") as f: + try: + f.write(f"{json.dumps(fc)}") + except Exception as e: + print(e) + + path = generate_shape_files(path) + return push_shape_to_geoserver(path, workspace=workspace, layer_name=layer_name) + + +def sync_fc_to_geoserver(fc, shp_folder, layer_name, workspace, style_name=None): + try: + geojson_fc = fc.getInfo() + except Exception as e: + print("Exception in getInfo()", e) + task_id = sync_vector_to_gcs(fc, layer_name, "GeoJSON") + check_task_status([task_id]) + + geojson_fc = get_geojson_from_gcs(layer_name) + geo = Geoserver() + if len(geojson_fc["features"]) > 0: + state_dir = os.path.join("data/fc_to_shape", shp_folder) + if not os.path.exists(state_dir): + os.mkdir(state_dir) + path = os.path.join(state_dir, f"{layer_name}") + + # Convert to GeoDataFrame + gdf = gpd.GeoDataFrame.from_features(geojson_fc["features"]) + + # Set CRS (Earth Engine uses EPSG:4326 by default) + gdf.crs = "EPSG:4326" + + gdf = fix_invalid_geometry_in_gdf(gdf) + + # Save as GeoPackage + gdf.to_file(path + ".gpkg", driver="GPKG") + res = push_shape_to_geoserver(path, workspace=workspace, file_type="gpkg") + if style_name: + style_res = geo.publish_style( + layer_name=layer_name, style_name=style_name, workspace=workspace + ) + print("Style response:", style_res) + return res + else: + return "No features in FeatureCollection" + + +def sync_project_fc_to_geoserver(fc, project_name, layer_name, workspace): + print("inside") + print(layer_name) + try: + geojson_fc = fc.getInfo() + except Exception as e: + print("Exception in getInfo()", e) + task_id = sync_vector_to_gcs(fc, layer_name, "GeoJSON") + check_task_status([task_id]) + + geojson_fc = get_geojson_from_gcs(layer_name) + print(len(geojson_fc["features"])) + if len(geojson_fc["features"]) > 0: + state_dir = os.path.join("data/fc_to_shape", project_name) + if not os.path.exists(state_dir): + os.mkdir(state_dir) + path = os.path.join(state_dir, f"{layer_name}") + + # Convert to GeoDataFrame + gdf = gpd.GeoDataFrame.from_features(geojson_fc["features"]) + + # Set CRS (Earth Engine uses EPSG:4326 by default) + gdf.crs = "EPSG:4326" + + gdf = fix_invalid_geometry_in_gdf(gdf) + + # Save as GeoPackage + gdf.to_file(path + ".gpkg", driver="GPKG") + print("pushed to geoserver") + return push_shape_to_geoserver( + path, workspace=workspace, layer_name=layer_name, file_type="gpkg" + ) + else: + print("no features found") + return + + +def to_camelcase(text): + words = text.split() + camelcase = words[0].lower() + for word in words[1:]: + camelcase += word.capitalize() + return camelcase + + +def create_chunk(aoi, description, chunk_size): + size = aoi.size().getInfo() + parts = size // chunk_size + # task_ids = [] + rois = [] + descs = [] + for part in range(parts + 1): + start = part * chunk_size + end = start + chunk_size + block_name_for_parts = description + "_" + str(start) + "-" + str(end) + roi = ee.FeatureCollection(aoi.toList(aoi.size()).slice(start, end)) + if roi.size().getInfo() > 0: + descs.append(block_name_for_parts) + rois.append(roi) + + return rois, descs + + +def merge_chunks( + aoi, + folder_list, + description, + chunk_size, + chunk_asset_path=GEE_HELPER_PATH, + merge_asset_path=GEE_ASSET_PATH, + merge_asset_id=None, +): + print("Merge Chunk task initiated") + ee_initialize() + size = aoi.size().getInfo() + parts = size // chunk_size + assets = [] + for part in range(parts + 1): + start = part * chunk_size + end = start + chunk_size + block_name_for_parts = description + "_" + str(start) + "-" + str(end) + src_asset_id = ( + get_gee_dir_path(folder_list, chunk_asset_path) + block_name_for_parts + ) + if is_gee_asset_exists(src_asset_id): + assets.append(ee.FeatureCollection(src_asset_id)) + + asset = ee.FeatureCollection(assets).flatten() + + asset_id = merge_asset_id or ( + get_gee_dir_path(folder_list, merge_asset_path) + description + ) + try: + # Export an ee.FeatureCollection as an Earth Engine asset. + task = ee.batch.Export.table.toAsset( + **{ + "collection": asset, + "description": description, + "assetId": asset_id, + } + ) + + task.start() + print("Successfully started the merge chunk", task.status()) + return task.status()["id"] + except Exception as e: + print(f"Error occurred in running merge task: {e}") + return None + + +def fix_invalid_geometry_in_gdf(gdf): + invalid = gdf[~gdf.is_valid] + if not invalid.empty: + print("Invalid geometries found:") + for idx, geom in invalid.geometry.items(): + print(f"Index {idx}: {explain_validity(geom)}") + gdf.loc[idx, "geometry"] = gdf.loc[idx, "geometry"].buffer(0) + + return gdf + + +def get_season_key(date): + """Return season key like 'rabi_2017-2018' based on Indian cropping seasons.""" + month = date.month + year = date.year + next_year = year + 1 + + if month in [1, 2]: + return f"rabi_{year - 1}-{year}" # Jan–Feb → Rabi of previous year + elif month in [11, 12]: + return f"rabi_{year}-{next_year}" # Nov–Dec → Rabi starting this year + elif month in [3, 4, 5, 6]: + return f"zaid_{year}-{next_year}" + elif month in [7, 8, 9, 10]: + return f"kharif_{year}-{next_year}" + else: + return None + + +def get_agri_year_key(season_key): + """Convert a season key to agricultural year key (e.g., rabi_2017-2018 → 2017-2018).""" + season, years = season_key.split("_") + start_year, end_year = map(int, years.split("-")) + + if season in ["kharif", "rabi"]: + return f"{start_year}-{end_year}" + elif season == "zaid": + return f"{start_year - 1}-{start_year}" # Zaid 2018-2019 → Agri year 2017-2018 + else: + return None + + +def calculate_precipitation_season( + geojson_filepath, draught_asset_id, start_year=2017, end_year=2024 +): + + # Load the GeoJSON file + with open(geojson_filepath, "r") as f: + feature_collection = json.load(f) + + features_ee = [] + + for feature in feature_collection["features"]: + original_props = feature["properties"] + new_props = {} + + # Copy UID + if "uid" in original_props: + new_props["uid"] = original_props["uid"] + + agri_year_totals = {} + + # Parse precipitation date keys + for key, val in original_props.items(): + try: + date = datetime.strptime(key, "%Y-%m-%d") + season_key = get_season_key(date) + if not season_key: + continue + + agri_key = get_agri_year_key(season_key) + if not agri_key: + continue + + agri_start = int(agri_key.split("-")[0]) + if not (start_year <= agri_start <= end_year): + continue + + season = season_key.split("_")[0] # kharif, rabi etc + full_key = f"{season}_{agri_key}" + + agri_year_totals[full_key] = agri_year_totals.get(full_key, 0) + float( + val + ) + + except Exception: + continue + + # Add all seasonal totals to new_props + for agri_key, total in agri_year_totals.items(): + new_props[f"precipitation_{agri_key}"] = total + + # Create EE Feature + geom_ee = ee.Geometry(feature["geometry"]) + feature_ee = ee.Feature(geom_ee, new_props) + features_ee.append(feature_ee) + + # Left side FC + mws_fc = ee.FeatureCollection(features_ee) + + return mws_fc + + +def generate_geojson_with_ci_and_ndvi(zoi_asset, ci_asset, ndvi_asset, proj_id): + # Load project + proj_obj = Project.objects.get(pk=proj_id) + + # Build CI and NDVI asset paths + asset_path_ci = ( + get_gee_dir_path( + [proj_obj.name], asset_path=GEE_PATHS["WATER_REJ"]["GEE_ASSET_PATH"] + ) + + ci_asset + ) + + asset_path_ndvi = ( + get_gee_dir_path( + [proj_obj.name], asset_path=GEE_PATHS["WATER_REJ"]["GEE_ASSET_PATH"] + ) + + ndvi_asset + ) + + # Load FeatureCollections + zoi = ee.FeatureCollection(zoi_asset) + ci = ee.FeatureCollection(asset_path_ci) + ndvi = ee.FeatureCollection(asset_path_ndvi) + + # ------------------------- + # STEP 1: Join ZOI with Cropping Intensity + # ------------------------- + join = ee.Join.inner() + filter = ee.Filter.intersects(leftField=".geo", rightField=".geo") + zoi_ci_joined = join.apply(zoi, ci, filter) + + def merge_zoi_ci(pair): + zoi_feat = ee.Feature(pair.get("primary")) + ci_feat = ee.Feature(pair.get("secondary")) + merged_props = zoi_feat.toDictionary().combine(ci_feat.toDictionary(), True) + return ee.Feature(zoi_feat.geometry(), merged_props) + + zoi_with_ci = ee.FeatureCollection(zoi_ci_joined.map(merge_zoi_ci)) + + # ------------------------- + # STEP 2: Join ZOI+CI with NDVI + # ------------------------- + zoi_ndvi_joined = join.apply(zoi_with_ci, ndvi, filter) + + def merge_zoi_ci_ndvi(pair): + ci_feat = ee.Feature(pair.get("primary")) + ndvi_feat = ee.Feature(pair.get("secondary")) + merged_props = ci_feat.toDictionary().combine(ndvi_feat.toDictionary(), True) + return ee.Feature(ci_feat.geometry(), merged_props) + + final_merged = ee.FeatureCollection(zoi_ndvi_joined.map(merge_zoi_ci_ndvi)) + + # ------------------------- + # STEP 3: Export or Push to GeoServer + # ------------------------- + layer_name = f"WaterRejapp_zoi_{proj_obj.name}_{proj_obj.id}" + sync_project_fc_to_geoserver(final_merged, proj_obj.name, layer_name, "waterrej") + + +def get_directory_size(path): + total_size = 0 + for dirpath, dirnames, filenames in os.walk(path): + for filename in filenames: + file_path = os.path.join(dirpath, filename) + if os.path.isfile(file_path): + total_size += os.path.getsize(file_path) + return total_size + + +def generate_geojson_with_ci_ndvi_ndmi( + zoi_asset, ci_asset, ndvi_asset, ndmi_asset, proj_id +): + + # Load project + proj_obj = Project.objects.get(pk=proj_id) + + zoi = ee.FeatureCollection(zoi_asset) + print("Number of features zoi:", zoi.size().getInfo()) + + ci = ee.FeatureCollection(ci_asset) + print("Number of features zoi:", ci.size().getInfo()) + ndvi = ee.FeatureCollection(ndmi_asset) + print("Number of features zoi:", ndvi.size().getInfo()) + ndmi = ee.FeatureCollection(ndmi_asset) + print("Number of features zoi:", ndmi.size().getInfo()) + + # ------------------------- + # STEP 1: Join ZOI with CI + # ------------------------- + join = ee.Join.inner() + filter = ee.Filter.intersects(leftField=".geo", rightField=".geo") + zoi_ci_joined = join.apply(zoi, ci, filter) + + def merge_zoi_ci(pair): + zoi_feat = ee.Feature(pair.get("primary")) + ci_feat = ee.Feature(pair.get("secondary")) + merged_props = zoi_feat.toDictionary().combine(ci_feat.toDictionary(), True) + return ee.Feature(zoi_feat.geometry(), merged_props) # ✅ keep ZOI geom + + zoi_with_ci = ee.FeatureCollection(zoi_ci_joined.map(merge_zoi_ci)) + + # ------------------------- + # STEP 2: Join with NDVI + # ------------------------- + zoi_ndvi_joined = join.apply(zoi_with_ci, ndvi, filter) + + def merge_zoi_ci_ndvi(pair): + prev_feat = ee.Feature(pair.get("primary")) + ndvi_feat = ee.Feature(pair.get("secondary")) + merged_props = prev_feat.toDictionary().combine(ndvi_feat.toDictionary(), True) + return ee.Feature(prev_feat.geometry(), merged_props) # ✅ still ZOI geom + + zoi_ci_ndvi = ee.FeatureCollection(zoi_ndvi_joined.map(merge_zoi_ci_ndvi)) + + # ------------------------- + # STEP 3: Join with NDMI + # ------------------------- + zoi_ndmi_joined = join.apply(zoi_ci_ndvi, ndmi, filter) + + def merge_zoi_ci_ndvi_ndmi(pair): + prev_feat = ee.Feature(pair.get("primary")) + ndmi_feat = ee.Feature(pair.get("secondary")) + merged_props = prev_feat.toDictionary().combine(ndmi_feat.toDictionary(), True) + return ee.Feature(prev_feat.geometry(), merged_props) # ✅ keep ZOI geom + + final_merged = ee.FeatureCollection(zoi_ndmi_joined.map(merge_zoi_ci_ndvi_ndmi)) + + # ------------------------- + # STEP 4: Export or Push to GeoServer + # ------------------------- + layer_name = f"WaterRejapp_zoi_{proj_obj.name}_{proj_obj.id}" + print(layer_name) + sync_project_fc_to_geoserver(final_merged, proj_obj.name, layer_name, "waterrej") + + +def generate_geojson_with_ci_ndvi(zoi_asset, ci_asset, ndvi_asset, proj_id): + # Load project + proj_obj = Project.objects.get(pk=proj_id) + + # Initialize Earth Engine + ee_initialize(4) + + # Load FeatureCollections + zoi = ee.FeatureCollection(zoi_asset) + ci = ee.FeatureCollection(ci_asset) + ndvi = ee.FeatureCollection(ndvi_asset) + + print("ZOI:", zoi.size().getInfo()) + print("CI:", ci.size().getInfo()) + print("NDVI:", ndvi.size().getInfo()) + + # Common join logic on UID + join = ee.Join.inner() + uid_filter = ee.Filter.equals(leftField="UID", rightField="UID") + + # --- Join ZOI + CI --- + zoi_ci_joined = join.apply(zoi, ci, uid_filter) + + def merge_zoi_ci(pair): + zoi_feat = ee.Feature(pair.get("primary")) + ci_feat = ee.Feature(pair.get("secondary")) + merged_props = zoi_feat.toDictionary().combine(ci_feat.toDictionary(), True) + # Keep ZOI geometry only + return ee.Feature(zoi_feat.geometry(), merged_props) + + zoi_with_ci = ee.FeatureCollection(zoi_ci_joined.map(merge_zoi_ci)) + + # --- Join with NDVI --- + zoi_ndvi_joined = join.apply(zoi_with_ci, ndvi, uid_filter) + + def merge_with_ndvi(pair): + base_feat = ee.Feature(pair.get("primary")) + ndvi_feat = ee.Feature(pair.get("secondary")) + merged_props = base_feat.toDictionary().combine(ndvi_feat.toDictionary(), True) + # Always retain ZOI geometry + return ee.Feature(base_feat.geometry(), merged_props) + + merged_final = ee.FeatureCollection(zoi_ndvi_joined.map(merge_with_ndvi)) + + # --- Ensure ZOI geometry retained in all features --- + merged_final = merged_final.map( + lambda f: ee.Feature( + f.setGeometry( + ee.Feature( + zoi.filter(ee.Filter.eq("UID", f.get("UID"))).first() + ).geometry() + ) + ) + ) + + layer_name = f"WaterRejapp_zoi_{proj_obj.name}_{proj_obj.id}" + print(layer_name) + + sync_project_fc_to_geoserver(merged_final, proj_obj.name, layer_name, "waterrej") + + +def save_layer_info_to_db( + state, + district, + block, + layer_name, + asset_id, + dataset_name, + sync_to_geoserver=False, + layer_version="1.0", + algorithm=None, + algorithm_version="1.0", + misc=None, + is_override=False, + is_gee_asset=True, +): + print("inside the save_layer_info_to_db function") + + dataset = Dataset.objects.get(name=dataset_name) + + try: + state_obj = StateSOI.objects.get(state_name__iexact=state) + district_obj = DistrictSOI.objects.get( + district_name__iexact=district, state=state_obj + ) + block_obj = TehsilSOI.objects.get( + tehsil_name__iexact=block, district=district_obj + ) + except Exception as e: + print("Error fetching in state district block:", e) + return + + is_public = is_asset_public(asset_id) if is_gee_asset else False + + # Check if there’s an existing layer + existing_layer = ( + Layer.objects.filter( + dataset=dataset, + layer_name=layer_name, + state=state_obj, + district=district_obj, + block=block_obj, + ) + .order_by("-layer_version") + .first() + ) + + if existing_layer: + if existing_layer.algorithm_version != algorithm_version: + # Algorithm version changed --> create new record with incremented layer_version + new_layer_version = str(float(existing_layer.layer_version) + 1) + print( + f"Algorithm version changed. Creating new layer version: {new_layer_version}" + ) + layer_obj = Layer.objects.create( + dataset=dataset, + layer_name=layer_name, + state=state_obj, + district=district_obj, + block=block_obj, + layer_version=new_layer_version, + algorithm=algorithm, + algorithm_version=algorithm_version, + is_sync_to_geoserver=sync_to_geoserver, + is_public_gee_asset=is_public, + is_override=is_override, + misc=misc, + gee_asset_path=asset_id, + ) + else: + # Algorithm version is same --> update existing layer + print("Algorithm version same. Updating existing layer.") + for field, value in { + "algorithm": algorithm, + "algorithm_version": algorithm_version, + "is_sync_to_geoserver": sync_to_geoserver, + "is_public_gee_asset": is_public, + "is_override": is_override, + "misc": misc, + "gee_asset_path": asset_id, + }.items(): + setattr(existing_layer, field, value) + existing_layer.save() + layer_obj = existing_layer + else: + # No existing record --> create a new one + print("No existing layer found. Creating new one.") + layer_obj = Layer.objects.create( + dataset=dataset, + layer_name=layer_name, + state=state_obj, + district=district_obj, + block=block_obj, + layer_version=layer_version, + algorithm=algorithm, + algorithm_version=algorithm_version, + is_sync_to_geoserver=sync_to_geoserver, + is_public_gee_asset=is_public, + is_override=is_override, + misc=misc, + gee_asset_path=asset_id, + ) + + print(f"Saved layer info (id={layer_obj.id}, version={layer_obj.layer_version})") + return layer_obj.id + + +def get_existing_end_year(dataset_name, layer_name): + """fetch objects from db on the basis of dataset name and layer_name""" + dataset = Dataset.objects.get(name=dataset_name) + layer_obj = Layer.objects.get(dataset=dataset, layer_name=layer_name) + existing_end_date = layer_obj.misc["end_year"] + print("existing_end_date", existing_end_date) + return existing_end_date + + +def get_layer_object(state, district, block, layer_name, dataset_name): + state_obj = StateSOI.objects.get(state_name__iexact=state) + district_obj = DistrictSOI.objects.get( + district_name__iexact=district, state=state_obj + ) + block_obj = TehsilSOI.objects.get(tehsil_name__iexact=block, district=district_obj) + layer_obj = ( + Layer.objects.filter( + state=state_obj, + district=district_obj, + block=block_obj, + layer_name=layer_name, + dataset__name=dataset_name, + ) + .order_by("-layer_version") + .first() + ) + return layer_obj + + +def update_dashboard_geojson( + state=None, + district=None, + block=None, + layer_name=None, + workspace_name=None, + proj_id=None, +): + if state and block and block: + print(f"🔄 Updating GeoJSON for {state}, {district}, {block}") + + # Get related objects + state_obj = StateSOI.objects.get(state_name=state) + district_obj = DistrictSOI.objects.get(district_name=district) + tehsil_obj = TehsilSOI.objects.get(tehsil_name=block) # fixed typo + + # Get or create main record + obj, created = State_Disritct_Block_Properties.objects.get_or_create( + state=state_obj, district=district_obj, tehsil=tehsil_obj + ) + else: + obj = Project.objects.get(pk=proj_id) + + # Map suffix to json_key + suffix_to_key = { + "wb": "wb_geojson", + "zoi": "zoi_geojson", + "mws": "mws_geojson", + } + + # Detect which key this layer corresponds to + json_key = None + for suffix, key in suffix_to_key.items(): + if layer_name == f"{state}_{district}_{block}_{suffix}": + json_key = key + break + + if not json_key: + print(f"⚠️ Layer name {layer_name} did not match any known type.") + return + + # Construct GeoServer URL + waterrej_url = ( + f"https://geoserver.core-stack.org:8443/geoserver/waterrej/ows?" + f"service=WFS&version=1.0.0&request=GetFeature&typeName={workspace_name}:{layer_name}" + f"&outputFormat=application%2Fjson" + ) + + # Load existing dashboard_geojson or create new + if proj_id: + misc = obj.dashboard_geojson or {} + else: + misc = obj.geojson_path or {} + + # Ensure waterrej section exists + if "waterrej" not in misc: + misc["waterrej"] = {} + + # Update or add this specific json_key + misc["waterrej"][json_key] = waterrej_url + + # Save the updated JSON field + obj.dashboard_geojson = misc + obj.save() + + print(f"✅ Added/Updated {json_key} for {state}, {district}, {block}") + + +def clean_geometry(geom): + """ + Clean geometry: + - Dissolve multipolygon → single polygon + - Remove holes automatically + - Fix invalid topology + - Buffer tiny polygons + """ + + # 1. Dissolve multi-polygons and remove holes + geom = geom.dissolve(maxError=1) + + # 2. Fix invalid rings by simplifying slightly (NEVER buffer(0)) + geom = geom.simplify(1) + + # 3. Buffer polygons smaller than 1 pixel (< 900 m²) + area = geom.area() + geom = ee.Algorithms.If( + area.lt(900), + geom.buffer(15), + geom, # ensure raster pixel center is captured + ) + + return ee.Geometry(geom) + + +def safe_reduce_max(image, geom, scale=30): + geom = clean_geometry(geom) + + val = ( + image.unmask(0) + .reduceRegion( + reducer=ee.Reducer.max(), + geometry=geom, + scale=scale, + maxPixels=1e13, + tileScale=4, + bestEffort=True, + ) + .get("b1") + ) + + return ee.Number(ee.Algorithms.If(val, val, 0)) + + +# ------------------------------------------------------ +# SAFE REDUCE MAX FUNCTION +# ------------------------------------------------------ +def safe_reduce_max(image, geom, scale=30): + geom = clean_geometry(geom) + + result = ( + image.unmask(0) + .reduceRegion( + reducer=ee.Reducer.max(), + geometry=geom, + scale=scale, + maxPixels=1e13, + tileScale=4, + bestEffort=True, + ) + .get("b1") + ) + + # Convert null → 0 + return ee.Number(ee.Algorithms.If(result, result, 0)) + + +# ------------------------------------------------------ +# MAIN FUNCTION TO PROCESS SWB LAYER +# ------------------------------------------------------ +def generate_swb_layer_with_max_so_catchment( + roi=None, + app_type="MWS", + asset_suffix=None, + asset_folder=None, + gee_account_id=None, +): + ee_initialize(gee_account_id) + + # Build asset paths + base_path = get_gee_dir_path( + asset_folder, asset_path=GEE_PATHS[app_type]["GEE_ASSET_PATH"] + ) + + so_asset = f"{base_path}stream_order_{asset_suffix}_raster" + ca_asset = f"{base_path}catchment_area_{asset_suffix}_raster" + + # Load rasters + stream_order_band = ee.Image(so_asset).select("b1") + catchment_band = ee.Image(ca_asset).select("b1") + + # Processing per waterbody + def compute_for_feature(feature): + geom = feature.geometry() + + max_so = safe_reduce_max(stream_order_band, geom, scale=30) + max_ca = safe_reduce_max(catchment_band, geom, scale=30) + + return feature.set( + { + "max_stream_order": max_so, + "max_catchment_area": max_ca, + } + ) + + # Map over the feature collection + return roi.map(compute_for_feature) + + +def _get_prod_backend_url(): + return getattr(settings, "PROD_BACKEND_URL", "").rstrip("/") + + +def _get_prod_api_key(): + return getattr(settings, "PROD_BACKEND_API_KEY", "") + + +def _sync_layer_to_prod_db(payload: dict): + prod_url = _get_prod_backend_url() + if not prod_url: + return None + + endpoint = prod_url + "/api/v1/sync_layer_remote/" + try: + response = requests.post( + endpoint, + json=payload, + headers={"X-Api-Key": _get_prod_api_key()}, + timeout=30, + ) + if response.status_code not in (200, 201): + logger.warning( + "Prod DB sync returned %s for layer %s: %s", + response.status_code, + payload.get("layer_name"), + response.text, + ) + return None + layer_id = response.json().get("layer_id") + logger.info( + "Layer %s synced to prod DB (id=%s).", payload.get("layer_name"), layer_id + ) + return layer_id + except requests.RequestException as e: + logger.error( + "Failed to sync layer %s to prod DB: %s", payload.get("layer_name"), e + ) + return None + + +def _update_layer_sync_remote( + layer_id, sync_to_geoserver=None, is_stac_specs_generated=None +): + prod_url = _get_prod_backend_url() + if not prod_url or layer_id is None: + return + + endpoint = prod_url + "/api/v1/update_layer_sync_remote/" + payload = { + "layer_id": layer_id, + "sync_to_geoserver": sync_to_geoserver, + "is_stac_specs_generated": is_stac_specs_generated, + } + try: + response = requests.post( + endpoint, + json=payload, + headers={"X-Api-Key": _get_prod_api_key()}, + timeout=30, + ) + if response.status_code not in (200, 201): + logger.warning( + "Prod layer sync status update returned %s for layer %s: %s", + response.status_code, + layer_id, + response.text, + ) + else: + logger.info("Layer sync status updated on prod DB for id=%s.", layer_id) + except requests.RequestException as e: + logger.error( + "Failed to update layer sync status on prod DB for id=%s: %s", layer_id, e + ) + + +def update_layer_sync_status( + layer_id, sync_to_geoserver=None, is_stac_specs_generated=None +): + if _get_prod_backend_url(): + _update_layer_sync_remote( + layer_id, + sync_to_geoserver=sync_to_geoserver, + is_stac_specs_generated=is_stac_specs_generated, + ) + return layer_id + + try: + layer_obj = Layer.objects.filter(id=layer_id).first() + if layer_obj is None: + return None + + update_fields = [] + if sync_to_geoserver is not None: + layer_obj.is_sync_to_geoserver = sync_to_geoserver + update_fields.append("is_sync_to_geoserver") + if is_stac_specs_generated is not None: + layer_obj.is_stac_specs_generated = is_stac_specs_generated + update_fields.append("is_stac_specs_generated") + + # `save(update_fields=...)` fires the post_save signal so the STAC + # auto-trigger handler in `computing.signals` can pick up the flip. + if update_fields: + layer_obj.save(update_fields=update_fields) + print( + f"Updated {update_fields} for layer ID: {layer_id} " + f"(sync={sync_to_geoserver}, stac={is_stac_specs_generated})" + ) + return layer_id + + except Exception as e: + print(f"Error updating layer sync status: {e}") diff --git a/utilities/constants.py b/utilities/constants.py index 7f5a4fad..b0bb5dd4 100644 --- a/utilities/constants.py +++ b/utilities/constants.py @@ -9,6 +9,7 @@ NREGA_ASSETS_OUTPUT_DIR = "data/nrega_assets/output" ANTYODAYA_2020 = "data/antyodaya/output/pan_india_antyodaya_2020.gpkg" +LIVESTOCKS = "data/livestock/pan_india_livestock.gpkg" MERGE_MWS_PATH = "data/merge_mws"