@@ -2,28 +2,42 @@ import { OutgoingQueueAdapter, QueueMessage } from "@mqueue/queue";
22import { MessageAttributeValue } from "@aws-sdk/client-sqs" ;
33import * as AWS from "@aws-sdk/client-sqs" ;
44
5+ export type SQSOutgoingQueueBeforeSendHook = (
6+ request : AWS . SendMessageRequest ,
7+ ) => Promise < AWS . SendMessageRequest > | AWS . SendMessageRequest ;
8+
59export interface SQSOutgoingQueueConnectOptions {
610 sdk ?: typeof AWS ;
711 clientConfig ?: AWS . SQSClientConfig ;
12+ beforeSend ?: SQSOutgoingQueueBeforeSendHook ;
813}
914
1015export default class SQSOutgoingQueue implements OutgoingQueueAdapter {
1116 public type = "sqs" ;
1217
18+ protected _beforeSend ?: SQSOutgoingQueueBeforeSendHook ;
19+
1320 constructor (
1421 public client : AWS . SQS ,
1522 protected _queueURL : string ,
16- ) { }
23+ { beforeSend } : { beforeSend ?: SQSOutgoingQueueBeforeSendHook } = { } ,
24+ ) {
25+ this . _beforeSend = beforeSend ;
26+ }
1727
1828 public static async connect (
1929 url : string ,
20- { clientConfig, sdk = AWS } : SQSOutgoingQueueConnectOptions = { } ,
30+ {
31+ clientConfig,
32+ sdk = AWS ,
33+ beforeSend,
34+ } : SQSOutgoingQueueConnectOptions = { } ,
2135 ) {
2236 const connection = new sdk . SQS ( {
2337 ...clientConfig ,
2438 } ) ;
2539
26- return new this ( connection , url ) ;
40+ return new this ( connection , url , { beforeSend } ) ;
2741 }
2842
2943 public async healthcheck ( ) {
@@ -53,10 +67,16 @@ export default class SQSOutgoingQueue implements OutgoingQueueAdapter {
5367 } ;
5468 }
5569
56- await this . client . sendMessage ( {
70+ let options : AWS . SendMessageRequest = {
5771 QueueUrl : this . _queueURL ,
5872 MessageBody : message . body . toString ( ) ,
5973 MessageAttributes : messageAttributes ,
60- } ) ;
74+ } ;
75+
76+ if ( this . _beforeSend ) {
77+ options = await this . _beforeSend ( options ) ;
78+ }
79+
80+ await this . client . sendMessage ( options ) ;
6181 }
6282}
0 commit comments