Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions rainfall/Realtime Rainfall ETL.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Realtime Rainfall ETL\n",
"An hourly scheduled notebook that performs and ETL process on recent RTRR and RTRG data and loads it into the appropriate feature layers in the realtimte timeseries map. The goal is to constantly updating data for the timeseries animation. We only delete/load the data that is necessary rather than loading all 72 hours of the data every hour.\n",
"\n",
"Parameters:\n",
"\n",
"CHUNK_SIZE - size of the chunks when adding and deleting features on AGOL - don't exceed 250\n",
"\n",
"NUM_OF_HOURS_ON_VIEW = number of hours that are displayed on the web map - defaults to 72 to view the last 72 hours worth of rainfall data\n",
"\n",
"HOURS_TO_UPDATE - the number of hours before the current time to go back and add to the web map - if 0 it will query for the latest pixel timestamp and add points after that\n",
"\n",
"\n",
"Environment: conda activate arcgispro-py3\n"
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {},
"outputs": [],
"source": [
"import requests\n",
"from datetime import datetime, timedelta, timezone\n",
"import pytz\n",
"import pandas as pd\n",
"from urllib.parse import urljoin\n",
"from arcgis.geometry import Polygon, Point\n",
"from arcgis.gis import GIS\n",
"from tqdm import tqdm"
]
},
{
"cell_type": "code",
"execution_count": 85,
"metadata": {},
"outputs": [],
"source": [
"CHUNK_SIZE = 50 #don't exceed 250\n",
"NUM_OF_HOURS_ON_VIEW = 72\n",
"HOURS_TO_UPDATE = 1\n",
"\n",
"RAINFALL_DATA_API_URL = \"https://trwwapi.herokuapp.com\"\n",
"REALTIME_TIMESERIES_ITEM_ID = '412c957447f64e0a8975034f399725b5'\n",
"REALTIME_TIMESERIES_POINTS_LAYER_INDEX = 0\n",
"\n",
"TZINFO = pytz.timezone('UTC')"
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [],
"source": [
"gis = GIS(\"home\") #Need the CM_3RWW profile - admin role"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [],
"source": [
"realtime_timeseries_item = gis.content.get(REALTIME_TIMESERIES_ITEM_ID)\n",
"realtime_timeseries_points = realtime_timeseries_item.layers[REALTIME_TIMESERIES_POINTS_LAYER_INDEX]\n",
"\n",
"end_datetime = datetime.now(tz=TZINFO)\n",
"delete_datetime = end_datetime-timedelta(hours=NUM_OF_HOURS_ON_VIEW)\n"
]
},
{
"cell_type": "code",
"execution_count": 68,
"metadata": {},
"outputs": [],
"source": [
"def get_latest_ts_from_points_layer(layer = realtime_timeseries_points):\n",
" query = layer.query(where=\"1=1\", out_fields=\"ts\", order_by_fields=\"ts DESC\")\n",
" if not query.features: #No features\n",
" return None \n",
" latest_timestamp = query.features[0].attributes[\"ts\"]\n",
" latest_datetime = datetime.fromtimestamp(latest_timestamp / 1000, tz= TZINFO) #Need to divide on Windows\n",
" return latest_datetime\n"
]
},
{
"cell_type": "code",
"execution_count": 69,
"metadata": {},
"outputs": [],
"source": [
"def get_latest_ts_from_points_layer(layer = realtime_timeseries_points):\n",
" # Making sid = 1 to imporve performance in not having to sort through all the data - may lead to slight time errors\n",
" query = layer.query(where=\"sid='1'\", out_fields=\"ts\", order_by_fields=\"ts DESC\")\n",
" if not query.features: #No features\n",
" return None \n",
" latest_timestamp = query.features[0].attributes[\"ts\"]\n",
" latest_datetime = datetime.fromtimestamp(latest_timestamp / 1000, tz= TZINFO) #Need to divide on Windows\n",
" return latest_datetime\n",
"\n",
"def get_hours_before():\n",
" if 1 <= HOURS_TO_UPDATE <= NUM_OF_HOURS_ON_VIEW:\n",
" return HOURS_TO_UPDATE\n",
" latest_point = get_latest_ts_from_points_layer()\n",
" if not latest_point:\n",
" return NUM_OF_HOURS_ON_VIEW\n",
" time_difference = end_datetime - latest_point\n",
" hours_difference = (time_difference.seconds * 60 * 60)\n",
" if hours_difference > NUM_OF_HOURS_ON_VIEW:\n",
" return NUM_OF_HOURS_ON_VIEW\n",
" return hours_difference\n",
"\n",
"def get_points_df_from_3rww_api():\n",
" request_url = urljoin(RAINFALL_DATA_API_URL, \"rainfall/v3/realtime/ago/\")\n",
" hours_ago = get_hours_before()\n",
" response = requests.get(\n",
" url = request_url,\n",
" params = {\"hours\": hours_ago}\n",
" )\n",
" response_json = response.json()\n",
" points_df = pd.DataFrame(response_json[\"data\"], columns=response_json[\"columns\"] )\n",
" points_df[\"SHAPE\"] = points_df.apply(lambda r: {\"x\": r[\"x\"], \"y\": r[\"y\"], \"spatialReference\" : {\"wkid\": 4326}}, axis=1)\n",
" points_df.drop(columns=[\"x\", \"y\"])\n",
"\n",
" return points_df\n",
"\n",
"def add_points_to_AGOL_realtime_timeseries_layer(layer = realtime_timeseries_points, df_to_add = None):\n",
" all_added = []\n",
" for i in tqdm(range(0, len(df_to_add.index), CHUNK_SIZE)):\n",
" #print(f\"adding {i} to {i+chunks}\")\n",
" adds = (df_to_add.loc[i:i+CHUNK_SIZE,]).copy()\n",
" add_results = layer.edit_features(\n",
" adds=adds.spatial.to_featureset(), \n",
" rollback_on_failure=True\n",
" )\n",
" added = [x for x in add_results.get('addResults', []) if x.get('success')]\n",
" if added:\n",
" all_added.extend(added)\n",
" else:\n",
" print(f\"Error adding records @ batch {i+CHUNK_SIZE}\")\n",
" print(add_results)\n",
"\n",
"def get_object_ids_to_remove(delete_datetime):\n",
" query_string = f\"ts < '{delete_datetime}'\"\n",
" delete_points_query = realtime_timeseries_points.query(where = query_string, return_ids_only=True)\n",
" delete_points_objectids_list = [str(objectId) for objectId in delete_points_query[\"objectIds\"]]\n",
" return delete_points_objectids_list\n",
" \n",
"def delete_points_from_AGOL_realtime_timeseries_layer(layer = realtime_timeseries_points, delete_points_objectids_list = []):\n",
" all_removed = []\n",
" for i in tqdm(range(0, len(delete_points_objectids_list), CHUNK_SIZE)):\n",
" delete_results = layer.edit_features(\n",
" deletes = delete_points_objectids_list[i:i+CHUNK_SIZE],\n",
" rollback_on_failure = True,\n",
" )\n",
" removed = [x for x in delete_results.get('deleteResults', []) if x.get('success')]\n",
" if removed:\n",
" all_removed.extend(removed)\n",
" else:\n",
" print(f\"Error removing records @ batch {i+CHUNK_SIZE}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def add_and_delete_points_on_AGOL_realtime_timeseries_layer():\n",
" delete_object_ids = get_object_ids_to_remove(delete_datetime)\n",
" delete_points_from_AGOL_realtime_timeseries_layer(delete_points_objectids_list=delete_object_ids)\n",
"\n",
" points_df = get_points_df_from_3rww_api()\n",
" add_points_to_AGOL_realtime_timeseries_layer(df_to_add=points_df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"add_and_delete_points_on_AGOL_realtime_timeseries_layer()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "arcgispro-py3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}