Skip to content

Commit 2de95da

Browse files
authored
(+semver: feature) Add AMQP exchange subscription and publishing (#16)
* (+semver: feature) Add AMQP exchange subscription and publishing * (+semver: fix) Per feedback, moved counter inside of worker func
1 parent 26a42f5 commit 2de95da

9 files changed

Lines changed: 662 additions & 1 deletion

.vscode/settings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
{
2-
"go.testTimeout": "120s"
2+
"go.testTimeout": "120s",
3+
"cSpell.words": ["amqp"]
34
}

amqp/exchangePublisher.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package amqp
2+
3+
import (
4+
"github.com/pkg/errors"
5+
"github.com/streadway/amqp"
6+
)
7+
8+
// ExchangePublisher is a service for publishing messages to an exchange
9+
type ExchangePublisher struct {
10+
amqpURL string
11+
12+
connection *amqp.Connection
13+
}
14+
15+
// NewExchangePublisher creates a Publisher
16+
func NewExchangePublisher(amqpURL string) *ExchangePublisher {
17+
return &ExchangePublisher{
18+
amqpURL: amqpURL,
19+
}
20+
}
21+
22+
// EnsurePublisherIsReady ensures that the publisher is ready to send messages
23+
func (p *ExchangePublisher) EnsurePublisherIsReady() error {
24+
var err error
25+
p.connection, err = amqp.Dial(p.amqpURL)
26+
if err != nil {
27+
return errors.Wrap(err, "failed to connect to broker")
28+
}
29+
30+
return nil
31+
}
32+
33+
// Publish publishes a message to the given exchange
34+
func (p *ExchangePublisher) Publish(exchangeName string, headers map[string]string, body []byte) error {
35+
channel, err := p.connection.Channel()
36+
if err != nil {
37+
return errors.Wrap(err, "failed to open channel to broker")
38+
}
39+
defer channel.Close()
40+
41+
headersTable := make(amqp.Table)
42+
for k, v := range headers {
43+
headersTable[k] = v
44+
}
45+
err = channel.Publish(exchangeName, "", false, false, amqp.Publishing{
46+
Headers: headersTable,
47+
Body: body,
48+
})
49+
if err != nil {
50+
return errors.Wrap(err, "failed to publish message")
51+
}
52+
53+
return nil
54+
}

amqp/exchangePublisher_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package amqp_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/streadway/amqp"
10+
"github.com/stretchr/testify/assert"
11+
sut "github.com/syncromatics/go-kit/v2/amqp"
12+
)
13+
14+
func Test_EnsurePublisherIsReady_Successful(t *testing.T) {
15+
// Arrange
16+
conn, err := amqp.Dial(amqpURL)
17+
assert.Nil(t, err)
18+
defer conn.Close()
19+
20+
channel, err := conn.Channel()
21+
assert.Nil(t, err)
22+
defer channel.Close()
23+
24+
publisher := sut.NewExchangePublisher(amqpURL)
25+
26+
// Act
27+
err = publisher.EnsurePublisherIsReady()
28+
29+
// Assert
30+
assert.Nil(t, err)
31+
}
32+
33+
func Test_EnsurePublisherIsReady_BadURL(t *testing.T) {
34+
// Arrange
35+
publisher := sut.NewExchangePublisher("amqp://localhost:80")
36+
37+
// Act
38+
err := publisher.EnsurePublisherIsReady()
39+
40+
// Assert
41+
assert.Error(t, err)
42+
}
43+
44+
func Test_Publish_Successful(t *testing.T) {
45+
// Arrange
46+
conn, err := amqp.Dial(amqpURL)
47+
assert.Nil(t, err)
48+
defer conn.Close()
49+
50+
channel, err := conn.Channel()
51+
assert.Nil(t, err)
52+
defer channel.Close()
53+
54+
testQueueName := fmt.Sprintf("test.%v", uuid.NewString())
55+
_, err = channel.QueueDeclare(testQueueName, false, true, true, false, nil)
56+
assert.Nil(t, err)
57+
58+
err = channel.QueueBind(testQueueName, "#", EXCHANGE_NAME, false, nil)
59+
assert.Nil(t, err)
60+
61+
actualMessages, err := channel.Consume(testQueueName, uuid.NewString(), true, true, false, false, nil)
62+
assert.Nil(t, err)
63+
64+
publisher := sut.NewExchangePublisher(amqpURL)
65+
66+
err = publisher.EnsurePublisherIsReady()
67+
assert.Nil(t, err)
68+
69+
expectedHeaders := amqp.Table{
70+
"messageType": "Position",
71+
}
72+
expectedBody := []byte(`{"VehicleId":1}`)
73+
74+
// Act
75+
err = publisher.Publish(EXCHANGE_NAME, map[string]string{
76+
"messageType": "Position",
77+
}, expectedBody)
78+
79+
// Assert
80+
assert.Nil(t, err)
81+
82+
select {
83+
case actual := <-actualMessages:
84+
assert.Equal(t, expectedHeaders, actual.Headers)
85+
assert.Equal(t, expectedBody, actual.Body)
86+
case <-time.After(3 * time.Second):
87+
assert.Fail(t, "expected to receive message within a timely manner")
88+
}
89+
90+
}

