forked from gsathya/centinel-server
-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathscheduler.py
More file actions
269 lines (222 loc) · 9.71 KB
/
scheduler.py
File metadata and controls
269 lines (222 loc) · 9.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
#!/usr/bin/env python
#
# Georgia Tech Spring 2014
#
# scheduler.py: script to manage experiments and data by finding
# appropriate clients, copying files into those directories, and
# potentially modifying the scheduling frequency information
#
# Note: you *MUST NOT* start more than 1 instance of this program at
# the same time. Doing so could cause race conditions in our scheduler
# and could result in your experiments not being run at the expected
# frequency
import argparse
from datetime import datetime, timedelta
import json
import os
import os.path
import sys
import config
from centinel.models import Client
# constants
DAYS_SINCE_ACTIVE = 30
def parse_args():
parser = argparse.ArgumentParser()
country_help = ('Two letter country code of the country to run the'
'experiment in')
parser.add_argument('--country', '-c', help=country_help, required=True)
client_help = ("Number of clients in the country to run the measurement "
"on. If this is not specified, the experiment will be "
"scheduled on all clients in the country")
parser.add_argument('--num-clients', '-n', help=client_help, default=None)
data_help = ("Data file for the clients to use. Note that this must be "
"paired with an experiment file that has the same name "
"(specified at the top of the class in the experiment file)")
parser.add_argument('--data', '-d', help=data_help, default=None)
exp_help = ("Experiment file for the clients to use. This should be a "
"Python file following the format as other experiments "
"(see the example template or ping.py for how to do this)")
parser.add_argument('--experiment', '-e', help=exp_help, default=None)
freq_help = ("How often the experiment should be run in minutes, i.e. "
"enter 60 to run the experiment every hour. If no "
"frequency information is specified, the measurement will "
"run every time the client does a measurement")
parser.add_argument('--frequency', '-f', help=freq_help, default=None)
remove_help = ("Remove the experiment or data file with the specified "
"name from the target clients. Note that this option will "
"be applied to both data and experiments if both the -d "
"and -e options are specified")
parser.add_argument('--remove', '-r', help=remove_help, default=False,
action='store_true')
args = parser.parse_args()
if args.frequency is not None and args.experiment is None:
parser.error("Specifying the frequency is only a valid option if you "
"are specifying an experiment to run. You must specify "
"a new experiment to schedule as well with the -e option")
return args
def find_clients(country, num_clients):
"""Find num_clients active clients in the target country
Params:
country- two letter country code of target country
num_clients- number of clients to get. If this is None, then we
get all the active clients for that country
Note: we define active here as having seen the client in the past
month
"""
clients = []
potential_clients = Client.query.filter_by(country=country).all()
month_diff = timedelta(days=DAYS_SINCE_ACTIVE)
for client in potential_clients:
if client.last_seen < datetime.now() - month_diff:
continue
if num_clients is not None and len(clients) >= num_clients:
break
clients.append(client.username)
return clients
def copy_data(clients, data):
"""Copy the given data file so that it gets used by the clients
Params:
clients- the clients to copy the data file to
data- the data file to copy into each user's directory
"""
if not os.path.exists(data):
print "Error: invalid data file to copy from"
return
with open(data, 'r') as file_p:
content = file_p.read()
basename = os.path.basename(data)
for client in clients:
client_data_dir = os.path.join(config.inputs_dir, client)
if not os.path.exists(client_data_dir):
print ("Error: client data directory does not exist! "
"%s" % (client_data_dir))
continue
filename = os.path.join(client_data_dir, basename)
with open(filename, 'w') as file_p:
file_p.write(content)
def remove_data(clients, data):
"""Remove the given data file so that it does not get used by the clients
Params:
clients- the clients to copy the data file to
data- the data file basename to remove
"""
data = os.path.basename(data)
for client in clients:
filename = os.path.join(config.inputs_dir, client, data)
if os.path.exists(filename):
os.remove(filename)
def copy_exps(clients, exp):
"""Copy the given experiment so that it gets used by the clients
Params:
clients- the clients to copy the data file to
exp- the experiment to copy into each user's directory
"""
if not os.path.exists(exp):
print "Error: invalid experiment to copy from"
return
with open(exp, 'r') as file_p:
content = file_p.read()
basename = os.path.basename(exp)
for client in clients:
client_experiments_dir = os.path.join(config.experiments_dir, client)
if not os.path.exists(client_experiments_dir):
print ("Error: client experiments directory does not exist! "
"%s" % (client_data_dir))
continue
filename = os.path.join(client_experiments_dir, basename)
with open(filename, 'w') as file_p:
file_p.write(content)
def remove_exps(clients, exp):
"""Remove the given experiment so that it does not get used by the clients
Params:
clients- the clients to copy the data file to
exp- the experiment file basename to remove
"""
basename = os.path.basename(exp)
for client in clients:
filename = os.path.join(config.experiments_dir, client, basename)
if os.path.exists(filename):
os.remove(filename)
def copy_frequency(clients, freq, exp):
"""Schedule the given experiment to run at the frequency specified
Params:
clients- the clients to update the frequencies for
freq- how many minutes should elapse between runs, i.e. enter 60 to
run every hour
exp- the experiment to adjust the frequency for
"""
exp_name, _ = os.path.splitext(os.path.basename(exp))
for client in clients:
# if the experiment doesn't exist for that user, then don't
# adjust the frequency
exp_file = os.path.join(config.experiments_dir, client, exp)
if not os.path.exists(exp_file):
continue
filename = os.path.join(config.experiments_dir, client,
"scheduler.info")
freqs = {}
if os.path.exists(filename):
with open(filename, 'r') as file_p:
freqs = json.load(file_p)
freqs[exp_name] = {'last_run': 0, 'frequency': int(freq) * 60}
# Note: as mentioned in the first few introductory lines, this
# section presents a race condition if another instance of the
# scheduler is running at the same time and your experiment
# may not be scheduled
with open(filename, 'w') as file_p:
json.dump(freqs, file_p)
def remove_frequency(clients, exp):
"""Remove the given experiment from the scheduler
Params:
clients- the clients to update the frequencies for
exp- the experiment to adjust the frequency for
"""
exp = os.path.basename(exp)
for client in clients:
filename = os.path.join(config.experiments_dir, client,
"scheduler.info")
freqs = {}
if os.path.exists(filename):
with open(filename, 'r') as file_p:
freqs = json.load(file_p)
if freqs.get(exp) is not None:
del freqs[exp]
if freqs == {}:
os.remove(filename)
return
# Note: as mentioned in the first few introductory lines, this
# section presents a race condition if another instance of the
# scheduler is running at the same time and your experiment
# may not be scheduled
with open(filename, 'w') as file_p:
json.dump(freqs, file_p)
if __name__ == "__main__":
# Note: the argument parser takes care of default values for us,
# so we don't need to specify default values
args = parse_args()
# lookup the clients/ probes to use
clients = find_clients(args.country, args.num_clients)
# print the clients and return if we are not copying any files over
if args.experiment is None and args.data is None:
print ("You have not specified a data file or experiment to run on "
"the clients, so I will just print the clients usernames that "
"you would have scheduled experiments on")
for client in clients:
print "{0}".format(client)
sys.exit(0)
# copy the data files if necessary
if args.data is not None:
if args.remove:
remove_data(clients, args.data)
else:
copy_data(clients, args.data)
# copy the experiment files if necessary
if args.experiment is not None:
if args.remove:
remove_exps(clients, args.experiment)
remove_frequency(clients, args.experiment)
else:
copy_exps(clients, args.experiment)
# add the frequency info as appropriate
if not args.remove and args.frequency is not None:
copy_frequency(clients, args.frequency, args.experiment)