-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
163 lines (132 loc) · 5.34 KB
/
main.py
File metadata and controls
163 lines (132 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import json
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, UUID4
import uuid
from uuid import UUID
from fastapi import FastAPI
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
from apscheduler.schedulers.background import BackgroundScheduler
import requests
import os
PERSISTENT_FILE = "/tmp/timers.txt"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class Timer(BaseModel):
uuid: UUID4
creation_time: datetime
webhook_call_time: datetime
webhook_url: str
class TimerRequest(BaseModel):
hours: int
minutes: int
seconds: int
webhook_url: str
timers: dict[str, Timer] = {}
# The task to run
def check_webhooks_to_send():
logging.info(f"Checking timers that should call the webhook")
# Use list() to create a snapshot to avoid dict size change during iteration
for uuid_field, timer in list(timers.items()):
if time_to_call_webhook(timer):
logging.info(f"Time to call webhook for timer {timer}")
call_webhook(timer)
write_or_remove_from_memories(timer, "remove")
def time_to_call_webhook(timer: Timer) -> bool:
return datetime.now() >= timer.webhook_call_time
def call_webhook(timer: Timer):
requests.post(timer.webhook_url)
def write_or_remove_from_memories(timer: Timer, mode: str):
import json
try:
if mode == "write":
logging.info(f'Adding timer {timer.uuid} to volatile and persistent memory')
timers[timer.uuid] = timer
elif mode == "remove":
logging.info(f'Removing timer {timer.uuid} from volatile and persistent memory')
timers.pop(timer.uuid, None)
# Write entire timers dict to file
with open(PERSISTENT_FILE, "w") as f:
timers_data = {str(k): v.model_dump() for k, v in timers.items()}
json.dump(timers_data, f, indent=2, default=str)
except Exception as e:
logging.error(f'Could not {mode} timer from persistent memory: {e}')
raise HTTPException(
status_code=500,
detail=f'{e}'
)
def load_timers_from_disk():
if not os.path.exists(PERSISTENT_FILE):
logging.info("No persistent file found, starting with empty timers")
return
try:
with open(PERSISTENT_FILE, "r") as f:
loaded_data = json.load(f)
loaded_timers = {UUID(k): Timer.model_validate(v) for k,v in loaded_data.items()}
timers.update(loaded_timers)
print(f'File had the following timers: {loaded_timers}')
except Exception as e:
logging.error("Could not get timers persisted on disk")
raise e
# Set up the scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(check_webhooks_to_send, 'interval', seconds=10) #TODO: set this every second so we don't miss
scheduler.start()
app = FastAPI()
# Upon app startup check disk for timers
load_timers_from_disk()
# Ensure the scheduler shuts down properly on application exit.
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
scheduler.shutdown()
@app.post("/set-timer")
def set_timer(request: TimerRequest):
try:
creation_time = datetime.now()
webhook_call_time = calculate_call_time(creation_time, request.hours, request.minutes, request.seconds)
timer = Timer(uuid=uuid.uuid4(),
creation_time=creation_time,
webhook_call_time=webhook_call_time,
webhook_url=request.webhook_url)
seconds_left = datetime_difference_seconds(timer.creation_time, timer.webhook_call_time)
logging.info(
f"Received Timer object {timer}, will send POST request to {timer.webhook_url} at {timer.webhook_call_time.date()} "
f"which is in {seconds_left} seconds")
write_or_remove_from_memories(timer, "write")
return {"id": timer.uuid}
except Exception as e:
logging.error("Could not set timer, error encountered: {e}")
raise HTTPException(
status_code=500,
detail=f"Could not set timer: {str(e)}"
)
def calculate_call_time(creation_time: datetime, hours: int, minutes: int, seconds: int) -> datetime:
return creation_time + timedelta(seconds=(seconds + minutes * 60 + hours * 3600))
def datetime_difference_seconds(first_date: datetime, second_date: datetime) -> int:
return int((second_date - first_date).total_seconds())
@app.post("/get-timer-status")
def get_timer_status(id: UUID4):
try:
if timers.get(id):
time_now = datetime.now()
webhook_time = timers.get(id).webhook_call_time
seconds_left = datetime_difference_seconds(time_now, webhook_time)
logging.info(f'Time left between {time_now} and {webhook_time} is {seconds_left}')
if seconds_left > 0 :
return {"id": id, "timeLeft": abs(seconds_left)}
else:
logging.info('Webhook expired')
return {"0"}
else:
logging.error(f'No uuid {id} exists')
return {f'UUID {id} not found'}
except Exception as e:
logging.error("Could not get timer sttatus: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to get timer status: {str(e)}"
)