-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobserve.py
More file actions
executable file
·57 lines (43 loc) · 1.61 KB
/
observe.py
File metadata and controls
executable file
·57 lines (43 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python3
import os
import requests
import sys
import signal
from requests.auth import HTTPBasicAuth
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
OBSERVER_PATH = os.environ["OBSERVER_PATH"]
NTFY_USER = os.environ["NTFY_USER"]
NTFY_PASS = os.environ["NTFY_PASS"]
NTFY_URL = os.environ["NTFY_URL"]
def on_created(event):
print(f"[NEW FILE] {event.src_path}")
with open(event.src_path, "rb") as f:
requests.put(NTFY_URL,
auth=HTTPBasicAuth(NTFY_USER, NTFY_PASS),
data=f,
headers={"Filename": os.path.basename(event.src_path)})
def handler(signum, frame):
signame = signal.Signals(signum).name
print(f"[SIGNAL] {signame} ({signum})")
def main(argc: int, argv: list[str]) -> None:
event_handler = PatternMatchingEventHandler(patterns=["*"],
ignore_patterns=None,
ignore_directories=False,
case_sensitive=True)
event_handler.on_created = on_created
observer = Observer()
observer.schedule(event_handler=event_handler,
path=OBSERVER_PATH,
recursive=False)
observer.start()
print("Observer started.")
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
signal.pause()
print("Stopping Observer...")
observer.stop()
observer.join()
print("Observer stopped.")
if __name__ == "__main__":
main(len(sys.argv), sys.argv)