forked from NCIOCPL/cdr-lib
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcdrpub.py
More file actions
2100 lines (1836 loc) · 80.6 KB
/
cdrpub.py
File metadata and controls
2100 lines (1836 loc) · 80.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Process a queued publishing job
The top-level entry point for this module is `Control.publish()`, which
calls `Control.__publish()`, wrapped in a try block to facilitate handling
of all failures in a central place. The latter method decides which of the
three basic publishing job types is being run (scripted, export, or push)
and handles the job appropriately. For scripted jobs the work is simply
handed off to the specified script by launching a separate process. For
an overview of the logic for the other two job types, see the methods
`Control.export_docs()` and `Control.push_docs()`.
"""
import argparse
import base64
import csv
import datetime
import glob
import io
import json
import os
import re
import subprocess
import shutil
import threading
import time
from lxml import etree, html
from lxml.html import builder as B
from PIL import Image
import cdr
from cdrapi import db
from cdrapi.docs import Doc
from cdrapi.publishing import Job, DrupalClient
from cdrapi.settings import Tier
from cdrapi.users import Session
from urllib.parse import urlparse
from copy import deepcopy
class Control:
"""
Top-level object for CDR publishing job processing
"""
SENDER = "NCIPDQOperator@mail.nih.gov"
PUSH = "Push_Documents_To_Cancer.Gov"
PUSH_STAGE = "pub_proc_cg_work"
RUN = "In process"
SUCCESS = "Success"
FAILURE = "Failure"
WAIT = "Waiting user approval"
COMPLETED_STATUSES = SUCCESS, FAILURE
XMLDECL = re.compile(r"<\?xml[^?]+\?>\s*")
DOCTYPE = re.compile(r"<!DOCTYPE[^>]*>\s*")
NORMALIZE_SPACE = re.compile(r"\s+")
EXCLUDED = ["Country"]
FAILURE_SLEEP = 5
PUB = "Publishing"
DEFAULT_BATCHSIZE = cdr.getControlValue(PUB, "batchsize", default=25)
DEFAULT_NUMPROCS = cdr.getControlValue(PUB, "numprocs", default=8)
MEDIA_TYPES = dict(
jpg="image/jpeg",
gif="image/gif",
mp3="audio/mpeg"
)
BROWSER_TITLE_MAX = 100
CTHP_CARD_TITLE_MAX = 100
DESCRIPTION_MAX = 600
ABOUT_THIS = "_section_AboutThis_1"
IN_THIS_SECTION = dict(
en="In This Section",
es="En esta secci\xf3n",
)
def __init__(self, job_id, **opts):
"""
Stash away what we'll need for running a publishing job
"""
self.__job_id = job_id
self.__opts = opts
# ------------------------------------------------------------------
# TOP-LEVEL PROCESSING METHODS.
# ------------------------------------------------------------------
def publish(self):
"""
Run the publishing job
Processing is passed off to a separate private method, which
we wrap in a `try` block to catch all exceptions.
"""
try:
self.__publish()
except Exception as e:
self.logger.exception("Job %d failure", self.job.id)
self.update_status(self.FAILURE, str(e))
if self.work_dir and os.path.isdir(self.work_dir):
os.rename(self.work_dir, self.failure_dir)
self.notify("Job failed: {}".format(e))
def __publish(self):
"""
Run the publishing job
The heavy lifting is extracted out to this private method so we
can trap all exceptions.
There are basically three kinds of publishing jobs:
1. scripted (the script embodies all of the logic for the job)
2. export (write filtered documents to the file syatem)
3. push (push results of an export job to the CMS and Akamai)
Push jobs have a `SubSetName` parameter to identify the type of
the corresponding export job. We use the presence of this parameter
to recognize push jobs.
"""
# Announce ourselves.
self.update_status(self.RUN, "Job started")
# 1. External script jobs completely handed off here.
if self.job.subsystem.script:
script = self.job.subsystem.script
if not os.path.isabs(script):
script = "{}:/cdr/{}".format(self.tier.drive, script)
if not os.path.isfile(script):
message = "Processing script {!r} not found".format(script)
raise Exception(message)
command = "{} {:d}".format(script, self.job.id)
if script.endswith(".py") and "python" not in command.lower():
command = " ".join([cdr.PYTHON, command])
self.logger.info("Launching %s", command)
os.system(command)
# The job is an export job or a push job.
else:
start = datetime.datetime.now()
self.logger.debug("Job %d parms=%s", self.job.id, self.job.parms)
args = self.job.id, self.job.subsystem.options
self.logger.debug("Job %d opts=%s", *args)
self.processed = set()
# 2. Export jobs don't have a `SubSetName` parameter.
if "SubSetName" not in self.job.parms:
self.export_docs()
verb = "Exported"
count = len(self.processed)
# 3. Otherwise, this is a push job.
else:
verb = "Pushed"
count = self.push_docs()
# Report the job's completion.
elapsed = (datetime.datetime.now() - start).total_seconds()
args = verb, count, elapsed
message = "{} {:d} documents in {:.2f} seconds".format(*args)
if self.job.parms.get("ReportOnly") == "Yes":
message += " (status set to failure for 'ReportOnly' job)"
self.update_status(self.FAILURE, message)
else:
self.update_status(self.SUCCESS, message)
# If documents were removed include this info in the push message
removed = self.count_removed_docs()
if removed and verb == "Pushed":
message += f"\nRemoved {removed} documents"
self.notify(message, with_link=True)
# Record documents published for the first time
self.record_first_pub()
# ------------------------------------------------------------------
# METHODS FOR EXPORT JOBS START HERE.
# ------------------------------------------------------------------
def export_docs(self):
"""
Filter and store the job's documents in the file system
There are two path for identifying documents to be exported.
One is by manually identifying each document by its unique ID.
The other is to run queries stored in the publishing control
document. In theory both could be used for the same job, but
in practice it's one or the other.
After exporting the documents, we create the corresponding
push job (assuming at least one document was successfully
exported, and we haven't been told to skip the push job).
Processing steps:
0. Housekeeping preparation
1. Export any manually selected documents
2. Export any query-selected documents
3. Write the media manifest if appropriate
4. Check error thresholds
5. Rename the output directory
6. Create push job if appropriate
"""
# 0. Housekeeping preparation
self.post_message("Start filtering/validating")
self.prep_export()
# 1. Export any manually selected documents
self.publish_user_selected_documents()
# 2. Export any query-selected document
self.publish_query_selected_documents()
# 3. Make sure we haven't blown any error threshold limits.
self.check_error_thresholds()
# 4. Write the media manifest if appropriate
self.write_media_manifest()
# 5. Rename the output directory
if os.path.isdir(self.work_dir):
os.rename(self.work_dir, self.output_dir)
# 6. Create the push job if appropriate
if not self.job.no_output:
if self.job.parms.get("ReportOnly") != "Yes":
self.create_push_job()
def record_first_pub(self):
"""
Populate the `document.first_pub` column where appropriate
Avoid populating the column for documents which pre-date
the CDR, because we have no way of knowing when those
were first published, as the legacy Oracle system did not
capture that information.
"""
self.cursor.execute("""\
UPDATE document
SET document.first_pub = pub_proc.started
FROM pub_proc
JOIN pub_proc_doc
ON pub_proc_doc.pub_proc = pub_proc.id
JOIN document
ON document.id = pub_proc_doc.doc_id
WHERE pub_proc.id = ?
AND pub_proc.status = ?
AND pub_proc_doc.removed != 'Y'
AND pub_proc_doc.failure IS NULL
AND document.first_pub IS NULL
AND document.first_pub_knowable = 'Y'""",
(self.job.id, self.SUCCESS))
count = self.cursor.rowcount
if count:
self.logger.info("Set first_pub for %d document(s)", count)
self.conn.commit()
def prep_export(self):
"""
Clean up from any previous jobs with this name/ID
If we're running on a lower tier which has been refreshed from
the production database, it's possible that the current job
number was already used. If so, move any directories left over
from such jobs out of the way. Let exceptions bubble up.
Similarly, we need to clear out the `export_spec` table
in case a previous attempt to run this job left rows in it.
Finally, create a way to remember which rows we have already
added to this table in this run, and a lock for controlling
access to things that can be changed by multiple threads.
"""
output_dir = self.output_dir
if "SubSetName" not in self.job.parms and output_dir:
for path in glob.glob(output_dir + "*"):
if os.path.isdir(path) and "-" not in os.path.basename(path):
stat = os.stat(path)
localtime = time.localtime(stat.st_mtime)
stamp = time.strftime("%Y%m%d%H%M%S", localtime)
new = "{}-{}".format(path, stamp)
self.logger.warning("Renaming {} to {}".format(path, new))
os.rename(path, new)
delete = "DELETE FROM export_spec WHERE job_id = ?"
self.cursor.execute(delete, (self.job.id,))
self.conn.commit()
self.spec_ids = set()
self.export_failed = False
self.lock = threading.Lock()
def publish_user_selected_documents(self):
"""
Export documents manually selected for this job
"""
self.logger.info("Processing user-selected documents")
for i, spec in enumerate(self.job.subsystem.specifications):
self.spec_id = i + 1
self.docs = []
for doc in self.job.docs:
if doc.id in self.processed:
continue
if spec.user_select_doctypes:
if doc.doctype.name not in spec.user_select_doctypes:
continue
self.docs.append("{}/{}".format(doc.id, doc.version))
self.processed.add(doc.id)
if self.docs:
self.launch_exporters(spec)
# Mark any documents left behind as failed.
for doc in self.job.docs:
if doc.id not in self.processed:
args = doc.doctype.name, doc.cdr_id
message = "{} doc {} not allowed by this job".format(*args)
self.logger.error(message)
self.cursor.execute("""\
UPDATE pub_proc_doc
SET failure = 'Y', messages = ?
WHERE pub_proc = ?
AND doc_id = ?""", (message, self.job.id, doc.id))
self.conn.commit()
def publish_query_selected_documents(self):
"""
Export documents not explicitly selected by the user for this job
Use multi-processing for performance.
"""
self.post_message("selecting documents")
for i, spec in enumerate(self.job.subsystem.specifications):
if spec.query is not None:
self.spec_id = i + 1
self.post_message("selecting {} documents".format(spec.name))
start = datetime.datetime.now()
docs = spec.select_documents(self)
elapsed = (datetime.datetime.now() - start).total_seconds()
name = "{} ".format(spec.name) if spec.name else ""
args = len(docs), name, elapsed
msg = "{:d} {}docs selected in {:.2f} seconds".format(*args)
self.post_message(msg)
self.docs = []
for doc in docs:
if doc.id not in self.processed:
self.docs.append("{:d}/{}".format(doc.id, doc.version))
self.processed.add(doc.id)
if self.docs:
self.launch_exporters(spec)
def launch_exporters(self, spec):
"""
Pass off the export work to separate processes
Pass:
spec - reference to `Job.Subsystem.Specification` object
which controls how we prepare the document
"""
# Communicate spec settings to processes via database tables
if self.spec_id not in self.spec_ids:
filters = [(f.filters, f.parameters) for f in spec.filters]
values = [self.job.id, self.spec_id, repr(filters)]
cols = ["job_id", "spec_id", "filters"]
if spec.subdirectory:
values.append(spec.subdirectory)
cols.append("subdir")
args = ", ".join(cols), ", ".join(["?"] * len(cols))
insert = "INSERT INTO export_spec ({}) VALUES ({})".format(*args)
self.cursor.execute(insert, tuple(values))
self.conn.commit()
self.spec_ids.add(self.spec_id)
# Determine how many processes to launch and batch size for each
self.next = 0
name = "{}-batchsize".format(spec.name)
default = self.DEFAULT_BATCHSIZE
batchsize = cdr.getControlValue(self.PUB, name, default=default)
self.batchsize = int(batchsize)
name = "{}-numprocs".format(spec.name)
default = self.__opts.get("numprocs") or self.DEFAULT_NUMPROCS
numprocs = cdr.getControlValue(self.PUB, name, default=default)
numprocs = min(int(numprocs), len(self.docs))
if self.batchsize * numprocs > len(self.docs):
self.batchsize = len(self.docs) // numprocs
if "batchsize" in self.__opts:
self.batchsize = self.__opts["batchsize"]
if "numprocs" in self.__opts:
numprocs = self.__opts["numprocs"]
# Create a separate thread to manage each external process
self.logger.info("Using %d parallel processes", numprocs)
threads = []
start = datetime.datetime.now()
for _ in range(numprocs):
threads.append(self.Thread(self))
for t in threads:
t.start()
for t in threads:
t.join()
elapsed = (datetime.datetime.now() - start).total_seconds()
args = len(self.docs), spec.name, elapsed
self.logger.info("exported %d %s docs in %.2f seconds", *args)
def check_error_thresholds(self):
"""
Make sure we haven't exceeded error thresholds
"""
if self.export_failed:
raise Exception("Export multiprocessing failure")
query = db.Query("pub_proc_doc d", "t.name", "COUNT(*) AS errors")
query.join("doc_version v", "v.id = d.doc_id", "v.num = d.doc_version")
query.join("doc_type t", "t.id = v.doc_type")
query.where("d.failure = 'Y'")
query.where(query.Condition("d.pub_proc", self.job.id))
query.group("t.name")
rows = query.execute(self.cursor).fetchall()
total_errors = 0
errors = dict([tuple(row) for row in rows])
for doctype in errors:
total_errors += errors[doctype]
name = "Max{}Errors".format(doctype)
threshold = self.job.parms.get(name)
if threshold is not None and int(threshold) < errors[doctype]:
args = threshold, doctype, errors[doctype]
message = "{} {} errors allowed; {:d} found".format(*args)
raise Exception(message)
threshold = self.job.subsystem.threshold
if threshold is not None and threshold < total_errors:
args = threshold, total_errors
message = "{:d} total errors allowed; {:d} found".format(*args)
raise Exception(message)
if len(self.processed) - total_errors < 1:
raise Exception("All documents failed export")
def write_media_manifest(self):
"""
Store information about each Media document exported by this job
The information is stored using comma-separated value format
"""
query = db.Query("media_manifest", "filename", "blob_date", "title")
query.where(query.Condition("job_id", self.job.id))
query.order("doc_id")
rows = query.execute(self.cursor).fetchall()
if rows and os.path.isdir(self.work_dir):
values = [(row[0], str(row[1])[:10], row[2]) for row in rows]
path = os.path.join(self.work_dir, "media_catalog.txt")
with open(path, "w", newline="", encoding="utf-8") as fp:
opts = dict(delimiter=",", quotechar='"')
writer = csv.writer(fp, **opts)
writer.writerows(values)
def create_push_job(self):
"""
Queue up a job to push the documents we just exported
"""
# First make sure there's something to push.
query = db.Query("pub_proc_doc", "COUNT(*) AS exported")
query.where(query.Condition("pub_proc", self.job.id))
query.where("failure IS NULL")
if query.execute(self.cursor).fetchone().exported:
parms = dict(
DrupalServer=self.job.parms.get("DrupalServer"),
InteractiveMode=self.job.parms.get("InteractiveMode", "No")
)
opts = dict(
system="Primary",
subsystem=f"{self.PUSH}_{self.job.subsystem.name}",
parms=parms,
email=self.job.email,
no_output=True
)
job = Job(self.session, **opts)
job.create()
self.logger.info("Job %d created push job %d", self.job.id, job.id)
else:
self.logger.warning("Job %d has nothing to push", self.job.id)
# ------------------------------------------------------------------
# METHODS FOR PUSH JOBS START HERE.
# ------------------------------------------------------------------
def push_docs(self):
"""
Send the most recent export job's document to the CMS and Akamai.
We also record the most recent copy of each document we have
exported, including documents which are not sent to the Drupal
CMS or to Akamai, in the pub_proc_cg table.
Processing steps:
1. Find the matching export job and queue its docs for pushing
2. Send the queued documents the CMS and to Akamai
3. Update the `pub_proc_cg` and `pub_proc_doc` tables
Return:
integer for count of "pushed" documents (excluding removals)
"""
# 1. Find the matching export job and queue its docs for pushing
self.prep_push()
# 2. Send the queued documents to the CMS and Akamai
self.send_docs()
# 3. Update the `pub_proc_cg` and `pub_proc_doc` tables
return self.record_pushed_docs()
def prep_push(self):
"""
Find the corresponding export job and queue its docs for pushing
"""
# Find the export job we need to push.
export_job = self.ExportJob(self)
# If this is a (drastic and VERY rare) full load, clear the decks.
# By 'rare' I mean there have only been three in the past couple of
# decades, and the last one was in 2007.
if self.job.parms.get("PubType") == "Full Load":
self.cursor.execute("DELETE pub_proc_cg")
self.conn.commit()
# Prepare the working table, unless we're trying again for this job.
if self.job.parms.get("RerunFailedPush") == "Yes":
update = "UPDATE pub_proc_cg_work SET cg_job = ?"
job_id = self.job.id
self.cursor.execute(update, (job_id,))
self.conn.commit()
self.logger.info("Job %d reprocessing existing work queue", job_id)
else:
self.stage_push_job(export_job)
# Some push jobs require explicit release by the operator.
if self.job.parms.get("InteractiveMode") == "Yes":
self.wait_for_approval()
def stage_push_job(self, export_job):
"""
Populate the `pub_proc_cg_work` table with documents to be pushed
Pass:
export_job - reference to `Control.ExportJob` object
"""
# Use a separate connection with a long timeout.
conn = db.connect(timeout=1000)
cursor = conn.cursor()
self.logger.info("Job %d clearing %s", self.job.id, self.PUSH_STAGE)
cursor.execute(f"DELETE FROM {self.PUSH_STAGE}")
conn.commit()
push_id = str(self.job.id)
# For 'Hotfix (Remove)' jobs all docs in pub_proc_doc are removals.
# Leaving the `xml` column NULL is what flags these as removals.
if self.job.parms["PubType"] == "Hotfix (Remove)":
args = self.job.id, self.PUSH_STAGE
self.logger.info("Job %d populating %s for Hotfix (Remove)", *args)
cols = "d.id", "p.doc_version", "p.pub_proc", push_id, "t.name"
query = db.Query("pub_proc_doc p", *cols)
query.join("document d", "d.id = p.doc_id")
query.join("doc_type t", "t.id = d.doc_type")
query.where("p.pub_proc = {:d}".format(export_job.job_id))
cols = "id", "num", "vendor_job", "cg_job", "doc_type"
args = self.PUSH_STAGE, ", ".join(cols), query
insert = "INSERT INTO {} ({}) {}".format(*args)
self.cursor.execute(insert)
self.conn.commit()
return
# Fetch the documents which need to be replaced on cancer.gov.
# Compare what we sent last time with what we've got now for each doc.
doc_type = "t.name AS doc_type"
cols = "c.id", doc_type, "d.subdir", "d.doc_version", "c.force_push"
query = db.Query("pub_proc_cg c", *cols)
query.join("pub_proc_doc d", "d.doc_id = c.id")
query.join("doc_version v", "v.id = c.id", "v.num = d.doc_version")
query.join("doc_type t", "t.id = v.doc_type")
query.where(query.Condition("d.pub_proc", export_job.job_id))
query.where(query.Condition("t.name", self.EXCLUDED, "NOT IN"))
query.where("d.failure IS NULL")
rows = query.execute(self.cursor).fetchall()
fields = dict(
vendor_job=export_job.job_id,
cg_job=self.job.id,
id=None,
doc_type=None,
xml=None,
num=None,
)
names = sorted(fields)
placeholders = ", ".join(["?"] * len(names))
args = self.PUSH_STAGE, ", ".join(names), placeholders
insert = "INSERT INTO {} ({}) VALUES ({})".format(*args)
push_all = self.job.parms.get("PushAllDocs") == "Yes"
self.logger.info("Queuing changed documents for push")
for row in rows:
if row.id in self.processed or row.id in self.partners_only:
continue
self.processed.add(row.id)
directory = export_job.directory
subdir = (row.subdir or "").strip()
if subdir:
directory = "{}/{}".format(export_job.directory, subdir)
if row.doc_type == "Media":
exported = self.wrap_media_file(directory, row.id)
else:
path = "{}/CDR{:d}.xml".format(directory, row.id)
with open(path, encoding="utf-8") as fp:
exported = fp.read()
needs_push = push_all or row.force_push == "Y"
if not needs_push:
query = db.Query("pub_proc_cg", "xml")
query.where(query.Condition("id", row.id))
pushed = query.execute(self.cursor).fetchone().xml
if self.normalize(pushed) != self.normalize(exported):
needs_push = True
if needs_push:
fields["id"] = row.id
fields["doc_type"] = row.doc_type
fields["xml"] = exported
fields["num"] = row.doc_version
values = [fields[name] for name in names]
self.logger.info("Queueing changed doc CDR%d for push", row.id)
try:
self.cursor.execute(insert, values)
except Exception:
self.logger.exception("First insert failed; trying again")
time.sleep(self.FAILURE_SLEEP)
self.cursor.execute(insert, values)
self.conn.commit()
# Queue up documents which are new.
self.logger.info("Queuing new documents for push")
cols = "v.id", doc_type, "d.subdir", "d.doc_version"
query = db.Query("pub_proc_doc d", *cols)
query.join("doc_version v", "v.id = d.doc_id", "v.num = d.doc_version")
query.join("doc_type t", "t.id = v.doc_type")
query.outer("pub_proc_cg c", "c.id = v.id")
query.where("d.pub_proc = {:d}".format(export_job.job_id))
query.where("d.failure IS NULL")
query.where("c.id IS NULL")
rows = query.execute(self.cursor).fetchall()
for row in rows:
if row.id in self.processed or row.id in self.partners_only:
continue
self.processed.add(row.id)
directory = export_job.directory
subdir = (row.subdir or "").strip()
if subdir:
directory = "{}/{}".format(export_job.directory, subdir)
if row.doc_type == "Media":
exported = self.wrap_media_file(directory, row.id)
else:
path = "{}/CDR{:d}.xml".format(directory, row.id)
with open(path, encoding="utf-8") as fp:
exported = fp.read()
fields["id"] = row.id
fields["doc_type"] = row.doc_type
fields["xml"] = exported
fields["num"] = row.doc_version
values = [fields[name] for name in names]
self.logger.info("Queueing new doc CDR%d for push", row.id)
try:
self.cursor.execute(insert, values)
except Exception:
self.logger.exception("First insert failed; trying again")
time.sleep(self.FAILURE_SLEEP)
self.cursor.execute(insert, values)
self.conn.commit()
# Don't prune documents not included in a hotfix job.
if self.job.parms["PubType"].startswith("Hotfix"):
return
# Don't prune documents if the number of documents was restricted.
if self.job.parms.get("NumDocs"):
return
# Handle documents which have been dropped for doctypes published
query = db.Query("pub_proc_doc d", "v.doc_type").unique()
query.join("doc_version v", "v.id = d.doc_id", "v.num = d.doc_version")
query.where(query.Condition("d.pub_proc", export_job.job_id))
types = [row.doc_type for row in query.execute(self.cursor).fetchall()]
if not types:
return
condition = "a.active_status <> 'A'"
if self.partners_only:
partners_only_ids = ", ".join(str(id) for id in self.partners_only)
condition = f"({condition} OR c.id in ({partners_only_ids}))"
types = ", ".join([str(t) for t in types])
export_id = str(export_job.job_id)
cols = "v.id", "v.num", export_id, push_id, "t.name"
query = db.Query("pub_proc_doc d", *cols).unique()
query.join("doc_version v", "v.id = d.doc_id", "v.num = d.doc_version")
query.join("all_docs a", "a.id = v.id")
query.join("pub_proc_cg c", "c.id = v.id", "c.pub_proc = d.pub_proc")
query.join("doc_type t", "t.id = v.doc_type")
query.outer("pub_proc_cg_work w", "w.id = c.id")
query.where(condition)
query.where("w.id IS NULL")
query.where("v.doc_type IN ({})".format(types))
cols = "id", "num", "vendor_job", "cg_job", "doc_type"
args = self.PUSH_STAGE, ", ".join(cols), query
insert = "INSERT INTO {} ({})\n{}".format(*args)
self.logger.info("Queueing dropped documents")
try:
self.cursor.execute(insert)
count = self.cursor.rowcount
if count:
self.logger.info("Queued %d dropped documents", count)
except Exception:
self.logger.exception("First insert failed; trying again")
time.sleep(self.FAILURE_SLEEP)
self.cursor.execute(insert)
self.conn.commit()
def wrap_media_file(self, directory, doc_id):
"""
Wrap the Media document's encoded blob in an XML document
Pass:
directory - string for location path for stored binary file
doc_id - integer for the CDR document's unique ID
Return:
serialized Media XML document
"""
paths = glob.glob(f"{directory}/CDR{doc_id:010d}.*")
if not paths:
raise Exception(f"Media file for CDR{doc_id} not found")
path = paths[0].replace("\\", "/")
base, extension = os.path.splitext(path)
extension = extension.replace(".", "")
if extension not in self.MEDIA_TYPES:
raise Exception(f"Media type not supported for {path}")
media_type = self.MEDIA_TYPES[extension]
with open(path, "rb") as fp:
media_bytes = fp.read()
encoded = base64.encodebytes(media_bytes).decode("ascii")
template = "<Media Type='{}' Size='{:d}' Encoding='base64'>{}</Media>"
return template.format(media_type, len(media_bytes), encoded)
def wait_for_approval(self):
"""
Allow the operator to review the queued push job before releasing it
"""
self.update_status(self.WAIT, "Waiting for push job release")
query = db.Query("pub_proc", "status")
query.where(query.Condition("id", self.job.id))
body = f"Push job {self.job.id:d} is waiting for approval."
self.notify(body)
while True:
status = query.execute(self.cursor).fetchone().status
if status == self.RUN:
self.post_message("Job resumed by user")
break
if status == self.FAILURE:
raise Exception("Job killed by user")
if status == self.WAIT:
time.sleep(10)
else:
message = f"Unexpected status {status} for job {self.job.id}"
raise Exception(message)
def send_docs(self):
"""
Send the documents for the push job to the CMS and to Akamai.
"""
# Identify the documents to be sent to the Drupal CMS.
types = tuple(DrupalClient.TYPES)
query = db.Query("pub_proc_cg_work", "id", "doc_type")
query.where("xml IS NOT NULL")
query.where(query.Condition("doc_type", types, "IN"))
rows = query.execute(self.cursor).fetchall()
send_to_cms = dict([tuple(row) for row in rows])
self.logger.info("%d docs to be pushed to Drupal CMS", len(rows))
# Identify the documents to removed from the Drupal CMS.
query = db.Query("pub_proc_cg_work", "id", "doc_type")
query.where("xml IS NULL")
query.where(query.Condition("doc_type", types, "IN"))
remove_from_cms = dict()
for row in query.execute(self.cursor).fetchall():
remove_from_cms[row.id] = row.doc_type
args = row.doc_type, row.id
self.logger.info("removing %s CDR%d from Drupal CMS", *args)
# Send the PDQ summaries to the Drupal CMS.
if send_to_cms or remove_from_cms:
source = "pub_proc_cg_work"
server = self.job.parms.get("DrupalServer")
base = "https://{}".format(server) if server else None
opts = dict(
send=send_to_cms,
remove=remove_from_cms,
table=source,
logger=self.logger,
base=base,
)
self.update_cms(self.session, **opts)
# Identify the pushed media documents.
query = db.Query("pub_proc_cg_work", "id", "num")
query.where("xml IS NOT NULL")
query.where("doc_type = 'Media'")
rows = query.execute(self.cursor).fetchall()
media = dict([tuple(row) for row in rows])
self.logger.info("%d media document(s) to be pushed", len(media))
# Identify the media documents to be removed.
query = db.Query("pub_proc_cg_work", "id")
query.where("xml IS NULL")
query.where("doc_type = 'Media'")
rows = query.execute(self.cursor).fetchall()
for row in rows:
media[row.id] = None
self.logger.info("%d media document(s) to be removed", len(rows))
# Make sure Akamai has any changes to the media files.
if media:
self.Media.sync(self.session, self.logger, media)
@classmethod
def update_cms(cls, session, **opts):
"""
Send new/modified summaries to Drupal and remove dropped content.
Failure of any of these documents will cause the entire job to
be marked as a failure (and almost always leave the content
pushed to Drupal in a `draft` state). I say "almost" because
the edge case is that the `publish()` call might fail between
batches. Nothing we can do about that very unlikely problem.
Implemented as a class method so that we can invoke this
functionality without creating a real publishing job.
Required positional argument:
session - object to be used in database queries, logging, etc.
Optional keyword arguments
send - dictionary of cdr_id -> document type for summaries to send
remove - similar dictionary for summaries being dropped
table - where to get the exported XML (default is pub_proc_cg)
logger - overide session.logger for recording activity
base - front portion of PDQ API URL
auth - optional credentials for Drupal client (name, pw tuple)
dumpfile - optional path for file in which to store docs
Raise:
`Exception` if unable to perform complete update successfully
"""
# Record what we're about to do.
dumpfile = opts.get("dumpfile")
logger = opts.get("logger")
base = opts.get("base")
auth = opts.get("auth")
client_opts = dict(logger=logger, base=base, auth=auth)
client = DrupalClient(session, **client_opts)
send = opts.get("send") or dict()
remove = opts.get("remove") or dict()
args = len(send), len(remove)
client.logger.info("Sending %d documents and removing %d", *args)
start = datetime.datetime.now()
# Compile the XSL/T filters we'll need.
filters = dict()
for name in ("Cancer", "Drug"):
title = "{} Information Summary for Drupal CMS".format(name)
key = "DrugInformationSummary" if name == "Drug" else "Summary"
filters[key] = Doc.load_single_filter(session, title)
# Defer the Spanish content to a second pass.
# The pylint tool doesn't understand how list expansion works.
# Cf. bug https://github.com/PyCQA/pylint/issues/2820
# pylint: disable=no-value-for-parameter
spanish = set()
pushed = []
table = opts.get("table", "pub_proc_cg")
query = db.Query("query_term_pub", "doc_id", "value")
query.where("path = '/Summary/SummaryMetaData/SummaryLanguage'")
summary_language = {}
for doc_id, language in query.execute(session.cursor).fetchall():
summary_language[doc_id] = language.lower().strip()
for doc_id in sorted(send):
if summary_language.get(doc_id, "english") != "english":
spanish.add(doc_id)
continue
doctype = send[doc_id]
xsl = filters[doctype]
root = cls.fetch_exported_doc(session, doc_id, table)
args = session, doc_id, xsl, root
if doctype == "Summary":
values = cls.assemble_values_for_cis(*args)
else:
values = cls.assemble_values_for_dis(*args)
if dumpfile:
with open(dumpfile, "a") as fp:
fp.write("{}\n".format(json.dumps(values)))
nid = client.push(values)
pushed.append((doc_id, nid, "en"))
# Do a second pass for the translated content.
xsl = filters["Summary"]
for doc_id in sorted(spanish):
root = cls.fetch_exported_doc(session, doc_id, table)
args = session, doc_id, xsl, root
values = cls.assemble_values_for_cis(*args)
if dumpfile:
with open(dumpfile, "a") as fp:
fp.write("{}\n".format(json.dumps(values)))
nid = client.push(values)
pushed.append((doc_id, nid, "es"))
# Drop the documents being removed, non-English before English.
for doc_id in remove:
if summary_language.get(doc_id, "english") != "english":
client.remove(doc_id)
for doc_id in remove:
if summary_language.get(doc_id, "english") == "english":
client.remove(doc_id)
# Switch pushed docs from draft to published.
errors = client.publish(pushed)
if errors:
raise Exception(f"{len(errors)} Drupal publish errors; see logs")
# Record how long it took.
elapsed = (datetime.datetime.now() - start).total_seconds()
args = len(send), len(remove), elapsed
client.logger.info("Sent %d and removed %d in %f seconds", *args)
@classmethod
def assemble_values_for_cis(cls, session, doc_id, xsl, root):
"""
Get the pieces of the summary needed by the Drupal CMS
Pass:
session - object to be used in database queries, logging, etc.
doc_id - CDR ID for the PDQ summary
xsl - compiled filter for generating HTML for the summary
root - parsed xml for the exported document
Return:
dictionary of values suitable for shipping to Drupal API
"""
# Tease out pieces which need a little bit of logic.
meta = root.find("SummaryMetaData")
node = meta.find("SummaryURL")
if node is None:
raise Exception("CDR{:d} has no SummaryURL".format(doc_id))
langs = dict(English="en", Spanish="es")
langcode = langs[Doc.get_text(meta.find("SummaryLanguage"))]
try:
url = urlparse(node.get("xref")).path
except Exception:
raise Exception(f"CDR{doc_id:d}: bad or missing summary URL")
if not url:
raise Exception(f"CDR{doc_id:d}: missing summary URL")
if url.startswith("/espanol"):
url = url[len("/espanol"):]
browser_title = cthp_card_title = translation_of = None
for node in root.findall("AltTitle"):
if node.get("TitleType") == "Browser":
browser_title = Doc.get_text(node)
elif node.get("TitleType") == "CancerTypeHomePage":
cthp_card_title = Doc.get_text(node)
if not cthp_card_title:
cthp_card_title = browser_title
node = root.find("TranslationOf")
if node is not None:
translation_of = Doc.extract_id(node.get("ref"))
svpc = suppress_otp = 0
if root.get("SVPC") == "Yes":
svpc = 1
if root.get("SuppressOnThisPageSection") == "Yes":
suppress_otp = 1
partner_merge_set = root.get("PartnerMergeSet") == "Yes"
# Pull out the summary sections into sequence of separate dictionaries.
intro_text_index = None
for i, node in enumerate(root.findall("SummarySection")):
types = []
for child in node.findall("SectMetaData/SectionType"):
types.append(Doc.get_text(child, ""))
if "Introductory Text" in types and not partner_merge_set:
if intro_text_index is not None:
error = "CDR{} has multiple introductory text sections"
raise Exception(error.format(doc_id))