-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
94 lines (72 loc) · 2.78 KB
/
main.py
File metadata and controls
94 lines (72 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import uvicorn
import httpx
from fastapi import FastAPI, Request, BackgroundTasks
from starlette.middleware import Middleware
from utils import (
InteractionType,
InteractionResponseType,
InteractionResponseFlags,
CustomHeaderMiddleware,
)
app = FastAPI(middleware=[Middleware(CustomHeaderMiddleware)])
async def process_interaction(json_data: dict):
"""
Runs AFTER the interaction has been ACKed.
Sends responses using webhook follow-ups.
"""
interaction_token = json_data["token"]
application_id = json_data["application_id"]
webhook_url = (
f"https://discord.com/api/v10/webhooks/"
f"{application_id}/{interaction_token}"
)
result = None
async with httpx.AsyncClient() as client:
# Slash commands
if json_data["type"] == InteractionType.APPLICATION_COMMAND:
from core import CommandHandler
handler = CommandHandler(json_data)
result = await handler.execute()
# Message components (buttons, selects)
elif json_data["type"] == InteractionType.MESSAGE_COMPONENT or json_data["type"]==InteractionType.MODAL_SUBMIT:
from core import ComponentHandler
handler = ComponentHandler(json_data)
result = await handler.execute()
# Send handler result if present
if result:
await client.post(webhook_url, json=result)
else:
import json
with open("sample2.json", "w", encoding="utf-8") as f:
json.dump(json_data, f, indent=4, sort_keys=True)
# Fallback message
await client.post(
webhook_url,
json={
"content": "Hmmm, I am not programmed to respond to that.The command could be temporarily disabled.",
"flags": InteractionResponseFlags.EPHEMERAL,
},
)
@app.post("/interactions")
async def interactions(request: Request, background: BackgroundTasks):
# Modals need immediate response
json_data = await request.json()
if json_data.get('data').get('custom_id')=="recruit":
from modals import text_input
await text_input(json_data)
return
# 1️⃣ PING — respond immediately
if json_data["type"] == InteractionType.PING:
return {"type": InteractionResponseType.PONG}
# 2️⃣ ACK immediately (DEFER)
background.add_task(process_interaction, json_data)
return {
"type": InteractionResponseType.DEFERRED_CHANNEL_MESSAGE_WITH_SOURCE
}
@app.post("/recruit")
async def recruit():
from recruit import sendSoldier
await sendSoldier()
return "Success",200
if __name__ == "__main__":
uvicorn.run(app)