-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapplication.py
More file actions
132 lines (94 loc) · 3.88 KB
/
Copy pathapplication.py
File metadata and controls
132 lines (94 loc) · 3.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import datetime as dt
import subprocess
import os
import boto3
from app.archive_agent import ArchiveAgent, get_wait_time_from_env
from app.context import create_database, add_log_entry
import app.search_consent as search_consent
import app.config as config
s3_resource = boto3.resource('s3')
s3_client = boto3.client('s3')
def start_archive_agent():
add_log_entry('starting archive agent')
agent = ArchiveAgent(conn=config.DATABASE, wait_time=get_wait_time_from_env())
agent.start_async()
add_log_entry(agent.get_status())
def is_current():
"""determine whether existing SSL certificates are current
Notes:
'current' is defined by the application config var LE_REFRESH_RATE
Returns:
success flag as bool
"""
path = os.path.join(config.LEDIR, config.EBS_CERT_PREFIX)
if not os.path.exists(path):
return False
else:
files = os.listdir(path)
mx = lambda x: dt.datetime.fromtimestamp(os.path.getmtime(x))
now = dt.datetime.now()
if any([(now-mx(os.path.join(path, f))).days >= config.LE_REFRESH_RATE for f in files]):
return False
return True
def get_certs_from_s3():
"""download certificates that have been backed up to S3"""
cert_files = s3_client.list_objects_v2(
Bucket=config.EBS_BUCKET, Delimiter=',', Prefix=config.EBS_CERT_PREFIX
)
bucket = s3_resource.Bucket(config.EBS_BUCKET)
for f in cert_files['Contents'][1:]:
local_path = os.path.join(config.LEDIR, f['Key'])
local_dir = os.path.join('/', *local_path.split('/')[:-1])
if not os.path.exists(local_dir):
os.makedirs(local_dir, exist_ok=True)
bucket.download_file(f['Key'], local_path)
add_log_entry('ssl certificates downloaded from S3')
def backup_certs():
"""backup the issued SSL certificates to S3"""
root = os.path.join(config.LEDIR, config.EBS_CERT_PREFIX)
for path, dirs, files in os.walk(root):
for f in files:
fn = os.path.join(path, f)
key = '/'.join(fn.split('/')[2:])
s3_client.upload_file(fn, config.EBS_BUCKET, key)
add_log_entry('ssl certificates backed up to S3')
def configure_ssl_certs():
"""get a new set of SSL certificates from LetsEncrypt"""
os.chdir(config.WORKING_DIR)
if not is_current():
get_certs_from_s3()
code = subprocess.call(['service', 'httpd', 'stop'])
add_log_entry(f'stopping http service finished with return code {code}')
dirs = ['config', 'log']
for d in dirs:
if not os.path.exists(os.path.join('.', d)):
os.makedirs(d, exist_ok=True)
# get the bot from s3, make it executable
bucket = s3_resource.Bucket(config.EBS_BUCKET)
bucket.download_file(config.CERTBOT_KEY, './certbot-auto')
code = subprocess.call(['chmod', 'a+x', 'certbot-auto'])
add_log_entry(f'changing certbot mode finished with return code {code}')
cli_args = [
'--non-interactive',
f'--email={config.CERTBOT_EMAIL}',
'--debug',
'--agree-tos',
'--standalone',
f'--domains {config.FQDN}',
'--keep-until-expiring',
'--logs-dir ./logs',
'--config-dir ./config',
]
code = subprocess.call(['sudo', './certbot-auto', 'certonly'] + cli_args)
add_log_entry(f'running certbot finished with return code {code}')
backup_certs()
code = subprocess.call(['service httpd start'])
add_log_entry(f'attempt to start httpd service finished with code {code}')
create_database(config.DATABASE)
if __name__ == '__main__':
application = search_consent.create_app(config, ssl=False, debug=True)
application.run(host='localhost', port=8080)
else:
configure_ssl_certs()
start_archive_agent()
application = search_consent.create_app(config)