-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathentrypoint.py
More file actions
140 lines (115 loc) · 3.64 KB
/
entrypoint.py
File metadata and controls
140 lines (115 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import logging
import os
import sys
import argparse
import time
import pprint
from dotenv import find_dotenv, load_dotenv
import pandas as pd
from requests import RequestException
from d3b_utils.requests_retry import Session
DOTENV_PATH = find_dotenv()
if DOTENV_PATH:
load_dotenv(DOTENV_PATH)
API_VERSION = "v2"
X_SBG_Auth_Token = os.getenv("X_SBG_Auth_Token")
DATASET_ID = os.getenv("DATASET_ID")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
pp = pprint.PrettyPrinter(indent=4)
class CustomParser(argparse.ArgumentParser):
def error(self, message):
sys.stderr.write(f"\nerror: {message}\n\n")
if not isinstance(sys.exc_info()[1], argparse.ArgumentError):
self.print_help()
sys.exit(2)
# Instantiate a parser
parser = CustomParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
# Required arguments
parser.add_argument("source_file_path", help="Source tabluar file")
parser.add_argument("target_file_path", help="Target tabluar file")
# Optional arguments
parser.add_argument(
"--sep",
default="t",
choices=["t", "c"],
required=False,
help="Delimiter to use; t (for tsv) or c (for csv)",
)
parser.add_argument(
"--cavatica_drs_api_url",
default="https://cavatica-api.sbgenomics.com/",
required=False,
help="CAVATICA DRS API URL",
)
parser.add_argument(
"--hash_types",
nargs="+",
default=["ETag", "MD5", "SHA-1"],
choices=["ETag", "MD5", "SHA-1"],
required=False,
help="Hash types",
)
# Parse arguments
args = parser.parse_args()
source_file_path = args.source_file_path
target_file_path = args.target_file_path
sep = "\t" if args.target_file_path == "t" else ","
cavatica_drs_api_url = args.cavatica_drs_api_url
hash_types = args.hash_types
# Import in source file
source_df = pd.read_csv(source_file_path, sep=sep)
target_df = source_df.copy()
expects, success = source_df.shape[0], 0
logging.info(f"🚀 Start registering {expects} files to {cavatica_drs_api_url}!")
start = time.time()
# Index DRS objects
access_url_list = []
for i, row in source_df.iterrows():
# Build data
data = {
"datasetId": DATASET_ID,
"name": row["Key"].split("/")[-1],
"size": row["Size"],
"checksums": [
{
"type": {"ETag": "ETAG"}.get(hash_type, hash_type),
"checksum": row[hash_type],
}
for hash_type in hash_types
],
"locationUrls": [f"s3://{row['Bucket']}/{row['Key']}"],
}
# Send a POST request
resp = Session().post(
f"{cavatica_drs_api_url.rstrip('/')}/{API_VERSION}/drs-internal/objects/index/",
headers={
"X-SBG-Auth-Token": X_SBG_Auth_Token,
"X-SBG-Advance-Access": "advance",
"Content-Type": "application/json",
},
json=data,
)
# Cache generated DRS URI
drs_uri = None
try:
resp.raise_for_status()
drs_uri = resp.json().get("drsUri")
success += 1
pp.pprint(f" 🍱 Registered {data}; Access URL: {drs_uri}")
except RequestException as e:
pp.pprint(f" ❌ Failed to register {data}: {e.response.text}")
finally:
access_url_list.append(drs_uri)
# Export to target file
target_df["Access URL"] = access_url_list
target_df.to_csv(target_file_path, sep=sep, index=False)
# Time the process
end = time.time()
timedelta = end - start
m, s = divmod(timedelta, 60)
h, m = divmod(m, 60)
logging.info(f"✅ Time elapsed: {h} hours {m} minutes {s} seconds")
logging.info(
f"🎉 {success} / {expects} files have been registered to {cavatica_drs_api_url}!"
)