-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
95 lines (75 loc) · 2.78 KB
/
auth.py
File metadata and controls
95 lines (75 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import binascii
import hashlib
from typing import Optional, Tuple
from uuid import uuid4
from sqlmodel import select, or_
from starlette.authentication import (
AuthCredentials,
AuthenticationBackend,
SimpleUser,
)
from db import *
class BasicAuthBackend(AuthenticationBackend):
def _simple_auth(self, db: SessionDep, auth):
user = auth["user"]
password_hash = hash_password(auth["pass"])
auth_user = db.exec(
select(AuthUser)
.where(AuthUser.user == user)
.where(AuthUser.password == password_hash)
).first()
if auth_user is None:
return
return AuthCredentials(auth_user.scopes.split(",")), SimpleUser(auth_user.user)
async def authenticate(self, conn):
with Session(engine) as db:
auth = conn.session.get("auth")
if auth is None:
headers = conn.headers
if "user" in headers and "pass" in headers:
return self._simple_auth(db, headers)
print("no auth in session, skipping")
return
if "sessionid" not in auth:
print("no sessionid in session, skipping")
conn.session.clear()
return
auth_user = db.exec(
select(AuthUser)
.join(AuthSession, AuthUser.user == AuthSession.user)
.where(AuthSession.sessionid == auth["sessionid"])
).first()
if auth_user is None:
print("user has sessionid but it's not valid")
conn.session.clear()
return
return AuthCredentials(auth_user.scopes.split(",")), SimpleUser(auth_user.user)
def hash_password(password: str) -> str:
dk = hashlib.pbkdf2_hmac("sha256", password.encode(), b"salt", 100000)
return binascii.hexlify(dk).decode()
def try_login(
userid: str, password: str, session: SessionDep
) -> Optional[Tuple[AuthUser, AuthSession]]:
password_hash = hash_password(password)
db_user = session.exec(
select(AuthUser)
.join(User, User.user == AuthUser.user)
.where(or_(AuthUser.user == userid, User.name == userid))
.where(AuthUser.password == password_hash)
).first()
if db_user is None:
return None, None
sessionid = uuid4().hex
new_session = AuthSession(sessionid=sessionid, user=db_user.user)
session.add(new_session)
session.commit()
return db_user, new_session
def try_logout(sessionid: str, session: SessionDep) -> bool:
auth_session = session.exec(
select(AuthSession).where(AuthSession.sessionid == sessionid)
).first()
if auth_session is None:
return False
session.delete(auth_session)
session.commit()
return True