-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_fetcher_datasets.py
More file actions
57 lines (45 loc) Β· 2.25 KB
/
data_fetcher_datasets.py
File metadata and controls
57 lines (45 loc) Β· 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# --- data_fetcher_datasets.py ---
"""Module for fetching dataset data from Hugging Face."""
import os
import time
import duckdb
import pandas as pd
from utils import log_progress, log_memory_usage
from config_datasets import HF_PARQUET_URL, RAW_DATA_COLUMNS_TO_FETCH
def fetch_raw_data():
"""
Fetch raw dataset data from Hugging Face, selecting only necessary columns.
Respects 'TEST_DATA_LIMIT' environment variable for testing.
"""
log_progress("π Starting DATASET data fetch from Hugging Face")
log_progress(f"Source URL: {HF_PARQUET_URL}")
fetch_start_time = time.time()
try:
columns_to_select = ", ".join(f'"{col}"' for col in RAW_DATA_COLUMNS_TO_FETCH)
query = f"SELECT {columns_to_select} FROM read_parquet('{HF_PARQUET_URL}')"
log_progress(f"Optimized query will fetch {len(RAW_DATA_COLUMNS_TO_FETCH)} specific columns.")
limit = os.environ.get('TEST_DATA_LIMIT')
if limit and limit.isdigit():
query += f" LIMIT {int(limit)}"
log_progress(f"π§ͺ Applying test limit: Fetching only {limit} rows.")
log_progress("β³ Executing DuckDB query to fetch remote dataset data...")
df_raw = duckdb.sql(query).df()
data_download_timestamp = pd.Timestamp.now(tz='UTC')
fetch_time = time.time() - fetch_start_time
log_progress(f"β
Dataset data fetch completed in {fetch_time:.2f}s")
if df_raw is None or df_raw.empty:
raise ValueError("Fetched dataset data is empty or None")
log_progress(f"π Rows: {len(df_raw):,}, Columns: {len(df_raw.columns)}")
log_memory_usage()
return df_raw, data_download_timestamp
except Exception as e:
log_progress(f"β ERROR: Could not fetch dataset data: {e}")
raise
def validate_raw_data(df_raw):
"""Perform validation on raw dataset data."""
log_progress("π Validating raw dataset data quality...")
if 'id' not in df_raw.columns:
raise ValueError("Critical 'id' column is missing from fetched dataset data.")
log_progress(f" - Duplicate IDs: {df_raw['id'].duplicated().sum():,}")
log_progress("β
Dataset data validation completed.")
return True