From f36ab920b2155973bd7b324efe65db44683aa1ee Mon Sep 17 00:00:00 2001 From: Mahith Edula Date: Thu, 6 Feb 2025 09:37:34 -0500 Subject: [PATCH] Finished Realtime Rainfall notebook - pushed on AGOL --- rainfall/Realtime Rainfall ETL.ipynb | 214 +++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 rainfall/Realtime Rainfall ETL.ipynb diff --git a/rainfall/Realtime Rainfall ETL.ipynb b/rainfall/Realtime Rainfall ETL.ipynb new file mode 100644 index 0000000..bd7deab --- /dev/null +++ b/rainfall/Realtime Rainfall ETL.ipynb @@ -0,0 +1,214 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Realtime Rainfall ETL\n", + "An hourly scheduled notebook that performs and ETL process on recent RTRR and RTRG data and loads it into the appropriate feature layers in the realtimte timeseries map. The goal is to constantly updating data for the timeseries animation. We only delete/load the data that is necessary rather than loading all 72 hours of the data every hour.\n", + "\n", + "Parameters:\n", + "\n", + "CHUNK_SIZE - size of the chunks when adding and deleting features on AGOL - don't exceed 250\n", + "\n", + "NUM_OF_HOURS_ON_VIEW = number of hours that are displayed on the web map - defaults to 72 to view the last 72 hours worth of rainfall data\n", + "\n", + "HOURS_TO_UPDATE - the number of hours before the current time to go back and add to the web map - if 0 it will query for the latest pixel timestamp and add points after that\n", + "\n", + "\n", + "Environment: conda activate arcgispro-py3\n" + ] + }, + { + "cell_type": "code", + "execution_count": 49, + "metadata": {}, + "outputs": [], + "source": [ + "import requests\n", + "from datetime import datetime, timedelta, timezone\n", + "import pytz\n", + "import pandas as pd\n", + "from urllib.parse import urljoin\n", + "from arcgis.geometry import Polygon, Point\n", + "from arcgis.gis import GIS\n", + "from tqdm import tqdm" + ] + }, + { + "cell_type": "code", + "execution_count": 85, + "metadata": {}, + "outputs": [], + "source": [ + "CHUNK_SIZE = 50 #don't exceed 250\n", + "NUM_OF_HOURS_ON_VIEW = 72\n", + "HOURS_TO_UPDATE = 1\n", + "\n", + "RAINFALL_DATA_API_URL = \"https://trwwapi.herokuapp.com\"\n", + "REALTIME_TIMESERIES_ITEM_ID = '412c957447f64e0a8975034f399725b5'\n", + "REALTIME_TIMESERIES_POINTS_LAYER_INDEX = 0\n", + "\n", + "TZINFO = pytz.timezone('UTC')" + ] + }, + { + "cell_type": "code", + "execution_count": 60, + "metadata": {}, + "outputs": [], + "source": [ + "gis = GIS(\"home\") #Need the CM_3RWW profile - admin role" + ] + }, + { + "cell_type": "code", + "execution_count": 61, + "metadata": {}, + "outputs": [], + "source": [ + "realtime_timeseries_item = gis.content.get(REALTIME_TIMESERIES_ITEM_ID)\n", + "realtime_timeseries_points = realtime_timeseries_item.layers[REALTIME_TIMESERIES_POINTS_LAYER_INDEX]\n", + "\n", + "end_datetime = datetime.now(tz=TZINFO)\n", + "delete_datetime = end_datetime-timedelta(hours=NUM_OF_HOURS_ON_VIEW)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 68, + "metadata": {}, + "outputs": [], + "source": [ + "def get_latest_ts_from_points_layer(layer = realtime_timeseries_points):\n", + " query = layer.query(where=\"1=1\", out_fields=\"ts\", order_by_fields=\"ts DESC\")\n", + " if not query.features: #No features\n", + " return None \n", + " latest_timestamp = query.features[0].attributes[\"ts\"]\n", + " latest_datetime = datetime.fromtimestamp(latest_timestamp / 1000, tz= TZINFO) #Need to divide on Windows\n", + " return latest_datetime\n" + ] + }, + { + "cell_type": "code", + "execution_count": 69, + "metadata": {}, + "outputs": [], + "source": [ + "def get_latest_ts_from_points_layer(layer = realtime_timeseries_points):\n", + " # Making sid = 1 to imporve performance in not having to sort through all the data - may lead to slight time errors\n", + " query = layer.query(where=\"sid='1'\", out_fields=\"ts\", order_by_fields=\"ts DESC\")\n", + " if not query.features: #No features\n", + " return None \n", + " latest_timestamp = query.features[0].attributes[\"ts\"]\n", + " latest_datetime = datetime.fromtimestamp(latest_timestamp / 1000, tz= TZINFO) #Need to divide on Windows\n", + " return latest_datetime\n", + "\n", + "def get_hours_before():\n", + " if 1 <= HOURS_TO_UPDATE <= NUM_OF_HOURS_ON_VIEW:\n", + " return HOURS_TO_UPDATE\n", + " latest_point = get_latest_ts_from_points_layer()\n", + " if not latest_point:\n", + " return NUM_OF_HOURS_ON_VIEW\n", + " time_difference = end_datetime - latest_point\n", + " hours_difference = (time_difference.seconds * 60 * 60)\n", + " if hours_difference > NUM_OF_HOURS_ON_VIEW:\n", + " return NUM_OF_HOURS_ON_VIEW\n", + " return hours_difference\n", + "\n", + "def get_points_df_from_3rww_api():\n", + " request_url = urljoin(RAINFALL_DATA_API_URL, \"rainfall/v3/realtime/ago/\")\n", + " hours_ago = get_hours_before()\n", + " response = requests.get(\n", + " url = request_url,\n", + " params = {\"hours\": hours_ago}\n", + " )\n", + " response_json = response.json()\n", + " points_df = pd.DataFrame(response_json[\"data\"], columns=response_json[\"columns\"] )\n", + " points_df[\"SHAPE\"] = points_df.apply(lambda r: {\"x\": r[\"x\"], \"y\": r[\"y\"], \"spatialReference\" : {\"wkid\": 4326}}, axis=1)\n", + " points_df.drop(columns=[\"x\", \"y\"])\n", + "\n", + " return points_df\n", + "\n", + "def add_points_to_AGOL_realtime_timeseries_layer(layer = realtime_timeseries_points, df_to_add = None):\n", + " all_added = []\n", + " for i in tqdm(range(0, len(df_to_add.index), CHUNK_SIZE)):\n", + " #print(f\"adding {i} to {i+chunks}\")\n", + " adds = (df_to_add.loc[i:i+CHUNK_SIZE,]).copy()\n", + " add_results = layer.edit_features(\n", + " adds=adds.spatial.to_featureset(), \n", + " rollback_on_failure=True\n", + " )\n", + " added = [x for x in add_results.get('addResults', []) if x.get('success')]\n", + " if added:\n", + " all_added.extend(added)\n", + " else:\n", + " print(f\"Error adding records @ batch {i+CHUNK_SIZE}\")\n", + " print(add_results)\n", + "\n", + "def get_object_ids_to_remove(delete_datetime):\n", + " query_string = f\"ts < '{delete_datetime}'\"\n", + " delete_points_query = realtime_timeseries_points.query(where = query_string, return_ids_only=True)\n", + " delete_points_objectids_list = [str(objectId) for objectId in delete_points_query[\"objectIds\"]]\n", + " return delete_points_objectids_list\n", + " \n", + "def delete_points_from_AGOL_realtime_timeseries_layer(layer = realtime_timeseries_points, delete_points_objectids_list = []):\n", + " all_removed = []\n", + " for i in tqdm(range(0, len(delete_points_objectids_list), CHUNK_SIZE)):\n", + " delete_results = layer.edit_features(\n", + " deletes = delete_points_objectids_list[i:i+CHUNK_SIZE],\n", + " rollback_on_failure = True,\n", + " )\n", + " removed = [x for x in delete_results.get('deleteResults', []) if x.get('success')]\n", + " if removed:\n", + " all_removed.extend(removed)\n", + " else:\n", + " print(f\"Error removing records @ batch {i+CHUNK_SIZE}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def add_and_delete_points_on_AGOL_realtime_timeseries_layer():\n", + " delete_object_ids = get_object_ids_to_remove(delete_datetime)\n", + " delete_points_from_AGOL_realtime_timeseries_layer(delete_points_objectids_list=delete_object_ids)\n", + "\n", + " points_df = get_points_df_from_3rww_api()\n", + " add_points_to_AGOL_realtime_timeseries_layer(df_to_add=points_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "add_and_delete_points_on_AGOL_realtime_timeseries_layer()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "arcgispro-py3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}