Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions BSM/Downloader/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .downloader import *

__all__ = ['HCADownloader', 'SCPDownloader']
222 changes: 222 additions & 0 deletions BSM/Downloader/downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import os
import aiohttp
from tqdm.asyncio import tqdm
import asyncio
import sqlite3
import json
import aiofiles
import requests

class BaseDownloader:
def __init__(self, database_path, table_name, save_root):
self.database_path = database_path
self.table_name = table_name
self.save_root = save_root

def create_connection(self):
return sqlite3.connect(self.database_path)

async def check_file_exists(self, file_path):
return os.path.exists(file_path)

def get_response_headers(self, url):
try:
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}
response = requests.get(url, headers=headers, allow_redirects=False)
return response.headers
except Exception as e:
print(f"An error occurred: {e}")
return {}

async def download_file(self, session, url, save_dir, semaphore, progress=None, overall_progress=None):
raise NotImplementedError("This method should be overridden by subclasses")

class Downloader(BaseDownloader):
def __init__(self, database_path, table_name, save_root, num_workers=1, downloader_type='hca', timeout=7200, **kwargs):
super().__init__(database_path, table_name, save_root)
self.num_workers = num_workers
self.downloader_type = downloader_type
self.timeout = timeout
if downloader_type == 'hca':
self.dcp = kwargs.get('dcp')
self.cookie = None
elif downloader_type == 'scp':
self.cookie = self._process_cookie(kwargs.get('cookie'))
self.dcp = None
elif downloader_type == 'cxg':
self.cookie = None
self.dcp = None
else:
raise ValueError(f"unsupported database type: {downloader_type}")
@staticmethod
def _process_cookie(cookie):
"""
Process the cookie parameter. It can be a dictionary or a path to a JSON file.
If it's a path, load and return the dictionary.
Otherwise, raise a ValueError.
"""
if isinstance(cookie, dict):
return cookie
elif isinstance(cookie, str) and os.path.isfile(cookie):
try:
with open(cookie, 'r') as f:
return json.load(f)
except Exception as e:
raise ValueError(f"Failed to load cookie from JSON file at {cookie}: {e}")
else:
raise ValueError(f"Invalid cookie format. Expected a dictionary or a valid JSON file path, got {type(cookie)}")
async def download_file(self, session, url, save_dir, semaphore, progress=None, overall_progress=None):
async with semaphore:
try:
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36"
}

if self.downloader_type == 'hca':
if self.dcp is not None:
url = url.replace('dcp44', self.dcp)
final_url = self.get_response_headers(url).get('Location', url)
file_path = os.path.join(save_dir, final_url.split('/')[-1].split('?')[0])
elif self.downloader_type == 'scp': # scp
url = url.replace('/api/v1/site/studies', '/data/public')
final_url = url
file_path = os.path.join(save_dir, final_url.split('/')[-1].split('?')[-1].replace('filename=',''))
elif self.downloader_type == 'cxg':
final_url = url
file_path = os.path.join(save_dir, final_url.split('/')[-1].split('?')[-1])
else:
final_url = url
file_path = os.path.join(save_dir, final_url.split('/')[-1].split('?')[-1])
if await self.check_file_exists(file_path):
wrote = os.path.getsize(file_path)
headers['Range'] = f'bytes={wrote}-'
print(f"Resuming download of {file_path} from byte {wrote}")
else:
wrote = 0

request_kwargs = {'headers': headers, 'timeout': self.timeout}
if self.downloader_type == 'scp' and self.cookie:
request_kwargs['cookies'] = self.cookie

async with session.get(final_url, **request_kwargs) as response:
if response.status == 416:
print(f"The local copy of {file_path} is complete.")
if overall_progress:
overall_progress.update(1)
else:
total_size = None
if response.status == 206: # Partial Content
content_range = response.headers.get('Content-Range')
if content_range:
total_size = int(content_range.partition('/')[-1].strip())
elif response.status == 200: # OK - whole file
total_size = int(response.headers.get('Content-Length', 0))

