-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgithub_webhook.py
More file actions
126 lines (100 loc) · 3.8 KB
/
Copy pathgithub_webhook.py
File metadata and controls
126 lines (100 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python3
"""GitHub webhook listener — auto-pull job-hunter repo on push to main."""
import hashlib
import hmac
import json
import logging
import os
import subprocess
import sys
from http.server import BaseHTTPRequestHandler, HTTPServer
REPO_PATH = os.environ.get("WEBHOOK_REPO_PATH", "/root/DEV/job_1")
BRANCH = os.environ.get("WEBHOOK_BRANCH", "main")
PORT = int(os.environ.get("WEBHOOK_PORT", "9000"))
SECRET_PATH = os.environ.get("WEBHOOK_SECRET_PATH", "/root/.config/github-webhook/secret")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
stream=sys.stdout,
)
log = logging.getLogger("github-webhook")
def load_secret() -> bytes:
with open(SECRET_PATH, "rb") as f:
return f.read().strip()
SECRET = load_secret()
def verify(signature_header: str | None, body: bytes) -> bool:
if not signature_header or not signature_header.startswith("sha256="):
return False
expected = "sha256=" + hmac.new(SECRET, body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature_header)
def git_pull() -> tuple[bool, str]:
try:
result = subprocess.run(
["git", "-C", REPO_PATH, "pull", "--ff-only", "origin", BRANCH],
capture_output=True,
text=True,
timeout=60,
)
output = (result.stdout + result.stderr).strip()
return result.returncode == 0, output
except subprocess.TimeoutExpired:
return False, "git pull timed out"
except Exception as e:
return False, f"git pull error: {e}"
class Handler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
log.info("%s - %s", self.address_string(), fmt % args)
def _respond(self, code: int, body: str) -> None:
self.send_response(code)
self.send_header("Content-Type", "text/plain")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body.encode())
def do_GET(self):
if self.path == "/health":
self._respond(200, "ok\n")
else:
self._respond(404, "not found\n")
def do_POST(self):
if self.path != "/webhook":
self._respond(404, "not found\n")
return
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length) if length > 0 else b""
signature = self.headers.get("X-Hub-Signature-256")
if not verify(signature, body):
log.warning("signature check failed from %s", self.client_address[0])
self._respond(401, "invalid signature\n")
return
event = self.headers.get("X-GitHub-Event", "")
if event == "ping":
log.info("ping received")
self._respond(200, "pong\n")
return
if event != "push":
log.info("ignoring event: %s", event)
self._respond(200, f"ignored: {event}\n")
return
try:
payload = json.loads(body)
except json.JSONDecodeError:
self._respond(400, "bad json\n")
return
ref = payload.get("ref", "")
expected_ref = f"refs/heads/{BRANCH}"
if ref != expected_ref:
log.info("ignoring ref: %s (expected %s)", ref, expected_ref)
self._respond(200, f"ignored ref: {ref}\n")
return
log.info("push to %s received, pulling...", ref)
ok, output = git_pull()
log.info("git pull %s: %s", "ok" if ok else "FAILED", output)
self._respond(200 if ok else 500, output + "\n")
def main() -> None:
log.info(
"github-webhook listening on 0.0.0.0:%d (repo=%s branch=%s)",
PORT, REPO_PATH, BRANCH,
)
HTTPServer(("0.0.0.0", PORT), Handler).serve_forever()
if __name__ == "__main__":
main()