This repository was archived by the owner on Dec 15, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_manager.py
More file actions
88 lines (71 loc) · 2.66 KB
/
data_manager.py
File metadata and controls
88 lines (71 loc) · 2.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os
import psycopg2
import psycopg2.extras
def establish_connection(connection_data=None):
"""
Create a database connection based on the :connection_data: parameter
:connection_data: Connection string attributes
:returns: psycopg2.connection
"""
if connection_data is None:
connection_data = get_connection_data()
try:
connect_str = os.environ.get('DATABASE_URL')
conn = psycopg2.connect(connect_str)
conn.autocommit = True
except psycopg2.DatabaseError as e:
print("Cannot connect to database.")
print(e)
else:
return conn
def get_connection_data(db_name=None):
"""
Give back a properly formatted dictionary based on the environment variables values which are started
with :MY__PSQL_: prefix
:db_name: optional parameter. By default it uses the environment variable value.
"""
if db_name is None:
db_name = os.environ.get('MY_PSQL_DBNAME')
return {
'dbname': db_name,
'user': os.environ.get('MY_PSQL_USER'),
'host': os.environ.get('MY_PSQL_HOST'),
'password': os.environ.get('MY_PSQL_PASSWORD')
}
def execute_select(statement, variables=None, fetchall=True):
"""
Execute SELECT statement optionally parameterized.
Use fetchall=False to get back one value (fetchone)
Example:
> execute_select('SELECT %(title)s; FROM shows', variables={'title': 'Codecool'})
statement: SELECT statement
variables: optional parameter dict, optional parameter fetchall"""
result_set = []
with establish_connection() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute(statement, variables)
result_set = cursor.fetchall() if fetchall else cursor.fetchone()
return result_set
def get_connection_string():
if os.environ.get('DATABASE_URL'):
return os.environ.get('DATABASE_URL')
else:
raise KeyError('Some necessary environment variable(s) are not defined')
def open_database():
try:
connection_string = get_connection_string()
connection = psycopg2.connect(connection_string)
connection.autocommit = True
except psycopg2.DatabaseError as exception:
print('Database connection problem')
raise exception
return connection
def connection_handler(function):
def wrapper(*args, **kwargs):
connection = open_database()
dict_cur = connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
ret_value = function(dict_cur, *args, **kwargs)
dict_cur.close()
connection.close()
return ret_value
return wrapper