if total_size is None:
print("Warning: Unable to determine total size. Downloading without progress tracking.")
pbar = tqdm(unit='B', unit_scale=True, desc=f"Downloading {file_path}", leave=False)
else:
pbar = tqdm(total=total_size, unit='B', unit_scale=True, desc=f"Downloading {file_path}",
initial=wrote, leave=False)

async with aiofiles.open(file_path, 'ab') as f:
async for chunk in response.content.iter_chunked(1024 * 1024):
await f.write(chunk)
wrote += len(chunk)
pbar.update(len(chunk))

pbar.close()

if total_size is not None and wrote != total_size:
print(
f"ERROR: Incomplete download detected for {file_path}. Expected {total_size} bytes, got {wrote}.")
else:
print(f"Successfully downloaded {file_path}")

except Exception as e:
print(f"Error occurred while downloading {url}: {e}")

finally:
if overall_progress:
overall_progress.update(1)

async def main(self):
conn = self.create_connection()
cursor = conn.cursor()
cursor.execute(f"SELECT internal_id as id, download_links as link FROM {self.table_name} WHERE download_links IS NOT NULL")
links = cursor.fetchall()
conn.close()

semaphore = asyncio.Semaphore(self.num_workers)
tasks = []
total_files = 0

# 计算总文件数
for item in links:
id, link_json = item
try:
link_data = json.loads(link_json)
if isinstance(link_data, (dict, list)):
total_files += len(link_data)
except json.JSONDecodeError as e:
print(f"Error decoding JSON for ID {id}: {e}")

with tqdm(total=total_files, desc="Overall Progress") as overall_progress:
async with aiohttp.ClientSession() as session:
selected_links = links[:] if self.downloader_type == 'scp' else links

for item in selected_links:
id, link_json = item
try:
link_data = json.loads(link_json)
save_dir = os.path.join(self.save_root, str(id))
os.makedirs(save_dir, exist_ok=True)

if isinstance(link_data, dict):
for key, link in link_data.items():
if isinstance(link, str) and link.startswith(('https://', 'ftp://')):
task = self.download_file(session, link, save_dir, semaphore, overall_progress=overall_progress)
tasks.append(task)
elif isinstance(link_data, list):
for link in link_data:
if isinstance(link, str) and link.startswith(('https://', 'ftp://', 'https://storage')):
task = self.download_file(session, link, save_dir, semaphore, overall_progress=overall_progress)
tasks.append(task)
else:
print(f"Unsupported data type for ID {id}: Expected dict or list, got {type(link_data)}")

except json.JSONDecodeError as e:
print(f"Error decoding JSON for ID {id}: {e}")

await asyncio.gather(*tasks)

if __name__ == "__main__":
# HCA下载器示例
hca_downloader = Downloader(
database_path="path/to/your/database.db",
table_name="your_table_name",
save_root="path/to/save/directory",
downloader_type="hca",
num_workers=4, # 并发数
dcp="your_dcp_value", # HCA特定参数
timeout=7200 # 可选的超时设置
)

# SCP下载器示例
scp_downloader = Downloader(
database_path="path/to/your/database.db",
table_name="your_table_name",
save_root="path/to/save/directory",
downloader_type="scp",
num_workers=4, # 并发数
cookie={"your": "cookie"}, # SCP特定参数
timeout=7200 # 可选的超时设置
)

# 运行下载器
asyncio.run(hca_downloader.main()) # 或者
asyncio.run(scp_downloader.main())
5 changes: 3 additions & 2 deletions BSM/Fetcher/SingleCellDBs/cellxgene.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ def __init__(self, domain_name="cellxgene.cziscience.com/curation/v1"):
self.headers = {"Content-Type": "application/json"}

def fetch_dataset(self):
self.logger.info('fetching all cellxgene datasets')
res = requests.get(url=self.datasets_url, headers=self.headers)
res.raise_for_status()
data = res.json()
return data

def fetch_collections(self):
self.logger.info('fetching all cellxgene collections')
res = requests.get(url=self.collections_url, headers=self.headers)
res.raise_for_status()
data = res.json()
Expand All @@ -27,7 +29,6 @@ def fetch_collections(self):
def fetch(self, db_name):
collections = self.fetch_collections()
datasets = self.fetch_dataset()