amqp/exchangeSubscription.go

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package amqp
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
uuid "github.com/google/uuid"
8+
"github.com/pkg/errors"
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promauto"
11+
"github.com/streadway/amqp"
12+
"github.com/syncromatics/go-kit/v2/log"
13+
)
14+
15+
// ExchangeSubscription is a service for subscribing to an AMQP exchange
16+
type ExchangeSubscription struct {
17+
amqpURL string
18+
queueName string
19+
exchangeName string
20+
21+
connection *amqp.Connection
22+
23+
activeConsumers prometheus.Gauge
24+
messagesConsumed prometheus.Counter
25+
messagesAcked prometheus.Counter
26+
messagesNacked prometheus.Counter
27+
messagesRejected prometheus.Counter
28+
}
29+
30+
// NewExchangeSubscription creates a new ExchangeSubscription
31+
func NewExchangeSubscription(amqpURL string, exchangeName string) *ExchangeSubscription {
32+
queueName := fmt.Sprintf("%s.%s", exchangeName, uuid.New())
33+
34+
labels := prometheus.Labels{
35+
"amqp_queue": queueName,
36+
"amqp_exchange": exchangeName,
37+
}
38+
39+
activeConsumers := promauto.NewGauge(prometheus.GaugeOpts{
40+
Name: "amqp_consumers_total",
41+
Help: "The total number of consumers connected to the queue that is subscribed to the exchange",
42+
ConstLabels: labels,
43+
})
44+
45+
messagesConsumed := promauto.NewCounter(prometheus.CounterOpts{
46+
Name: "amqp_messages_recv_total",
47+
Help: "The total number of received messages",
48+
ConstLabels: labels,
49+
})
50+
51+
messagesAcked := promauto.NewCounter(prometheus.CounterOpts{
52+
Name: "amqp_messages_ack_total",
53+
Help: "The total number of acknowledged messages",
54+
ConstLabels: labels,
55+
})
56+
57+
messagesNacked := promauto.NewCounter(prometheus.CounterOpts{
58+
Name: "amqp_messages_nack_total",
59+
Help: "The total number of negatively acknowledged messages",
60+
ConstLabels: labels,
61+
})
62+
63+
messagesRejected := promauto.NewCounter(prometheus.CounterOpts{
64+
Name: "amqp_messages_reject_total",
65+
Help: "The total number of rejected messages",
66+
ConstLabels: labels,
67+
})
68+
69+
return &ExchangeSubscription{
70+
amqpURL: amqpURL,
71+
queueName: queueName,
72+
exchangeName: exchangeName,
73+
74+
activeConsumers: activeConsumers,
75+
messagesConsumed: messagesConsumed,
76+
messagesAcked: messagesAcked,
77+
messagesNacked: messagesNacked,
78+
messagesRejected: messagesRejected,
79+
}
80+
}
81+
82+
// EnsureExchangeSubscriptionIsReady ensures that the necessary transient queue exists and is bound to the exchange
83+
func (es *ExchangeSubscription) EnsureExchangeSubscriptionIsReady() error {
84+
var err error
85+
es.connection, err = amqp.Dial(es.amqpURL)
86+
if err != nil {
87+
return errors.Wrap(err, "failed to connect to broker")
88+
}
89+
90+
channel, err := es.connection.Channel()
91+
if err != nil {
92+
return errors.Wrap(err, "failed to open channel to broker")
93+
}
94+
defer channel.Close()
95+
96+
_, err = channel.QueueDeclare(es.queueName, false, true, true, false, nil)
97+
if err != nil {
98+
return errors.Wrap(err, "failed to declare queue")
99+
}
100+
101+
err = channel.QueueBind(es.queueName, "#", es.exchangeName, false, nil)
102+
if err != nil {
103+
return errors.Wrap(err, "failed to bind queue to exchange")
104+
}
105+
106+
return nil
107+
}
108+
109+
// Message represents a message in-flight from an AMQP broker
110+
type Message struct {
111+
// Headers are the collection of metadata passed along with the Body
112+
Headers map[string]interface{}
113+
// Body is the unmodified byte array containing the message
114+
Body []byte
115+
// Ack acknowledges the successful processing of the message
116+
Ack func() error
117+
// Nack acknowledges the failed processing of the message and instructs the message to be requeued
118+
Nack func() error
119+
}
120+
121+
// Consume starts consuming messages
122+
//
123+
// Any messages that are not explicitly Acked or Nacked by this consumer before the connection is terminated will be automatically requeued.
124+
func (es *ExchangeSubscription) Consume(outerCtx context.Context) (<-chan *Message, error) {
125+
channel, err := es.connection.Channel()
126+
if err != nil {
127+
return nil, errors.Wrap(err, "failed to open channel for consumer")
128+
}
129+
130+
consumer := fmt.Sprintf("%s.consumer", es.queueName)
131+
rawMessages, err := channel.Consume(es.queueName, consumer, false, true, false, false, nil)
132+
if err != nil {
133+
return nil, errors.Wrap(err, "failed to start consuming messages from queue")
134+
}
135+
136+
ctx, cancel := context.WithCancel(outerCtx)
137+
messages := make(chan *Message)
138+
go func() {
139+
es.activeConsumers.Inc()
140+
defer es.activeConsumers.Dec()
141+
for {
142+
select {
143+
case msg, ok := <-rawMessages:
144+
if !ok {
145+
cancel()
146+
continue
147+
}
148+
149+
es.messagesConsumed.Inc()
150+
151+
message := &Message{
152+
Headers: msg.Headers,
153+
Body: msg.Body,
154+
Ack: func() error {
155+
es.messagesAcked.Inc()
156+
return msg.Ack(false)
157+
},
158+
Nack: func() error {
159+
es.messagesNacked.Inc()
160+
return msg.Nack(false, true)
161+
},
162+
}
163+
164+
select {
165+
case messages <- message:
166+
case <-ctx.Done():
167+
err = message.Nack()
168+
if err != nil {
169+
log.Warn("failed to nack in-flight message",
170+
"err", err,
171+
"consumer", consumer,
172+
)
173+
}
174+
}
175+
case <-ctx.Done():
176+
close(messages)
177+
178+
err := channel.Cancel(consumer, false)
179+
if err != nil {
180+
log.Error("failed to cancel consumer",
181+
"err", err,
182+
"consumer", consumer,
183+
)
184+
}
185+
186+
err = channel.Close()
187+
if err != nil {
188+
log.Error("failed to close channel for consumer",
189+
"err", err,
190+
"consumer", consumer,
191+
)
192+
}
193+
return
194+
}
195+
}
196+
}()
197+
198+
return messages, nil
199+
}
200+
201+
// ExchangeName is the name of the exchange to which this is subscribed
202+
func (es *ExchangeSubscription) ExchangeName() string {
203+
return es.exchangeName
204+
}

0 commit comments

Comments
 (0)