Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src/ingest_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,6 @@ def insert_entity(self, entity: Dict) -> Optional[str]:
)
return None

# Create indexes for common query patterns
self.db.entities.create_index("uri", unique=True)
self.db.entities.create_index("ber_data_source")
self.db.entities.create_index("data_type")

# Create 2dsphere index for geospatial queries on coordinates
self.db.entities.create_index([("geojson", GEOSPHERE)])

# Insert with upsert to handle potential duplicates based on URI
result = self.db.entities.update_one(
{"uri": entity["uri"]}, {"$set": entity}, upsert=True
Expand All @@ -152,6 +144,21 @@ def insert_entity(self, entity: Dict) -> Optional[str]:
logger.error(f"Error inserting entity: {e}")
return None

def create_indexes(self) -> None:
"""Create indexes for the 'entities' collection."""
assert self.db is not None, "Connection to database has not been established"
try:
logger.info("Creating indexes on 'entities' collection")
self.db.entities.create_index("uri")
# TODO: enforce unique index on id once ess-dive implements unique ids
self.db.entities.create_index("id", unique=True)
self.db.entities.create_index("ber_data_source")
self.db.entities.create_index("data_type")
self.db.entities.create_index([("geojson", GEOSPHERE)])
logger.info("Indexes created successfully")
except PyMongoError as e:
logger.error(f"Error creating indexes: {e}")

def ingest_file(self, filepath: str) -> Dict[str, int]:
"""Ingest entities from a JSON file."""
stats = {"processed": 0, "valid": 0, "invalid": 0, "inserted": 0, "error": 0}
Expand Down Expand Up @@ -233,6 +240,8 @@ def main():
"error": 0,
}

ingestor.create_indexes() # Create indexes before ingesting data

# Process a single file or all JSON files in a directory
if os.path.isdir(args.input):
for filename in os.listdir(args.input):
Expand Down