Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dynamodb-streams-lambda-sns/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
*.js
!jest.config.js
*.d.ts
node_modules

# CDK asset staging directory
.cdk.staging
cdk.out
.DS_Store
6 changes: 6 additions & 0 deletions dynamodb-streams-lambda-sns/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*.ts
!*.d.ts

# CDK asset staging directory
.cdk.staging
cdk.out
108 changes: 108 additions & 0 deletions dynamodb-streams-lambda-sns/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# DynamoDB Stream Integration with Lambda and SNS

## Overview

This repository provides both L2 and L3 constructs example usage for working with DynamoDB streams [AWS Cloud Development Kit (CDK)](https://aws.amazon.com/cdk/) with TypeScript. It showcases the integration of DynamoDB streams with AWS Lambda and Amazon SNS (Simple Notification Service), providing an example of real-time data processing and notification workflows.

This solution demonstrates a use case for real-time notifications: alerting users about low inventory of an item in the system.

## Architecture Diagram

![Architecture Diagram](images/architecture.jpg)

## Features

- L2 (low-level) construct for fine-grained control over DynamoDB streams
- [L3 (high-level)](https://docs.aws.amazon.com/solutions/latest/constructs/aws_dynamodbstreams_lambda.html) construct for simplified, best-practice implementations of DynamoDB streams
- Integration with Lambda functions for stream processing
- Implements an SQS Dead Letter Queue (DLQ) for the Lambda function failure handling
- Shows how to use Amazon SNS to distribute stream processing results or notifications.


## Build, Deploy and Testing

### Prerequisites

Before you begin, ensure you have met the following requirements:

* You have installed the latest version of [Node.js and npm](https://nodejs.org/en/download/)
* You have installed the [AWS CLI](https://aws.amazon.com/cli/) and configured it with your credentials
* You have installed the [AWS CDK Toolkit](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) globally
* You have an AWS account and have set up your [AWS credentials](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)
* You have [bootstrapped your AWS account](https://docs.aws.amazon.com/cdk/latest/guide/bootstrapping.html) for CDK



### Build
To build this app, you need to be in this example's root folder. Then run the following:

```bash
npm install
npm run build
```

This will install the dependencies for this example.

### Deploy

Run `cdk deploy`. This will deploy the Stack to your AWS Account.

Post deployment, you should see the table arn, lambda function arn and sns topic arn on the output of your CLI.

## Testing
```bash
npm run test
```

## Usage

### Configuring SNS Notification Subscription

1. After deploying the stack, locate the SNS topic Amazon Resource Name (ARN) from the CLI output.

2. To subscribe an email address to the SNS topic:

```bash
aws sns subscribe --topic-arn <your-sns-topic-arn> --protocol email --notification-endpoint your-email@example.com
```
Replace <your-sns-topic-arn> with the actual ARN of your SNS topic, and your-email@example.com with the email address you want to subscribe.

3. Check your email inbox for a confirmation message from AWS. Click the link in the email to confirm your subscription.

### Creating an Item in DynamoDB with id, itemName, and count
To trigger the stream processing and email notification, you need to create an item in your DynamoDB table with the fields id, itemName, and count. You can do this using the AWS CLI or AWS Management Console.

Example item.json provided in this repo:
```bash
{
"id": {
"S": "1"
},
"count": {
"N": "10"
},
"itemName": {
"S": "Coffee Beans"
}
}
```

1. Use the following command to put the item into your DynamoDB table:

```bash
aws dynamodb put-item --table-name <your-table-name> --item file://item.json
```

Replace <your-table-name> with the actual name of your DynamoDB table.

2. Whenever you update the count field of the item to 0, the DynamoDB stream will trigger the Lambda function, which will process the data and send a notification to the subscribed email address via SNS.



## Cleanup

To avoid incurring future charges, please destroy the resources when they are no longer needed:

```bash
cdk destroy
```
6 changes: 6 additions & 0 deletions dynamodb-streams-lambda-sns/bin/ddb-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env node
import * as cdk from 'aws-cdk-lib';
import { DdbStreamStack } from '../lib/ddb-stream-stack';

const app = new cdk.App();
new DdbStreamStack(app, 'DdbStreamStack', {});
72 changes: 72 additions & 0 deletions dynamodb-streams-lambda-sns/cdk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{
"app": "npx ts-node --prefer-ts-exts bin/ddb-stream.ts",
"watch": {
"include": [
"**"
],
"exclude": [
"README.md",
"cdk*.json",
"**/*.d.ts",
"**/*.js",
"tsconfig.json",
"package*.json",
"yarn.lock",
"node_modules",
"test"
]
},
"context": {
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
"@aws-cdk/core:checkSecretUsage": true,
"@aws-cdk/core:target-partitions": [
"aws",
"aws-cn"
],
"@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true,
"@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true,
"@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true,
"@aws-cdk/aws-iam:minimizePolicies": true,
"@aws-cdk/core:validateSnapshotRemovalPolicy": true,
"@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true,
"@aws-cdk/aws-s3:createDefaultLoggingPolicy": true,
"@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true,
"@aws-cdk/aws-apigateway:disableCloudWatchRole": true,
"@aws-cdk/core:enablePartitionLiterals": true,
"@aws-cdk/aws-events:eventsTargetQueueSameAccount": true,
"@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true,
"@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true,
"@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true,
"@aws-cdk/aws-route53-patters:useCertificate": true,
"@aws-cdk/customresources:installLatestAwsSdkDefault": false,
"@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true,
"@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true,
"@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true,
"@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true,
"@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true,
"@aws-cdk/aws-redshift:columnId": true,
"@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true,
"@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true,
"@aws-cdk/aws-apigateway:requestValidatorUniqueId": true,
"@aws-cdk/aws-kms:aliasNameRef": true,
"@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true,
"@aws-cdk/core:includePrefixInUniqueNameGeneration": true,
"@aws-cdk/aws-efs:denyAnonymousAccess": true,
"@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true,
"@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true,
"@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true,
"@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true,
"@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true,
"@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true,
"@aws-cdk/aws-codepipeline-actions:useNewDefaultBranchForCodeCommitSource": true,
"@aws-cdk/aws-cloudwatch-actions:changeLambdaPermissionLogicalIdForLambdaAction": true,
"@aws-cdk/aws-codepipeline:crossAccountKeysDefaultValueToFalse": true,
"@aws-cdk/aws-codepipeline:defaultPipelineTypeToV2": true,
"@aws-cdk/aws-kms:reduceCrossAccountRegionPolicyScope": true,
"@aws-cdk/aws-eks:nodegroupNameAttribute": true,
"@aws-cdk/aws-ec2:ebsDefaultGp3Volume": true,
"@aws-cdk/aws-ecs:removeDefaultDeploymentAlarm": true,
"@aws-cdk/custom-resources:logApiResponseDataPropertyTrueDefault": false,
"@aws-cdk/aws-s3:keepNotificationInImportedBucket": false
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 11 additions & 0 deletions dynamodb-streams-lambda-sns/item.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"id": {
"S": "1"
},
"count": {
"N": "10"
},
"itemName": {
"S": "Coffee Beans"
}
}
9 changes: 9 additions & 0 deletions dynamodb-streams-lambda-sns/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module.exports = {
testEnvironment: 'node',
roots: ['<rootDir>/test'],
testMatch: ['**/*.test.ts'],
transform: {
'^.+\\.tsx?$': 'ts-jest'
},
silent: true
};
93 changes: 93 additions & 0 deletions dynamodb-streams-lambda-sns/lib/ddb-stream-stack.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';
import { DynamoDBStreamsToLambda } from '@aws-solutions-constructs/aws-dynamodbstreams-lambda';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as kms from 'aws-cdk-lib/aws-kms';
import * as sqs from 'aws-cdk-lib/aws-sqs';

export class DdbStreamStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const aws_sns_kms_key = kms.Alias.fromAliasName(
this,
"aws-managed-sns-kms-key",
"alias/aws/sns",
)

const snsTopic = new sns.Topic(this, 'ddb-stream-topic', {
topicName: 'ddb-stream-topic',
displayName: 'SNS Topic for DDB streams',
enforceSSL: true,
masterKey: aws_sns_kms_key,
});

//L2 CDK Construct
const deadLetterQueueL2 = new sqs.Queue(this, 'ddb-stream-l2-dlq', {
queueName: 'ddb-stream-l2-dlq',
encryption: sqs.QueueEncryption.KMS_MANAGED,
retentionPeriod: cdk.Duration.days(4), // Adjust retention period as needed
});

const itemL2Table = new dynamodb.Table(this, 'itemL2Table', {
partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
encryption: dynamodb.TableEncryption.AWS_MANAGED,
//If you wish to retain the table after running cdk destroy, comment out the line below
removalPolicy: cdk.RemovalPolicy.DESTROY
});

const itemL2TableLambdaFunction = new lambda.Function(this, 'itemL2TableLambdaFunction', {
runtime: lambda.Runtime.NODEJS_24_X,
handler: 'index.handler',
tracing: lambda.Tracing.ACTIVE,
code: lambda.Code.fromAsset('resources/lambda'),
environment: {
SNS_TOPIC_ARN: snsTopic.topicArn,
AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1'
},
});
itemL2TableLambdaFunction.addEventSource(new lambdaEventSources.DynamoEventSource(itemL2Table, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
onFailure: new lambdaEventSources.SqsDlq(deadLetterQueueL2),
bisectBatchOnError: true,
maxRecordAge: cdk.Duration.hours(24),
retryAttempts: 500,
}));

deadLetterQueueL2.grantSendMessages(itemL2TableLambdaFunction);

itemL2Table.grantStreamRead(itemL2TableLambdaFunction);

//L3 CDK Construct
const itemL3Table = new DynamoDBStreamsToLambda(this, 'itemL3Table', {
dynamoTableProps: {
partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
//If you wish to retain the table after running cdk destroy, comment out the line below
removalPolicy: cdk.RemovalPolicy.DESTROY
},
lambdaFunctionProps: {
code: lambda.Code.fromAsset('resources/lambda'),
runtime: lambda.Runtime.NODEJS_24_X,
handler: 'index.handler',
environment: {
SNS_TOPIC_ARN: snsTopic.topicArn,
},
},
});

snsTopic.grantPublish(itemL2TableLambdaFunction);
snsTopic.grantPublish(itemL3Table.lambdaFunction);

new cdk.CfnOutput(this, 'itemL2TableLambdaFunctionArn', { value: itemL2TableLambdaFunction.functionArn });
new cdk.CfnOutput(this, 'itemL3TableLambdaFunctionArn', { value: itemL3Table.lambdaFunction.functionArn });
new cdk.CfnOutput(this, 'l3TableArn', { value: itemL3Table.dynamoTableInterface.tableArn });
new cdk.CfnOutput(this, 'l2TableArn', { value: itemL2Table.tableArn });
new cdk.CfnOutput(this, 'topicArn', { value: snsTopic.topicArn });
new cdk.CfnOutput(this, 'l2DLQArn', { value: deadLetterQueueL2.queueArn })
}
}
27 changes: 27 additions & 0 deletions dynamodb-streams-lambda-sns/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"name": "ddb-stream-lambda-sns",
"version": "0.1.0",
"bin": {
"ddb-stream": "bin/ddb-stream.js"
},
"scripts": {
"build": "tsc",
"watch": "tsc -w",
"test": "jest",
"cdk": "cdk"
},
"devDependencies": {
"@types/jest": "^30.0.0",
"@types/node": "25.0.3",
"aws-cdk": "2.1100.1",
"jest": "^30.2.0",
"ts-jest": "^29.4.6",
"ts-node": "^10.9.2",
"typescript": "~5.9.3"
},
"dependencies": {
"@aws-solutions-constructs/aws-dynamodbstreams-lambda": "^2.96.0",
"aws-cdk-lib": "2.233.0",
"constructs": "^10.4.4"
}
}
44 changes: 44 additions & 0 deletions dynamodb-streams-lambda-sns/resources/lambda/index.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { SNSClient, PublishCommand } from "@aws-sdk/client-sns";

const snsClient = new SNSClient();

/**
* Lambda function handler to monitor DynamoDB stream events for inventory changes
* Sends email notifications when an item's count reaches zero
* @param {Object} event - DynamoDB Stream event
* @returns {Object} - Status of the execution
*/

export const handler = async (event) => {
try {
for (const record of event.Records) {
// Only process MODIFY events
if (record.eventName === "MODIFY") {
const newImage = record.dynamodb.NewImage;
const oldImage = record.dynamodb.OldImage;

const newCount = newImage.count ? parseInt(newImage.count.N) : null;
const oldCount = oldImage?.count ? parseInt(oldImage.count.N) : null;

// Check if count changed to 0 from a non-zero value
if (newCount === 0 && oldCount > 0) {
const itemName = newImage.itemName ? newImage.itemName.S : "Unknown item";

const params = {
Message: `Alert: ${itemName} has reached zero inventory! Previous count was ${oldCount}.`,
Subject: `Stock Alert - ${itemName} Out of Stock`,
TopicArn: process.env.SNS_TOPIC_ARN
};

await snsClient.send(new PublishCommand(params));

console.log(`Notification sent for ${itemName} - count dropped to 0 from ${oldCount}`);
}
}
}
return { statusCode: 200, body: 'Processing complete.' };
} catch (error) {
console.error('Error processing records:', error);
throw error;
}
};
Loading