-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathdirectory.py
More file actions
executable file
·1421 lines (1271 loc) · 65 KB
/
directory.py
File metadata and controls
executable file
·1421 lines (1271 loc) · 65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# vim:ts=4:sw=4:tw=0:sts=4:et
import copy
import logging
import os
import os.path
import time
from pathlib import Path
from typing import Any, Optional
import networkx as nx
import pandas as pd
from diskcache import Cache
from molgenis_emx2_pyclient import Client
from molgenis_emx2_pyclient.exceptions import NoSuchTableException
from nncontacts import NNContacts
#logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger("BBMRI Directory")
REPO_ROOT = Path(__file__).resolve().parent
def _cache_root() -> Path:
"""Return the base directory for persistent caches."""
cache_root = os.environ.get("DIRECTORY_CACHE_ROOT")
if cache_root:
return Path(cache_root)
return Path.cwd()
def _repo_cache_dir(*parts: str) -> str:
"""Return a cache path anchored to the configured cache root."""
return str(_cache_root().joinpath(*parts))
def get_directory_ontology_table(
table_name: str,
*,
directory_url: Optional[str] = None,
purge_cache: bool = False,
) -> pd.DataFrame:
"""Return a cached DirectoryOntologies table, refreshing it live when needed.
Cache entries are keyed by both table name and Directory base URL so
alternate Directory instances do not accidentally reuse ontology rows from
the default public service.
"""
base_url = directory_url or "https://directory.bbmri-eric.eu"
cache_dir = _repo_cache_dir("data-check-cache", "directory-DirectoryOntologies")
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
cache = Cache(cache_dir)
cache_key = f"table:{base_url}:{table_name}"
try:
if purge_cache and cache_key in cache:
del cache[cache_key]
cached_value = cache.get(cache_key)
if isinstance(cached_value, pd.DataFrame):
log.info("Using cached DirectoryOntologies/%s table.", table_name)
return cached_value
log.info("Retrieving DirectoryOntologies/%s from %s", table_name, base_url)
with Client(base_url, schema="DirectoryOntologies") as session:
table_df = session.get(table=table_name, as_df=True)
cache[cache_key] = table_df
return table_df
except Exception as exc:
cached_value = cache.get(cache_key)
if isinstance(cached_value, pd.DataFrame):
log.warning(
"Unable to refresh DirectoryOntologies/%s; reusing cached copy instead: %s",
table_name,
exc,
)
return cached_value
raise RuntimeError(
f"Unable to load DirectoryOntologies table {table_name!r} and no cached copy is available."
) from exc
finally:
cache.close()
class Directory:
"""Access, cache, and graph-model BBMRI Directory data for downstream checks."""
def __init__(
self,
schema="ERIC",
purgeCaches=None,
debug=False,
pp=None,
username=None,
password=None,
token: str = None,
directory_url: Optional[str] = None,
include_withdrawn_entities: bool = False,
only_withdrawn_entities: bool = False,
):
"""Initialize a directory snapshot and build query/helper graphs.
Args:
schema: Directory schema (staging area) name.
purgeCaches: Cache names to purge before loading data.
debug: Enable additional debug output.
pp: Pretty-printer object used in debug mode.
username: Username for session authentication.
password: Password for session authentication.
token: Access token for token-based authentication.
directory_url: Base URL of the Directory instance to query.
include_withdrawn_entities: When False, public biobank/collection
accessors exclude entities that are withdrawn explicitly or
inherit withdrawal from a parent biobank/collection.
only_withdrawn_entities: When True, public biobank/collection
accessors return only withdrawn entities. Implies
include_withdrawn_entities.
"""
if purgeCaches is None:
purgeCaches = list()
self.__pp = pp
self.__package = schema
self.only_withdrawn_entities = only_withdrawn_entities
self.include_withdrawn_entities = include_withdrawn_entities or only_withdrawn_entities
self._ai_checksum_snapshot = {}
log.debug('Checking data in schema: ' + schema)
schema_cache_suffix = "".join(ch if ch.isalnum() or ch in {"-", "_"} else "_" for ch in str(schema))
cache_dir = _repo_cache_dir("data-check-cache", f'directory-{schema_cache_suffix}')
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
cache = Cache(cache_dir)
if 'directory' in purgeCaches:
cache.clear()
#self.__directoryURL = "https://directory-acc.molgenis.net/"
self.__directoryURL = directory_url or "https://directory.bbmri-eric.eu"
log.info('Retrieving directory content from ' + self.__directoryURL)
client_kwargs = {}
if token is not None:
client_kwargs["token"] = token
if self._has_complete_cached_snapshot(cache):
self._load_cached_snapshot(cache, schema)
self._refresh_missing_optional_quality_tables(
cache=cache,
schema=schema,
client_kwargs=client_kwargs,
username=username,
password=password,
token=token,
)
else:
try:
with Client(self.__directoryURL, **client_kwargs) as session:
if username is not None and password is not None:
log.info("Logging in to MOLGENIS with a user account.")
log.debug('username: ' + username)
log.debug('password: ' + password)
session.signin(username, password)
elif token is None:
log.warning("Continuing without authorization.")
session.set_schema(schema)
self._load_live_snapshot(session, cache, schema, debug)
except Exception as exc:
if self._has_complete_cached_snapshot(cache):
log.warning(
"Unable to reach or refresh the live Directory for schema %s; reusing cached snapshot instead: %s",
schema,
exc,
)
self._load_cached_snapshot(cache, schema)
else:
raise RuntimeError(
f"Unable to reach Directory schema {schema!r} and no complete cached snapshot is available."
) from exc
log.info(' ... all entities retrieved')
self.contactHashmap = {}
log.info('Processing directory data')
# Graph containing only biobanks and collections
self.directoryGraph = nx.DiGraph()
# DAG containing only biobanks and collections
self.directoryCollectionsDAG = nx.DiGraph()
# Graph/DAG containing only biobanks and services
self.directoryServicesGraph = nx.DiGraph()
self.directoryServicesDAG = nx.DiGraph()
# Graph/DAG containing biobanks, collections, and studies
self.directoryStudiesGraph = nx.DiGraph()
self.directoryStudiesDAG = nx.DiGraph()
# Weighted graph linking contacts to biobanks/collections/networks
self.contactGraph = nx.DiGraph()
# Graph linking networks to biobanks/collections
self.networkGraph = nx.DiGraph()
for c in self.contacts:
log.debug(f'Processing contact {c["id"]} into the graph')
if self.contactGraph.has_node(c['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found in contactGraph: ' + c['id'])
# XXX temporary hack -- adding contactID prefix
self.contactGraph.add_node(c['id'], data=c)
self.contactHashmap[c['id']] = c
log.debug(f'Contact {c["id"]} added into contactHashmap')
for b in self.biobanks:
log.debug(f'Processing biobank {b["id"]} into the graph')
if self.directoryGraph.has_node(b['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found in directoryGraph: ' + b['id'])
self.directoryGraph.add_node(b['id'], data=b)
self.directoryCollectionsDAG.add_node(b['id'], data=b)
self.directoryServicesGraph.add_node(b['id'], data=b)
self.directoryServicesDAG.add_node(b['id'], data=b)
self.directoryStudiesGraph.add_node(b['id'], data=b)
self.directoryStudiesDAG.add_node(b['id'], data=b)
if self.contactGraph.has_node(b['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found in contactGraph: ' + b['id'])
self.contactGraph.add_node(b['id'], data=b)
if self.networkGraph.has_node(b['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found in networkGraph: ' + b['id'])
self.networkGraph.add_node(b['id'], data=b)
for c in self.collections:
log.debug(f'Processing collection {c["id"]} into the graph')
if self.directoryGraph.has_node(c['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found: ' + c['id'])
self.directoryGraph.add_node(c['id'], data=c)
self.directoryCollectionsDAG.add_node(c['id'], data=c)
self.directoryStudiesGraph.add_node(c['id'], data=c)
self.directoryStudiesDAG.add_node(c['id'], data=c)
if self.contactGraph.has_node(c['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found in contactGraph: ' + c['id'])
self.contactGraph.add_node(c['id'], data=c)
if self.networkGraph.has_node(c['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found in networkGraph: ' + c['id'])
self.networkGraph.add_node(c['id'], data=c)
for service in self.services:
log.debug(f'Processing service {service["id"]} into the graph')
if self.directoryServicesGraph.has_node(service['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found in directoryServicesGraph: ' + service['id'])
self.directoryServicesGraph.add_node(service['id'], data=service)
self.directoryServicesDAG.add_node(service['id'], data=service)
for study in self.studies:
log.debug(f'Processing study {study["id"]} into the graph')
if self.directoryStudiesGraph.has_node(study['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found in directoryStudiesGraph: ' + study['id'])
self.directoryStudiesGraph.add_node(study['id'], data=study)
self.directoryStudiesDAG.add_node(study['id'], data=study)
for n in self.networks:
log.debug(f'Processing network {n["id"]} into the graph')
if self.contactGraph.has_node(n['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found in contactGraph: ' + n['id'])
self.contactGraph.add_node(n['id'], data=n)
if self.networkGraph.has_node(n['id']):
raise Exception('DirectoryStructure', 'Conflicting ID found in networkGraph: ' + n['id'])
self.networkGraph.add_node(n['id'], data=n)
self.collectionFactMap = {}
for f in self.facts:
if not f['collection']['id'] in self.collectionFactMap:
self.collectionFactMap[f['collection']['id']] = [ f ]
else:
self.collectionFactMap[f['collection']['id']].append(f)
self.serviceHashmap = {}
self.biobankServiceMap = {}
for service in self.services:
self.serviceHashmap[service['id']] = service
biobank = service.get('biobank')
if biobank and 'id' in biobank:
self.biobankServiceMap.setdefault(biobank['id'], []).append(service)
self.studyHashmap = {}
self.collectionStudyMap = {}
self.studyCollectionIdMap = {}
self.biobankStudyMap = {}
for study in self.studies:
self.studyHashmap[study['id']] = study
for collection in self.collections:
collection_id = collection['id']
biobank_id = collection['biobank']['id']
seen_study_ids = set()
for study_id in self.getListOfEntityAttributeIds(collection, 'studies'):
if not study_id or study_id in seen_study_ids:
continue
seen_study_ids.add(study_id)
study = self.studyHashmap.get(study_id)
if study is None:
log.warning(
"Collection %r refers non-existent study ID %r in studies field.",
collection_id,
study_id,
)
continue
self.collectionStudyMap.setdefault(collection_id, []).append(study)
self.studyCollectionIdMap.setdefault(study_id, []).append(collection_id)
existing_studies = self.biobankStudyMap.setdefault(biobank_id, [])
if study not in existing_studies:
existing_studies.append(study)
# check forward pointers from biobanks
for b in self.biobanks:
for c in b.get('collections', []):
if not self.directoryGraph.has_node(c['id']):
raise Exception('DirectoryStructure', 'Biobank refers non-existent collection ID: ' + c['id'])
# add biobank contact and network edges
for b in self.biobanks:
if 'contact' in b:
self.contactGraph.add_edge(b['id'], b['contact']['id'])
for c in b.get('contacts', []):
for n in c.get('networks', []):
self.networkGraph.add_edge(b['id'], n['id'])
# now we have all the collections created and checked duplicates, so we create edges
for c in self.collections:
if 'parent_collection' in c:
# some child collection
self.directoryGraph.add_edge(c['id'], c['parent_collection']['id'])
self.directoryStudiesGraph.add_edge(c['id'], c['parent_collection']['id'])
self.directoryStudiesGraph.add_edge(c['parent_collection']['id'], c['id'])
self.directoryStudiesDAG.add_edge(c['parent_collection']['id'], c['id'])
else:
# some of root collections of a biobank
# we add both edges as we can't extract this information from the biobank level (it contains pointers to all the child collections)
self.directoryGraph.add_edge(c['id'], c['biobank']['id'])
self.directoryGraph.add_edge(c['biobank']['id'], c['id'])
self.directoryCollectionsDAG.add_edge(c['biobank']['id'], c['id'])
self.directoryStudiesGraph.add_edge(c['id'], c['biobank']['id'])
self.directoryStudiesGraph.add_edge(c['biobank']['id'], c['id'])
self.directoryStudiesDAG.add_edge(c['biobank']['id'], c['id'])
# some of root collections of a biobank
for sb in c.get('sub_collections', []):
self.directoryGraph.add_edge(c['id'], sb['id'])
self.directoryCollectionsDAG.add_edge(c['id'], sb['id'])
self.directoryStudiesGraph.add_edge(c['id'], sb['id'])
self.directoryStudiesGraph.add_edge(sb['id'], c['id'])
self.directoryStudiesDAG.add_edge(c['id'], sb['id'])
if 'contact' in c:
self.contactGraph.add_edge(c['id'],c['contact']['id'])
for n in c.get('networks', []):
self.networkGraph.add_edge(c['id'], n['id'])
for service in self.services:
biobank = service.get('biobank')
if biobank is None or 'id' not in biobank:
continue
if not self.directoryServicesGraph.has_node(biobank['id']):
raise Exception('DirectoryStructure', 'Service refers non-existent biobank ID: ' + biobank['id'])
self.directoryServicesGraph.add_edge(service['id'], biobank['id'])
self.directoryServicesGraph.add_edge(biobank['id'], service['id'])
self.directoryServicesDAG.add_edge(biobank['id'], service['id'])
for study_id, collection_ids in self.studyCollectionIdMap.items():
for collection_id in collection_ids:
if not self.directoryStudiesGraph.has_node(collection_id):
raise Exception('DirectoryStructure', 'Study refers non-existent collection ID: ' + collection_id)
self.directoryStudiesGraph.add_edge(study_id, collection_id)
self.directoryStudiesGraph.add_edge(collection_id, study_id)
self.directoryStudiesDAG.add_edge(collection_id, study_id)
# processing network edges
for n in self.networks:
for b in n.get('biobanks', []):
self.networkGraph.add_edge(n['id'], b['id'])
# TODO remove once the datamodel is fixed
for c in n.get('contacts', []):
self.contactGraph.add_edge(n['id'], c['id'])
if 'contact' in n:
self.contactGraph.add_edge(n['id'], n['contact']['id'])
for c in n.get('collections', []):
self.networkGraph.add_edge(n['id'], c['id'])
# processing edges from contacts
for c in self.contacts:
for b in c.get('biobanks', []):
self.contactGraph.add_edge(c['id'], b['id'])
for coll in c.get('collections', []):
self.contactGraph.add_edge(c['id'], coll['id'])
for n in c.get('networks', []):
self.contactGraph.add_edge(c['id'], n['id'])
log.info('Checks of directory data as graphs')
# now we check if all the edges in the graph are in both directions
for e in self.directoryGraph.edges():
if not self.directoryGraph.has_edge(e[1],e[0]):
#raise Exception('DirectoryStructure', 'directoryGraph: Missing edge: ' + e[1] + ' to ' + e[0])
log.warning('DirectoryStructure - directoryGraph: Missing edge: ' + e[1] + ' to ' + e[0])
self.directoryGraph.add_edge(e[1],e[0])
for e in self.contactGraph.edges():
if not self.contactGraph.has_edge(e[1],e[0]):
#raise Exception('DirectoryStructure', 'contactGraph: Missing edge: ' + e[1] + ' to ' + e[0])
log.warning('DirectoryStructure - contactGraph: Missing edge: ' + e[1] + ' to ' + e[0])
self.contactGraph.add_edge(e[1],e[0])
for e in self.directoryServicesGraph.edges():
if not self.directoryServicesGraph.has_edge(e[1],e[0]):
log.warning('DirectoryStructure - directoryServicesGraph: Missing edge: ' + e[1] + ' to ' + e[0])
self.directoryServicesGraph.add_edge(e[1], e[0])
for e in self.directoryStudiesGraph.edges():
if not self.directoryStudiesGraph.has_edge(e[1],e[0]):
log.warning('DirectoryStructure - directoryStudiesGraph: Missing edge: ' + e[1] + ' to ' + e[0])
self.directoryStudiesGraph.add_edge(e[1], e[0])
for e in self.networkGraph.edges():
if not self.networkGraph.has_edge(e[1],e[0]):
#raise Exception('DirectoryStructure', 'networkGraph: Missing edge: ' + e[1] + ' to ' + e[0])
log.warning('DirectoryStructure - networkGraph: Missing edge: ' + e[1] + ' to ' + e[0])
self.networkGraph.add_edge(e[1],e[0])
# now make graphs immutable
nx.freeze(self.directoryGraph)
nx.freeze(self.directoryCollectionsDAG)
nx.freeze(self.directoryServicesGraph)
nx.freeze(self.directoryServicesDAG)
nx.freeze(self.directoryStudiesGraph)
nx.freeze(self.directoryStudiesDAG)
nx.freeze(self.contactGraph)
nx.freeze(self.networkGraph)
# we check that DAG is indeed DAG :-)
if not nx.algorithms.dag.is_directed_acyclic_graph(self.directoryCollectionsDAG):
raise Exception('DirectoryStructure', 'Collection DAG is not DAG')
if not nx.algorithms.dag.is_directed_acyclic_graph(self.directoryServicesDAG):
raise Exception('DirectoryStructure', 'Service DAG is not DAG')
if not nx.algorithms.dag.is_directed_acyclic_graph(self.directoryStudiesDAG):
raise Exception('DirectoryStructure', 'Study DAG is not DAG')
log.info('Directory structure initialized')
self.__orphacodesmapper = None
self._collection_withdrawn_cache = {}
@staticmethod
def _load_quality_table(session: Client, table_name: str, schema: str) -> pd.DataFrame:
"""Load an optional quality-info table or return an empty DataFrame when absent."""
try:
return session.get(table=table_name, as_df=True)
except NoSuchTableException:
log.info("Skipping optional quality table %s in schema %s.", table_name, schema)
return pd.DataFrame()
@staticmethod
def _has_complete_cached_snapshot(cache: Cache) -> bool:
"""Return whether the cache contains the minimum full snapshot needed for offline reuse."""
required_keys = ("biobanks", "collections", "contacts", "networks", "facts")
return all(key in cache for key in required_keys)
@staticmethod
def _get_cached_dataframe(cache: Cache, key: str) -> pd.DataFrame:
"""Return a cached DataFrame value or an empty DataFrame when the cache key is absent."""
if key in cache:
cached_value = cache[key]
if isinstance(cached_value, pd.DataFrame):
return cached_value
return pd.DataFrame()
@staticmethod
def _get_missing_optional_quality_cache_keys(cache: Cache) -> list[tuple[str, str]]:
"""Return missing optional quality cache keys and their live table names."""
quality_tables = [
("quality_info_biobanks", "QualityInfoBiobanks"),
("quality_info_collections", "QualityInfoCollections"),
]
return [
(cache_key, table_name)
for cache_key, table_name in quality_tables
if cache_key not in cache
]
def _load_cached_snapshot(self, cache: Cache, schema: str) -> None:
"""Populate Directory tables from an existing cache snapshot without using the live API."""
log.info("Using cached directory snapshot for schema %s.", schema)
log.info(' ... retrieving biobanks')
self.biobanks = cache['biobanks']
log.info(f' ... retrieved {len(self.biobanks)} biobanks from cache')
log.info(' ... retrieving collections')
self.qualBBtable = self._get_cached_dataframe(cache, 'quality_info_biobanks')
if self.qualBBtable.empty and 'quality_info_biobanks' not in cache:
log.info("Cached snapshot has no QualityInfoBiobanks table for schema %s.", schema)
self.qualColltable = self._get_cached_dataframe(cache, 'quality_info_collections')
if self.qualColltable.empty and 'quality_info_collections' not in cache:
log.info("Cached snapshot has no QualityInfoCollections table for schema %s.", schema)
self.collections = cache['collections']
log.info(f' ... retrieved {len(self.collections)} collections from cache')
log.info(' ... retrieving contacts')
self.contacts = cache['contacts']
log.info(f' ... retrieved {len(self.contacts)} contacts from cache')
log.info(' ... retrieving networks')
self.networks = cache['networks']
log.info(f' ... retrieved {len(self.networks)} networks from cache')
self.facts = cache['facts']
log.info(f' ... retrieved {len(self.facts)} facts from cache')
log.info(' ... retrieving services')
self.services = cache['services'] if 'services' in cache else []
if 'services' in cache:
log.info(f' ... retrieved {len(self.services)} services from cache')
else:
log.info(' ... cached snapshot has no services table; using empty list')
log.info(' ... retrieving studies')
self.studies = cache['studies'] if 'studies' in cache else []
if 'studies' in cache:
log.info(f' ... retrieved {len(self.studies)} studies from cache')
else:
log.info(' ... cached snapshot has no studies table; using empty list')
def _refresh_missing_optional_quality_tables(
self,
cache: Cache,
schema: str,
client_kwargs: dict[str, Any],
username: Optional[str],
password: Optional[str],
token: Optional[str],
) -> None:
"""Backfill missing optional quality tables without refetching the full snapshot."""
missing_quality_tables = self._get_missing_optional_quality_cache_keys(cache)
if not missing_quality_tables:
return
log.info(
"Cached snapshot for schema %s is missing optional quality tables; attempting live backfill.",
schema,
)
try:
with Client(self.__directoryURL, **client_kwargs) as session:
if username is not None and password is not None:
log.info("Logging in to MOLGENIS with a user account.")
log.debug('username: ' + username)
log.debug('password: ' + password)
session.signin(username, password)
elif token is None:
log.warning("Continuing without authorization.")
session.set_schema(schema)
for cache_key, table_name in missing_quality_tables:
quality_df = self._load_quality_table(session, table_name, schema)
cache[cache_key] = quality_df
if cache_key == "quality_info_biobanks":
self.qualBBtable = quality_df
elif cache_key == "quality_info_collections":
self.qualColltable = quality_df
except Exception as exc:
log.warning(
"Unable to backfill optional quality tables for schema %s; continuing with cached snapshot: %s",
schema,
exc,
)
def _load_live_snapshot(self, session: Client, cache: Cache, schema: str, debug: bool) -> None:
"""Populate Directory tables from the live API and cache the retrieved snapshot."""
log.info(' ... retrieving biobanks')
if 'biobanks' in cache:
self.biobanks = cache['biobanks']
log.info(f' ... retrieved {len(self.biobanks)} biobanks from cache')
else:
start_time = time.perf_counter()
self.biobanks = session.get_graphql(table="Biobanks")
cache['biobanks'] = self.biobanks
end_time = time.perf_counter()
log.info(f' ... retrieved {len(self.biobanks)} biobanks in ' + "%0.3f" % (end_time-start_time) + 's')
log.info(' ... retrieving collections')
self.qualBBtable = self._load_quality_table(session, 'QualityInfoBiobanks', schema)
cache['quality_info_biobanks'] = self.qualBBtable
self.qualColltable = self._load_quality_table(session, 'QualityInfoCollections', schema)
cache['quality_info_collections'] = self.qualColltable
if 'collections' in cache:
self.collections = cache['collections']
log.info(f' ... retrieved {len(self.collections)} collections from cache')
else:
start_time = time.perf_counter()
self.collections = session.get_graphql(table="Collections")
cache['collections'] = self.collections
end_time = time.perf_counter()
if debug and self.__pp is not None:
for c in self.collections:
self.__pp.pprint(c)
log.info(f' ... retrieved {len(self.collections)} collections in ' + "%0.3f" % (end_time-start_time) + 's')
log.info(' ... retrieving contacts')
if 'contacts' in cache:
self.contacts = cache['contacts']
log.info(f' ... retrieved {len(self.contacts)} contacts from cache')
else:
start_time = time.perf_counter()
self.contacts = session.get_graphql(table="Persons")
cache['contacts'] = self.contacts
end_time = time.perf_counter()
log.info(f' ... retrieved {len(self.contacts)} contacts in ' + "%0.3f" % (end_time-start_time) + 's')
log.info(' ... retrieving networks')
if 'networks' in cache:
self.networks = cache['networks']
log.info(f' ... retrieved {len(self.networks)} networks from cache')
else:
start_time = time.perf_counter()
self.networks = session.get_graphql("Networks")
cache['networks'] = self.networks
end_time = time.perf_counter()
log.info(f' ... retrieved {len(self.networks)} networks in ' + "%0.3f" % (end_time-start_time) + 's')
if 'facts' in cache:
self.facts = cache['facts']
log.info(f' ... retrieved {len(self.facts)} facts from cache')
else:
start_time = time.perf_counter()
self.facts = session.get_graphql("CollectionFacts")
cache['facts'] = self.facts
end_time = time.perf_counter()
log.info(f' ... retrieved {len(self.facts)} facts in ' + "%0.3f" % (end_time-start_time) + 's')
log.info(' ... retrieving services')
if 'services' in cache:
self.services = cache['services']
log.info(f' ... retrieved {len(self.services)} services from cache')
else:
try:
start_time = time.perf_counter()
self.services = session.get_graphql("Services")
cache['services'] = self.services
end_time = time.perf_counter()
log.info(f' ... retrieved {len(self.services)} services in ' + "%0.3f" % (end_time-start_time) + 's')
except Exception as exc:
log.warning('Unable to retrieve services: %s', exc)
self.services = []
log.info(' ... retrieving studies')
if 'studies' in cache:
self.studies = cache['studies']
log.info(f' ... retrieved {len(self.studies)} studies from cache')
else:
try:
start_time = time.perf_counter()
self.studies = session.get_graphql("Studies")
cache['studies'] = self.studies
end_time = time.perf_counter()
log.info(f' ... retrieved {len(self.studies)} studies in ' + "%0.3f" % (end_time-start_time) + 's')
except Exception as exc:
log.warning('Unable to retrieve studies: %s', exc)
self.studies = []
def prepare_ai_cache_checksum_state(self):
"""Capture pristine entities for AI-cache checksum validation.
QC plugins may mutate in-memory Directory entities during a run. AI
cache validation must therefore compare cached checksums against the
original Directory snapshot, not the post-plugin mutated state.
"""
if self._ai_checksum_snapshot:
return
self._ai_checksum_snapshot = {
"BIOBANK": {
biobank["id"]: copy.deepcopy(biobank) for biobank in self.biobanks
},
"COLLECTION": {
collection["id"]: copy.deepcopy(collection)
for collection in self.collections
},
}
def get_ai_checksum_entity(self, entity_type: str, entity_id: str) -> Optional[dict[str, Any]]:
"""Return the pristine snapshot entity used for AI-cache checksums."""
if not self._ai_checksum_snapshot:
self.prepare_ai_cache_checksum_state()
return self._ai_checksum_snapshot.get(entity_type, {}).get(entity_id)
def setOrphaCodesMapper(self, o):
"""Attach an OrphaCodes mapper implementation."""
self.__orphacodesmapper = o
def issetOrphaCodesMapper(self) -> bool:
"""Return whether an OrphaCodes mapper is configured."""
return self.__orphacodesmapper is not None
def getOrphaCodesMapper(self):
"""Return the configured OrphaCodes mapper."""
return self.__orphacodesmapper
def getSchema(self) -> str:
"""Return the configured Directory schema/staging-area name."""
return self.__package
def getDirectoryUrl(self) -> str:
"""Return the configured Directory base URL."""
return self.__directoryURL
@staticmethod
def _is_explicitly_withdrawn(entity: Optional[dict[str, Any]]) -> bool:
"""Return whether an entity is explicitly marked as withdrawn."""
if not entity:
return False
return bool(entity.get("withdrawn"))
def isBiobankWithdrawn(self, biobankID: str) -> bool:
"""Return whether a biobank is explicitly marked as withdrawn."""
biobank = self.directoryGraph.nodes[biobankID]['data']
return self._is_explicitly_withdrawn(biobank)
def isCollectionWithdrawn(self, collectionID: str) -> bool:
"""Return whether a collection is withdrawn, including inherited state."""
if collectionID in self._collection_withdrawn_cache:
return self._collection_withdrawn_cache[collectionID]
collection = self.directoryGraph.nodes[collectionID]['data']
withdrawn = self._is_explicitly_withdrawn(collection)
if not withdrawn:
withdrawn = self.isBiobankWithdrawn(collection['biobank']['id'])
if not withdrawn and 'parent_collection' in collection:
withdrawn = self.isCollectionWithdrawn(collection['parent_collection']['id'])
self._collection_withdrawn_cache[collectionID] = withdrawn
return withdrawn
def _matches_withdrawn_scope(self, is_withdrawn: bool) -> bool:
"""Return whether an entity matches the configured withdrawn scope."""
if self.only_withdrawn_entities:
return is_withdrawn
if self.include_withdrawn_entities:
return True
return not is_withdrawn
def getBiobanks(self):
"""Return all loaded biobanks."""
return [
biobank for biobank in self.biobanks
if self._matches_withdrawn_scope(self.isBiobankWithdrawn(biobank['id']))
]
@staticmethod
def _normalize_quality_entity_reference(value: Any) -> str:
"""Return a comparable entity identifier from a quality-table reference cell."""
if isinstance(value, dict):
value = value.get("id", "")
return str(value) if value is not None else ""
@staticmethod
def _filter_quality_table_by_entity_ids(
df: pd.DataFrame,
entity_column: str,
allowed_ids: set[str],
) -> pd.DataFrame:
"""Return only the quality rows whose entity reference matches ``allowed_ids``."""
if df.empty or entity_column not in df.columns:
return df.copy()
mask = df[entity_column].apply(Directory._normalize_quality_entity_reference).isin(allowed_ids)
return df.loc[mask].copy()
@staticmethod
def _reshape_quality_table(
df: pd.DataFrame,
entity_column: str,
assess_level_column: str,
) -> pd.DataFrame:
"""Return one wide quality row per entity from the raw quality rows."""
required_columns = {"id", entity_column, "quality_standard", assess_level_column}
missing_columns = sorted(required_columns.difference(df.columns))
if df.empty or missing_columns:
return pd.DataFrame(columns=[entity_column])
pivoted_df = df.pivot(
index=["id", entity_column],
columns="quality_standard",
values=assess_level_column,
).reset_index()
pivoted_df = pivoted_df.drop(columns="id")
pivoted_df.columns.name = None
final_df = pivoted_df.groupby(entity_column, as_index=False).first()
return final_df[[entity_column] + sorted(col for col in final_df.columns if col != entity_column)]
@staticmethod
def _rename_quality_standard_columns(
df: pd.DataFrame,
quality_standards_ontology: pd.DataFrame,
) -> pd.DataFrame:
"""Rename quality-standard code columns to ontology labels when available."""
if df.empty:
return df.copy()
if quality_standards_ontology.empty or not {"name", "label"}.issubset(quality_standards_ontology.columns):
return df.copy()
renamed_df = df.copy()
mapping = {
row["name"]: row["label"]
for _, row in quality_standards_ontology.iterrows()
if row.get("name") not in (None, "") and row.get("label") not in (None, "")
}
return renamed_df.rename(columns=mapping)
def _resolve_quality_scope(self, scope: str) -> str:
"""Validate and normalize a quality-table scope selector."""
allowed_scopes = {"configured", "active", "withdrawn", "all"}
if scope not in allowed_scopes:
raise ValueError(
f"Unsupported quality scope {scope!r}; expected one of {sorted(allowed_scopes)}."
)
return scope
def _get_quality_allowed_entity_ids(self, entity_type: str, scope: str) -> Optional[set[str]]:
"""Return the entity ids visible under a given quality-table scope."""
scope = self._resolve_quality_scope(scope)
if scope == "all":
return None
if entity_type == "biobank":
entities = self.biobanks
configured_entities = self.getBiobanks()
is_withdrawn = self.isBiobankWithdrawn
elif entity_type == "collection":
entities = self.collections
configured_entities = self.getCollections()
is_withdrawn = self.isCollectionWithdrawn
else:
raise ValueError(f"Unsupported quality entity type {entity_type!r}.")
if scope == "configured":
return {entity["id"] for entity in configured_entities}
include_withdrawn = scope == "withdrawn"
return {entity["id"] for entity in entities if is_withdrawn(entity["id"]) is include_withdrawn}
def getQualityStandardsOntology(self, purge_cache: bool = False) -> pd.DataFrame:
"""Return the cached QualityStandards ontology table for this Directory target."""
return get_directory_ontology_table(
"QualityStandards",
directory_url=self.__directoryURL,
purge_cache=purge_cache,
)
def getBiobankQualityInfo(self, scope: str = "configured") -> pd.DataFrame:
"""Return biobank quality-info rows filtered by the requested scope.
Args:
scope: One of ``configured``, ``active``, ``withdrawn``, or ``all``.
``configured`` follows this ``Directory`` instance's withdrawn
flags, while the explicit scopes ignore those constructor flags.
"""
allowed_ids = self._get_quality_allowed_entity_ids("biobank", scope)
raw_df = self.qualBBtable.copy()
if allowed_ids is None:
return raw_df
return self._filter_quality_table_by_entity_ids(raw_df, "biobank", allowed_ids)
def getCollectionQualityInfo(self, scope: str = "configured") -> pd.DataFrame:
"""Return collection quality-info rows filtered by the requested scope.
Args:
scope: One of ``configured``, ``active``, ``withdrawn``, or ``all``.
``configured`` follows this ``Directory`` instance's withdrawn
flags, while the explicit scopes ignore those constructor flags.
"""
allowed_ids = self._get_quality_allowed_entity_ids("collection", scope)
raw_df = self.qualColltable.copy()
if allowed_ids is None:
return raw_df
return self._filter_quality_table_by_entity_ids(raw_df, "collection", allowed_ids)
def getBiobankQualityInfoWide(
self,
scope: str = "configured",
*,
use_ontology_labels: bool = False,
purge_ontology_cache: bool = False,
quality_standards_ontology: Optional[pd.DataFrame] = None,
) -> pd.DataFrame:
"""Return one wide quality-information row per biobank."""
quality_df = self._reshape_quality_table(
self.getBiobankQualityInfo(scope=scope),
"biobank",
"assess_level_bio",
)
if use_ontology_labels:
if quality_standards_ontology is None:
quality_standards_ontology = self.getQualityStandardsOntology(
purge_cache=purge_ontology_cache
)
quality_df = self._rename_quality_standard_columns(
quality_df,
quality_standards_ontology,
)
return quality_df
def getCollectionQualityInfoWide(
self,
scope: str = "configured",
*,
use_ontology_labels: bool = False,
purge_ontology_cache: bool = False,
quality_standards_ontology: Optional[pd.DataFrame] = None,
) -> pd.DataFrame:
"""Return one wide quality-information row per collection."""
quality_df = self._reshape_quality_table(
self.getCollectionQualityInfo(scope=scope),
"collection",
"assess_level_col",
)
if use_ontology_labels:
if quality_standards_ontology is None:
quality_standards_ontology = self.getQualityStandardsOntology(
purge_cache=purge_ontology_cache
)
quality_df = self._rename_quality_standard_columns(
quality_df,
quality_standards_ontology,
)
return quality_df
def getQualBB(self):
"""Return the raw cached biobank quality-info table without scope filtering.
Prefer ``getBiobankQualityInfo(...)`` or
``getBiobankQualityInfoWide(...)`` in new code.
"""
return self.qualBBtable.copy()
def getQualColl(self):
"""Return the raw cached collection quality-info table without scope filtering.
Prefer ``getCollectionQualityInfo(...)`` or
``getCollectionQualityInfoWide(...)`` in new code.
"""
return self.qualColltable.copy()
def _get_loaded_biobank_by_id(self, biobankID: str) -> Optional[dict[str, Any]]:
"""Return a loaded biobank regardless of withdrawn scope, or None when absent."""
if self.directoryGraph.has_node(biobankID):
biobank = self.directoryGraph.nodes[biobankID]['data']
if 'country' in biobank or 'contact' in biobank:
return biobank
return None
def _get_loaded_collection_by_id(self, collectionID: str) -> Optional[dict[str, Any]]:
"""Return a loaded collection regardless of withdrawn scope, or None when absent."""
if self.directoryGraph.has_node(collectionID):
collection = self.directoryGraph.nodes[collectionID]['data']
if 'biobank' in collection:
return collection
return None
def _get_visible_collection_by_id(self, collectionID: str) -> Optional[dict[str, Any]]:
"""Return a collection visible under the current withdrawn scope, or None."""
collection = self._get_loaded_collection_by_id(collectionID)
if collection is None:
return None
if not self._matches_withdrawn_scope(self.isCollectionWithdrawn(collectionID)):
return None
return collection
def getBiobankById(self, biobankId: str, raise_on_missing: bool = False) -> Optional[dict[str, Any]]:
"""Return a biobank by id.
Args:
biobankId: Biobank identifier.
raise_on_missing: Raise KeyError when not found.
Returns:
Matching biobank or None when not found and raise_on_missing is False.
"""
for b in self.biobanks:
if b['id'] == biobankId:
if not self._matches_withdrawn_scope(self.isBiobankWithdrawn(biobankId)):
break
return b
if raise_on_missing:
raise KeyError(f"Biobank {biobankId!r} not found in loaded directory snapshot.")
log.warning("Biobank %r not found in loaded directory snapshot.", biobankId)
return None
def getBiobanksCount(self):
"""Return the number of loaded biobanks."""
return len(self.getBiobanks())
@staticmethod
def _extract_country_code(value) -> str:
"""Return a country/staging code from a scalar or EMX-style wrapper."""
if isinstance(value, dict):
value = value.get("id", "")
return str(value).strip().upper() if value is not None else ""
def getBiobankNN(self, biobankID: str):
"""Return the node/staging-area code for a biobank id.
The routing/grouping node is derived from the entity id prefix, not from
the biobank country. This keeps non-member/global areas such as EXT/EU
grouped under their staging area even when the hosted biobank country is
a member-state code such as US/VN/DE.
"""
biobank = self.directoryGraph.nodes[biobankID]['data']
staging_area = NNContacts.extract_staging_area(biobankID)
if staging_area:
return staging_area
return self._extract_country_code(biobank.get('country'))
def getBiobankCountry(self, biobankID: str):
"""Return the reported country code for a biobank id."""
biobank = self.directoryGraph.nodes[biobankID]['data']
return self._extract_country_code(biobank.get('country'))
def getCollections(self):
"""Return all loaded collections."""
return [
collection for collection in self.collections
if self._matches_withdrawn_scope(self.isCollectionWithdrawn(collection['id']))
]
def getCollectionById(self, collectionId: str, raise_on_missing: bool = False) -> Optional[dict[str, Any]]:
"""Return a collection by id.
Args:
collectionId: Collection identifier.
raise_on_missing: Raise KeyError when not found.
Returns:
Matching collection or None when not found and raise_on_missing is False.
"""
for c in self.collections:
if c['id'] == collectionId:
if not self._matches_withdrawn_scope(self.isCollectionWithdrawn(collectionId)):