This repository was archived by the owner on May 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpems.py
More file actions
77 lines (59 loc) · 2.13 KB
/
pems.py
File metadata and controls
77 lines (59 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from datetime import datetime
from typing import Dict
import anaximander as nx
import os
from requests import Session
from config import WINDOW_SIZE, WINDOW_PERIOD
BASE_URL = "https://pems.dot.ca.gov"
DISTRICT_ID = 4
class PeMSStation(nx.Entity):
id: int = nx.id()
district: int = nx.data()
freeway: str = nx.data()
direction: str = nx.data()
length: float = nx.data()
class Speed(nx.Measurement):
unit = "mph"
ge = 0
class Flow(nx.Measurement):
unit = "vph"
ge = 0
class PeMSSample(nx.Sample):
station_id: int = nx.key()
timestamp: datetime = nx.timestamp()
average_speed: Speed = nx.data(ge=0)
total_flow: Flow = nx.data(ge=0)
BASE_URL = "https://pems.dot.ca.gov"
@nx.source
@scheduler("1d")
@csv
@authentication(username=os.environ['USERNAME'], password=os.environ['PASSWORD'], url=BASE_URL)
def from_link_source(cls, session: Session, scheduled_at: datetime):
day_str = scheduled_at.strftime('%Y_%m_%d')
# get information about available data within a year
months = session.get(BASE_URL, params=
{"srq": "clearinghouse", "district_id": DISTRICT_ID, "yy": day_str[:4],
"type": "station_5min", "returnformat": "text"})
months.raise_for_status()
for files in months.json()['data'].values():
for file in files:
# if any file matches the date we want, download it
if any(day_str in file['file_name']):
print(f"Upload {file['file_name']}.")
return session.get(BASE_URL + file['url'])
class SegmentPeMsJournal(nx.Journal):
__station_to_segment: Dict
segment_id: int = nx.key()
timestamp: datetime = nx.timestamp()
aggregated_speed: Speed = nx.data(ge=0)
@nx.source
def from_sample(cls):
PeMSSample.map(
cls.__station_to_segment, field="station_id", new_field="segment_ids"
).splitter(field="segment_ids", new_field="segment_id").group_by_key(
key="segment_id"
).agg(
get_pems_feature
).sliding_window(
WINDOW_SIZE, WINDOW_PERIOD
)