diff --git a/capepy/aws/dynamodb.py b/capepy/aws/dynamodb.py index ed032f9..f9fe661 100644 --- a/capepy/aws/dynamodb.py +++ b/capepy/aws/dynamodb.py @@ -1,5 +1,6 @@ import os +from boto3.dynamodb.conditions import ConditionExpressionBuilder, Key from botocore.exceptions import ClientError from capepy.aws.meta import Boto3Object @@ -68,10 +69,55 @@ def get_item(self, key): return ret + def query_items(self, index_name, key_cond_exp): + """Retrieve items from the loaded table using a secondary index. + + Args: + index_name: The name of the index to query. + key_cond_exp: The key condition expression to use in the query. + + Returns: + The items value from the DyamoDB table. Any valid query results + will come back as a list (including an empty list for no match), + None is returned if there is an error. + """ + ret = None + + try: + response = self.table.query( + IndexName=index_name, KeyConditionExpression=key_cond_exp + ) + ret = response.get("Items") + + except ClientError as err: + code, message = decode_error(err) + + # the key condition is not real easy to print in a good way, but we + # do our best. + ceb = ConditionExpressionBuilder() + kce = ceb.build_expression(key_cond_exp, is_key_condition=True) + + kce_str = ( + f"Expression: {kce.condition_expression} " + f"Expression Attr Names: {kce.attribute_name_placeholders} " + f"Expression Attr Values: {kce.attribute_value_placeholders} " + ) + + self.logger.error( + f"Couldn't get DynamoDB entries for in index '{index_name}' " + f"with key condition expression '{kce_str}' in table {self.name}. " + f"{code} {message}" + ) + + return ret + class PipelineTable(Table): """A DynamoDB table with specific structure for organizing analysis pipelines.""" + # name of the index that we query with name and version + NAME_VER_GSI = "PipelineNameVerIndex" + def __init__(self, table_name=None): """Constructor to fetch and initialize a DynamoDB analysis pipeline table. @@ -82,22 +128,25 @@ def __init__(self, table_name=None): table_name = os.getenv("DAP_REG_DDB_TABLE") super().__init__(table_name) - def get_pipeline(self, pipeline_name, pipeline_version): - """Retrieve a specific pipeline from the table. + def get_pipelines_by_name(self, pipeline_name, pipeline_version=None): + """Retrieve pipelines from the table matching name and optional version. Args: pipeline_name: The name of the pipeline. - pipeline_version: The version of the pipeline. + pipeline_version: The optional version of the pipeline. Returns: - The retrieved pipeline item. + The retrieved pipeline item(s) in a list (which will be empty in the + case of no match) or None on error. """ - key = {"pipeline_name": pipeline_name} + kce = Key("pipeline_name").eq(pipeline_name) + if pipeline_version is not None: - key["version"] = pipeline_version - return self.get_item(key) + kce = kce & Key("version").eq(pipeline_version) + + return self.query_items(self.NAME_VER_GSI, kce) - def get_pipeline_by_id(self, pipeline_id): + def get_pipeline(self, pipeline_id): """Retrieve a specific pipeline from the table. Args: