Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions capepy/aws/dynamodb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os

from boto3.dynamodb.conditions import ConditionExpressionBuilder, Key
from botocore.exceptions import ClientError

from capepy.aws.meta import Boto3Object
Expand Down Expand Up @@ -68,10 +69,55 @@ def get_item(self, key):

return ret

def query_items(self, index_name, key_cond_exp):
"""Retrieve items from the loaded table using a secondary index.

Args:
index_name: The name of the index to query.
key_cond_exp: The key condition expression to use in the query.

Returns:
The items value from the DyamoDB table. Any valid query results
will come back as a list (including an empty list for no match),
None is returned if there is an error.
"""
ret = None

try:
response = self.table.query(
IndexName=index_name, KeyConditionExpression=key_cond_exp
)
ret = response.get("Items")

except ClientError as err:
code, message = decode_error(err)

# the key condition is not real easy to print in a good way, but we
# do our best.
ceb = ConditionExpressionBuilder()
kce = ceb.build_expression(key_cond_exp, is_key_condition=True)

kce_str = (
f"Expression: {kce.condition_expression} "
f"Expression Attr Names: {kce.attribute_name_placeholders} "
f"Expression Attr Values: {kce.attribute_value_placeholders} "
)

self.logger.error(
f"Couldn't get DynamoDB entries for in index '{index_name}' "
f"with key condition expression '{kce_str}' in table {self.name}. "
f"{code} {message}"
)

return ret


class PipelineTable(Table):
"""A DynamoDB table with specific structure for organizing analysis pipelines."""

# name of the index that we query with name and version
NAME_VER_GSI = "PipelineNameVerIndex"

def __init__(self, table_name=None):
"""Constructor to fetch and initialize a DynamoDB analysis pipeline table.

Expand All @@ -82,22 +128,25 @@ def __init__(self, table_name=None):
table_name = os.getenv("DAP_REG_DDB_TABLE")
super().__init__(table_name)

def get_pipeline(self, pipeline_name, pipeline_version):
"""Retrieve a specific pipeline from the table.
def get_pipelines_by_name(self, pipeline_name, pipeline_version=None):
"""Retrieve pipelines from the table matching name and optional version.

Args:
pipeline_name: The name of the pipeline.
pipeline_version: The version of the pipeline.
pipeline_version: The optional version of the pipeline.

Returns:
The retrieved pipeline item.
The retrieved pipeline item(s) in a list (which will be empty in the
case of no match) or None on error.
"""
key = {"pipeline_name": pipeline_name}
kce = Key("pipeline_name").eq(pipeline_name)

if pipeline_version is not None:
key["version"] = pipeline_version
return self.get_item(key)
kce = kce & Key("version").eq(pipeline_version)

return self.query_items(self.NAME_VER_GSI, kce)

def get_pipeline_by_id(self, pipeline_id):
def get_pipeline(self, pipeline_id):
"""Retrieve a specific pipeline from the table.

Args:
Expand Down
Loading