diff --git a/.gitignore b/.gitignore index f646b9a..56e055b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -user_input_data/inventories/* +user_input_data/ !user_input_data/inventories/sample-bucket/ app/out.json app/items.json diff --git a/app/Dockerfile b/app/Dockerfile index a0ef174..9bc7f71 100644 --- a/app/Dockerfile +++ b/app/Dockerfile @@ -1,4 +1,4 @@ -FROM centos/python-38-centos7 +FROM arm64v8/python:3.9 # Set the working directory in the container WORKDIR /app diff --git a/docker-compose.yml b/docker-compose.yml index 3179fe5..bf23457 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,7 @@ services: - postgres_data:/var/lib/postgresql/data - ./rdb:/docker-entrypoint-initdb.d healthcheck: - test: ["CMD-SHELL", "pg_isready -U postgres"] + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER}"] interval: 5s timeout: 5s retries: 5 @@ -77,4 +77,4 @@ volumes: driver_opts: o: bind type: none - device: ./user_input_data \ No newline at end of file + device: ./user_input_data diff --git a/ingestor/Dockerfile b/ingestor/Dockerfile index bc5c313..8f2d33f 100644 --- a/ingestor/Dockerfile +++ b/ingestor/Dockerfile @@ -1,4 +1,4 @@ -FROM centos/python-38-centos7 +FROM arm64v8/python:3.9 # Set the working directory in the container WORKDIR /app diff --git a/ingestor/ingestor.py b/ingestor/ingestor.py index 39e2a28..78d9740 100755 --- a/ingestor/ingestor.py +++ b/ingestor/ingestor.py @@ -2,6 +2,8 @@ import os import pandas as pd +import dask.dataframe as dd +from dask.distributed import Client, LocalCluster import psycopg2 from psycopg2.extras import execute_values import logging @@ -45,6 +47,16 @@ def __init__(self): self.chunk_size = int(os.getenv("CHUNK_SIZE", 5000)) # Maximum number of chunks to process self.max_chunks = int(os.getenv("MAX_CHUNKS", 2)) + + # Set up Dask cluster + n_workers = 4 + threads_per_worker = 2 + logging.info(f"Initializing Dask cluster with {n_workers} workers, {threads_per_worker} threads per worker") + + # Start Dask client + self.cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) + self.client = Client(self.cluster) + logging.info(f"Dask dashboard available at: {self.client.dashboard_link}") self.conn, self.cur = self.__connect_to_db() @@ -70,9 +82,12 @@ def __del__(self): try: self.cur.close() self.conn.close() + # Close Dask client and cluster + self.client.close() + self.cluster.close() except: pass - logging.info("Database connection closed.") + logging.info("Database connection and Dask cluster closed.") def refresh_materialized_views(self): # Refresh the materialized views @@ -110,35 +125,159 @@ def ingest_csv_dir(self, csv_dir): header_columns = self._get_header_columns_for_csv( os.path.join(csv_dir, "manifest.json")) logging.info(f"Header columns: {header_columns}") + + # Get all CSV files for parallel processing + csv_files = [] for filename in os.listdir(csv_dir): - if filename.endswith(".csv"): + if filename == ".DS_Store": + continue + if filename.endswith(".csv.gz"): csv_filepath = os.path.join(csv_dir, filename) - logging.info(f"Processing file: {csv_filepath}") - self.insert_csv_to_db(csv_filepath, header_columns) + csv_files.append(csv_filepath) + + # Process files in parallel using Dask + if csv_files: + self.process_files_with_dask(csv_files, header_columns) def ingest_inventory_files(self): + logging.info(os.listdir(self.inventories_dir)) # Process each CSV file in the directory for bucket_dir in os.listdir(self.inventories_dir): - logging.info(f"Processing bucket directory: {bucket_dir}") - bucket_dir = os.path.join(self.inventories_dir, bucket_dir) - for inventory_type in os.listdir(bucket_dir): - logging.info(f"Processing inventory type: {inventory_type}") - if inventory_type in self.file_handlers: - inventory_type_dir = os.path.join( - bucket_dir, inventory_type) - try: - self.file_handlers[inventory_type](inventory_type_dir) - except Exception as e: - logging.error( - f"Failed to process {inventory_type_dir}: {e}") - raise e - else: - logging.warning( - f"Unsupported inventory type: {inventory_type}") + if bucket_dir == ".DS_Store": + continue + else: + logging.info(f"Processing bucket directory: {bucket_dir}") + bucket_dir = os.path.join(self.inventories_dir, bucket_dir) + for inventory_type in os.listdir(bucket_dir): + logging.info(f"Processing inventory type: {inventory_type}") + if inventory_type == ".DS_Store": + continue + else: + if inventory_type in self.file_handlers: + inventory_type_dir = os.path.join( + bucket_dir, inventory_type) + try: + self.file_handlers[inventory_type](inventory_type_dir) + except Exception as e: + logging.error( + f"Failed to process {inventory_type_dir}: {e}") + raise e + else: + logging.warning( + f"Unsupported inventory type: {inventory_type}") def _add_header_to_csv_from_manifest_json(self, filepath): pass + def process_files_with_dask(self, file_list, header_columns): + """Process multiple CSV files in parallel with Dask""" + logging.info(f"Processing {len(file_list)} CSV files with Dask") + + # Create a list of tasks, one for each file + tasks = [] + for filepath in file_list: + # Submit task to Dask for parallel processing + task = self.client.submit( + self.process_single_file, + filepath, + header_columns, + self._db_params.copy(), + self.chunk_size, + self.max_chunks + ) + tasks.append(task) + + # Wait for all tasks to complete + results = self.client.gather(tasks) + total_rows = sum(results) + logging.info(f"Total rows inserted: {total_rows}") + + @staticmethod + def process_single_file(filepath, header_columns, db_params, chunk_size, max_chunks): + """Process a single CSV file and insert its contents into the database. + This is a static method so it can be serialized and sent to workers.""" + logging.info(f"Processing file: {filepath}") + + total_rows = 0 + is_limited_chunks = max_chunks > 0 + current_chunk = 0 + + # Process the file in chunks + with pd.read_csv( + filepath, + header=None, + names=header_columns, + usecols=range(len(header_columns)), + compression='gzip', + chunksize=chunk_size + ) as reader: + for chunk in reader: + current_chunk += 1 + + # Stop if we've reached the maximum number of chunks + if is_limited_chunks and current_chunk > max_chunks: + break + + # Drop the column object_access_control_list + if "object_access_control_list" in chunk.columns: + chunk.drop(columns=["object_access_control_list"], inplace=True) + + # Process the data + try: + # Normalize the key column (remove double slashes) + def normalize_key(x): + if pd.isna(x) or x is None: + return "" + return re.sub(r'\/\/+', '/', str(x)) + + key_normalized = chunk.key.apply(normalize_key) + chunk = chunk.assign(key=key_normalized) + + # Add a suffix column based on the key + def extract_suffix(x): + if pd.isna(x) or x is None: + return None + x_str = str(x) + return x_str.split(".")[-1] if "." in x_str and x_str.rfind(".") > x_str.rfind("/") else None + + suffix_series = chunk.key.apply(extract_suffix) + chunk = chunk.assign(suffix=suffix_series) + + # Add depth column based on the key + def calculate_depth(x): + if pd.isna(x) or x is None: + return 0 + return str(x).count("/") + + depth_series = chunk.key.apply(calculate_depth) + chunk = chunk.assign(depth=depth_series) + + # Normalize column names (make them lowercase) + chunk.columns = [c.lower() for c in chunk.columns] + + # Prepare data tuple list for insertion + tuples = [tuple(x) for x in chunk.to_numpy(na_value=None)] + + # Compose the query dynamically based on the CSV columns + cols = ",".join(list(chunk.columns)) + query = f"INSERT INTO inventory ({cols}) VALUES %s" + + # Insert data into database + with psycopg2.connect(**db_params) as conn: + with conn.cursor() as cur: + execute_values(cur, query, tuples) + conn.commit() + + # Update row count + total_rows += len(tuples) + logging.info(f"Inserted {len(tuples)} rows from {os.path.basename(filepath)} (Chunk {current_chunk})") + + except Exception as e: + logging.error(f"Error processing chunk {current_chunk} from {filepath}: {e}") + + logging.info(f"Completed file {os.path.basename(filepath)} - Inserted {total_rows} rows") + return total_rows + def insert_csv_to_db(self, filepath, header_columns): """Insert CSV file into the database table in chunks""" # Read the CSV in chunks @@ -150,6 +289,7 @@ def insert_csv_to_db(self, filepath, header_columns): header=None, names=header_columns, usecols=range(len(header_columns)), + low_memory=False # Added to prevent DtypeWarning ) as reader: for i, chunk in enumerate(reader): logging.info(f"max_chunks: {max_chunks}") @@ -200,7 +340,7 @@ def insert_csv_to_db(self, filepath, header_columns): def main(): ingestor = Ingestor() try: - ingestor.ingest_inventory_files() + #ingestor.ingest_inventory_files() ingestor.refresh_materialized_views() except Exception as e: logging.error(f"Failed to ingest inventory files: {e}") diff --git a/ingestor/requirements.txt b/ingestor/requirements.txt index bcb6ed6..61b3cf2 100644 --- a/ingestor/requirements.txt +++ b/ingestor/requirements.txt @@ -1,7 +1,8 @@ +dask pandas>=1.5.0 plotly>=5.9.0 psycopg2-binary==2.9.3 matplotlib==3.5.1 flask==2.0.3 werkzeug==2.0.3 -inflection>=0.5.1 \ No newline at end of file +inflection>=0.5.1