Skip to content

Commit 2cd2840

Browse files
Add ability to reset indexes when populating documents
1 parent c0f4c69 commit 2cd2840

4 files changed

Lines changed: 331 additions & 218 deletions

File tree

backend/cosmetology-app/lambdas/python/search/handlers/manage_opensearch_indices.py

Lines changed: 2 additions & 216 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@
33
from cc_common.config import config, logger
44
from cc_common.exceptions import CCInternalException
55
from custom_resource_handler import CustomResourceHandler, CustomResourceResponse
6-
from opensearch_client import OpenSearchClient
7-
8-
# Initial index version for new deployments
9-
INITIAL_INDEX_VERSION = 'v1'
6+
from opensearch_client import INITIAL_INDEX_VERSION, OpenSearchClient
107

118
# Readiness check configuration
129
# OpenSearch domains may take time to become responsive after CloudFormation reports them as created.
@@ -53,8 +50,7 @@ def on_create(self, properties: dict) -> CustomResourceResponse | None:
5350
index_name = f'compact_{compact}_providers_{INITIAL_INDEX_VERSION}'
5451
# Create alias name (e.g., compact_cosm_providers)
5552
alias_name = f'compact_{compact}_providers'
56-
self._create_provider_index_with_alias(
57-
client=client,
53+
client.create_provider_index_with_alias(
5854
index_name=index_name,
5955
alias_name=alias_name,
6056
number_of_shards=number_of_shards,
@@ -144,215 +140,5 @@ def _wait_for_domain_ready(self) -> OpenSearchClient:
144140
f'Last error: {last_exception}'
145141
)
146142

147-
def _create_provider_index_with_alias(
148-
self,
149-
client: OpenSearchClient,
150-
index_name: str,
151-
alias_name: str,
152-
number_of_shards: int,
153-
number_of_replicas: int,
154-
) -> None:
155-
"""
156-
Create the provider index and alias in OpenSearch if they don't exist.
157-
158-
:param client: The OpenSearch client
159-
:param index_name: The versioned index name (e.g., compact_cosm_providers_v1)
160-
:param alias_name: The alias name (e.g., compact_cosm_providers)
161-
:param number_of_shards: Number of primary shards for the index
162-
:param number_of_replicas: Number of replica shards for the index
163-
"""
164-
# Check if the alias already exists (meaning an index version is already set up)
165-
if client.alias_exists(alias_name):
166-
logger.info(f"Alias '{alias_name}' already exists. Skipping index and alias creation.")
167-
return
168-
169-
# Check if the index already exists (edge case: index exists but alias doesn't)
170-
if client.index_exists(index_name):
171-
logger.info(f"Index '{index_name}' already exists. Creating alias only.")
172-
client.create_alias(index_name, alias_name)
173-
logger.info(f"Alias '{alias_name}' -> '{index_name}' created successfully.")
174-
return
175-
176-
# Create the index with the specified configuration
177-
logger.info(f"Creating index '{index_name}'...")
178-
index_mapping = self._get_provider_index_mapping(number_of_shards, number_of_replicas)
179-
client.create_index(index_name, index_mapping)
180-
logger.info(f"Index '{index_name}' created successfully.")
181-
182-
# Create the alias pointing to the new index
183-
logger.info(f"Creating alias '{alias_name}' -> '{index_name}'...")
184-
client.create_alias(index_name, alias_name)
185-
logger.info(f"Alias '{alias_name}' -> '{index_name}' created successfully.")
186-
187-
def _get_provider_index_mapping(self, number_of_shards: int, number_of_replicas: int) -> dict:
188-
"""
189-
Define the index mapping for provider documents.
190-
191-
:param number_of_shards: Number of primary shards for the index
192-
:param number_of_replicas: Number of replica shards for the index
193-
:return: The index mapping dictionary
194-
"""
195-
# Nested schema for AdverseAction
196-
adverse_action_properties = {
197-
'type': {'type': 'keyword'},
198-
'adverseActionId': {'type': 'keyword'},
199-
'compact': {'type': 'keyword'},
200-
'jurisdiction': {'type': 'keyword'},
201-
'providerId': {'type': 'keyword'},
202-
'licenseType': {'type': 'keyword'},
203-
'licenseTypeAbbreviation': {'type': 'keyword'},
204-
'actionAgainst': {'type': 'keyword'},
205-
'effectiveStartDate': {'type': 'date'},
206-
'creationDate': {'type': 'date'},
207-
'effectiveLiftDate': {'type': 'date'},
208-
'dateOfUpdate': {'type': 'date'},
209-
'encumbranceType': {'type': 'keyword'},
210-
'clinicalPrivilegeActionCategories': {'type': 'keyword'},
211-
'clinicalPrivilegeActionCategory': {'type': 'keyword'},
212-
'submittingUser': {'type': 'keyword'},
213-
'liftingUser': {'type': 'keyword'},
214-
}
215-
216-
# Nested schema for Investigation
217-
investigation_properties = {
218-
'type': {'type': 'keyword'},
219-
'investigationId': {'type': 'keyword'},
220-
'compact': {'type': 'keyword'},
221-
'jurisdiction': {'type': 'keyword'},
222-
'licenseType': {'type': 'keyword'},
223-
'status': {'type': 'keyword'},
224-
'dateOfUpdate': {'type': 'date'},
225-
}
226-
227-
# Nested schema for License
228-
license_properties = {
229-
'providerId': {'type': 'keyword'},
230-
'type': {'type': 'keyword'},
231-
'dateOfUpdate': {'type': 'date'},
232-
'compact': {'type': 'keyword'},
233-
'jurisdiction': {'type': 'keyword'},
234-
'licenseType': {'type': 'keyword'},
235-
'licenseStatusName': {'type': 'keyword'},
236-
'licenseStatus': {'type': 'keyword'},
237-
'jurisdictionUploadedLicenseStatus': {'type': 'keyword'},
238-
'compactEligibility': {'type': 'keyword'},
239-
'jurisdictionUploadedCompactEligibility': {'type': 'keyword'},
240-
'licenseNumber': {'type': 'keyword'},
241-
'givenName': {
242-
'type': 'text',
243-
'analyzer': 'custom_ascii_analyzer',
244-
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
245-
},
246-
'middleName': {
247-
'type': 'text',
248-
'analyzer': 'custom_ascii_analyzer',
249-
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
250-
},
251-
'familyName': {
252-
'type': 'text',
253-
'analyzer': 'custom_ascii_analyzer',
254-
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
255-
},
256-
'suffix': {'type': 'keyword'},
257-
'dateOfIssuance': {'type': 'date'},
258-
'dateOfRenewal': {'type': 'date'},
259-
'dateOfExpiration': {'type': 'date'},
260-
'dateOfBirth': {'type': 'date'},
261-
'homeAddressStreet1': {'type': 'text'},
262-
'homeAddressStreet2': {'type': 'text'},
263-
'homeAddressCity': {
264-
'type': 'text',
265-
'analyzer': 'custom_ascii_analyzer',
266-
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
267-
},
268-
'homeAddressState': {'type': 'keyword'},
269-
'homeAddressPostalCode': {'type': 'keyword'},
270-
'emailAddress': {'type': 'keyword'},
271-
'phoneNumber': {'type': 'keyword'},
272-
'adverseActions': {'type': 'nested', 'properties': adverse_action_properties},
273-
'investigations': {'type': 'nested', 'properties': investigation_properties},
274-
'investigationStatus': {'type': 'keyword'},
275-
}
276-
277-
# Nested schema for Privilege
278-
privilege_properties = {
279-
'type': {'type': 'keyword'},
280-
'providerId': {'type': 'keyword'},
281-
'compact': {'type': 'keyword'},
282-
'jurisdiction': {'type': 'keyword'},
283-
'licenseJurisdiction': {'type': 'keyword'},
284-
'licenseType': {'type': 'keyword'},
285-
'dateOfIssuance': {'type': 'date'},
286-
'dateOfRenewal': {'type': 'date'},
287-
'dateOfExpiration': {'type': 'date'},
288-
'dateOfUpdate': {'type': 'date'},
289-
'adverseActions': {'type': 'nested', 'properties': adverse_action_properties},
290-
'investigations': {'type': 'nested', 'properties': investigation_properties},
291-
'administratorSetStatus': {'type': 'keyword'},
292-
'compactTransactionId': {'type': 'keyword'},
293-
'privilegeId': {'type': 'keyword'},
294-
'status': {'type': 'keyword'},
295-
'investigationStatus': {'type': 'keyword'},
296-
}
297-
298-
return {
299-
'settings': {
300-
'index': {
301-
'number_of_shards': number_of_shards,
302-
'number_of_replicas': number_of_replicas,
303-
},
304-
'analysis': {
305-
# this custom analyzer is recommended by Opensearch when you have international character
306-
# sets, and you want to support searching by their closest ASCII equivalents.
307-
# See https://docs.opensearch.org/latest/analyzers/token-filters/asciifolding/
308-
'filter': {'custom_ascii_folding': {'type': 'asciifolding', 'preserve_original': True}},
309-
'analyzer': {
310-
'custom_ascii_analyzer': {
311-
'type': 'custom',
312-
'tokenizer': 'standard',
313-
'filter': ['lowercase', 'custom_ascii_folding'],
314-
}
315-
},
316-
},
317-
},
318-
'mappings': {
319-
'properties': {
320-
# Top-level provider fields
321-
'providerId': {'type': 'keyword'},
322-
'type': {'type': 'keyword'},
323-
'dateOfUpdate': {'type': 'date'},
324-
'compact': {'type': 'keyword'},
325-
'licenseJurisdiction': {'type': 'keyword'},
326-
'licenseStatus': {'type': 'keyword'},
327-
'compactEligibility': {'type': 'keyword'},
328-
'givenName': {
329-
'type': 'text',
330-
'analyzer': 'custom_ascii_analyzer',
331-
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
332-
},
333-
'middleName': {
334-
'type': 'text',
335-
'analyzer': 'custom_ascii_analyzer',
336-
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
337-
},
338-
'familyName': {
339-
'type': 'text',
340-
'analyzer': 'custom_ascii_analyzer',
341-
'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}},
342-
},
343-
'suffix': {'type': 'keyword'},
344-
'dateOfExpiration': {'type': 'date'},
345-
'jurisdictionUploadedLicenseStatus': {'type': 'keyword'},
346-
'jurisdictionUploadedCompactEligibility': {'type': 'keyword'},
347-
'providerFamGivMid': {'type': 'keyword'},
348-
'providerDateOfUpdate': {'type': 'date'},
349-
'birthMonthDay': {'type': 'keyword'},
350-
# Nested arrays
351-
'licenses': {'type': 'nested', 'properties': license_properties},
352-
'privileges': {'type': 'nested', 'properties': privilege_properties},
353-
}
354-
},
355-
}
356-
357143

358144
on_event = OpenSearchIndexManager('opensearch-index-manager')

backend/cosmetology-app/lambdas/python/search/handlers/populate_provider_documents.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@
1919
"startingLastKey": {"pk": "...", "sk": "..."}
2020
}
2121
22+
Optional parameters:
23+
24+
- resetIndexes: If true, deletes and recreates all compact provider indexes before
25+
indexing (uses numberOfShards / numberOfReplicas). Run during low traffic; do not
26+
combine with resumption (startingCompact / startingLastKey) for the same run.
27+
- numberOfShards: Primary shard count for recreated indexes (default: 1).
28+
- numberOfReplicas: Replica shard count for recreated indexes (default: 0).
29+
2230
Race Condition Consideration:
2331
A potential race condition can occur when running this function while provider
2432
data is being actively updated:
@@ -39,7 +47,7 @@
3947
from cc_common.config import config, logger
4048
from cc_common.exceptions import CCInternalException
4149
from marshmallow import ValidationError
42-
from opensearch_client import OpenSearchClient
50+
from opensearch_client import INITIAL_INDEX_VERSION, OpenSearchClient
4351
from utils import generate_provider_opensearch_documents
4452