merged_datasets = []
for collection in collections:
collection_datasets = collection.get('datasets', [])
Expand All @@ -40,5 +41,5 @@ def fetch(self, db_name):

json_manager = JsonManager(db_name)
json_manager.save(merged_datasets)
self.logger.info("Data saved successfully to JSON file.")
self.logger.info(f"Data saved successfully to {db_name} file.")

16 changes: 6 additions & 10 deletions BSM/Fetcher/SingleCellDBs/exploredata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@


class ExploreDataFetcher(SingleCellDBFetcher):
def __init__(self, project_url=r'https://service.azul.data.humancellatlas.org/index/projects?size=100&catalog=dcp44&order=asc&sort=projectTitle&filters=%7B%7D',
files_url=r'https://service.azul.data.humancellatlas.org/index/files'):
def __init__(self, project_url=None, files_url=None, dcp_num='dcp44'):
super().__init__()
self.project_meta_data = []
self.project_meta_data_with_url = []
self.project_url = project_url
self.files_url = files_url
self.dcp_num = "dcp44"
self.dcp_num = dcp_num
self.project_url = rf'https://service.azul.data.humancellatlas.org/index/projects?size=100&catalog={self.dcp_num}&order=asc&sort=projectTitle&filters=%7B%7D' if project_url is None else project_url
self.files_url = r'https://service.azul.data.humancellatlas.org/index/files' if files_url is None else files_url
self.headers = {'Accept': 'application/json, text/plain, */*'}

def fetch(self, file_name):
Expand Down Expand Up @@ -48,14 +47,14 @@ def fetch_project(self):

response = requests.get(url)
response.raise_for_status()
# 解析JSON数据

data = response.json()
total = data['pagination']['total']
with tqdm(total=total, desc='Fetching Project Data', initial=data['pagination']['count']) as pbar:
while url:
hits = data.get('hits', [])
self.project_meta_data.extend(hits)
# 获取下一页的URL

pagination = data.get('pagination', {})
url = pagination.get('next', None)
if url:
Expand Down Expand Up @@ -86,9 +85,7 @@ def fetch_url(self, projects):
with tqdm(total=total, desc='Fetching Project URLs', initial=data['pagination']['count']) as pbar:
while url:

# 提取“hits”字段
hits = data.get('hits', [])
# 处理每个hit的files字段
for hit in hits:
files = hit.get('files', [])
for file in files:
Expand All @@ -98,7 +95,6 @@ def fetch_url(self, projects):
file.pop('format')
aggregated_data[file_format].append(file)

# 获取下一页的URL
pagination = data.get('pagination', {})
url = pagination.get('next', None)
if url:
Expand Down
1 change: 0 additions & 1 deletion BSM/Fetcher/SingleCellDBs/fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

class SingleCellDBFetcher(object):
def __init__(self):
# 设置日志记录器
self.logger = logging.getLogger(__name__)

def fetch(self, db_name):
Expand Down
8 changes: 6 additions & 2 deletions BSM/Fetcher/SingleCellDBs/single_cell_portal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import requests
from tqdm import tqdm

from BSM.Fetcher.SingleCellDBs.fetchers import SingleCellDBFetcher
from BSM.Fetcher.utils import JsonManager

Expand All @@ -15,18 +17,20 @@ def fetch(self, db_name):
if response.status_code == 200:
studies = response.json()
final_data = []
for study in studies:
for study in tqdm(studies):
accessions = study.get('accession', 'N/A')
study_url = f"{self.datasets_url}/{accessions}"
response = requests.get(study_url, headers=self.headers, verify=False)
if response.status_code == 200:
study_data = response.json()
final_data.append(study_data)
self.logger.info(f"Data saved successfully to {accessions}.json file.")
# self.logger.info()
tqdm.write(f"Data saved successfully to {accessions}.json file.")
else:
self.logger.error(f"Failed to retrieve study {accessions}. Status code: {response.status_code}")
manager = JsonManager(db_name)
manager.save(final_data)
self.logger.info(f"Data saved successfully to {db_name} file.")
else:
self.logger.error(f"Failed to retrieve studies. Status code: {response.status_code}")

Expand Down
Loading
Loading