Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
user_input_data/inventories/*
user_input_data/
!user_input_data/inventories/sample-bucket/
app/out.json
app/items.json
Expand Down
2 changes: 1 addition & 1 deletion app/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM centos/python-38-centos7
FROM arm64v8/python:3.9

# Set the working directory in the container
WORKDIR /app
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ services:
- postgres_data:/var/lib/postgresql/data
- ./rdb:/docker-entrypoint-initdb.d
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER}"]
interval: 5s
timeout: 5s
retries: 5
Expand Down Expand Up @@ -77,4 +77,4 @@ volumes:
driver_opts:
o: bind
type: none
device: ./user_input_data
device: ./user_input_data
2 changes: 1 addition & 1 deletion ingestor/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM centos/python-38-centos7
FROM arm64v8/python:3.9

# Set the working directory in the container
WORKDIR /app
Expand Down
182 changes: 161 additions & 21 deletions ingestor/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import os
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import psycopg2
from psycopg2.extras import execute_values
import logging
Expand Down Expand Up @@ -45,6 +47,16 @@ def __init__(self):
self.chunk_size = int(os.getenv("CHUNK_SIZE", 5000))
# Maximum number of chunks to process
self.max_chunks = int(os.getenv("MAX_CHUNKS", 2))

# Set up Dask cluster
n_workers = 4
threads_per_worker = 2
logging.info(f"Initializing Dask cluster with {n_workers} workers, {threads_per_worker} threads per worker")

# Start Dask client
self.cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
self.client = Client(self.cluster)
logging.info(f"Dask dashboard available at: {self.client.dashboard_link}")

self.conn, self.cur = self.__connect_to_db()

Expand All @@ -70,9 +82,12 @@ def __del__(self):
try:
self.cur.close()
self.conn.close()
# Close Dask client and cluster
self.client.close()
self.cluster.close()
except:
pass
logging.info("Database connection closed.")
logging.info("Database connection and Dask cluster closed.")

def refresh_materialized_views(self):
# Refresh the materialized views
Expand Down Expand Up @@ -110,35 +125,159 @@ def ingest_csv_dir(self, csv_dir):
header_columns = self._get_header_columns_for_csv(
os.path.join(csv_dir, "manifest.json"))
logging.info(f"Header columns: {header_columns}")

# Get all CSV files for parallel processing
csv_files = []
for filename in os.listdir(csv_dir):
if filename.endswith(".csv"):
if filename == ".DS_Store":
continue
if filename.endswith(".csv.gz"):
csv_filepath = os.path.join(csv_dir, filename)
logging.info(f"Processing file: {csv_filepath}")
self.insert_csv_to_db(csv_filepath, header_columns)
csv_files.append(csv_filepath)

# Process files in parallel using Dask
if csv_files:
self.process_files_with_dask(csv_files, header_columns)

def ingest_inventory_files(self):
logging.info(os.listdir(self.inventories_dir))
# Process each CSV file in the directory
for bucket_dir in os.listdir(self.inventories_dir):
logging.info(f"Processing bucket directory: {bucket_dir}")
bucket_dir = os.path.join(self.inventories_dir, bucket_dir)
for inventory_type in os.listdir(bucket_dir):
logging.info(f"Processing inventory type: {inventory_type}")
if inventory_type in self.file_handlers:
inventory_type_dir = os.path.join(
bucket_dir, inventory_type)
try:
self.file_handlers[inventory_type](inventory_type_dir)
except Exception as e:
logging.error(
f"Failed to process {inventory_type_dir}: {e}")
raise e
else:
logging.warning(
f"Unsupported inventory type: {inventory_type}")
if bucket_dir == ".DS_Store":
continue
else:
logging.info(f"Processing bucket directory: {bucket_dir}")
bucket_dir = os.path.join(self.inventories_dir, bucket_dir)
for inventory_type in os.listdir(bucket_dir):
logging.info(f"Processing inventory type: {inventory_type}")
if inventory_type == ".DS_Store":
continue
else:
if inventory_type in self.file_handlers:
inventory_type_dir = os.path.join(
bucket_dir, inventory_type)
try:
self.file_handlers[inventory_type](inventory_type_dir)
except Exception as e:
logging.error(
f"Failed to process {inventory_type_dir}: {e}")
raise e
else:
logging.warning(
f"Unsupported inventory type: {inventory_type}")

def _add_header_to_csv_from_manifest_json(self, filepath):
pass

def process_files_with_dask(self, file_list, header_columns):
"""Process multiple CSV files in parallel with Dask"""
logging.info(f"Processing {len(file_list)} CSV files with Dask")

# Create a list of tasks, one for each file
tasks = []
for filepath in file_list:
# Submit task to Dask for parallel processing
task = self.client.submit(
self.process_single_file,
filepath,
header_columns,
self._db_params.copy(),
self.chunk_size,
self.max_chunks
)
tasks.append(task)

# Wait for all tasks to complete
results = self.client.gather(tasks)
total_rows = sum(results)
logging.info(f"Total rows inserted: {total_rows}")

@staticmethod
def process_single_file(filepath, header_columns, db_params, chunk_size, max_chunks):
"""Process a single CSV file and insert its contents into the database.
This is a static method so it can be serialized and sent to workers."""
logging.info(f"Processing file: {filepath}")

total_rows = 0
is_limited_chunks = max_chunks > 0
current_chunk = 0

# Process the file in chunks
with pd.read_csv(
filepath,
header=None,
names=header_columns,
usecols=range(len(header_columns)),
compression='gzip',
chunksize=chunk_size
) as reader:
for chunk in reader:
current_chunk += 1

# Stop if we've reached the maximum number of chunks
if is_limited_chunks and current_chunk > max_chunks:
break

# Drop the column object_access_control_list
if "object_access_control_list" in chunk.columns:
chunk.drop(columns=["object_access_control_list"], inplace=True)

# Process the data
try:
# Normalize the key column (remove double slashes)
def normalize_key(x):
if pd.isna(x) or x is None:
return ""
return re.sub(r'\/\/+', '/', str(x))

key_normalized = chunk.key.apply(normalize_key)
chunk = chunk.assign(key=key_normalized)

# Add a suffix column based on the key
def extract_suffix(x):
if pd.isna(x) or x is None:
return None
x_str = str(x)
return x_str.split(".")[-1] if "." in x_str and x_str.rfind(".") > x_str.rfind("/") else None

suffix_series = chunk.key.apply(extract_suffix)
chunk = chunk.assign(suffix=suffix_series)

# Add depth column based on the key
def calculate_depth(x):
if pd.isna(x) or x is None:
return 0
return str(x).count("/")

depth_series = chunk.key.apply(calculate_depth)
chunk = chunk.assign(depth=depth_series)

# Normalize column names (make them lowercase)
chunk.columns = [c.lower() for c in chunk.columns]

# Prepare data tuple list for insertion
tuples = [tuple(x) for x in chunk.to_numpy(na_value=None)]

# Compose the query dynamically based on the CSV columns
cols = ",".join(list(chunk.columns))
query = f"INSERT INTO inventory ({cols}) VALUES %s"

# Insert data into database
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cur:
execute_values(cur, query, tuples)
conn.commit()

# Update row count
total_rows += len(tuples)
logging.info(f"Inserted {len(tuples)} rows from {os.path.basename(filepath)} (Chunk {current_chunk})")

except Exception as e:
logging.error(f"Error processing chunk {current_chunk} from {filepath}: {e}")

logging.info(f"Completed file {os.path.basename(filepath)} - Inserted {total_rows} rows")
return total_rows

def insert_csv_to_db(self, filepath, header_columns):
"""Insert CSV file into the database table in chunks"""
# Read the CSV in chunks
Expand All @@ -150,6 +289,7 @@ def insert_csv_to_db(self, filepath, header_columns):
header=None,
names=header_columns,
usecols=range(len(header_columns)),
low_memory=False # Added to prevent DtypeWarning
) as reader:
for i, chunk in enumerate(reader):
logging.info(f"max_chunks: {max_chunks}")
Expand Down Expand Up @@ -200,7 +340,7 @@ def insert_csv_to_db(self, filepath, header_columns):
def main():
ingestor = Ingestor()
try:
ingestor.ingest_inventory_files()
#ingestor.ingest_inventory_files()
ingestor.refresh_materialized_views()
except Exception as e:
logging.error(f"Failed to ingest inventory files: {e}")
Expand Down
3 changes: 2 additions & 1 deletion ingestor/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
dask
pandas>=1.5.0
plotly>=5.9.0
psycopg2-binary==2.9.3
matplotlib==3.5.1
flask==2.0.3
werkzeug==2.0.3
inflection>=0.5.1
inflection>=0.5.1