From 1ee37d6ff9e1ed3c5fbf5e320fb078e96e9c7f92 Mon Sep 17 00:00:00 2001 From: shreddd Date: Tue, 16 Sep 2025 16:27:41 -0700 Subject: [PATCH] Redo index creation to happen once Switch to id as unique field --- src/ingest_data.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/ingest_data.py b/src/ingest_data.py index 93cedf5..375fbb3 100644 --- a/src/ingest_data.py +++ b/src/ingest_data.py @@ -125,14 +125,6 @@ def insert_entity(self, entity: Dict) -> Optional[str]: ) return None - # Create indexes for common query patterns - self.db.entities.create_index("uri", unique=True) - self.db.entities.create_index("ber_data_source") - self.db.entities.create_index("data_type") - - # Create 2dsphere index for geospatial queries on coordinates - self.db.entities.create_index([("geojson", GEOSPHERE)]) - # Insert with upsert to handle potential duplicates based on URI result = self.db.entities.update_one( {"uri": entity["uri"]}, {"$set": entity}, upsert=True @@ -152,6 +144,21 @@ def insert_entity(self, entity: Dict) -> Optional[str]: logger.error(f"Error inserting entity: {e}") return None + def create_indexes(self) -> None: + """Create indexes for the 'entities' collection.""" + assert self.db is not None, "Connection to database has not been established" + try: + logger.info("Creating indexes on 'entities' collection") + self.db.entities.create_index("uri") + # TODO: enforce unique index on id once ess-dive implements unique ids + self.db.entities.create_index("id", unique=True) + self.db.entities.create_index("ber_data_source") + self.db.entities.create_index("data_type") + self.db.entities.create_index([("geojson", GEOSPHERE)]) + logger.info("Indexes created successfully") + except PyMongoError as e: + logger.error(f"Error creating indexes: {e}") + def ingest_file(self, filepath: str) -> Dict[str, int]: """Ingest entities from a JSON file.""" stats = {"processed": 0, "valid": 0, "invalid": 0, "inserted": 0, "error": 0} @@ -233,6 +240,8 @@ def main(): "error": 0, } + ingestor.create_indexes() # Create indexes before ingesting data + # Process a single file or all JSON files in a directory if os.path.isdir(args.input): for filename in os.listdir(args.input):