-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
485 lines (380 loc) · 16.9 KB
/
Copy pathdatabase.py
File metadata and controls
485 lines (380 loc) · 16.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
import sqlite3, os, hashlib, pathlib
from typing import List, Optional
from encryption import encrypt, decrypt_account_list, decrypt_nested_account_list, encrypt_account_dict
# Exceptions
class VaultAlreadyExistsException(Exception):
pass
class InvalidPasswordException(Exception):
pass
class InvalidVaultException(Exception):
pass
class AccountAlreadyExistsException(Exception):
pass
class AccountDoesNotExistException(Exception):
pass
def custom_hash(input_string):
'''Returns a hashed password to store in the db'''
sha256 = hashlib.sha256()
sha256.update(input_string.encode('utf-8'))
return sha256.hexdigest()
def db_name(name:str):
'''Returns the name version that can be a table name in the db'''
name_elements = name.split(" ")
shaped_name = "_".join(name_elements).lower()
return shaped_name
def db_path() -> str | pathlib.Path:
'''Returns the db path for the conn'''
def db_folder() -> str:
"""Returns the default directory of the folder of the database
Returns:
(str | Path): The folder name of the Database
"""
return pathlib.Path(os.getcwd())
DB_NAME = "database.db"
DB_FOLDER = db_folder()
DB_PATH = os.path.join(DB_FOLDER, DB_NAME)
return DB_PATH
def show_name(name:str) -> str:
'''Returns a string that reverses the the effect of db_name'''
name_elements = name.split("_")
shaped_name = " ".join(name_elements).capitalize()
return shaped_name
def create_db() -> None:
"""
Creates 2 tables.
1. The default_vault (stores multiple accounts and a vault_id for each account)
2. The vaults (stores all the vault_names with their id)
3. The default vault table, named "default_vault" where the first accounts of the user can saved
"""
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
# Create the Users table
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
row_id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
vaults_ids TEXT -- Stores a JSON
);
""")
# Create the Vaults table
cur.execute("""
CREATE TABLE IF NOT EXISTS vaults (
vault_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
password TEXT NOT NULL
);
""")
# Create the default vault table
try:
cur.execute("""
CREATE TABLE default_vault (
account_id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT,
email TEXT,
password TEXT,
service TEXT
);
""")
# Add the default_table into the vaults table with a random password
cur.execute("""INSERT INTO vaults (name, password) VALUES (?, ?)""", ("default_vault", custom_hash("thanos"))) #! The key here has to change
except sqlite3.OperationalError as e: # For the case that default_vault exists already and for some reason (probably I will be stupid) the create_db gets called again
print(e)
conn.commit()
def create_vault_table(vault_name:str, vault_pass:str) -> None:
"""Creates a new vault table in the database and adds its id to the vaults table
Args:
vault_name (str): The name of the new vault in regular form
vault_pass (str): The password of the vault
Returns:
_description_: None
Raises:
_type_: VaultAlreadyExistsException
"""
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
# Create the new vault table
try:
# Start a transaction
cur.execute("BEGIN TRANSACTION;")
# Check if the vault already exists
cur.execute("SELECT COUNT(*) FROM vaults WHERE name = ?", (db_name(vault_name),))
if cur.fetchone()[0] > 0:
raise VaultAlreadyExistsException(f"{db_name(vault_name)} table already exists")
# Insert the new vault into the 'vaults' table
cur.execute("INSERT INTO vaults (name, password) VALUES (?, ?)", (db_name(vault_name), custom_hash(vault_pass),))
# Create a new table for the vault with the corresponding 'vault_id'
cur.execute(f"""
CREATE TABLE {db_name(vault_name)} (
account_id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT,
email TEXT,
password TEXT,
service TEXT
)
""")
# Commit the transaction
conn.commit()
except sqlite3.OperationalError as e:
print(f"An error occurred: {e}")
# Rollback the transaction if an error occurs
conn.rollback()
except VaultAlreadyExistsException as e:
print(e)
conn.rollback()
def get_all_vaults() -> List[str]:
'''Returns a list with all the vaults saved in the database'''
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
query = '''SELECT name FROM sqlite_master WHERE type='table';'''
cur.execute(query)
vaults = cur.fetchall()
vaults = [show_name(x[0]) for x in vaults if (x[0] != "sqlite_sequence")]
return vaults
def check_for_valid_vault_password(vault_name:str, vault_pass:str) -> bool:
"""Checks if the hash of vault_pass is equal to the hash that is being stored in the vaults table
Args:
vault_name (str): The name of the vault in regular form
vault_pass (str): The vault of the password to be tested if matched
Returns:
bool: true or false
Raises:
_description_: InvalidPasswordException
"""
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
cur.execute("SELECT password FROM vaults WHERE name = ?", (db_name(vault_name), ) )
hashed_pass = cur.fetchone()
try:
if not hashed_pass:
# The vault doesn't exists
raise InvalidPasswordException("The vault doesn't exists")
if custom_hash(vault_pass) == hashed_pass[0]:
return True
except InvalidPasswordException as e:
pass
return False
def check_for_duplicate_account(vault_name: str, account_dict:dict[str, str]) -> bool:
"""Returns true if the account already exists in vault with name vault_name else returns false.
A duplicate is considered an account with the same combination of email and service as most online
services do not allow to create a 2 account with 2 emails.
Args:
vault_name (str): Name of the vault
account_dict (dict[str, str]): The dictionary containing the username email password and service as keys
NOTE it's important to pass the encrypted data here
Returns:
bool: True or False
"""
email = account_dict['email']
service = account_dict['service']
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
try:
# Check if the vault table exists
cur.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{db_name(vault_name)}';")
if cur.fetchone() is None:
raise InvalidVaultException(f"Vault with ID '{db_name(vault_name)}' does not exist.")
# Search for the account by email
cur.execute(f"""
SELECT account_id, username, email, password, service
FROM {db_name(vault_name)}
WHERE email LIKE ? AND service LIKE ?;
""", (email, service, ))
# Fetch the matching account
result = cur.fetchone()
if result is None:
return False # The account does not exists in the vault
return True # The account was found
except sqlite3.Error as e:
print(f"An error occurred: {e}")
return False
except InvalidVaultException as e:
print(e)
return False
def save_account(vault_name:str, vault_pass:str, account_dict: dict) -> Optional[str | dict]:
"""Unpacks the account_dict of the account details and saves them into the database.
Args:
vault_name (str): The name of the vault
vault_pass (str): The password of the vault
account_dict (dict): A list in the form of [username, email, password, service]
Returns:
str | dict: Returns the error messages | encrypted account_dict
"""
# NOTE LOOK into the format
username = account_dict['username']
email = account_dict['email']
password = account_dict['password']
service = account_dict['service']
key = custom_hash(vault_pass)
enc_account_dict = {
"username": encrypt(key, username),
"email": encrypt(key, email),
"password": encrypt(key, password),
"service": encrypt(key, service)
}
enc_username = enc_account_dict["username"]
enc_email = enc_account_dict["email"]
enc_password = enc_account_dict["password"]
enc_service = enc_account_dict["service"]
try:
# Check if the vault password is matching
is_vault_pass_valid = check_for_valid_vault_password(vault_name, vault_pass)
if not is_vault_pass_valid:
raise InvalidPasswordException("Wrong Password")
# Check if the account already exists in the vault
is_account_duplicate = check_for_duplicate_account(vault_name, enc_account_dict)
if is_account_duplicate:
raise AccountAlreadyExistsException("The account is already stored in the vault")
except InvalidPasswordException as e:
print(e)
return str(e)
except AccountAlreadyExistsException as e:
print(e)
return str(e)
try:
# Connect to the db and insert the account into the correct vault
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
cur.execute("BEGIN TRANSACTION;")
cur.execute(f"""INSERT INTO {db_name(vault_name)}
(username, email, password, service)
VALUES (?,?,?,?)"""
,(enc_username, enc_email, enc_password, enc_service,)
)
conn.commit()
except sqlite3.OperationalError as e:
print(e)
return str(e)
# Actual storing of the account in the corresponding vault of the db
def get_vault_id(vault_name:str) -> int:
"""Gets the vault_id of the vault_name specified
Args:
vault_name (str): The regular name of the vault table
Returns:
int: The vault_id
"""
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
cur.execute(f"""SELECT vault_id FROM vaults WHERE name = ?""", (db_name(vault_name), ) )
vault_id = cur.fetchone()[0]
if not vault_id:
raise InvalidVaultException("This vaults does not exist in vaults")
print(f"vault_name={db_name(vault_name)}, vault_id={vault_id}", )
return vault_id
def get_all_accounts_from_vault(vault_name:str, vault_pass:str) -> list[dict[str, str]]:
"""Decrypts all of the contents of the vault and returns them as a list of nicely mapped dictionaries
Args:
vault_name (str): The name of the vault in regular format
vault_pass (str): The password of the vault
Returns:
_type_: list[dict[str, str]]
_description_: Returns a list of the decrypted accounts of the vault
"""
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
key = custom_hash(vault_pass)
cur.execute(f'''SELECT * FROM {db_name(vault_name)}''')
contents = cur.fetchall()
dec_content = decrypt_nested_account_list(key, contents)
return dec_content
def get_account_from_vault(vault_name: str, vault_pass: str, account_dict: dict[str, str]) -> dict[str, str]:
"""Returns the decrypted version of the account stored in the db. The account_dict
Args:
vault_name (str): The name of the vault
vault_pass (str): The password of the vault
account_dict (dict[str, str]): The standard account_dict with
(account_id:optional, username:optional, email:VITAL, password:optional, service:VITAL) as
its keys. NOTE you need to input the unencrypted dictionary
Returns:
_type_: dict[str, str]
_description_: Returns the unencrypted account off of the vault. If it exists. If not returns the equivalent error message
"""
# Check if vault_pass is valid
is_valid_vault_pass = check_for_valid_vault_password(vault_name, vault_pass)
try:
if not is_valid_vault_pass:
raise InvalidPasswordException("The password of the vault is not correct.")
except InvalidPasswordException as e:
print(e)
return str(e)
# Encrypt the dictionary for the search. I only really need email and service for that purpose
key = custom_hash(vault_pass)
enc_email = encrypt(key, account_dict["email"])
enc_service = encrypt(key, account_dict["service"])
enc_account_dict = {
"email": enc_email,
"service": enc_service
}
# Check if the account exists
does_account_exists_in_vault = check_for_duplicate_account(vault_name, enc_account_dict)
try:
if not does_account_exists_in_vault:
raise AccountDoesNotExistException("The account you are looking for does not exist.")
except AccountDoesNotExistException as e:
return str(e)
# Connect to the db
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
# Search for the matching encrypted account
cur.execute(f"""SELECT * FROM {db_name(vault_name)}
WHERE email LIKE ? AND service LIKE ?""",
(enc_email, enc_service, ) )
enc_account_list = cur.fetchone()
# Decrypt the account
account_dict = decrypt_account_list(key, enc_account_list)
return account_dict
# Return the decrypted account
def drop_vault(vault_name:str):
'''Deletes a vault(table) from the database'''
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
# is_correct_pass = check_for_valid_vault_password(vault_name, vault_pass)
# if not is_correct_pass:
# return
cur.execute("BEGIN TRANSACTION;")
try:
cur.execute("DELETE FROM vaults WHERE name = ?", (db_name(vault_name), ))
cur.execute(f'''DROP TABLE {db_name(vault_name)}''')
except sqlite3.Error as e:
print(e)
conn.rollback()
conn.commit()
def drop_all_vaults():
'''Drops all the vaults'''
vaults = get_all_vaults()
for vault in vaults:
drop_vault(vault)
def delete_account(vault_name:str, vault_pass:str, account_dict:dict[str, str]) -> str | None:
"""Deletes the account provided from the vault, if it exists.
Args:
vault_name (str): The name of the vault
vault_pass (str): The password of the vault
account_dict (_type_): A dictionary with the following keys -->
(account_id:optional, username:optional, email:VITAL, password:optional, service:VITAL)
Returns:
_type_: str
_description_: Returns only when there is an exception and it returns the error message
"""
key = custom_hash(vault_pass)
enc_account_dict = encrypt_account_dict(key, account_dict)
enc_email_to_delete = enc_account_dict["email"]
enc_service_to_delete = enc_account_dict["service"]
# Check if the account is stored in the vault
is_account_in_vault = check_for_duplicate_account(vault_name, enc_account_dict)
try:
if not is_account_in_vault:
raise AccountDoesNotExistException("The account you are trying to delete doesn't exist.")
except AccountDoesNotExistException as e:
print(e)
return str(e)
with sqlite3.connect(db_path()) as conn:
cur = conn.cursor()
cur.execute("BEGIN TRANSACTION;")
try:
cur.execute(f"""DELETE FROM {db_name(vault_name)}
WHERE email LIKE ? AND service LIKE ?"""
, (enc_email_to_delete, enc_service_to_delete, ) )
except sqlite3.OperationalError as e:
conn.rollback()
return str(e)
conn.commit()