-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapplication.py
More file actions
63 lines (56 loc) · 1.53 KB
/
application.py
File metadata and controls
63 lines (56 loc) · 1.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import tinyws
import tinyws.server
import uvicorn
import uvloop
from core import (
ControllerManager, RedisDatabaseManager,
PostgressDatabaseManager
)
from exceptions import GeneralException
from controllers import (
PartyController, ChatController,
)
from dotenv import dotenv_values
env = dotenv_values()
redis_database = RedisDatabaseManager(
database_host=env["REDIS_HOST"],
database_port=env["REDIS_PORT"]
)
postgress_database = PostgressDatabaseManager(
host=env["POSTGRESS_HOST"],
port=env["POSTGRESS_PORT"],
POSTGRESS_USERNAME=env["POSTGRESS_USERNAME"],
password=env["POSTGRESS_PASSWORD"]
)
redis_database.set_database(postgress_database)
party_controller = PartyController("/party")
chat_controller = ChatController("/chat")
controller = ControllerManager(
controllers=[
party_controller,
chat_controller,
],
redis_database=redis_database
)
@tinyws.server.app()
async def application(websocket: tinyws.WebSocket) -> None:
"""The main handler of the application."""
await websocket.accept()
while True:
try:
# read the data.
await controller.on_request(websocket)
except Exception as e:
if issubclass(e.__class__, (GeneralException, )):
print(e)
await websocket.send_json(
e.json_error()
)
else:
await controller.on_disconnect(websocket)
break
uvloop.install()
uvicorn.run(
app=application,
ws="wsproto",
)