4553
# Batch size for DynamoDB pagination
@@ -64,12 +72,37 @@ def populate_provider_documents(event: dict, context: LambdaContext):
6472
:param event: Lambda event with optional parameters:
6573
- startingCompact: The compact to start/resume processing from
6674
- startingLastKey: The DynamoDB pagination key to resume from
75+
- resetIndexes: If true, delete and recreate all compact indexes first
76+
- numberOfShards: Shards for recreated indexes (default 1)
77+
- numberOfReplicas: Replicas for recreated indexes (default 0)
6778
:param context: Lambda context
6879
:return: Summary of indexing operation, including pagination info if incomplete
6980
"""
7081
data_client = config.data_client
7182
opensearch_client = OpenSearchClient()
7283

84+
reset_indexes = bool(event.get('resetIndexes', False))
85+
number_of_shards = int(event.get('numberOfShards', 1))
86+
number_of_replicas = int(event.get('numberOfReplicas', 0))
87+
88+
if reset_indexes:
89+
logger.info(
90+
'resetIndexes=True: deleting and recreating all compact indexes',
91+
number_of_shards=number_of_shards,
92+
number_of_replicas=number_of_replicas,
93+
)
94+
for compact in config.compacts:
95+
alias_name = f'compact_{compact}_providers'
96+
index_name = f'compact_{compact}_providers_{INITIAL_INDEX_VERSION}'
97+
opensearch_client.delete_provider_index_with_alias(alias_name=alias_name)
98+
opensearch_client.create_provider_index_with_alias(
99+
index_name=index_name,
100+
alias_name=alias_name,
101+
number_of_shards=number_of_shards,
102+
number_of_replicas=number_of_replicas,
103+
)
104+
logger.info('Index reset complete. Proceeding with population.')
105+
73106
# Get optional pagination parameters from event for resumption (normal mode)
74107
starting_compact = event.get('startingCompact')
75108
starting_last_key = event.get('startingLastKey')

0 commit comments

Comments
 (0)