-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbullfunc.py
More file actions
166 lines (154 loc) · 5.43 KB
/
bullfunc.py
File metadata and controls
166 lines (154 loc) · 5.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import sqlite3 as sql, time, functools
from typing import Dict, Any, Optional
class Bulletin:
def __init__(self, id: int):
"""
Initialize a Bulletin object given a row ID.
"""
with sql.connect("myop.db") as c:
c.row_factory = sql.Row
cur = c.cursor()
cur.execute("SELECT * FROM bulletins WHERE id = ?;", (id,))
bulletin = cur.fetchone()
if bulletin is None:
raise ReferenceError("Bulletin does not exist")
expires = bulletin["expires"]
if expires is not None and int(expires) <= int(time.time()):
cur.execute("DELETE FROM bulletins WHERE id = ?", (id,))
c.commit()
raise ReferenceError("Bulletin has expired")
self.id = bulletin["id"]
self.origin = bulletin["origin"]
self.timestamp = bulletin["timestamp"]
self.title = bulletin["title"]
self.body = bulletin["body"]
self.expires = bulletin["expires"]
self._deleted = False
return None
def isntdeleted(func):
"""
Wrapper to ensure property self._deleted is not true before executing a function on a row that doesn't exist.
ONLY WORKS ON INSTANCE METHODS,
NOT CLASS OR STATIC METHODS.
"""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if getattr(self, "_deleted", False):
raise ReferenceError("Bulletin was deleted")
return func(self, *args, **kwargs)
return wrapper
@isntdeleted
def edit(self, **kwargs):
allowed_columns = (
"title",
"body",
"origin",
"timestamp",
"timestampstr",
"expires"
)
columns = []
values = []
for key, value in kwargs.items():
if key not in allowed_columns:
raise ValueError(f"Key {key} not allowed")
columns.append(f"{key} = ?")
values.append(value)
if not columns: return self
sqlstmt = f"""
UPDATE bulletins
SET {", ".join(columns)}
WHERE id = ?"""
values.append(self.id)
with sql.connect("myop.db") as c:
cur = c.cursor()
cur.execute(sqlstmt, values)
c.commit()
return Bulletin(self.id)
@isntdeleted
def delete(self):
"""
Deletes the object.
Object properties are still accessible.
"""
with sql.connect("myop.db") as c:
cur = c.cursor()
cur.execute("DELETE FROM bulletins WHERE id = ?", (self.id,))
c.commit()
self._deleted = True
return self
@isntdeleted
def to_dict(self):
return {
"origin": self.origin,
"title": self.title,
"body": self.body,
"timestamp": self.timestamp,
"expires": self.expires,
"id": self.id
}
@classmethod
def new_bulletin(cls, title: str, origin: str, expiresin: int, body: str=""):
"""
Given the parameters of a bulletin, add a new bulletin to the table.
Returns a Bulletin object.
Note: expiresin is an integer in minutes.
"""
timestamp = time.time()
expires = time.time() + (60*int(expiresin))
with sql.connect("myop.db") as c:
cur = c.cursor()
cur.execute("INSERT INTO bulletins (title, body, origin, timestamp, expires) VALUES (?,?,?,?,?)",
(title,
body,
origin,
timestamp,
expires)
)
c.commit()
return cls(cur.lastrowid)
@classmethod
def from_row(cls, row: sql.Row):
obj = cls.__new__(cls)
obj._load_from_row(row)
obj._deleted = False
return obj
def _load_from_row(self, row: sql.Row):
self.origin = row["origin"]
self.title = row["title"]
self.body = row["body"]
self.timestamp = int(row["timestamp"])
self.expires = int(row["expires"]) if row["expires"] is not None else None
self.id = int(row["id"])
@classmethod
def purge_expired(cls, now: Optional[int] = None):
if now is None:
now = int(time.time())
with sql.connect("myop.db") as c:
cur = c.cursor()
cur.execute(
"DELETE FROM bulletins WHERE expires IS NOT NULL AND expires <= ?;",
(now,),
)
c.commit()
return cur.rowcount
@classmethod
def get_all_bulletins(cls):
cls.purge_expired()
with sql.connect("myop.db") as c:
c.row_factory = sql.Row
cur = c.cursor()
cur.execute("SELECT * FROM bulletins ORDER BY timestamp DESC;")
bulletins = [cls.from_row(row) for row in cur.fetchall()]
return bulletins
@classmethod
def filter_user(cls, user: str):
cls.purge_expired()
bulletins = []
with sql.connect("myop.db") as c:
c.row_factory = sql.Row
cur = c.cursor()
cur.execute("SELECT * FROM bulletins WHERE origin = ?", (user,))
for row in cur.fetchall():
bulletins.append(Bulletin(row["id"]))
return bulletins