-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpubsub.go
More file actions
106 lines (93 loc) · 2.96 KB
/
pubsub.go
File metadata and controls
106 lines (93 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package pubsub
import (
"context"
"io"
"time"
)
// A Message is a pub/sub message.
type Message interface {
ID() string
Data() []byte
}
type basicMessage struct {
id string
data []byte
}
func (msg basicMessage) ID() string {
return msg.id
}
func (msg basicMessage) Data() []byte {
return msg.data
}
// NewMessage creates a new message.
func NewMessage(data []byte) Message {
return basicMessage{id: NewID(), data: data}
}
// A Queue is a pub/sub queue. It contains topics.
type Queue interface {
io.Closer
Topic(ctx context.Context, name string) (Topic, error)
}
// A Topic is a named, logical channel which messages can be published to.
type Topic interface {
// Publish publishes a message to the topic.
Publish(ctx context.Context, msg Message) error
// Subscribe creates (or resumes) a subscription to a topic. All messages
// published to the topic will be sent to all subscribers.
Subscribe(ctx context.Context, name string, opts ...SubscribeOption) (Subscription, error)
}
type (
// The SubscribeConfig customizes how subscription works.
SubscribeConfig struct {
RetryPolicy RetryPolicy
}
// A SubscribeOption customizes the SubscribeConfig.
SubscribeOption = func(config *SubscribeConfig)
// A SubscribeHandler is a function invoked for pub/sub messages sent to a topic.
SubscribeHandler = func(ctx context.Context, msg SubscriberMessage)
// A Subscription is a single subscription to a topic.
Subscription interface {
// Receive receives messages sent to the subscription. The given handler
// will be invoked for each message in a goroutine. To stop `Receive`
// cancel the context.
Receive(ctx context.Context, handler SubscribeHandler) error
}
// A SubscriberMessage is a pub sub message with Ack and Nack methods.
SubscriberMessage interface {
Message
// Ack signals completion of a message which will then be removed from the pub/sub topic.
Ack()
// Nack signals failure of message processing. The message will be retried according to
// the retry policy of the subscription.
Nack()
}
)
// WithMaxAttempts sets the max attempts in the subscribe config.
func WithMaxAttempts(maxAttempts int) SubscribeOption {
return func(cfg *SubscribeConfig) {
cfg.RetryPolicy.MaxAttempts = maxAttempts
}
}
// WithMinBackoff sets the min backoff in the subscribe config.
func WithMinBackoff(minBackoff time.Duration) SubscribeOption {
return func(cfg *SubscribeConfig) {
cfg.RetryPolicy.MinBackoff = minBackoff
}
}
// WithMaxBackoff sets the max backoff in the subscribe config.
func WithMaxBackoff(maxBackoff time.Duration) SubscribeOption {
return func(cfg *SubscribeConfig) {
cfg.RetryPolicy.MaxBackoff = maxBackoff
}
}
// NewSubscribeConfig creates a new SubscribeConfig from the given options.
func NewSubscribeConfig(opts ...SubscribeOption) *SubscribeConfig {
cfg := new(SubscribeConfig)
WithMaxAttempts(5)(cfg)
WithMinBackoff(time.Second * 10)(cfg)
WithMaxBackoff(time.Second * 600)(cfg)
for _, opt := range opts {
opt(cfg)
}
return cfg
}