Conversation
|
Caution Review failedThe pull request is closed. WalkthroughAdds Subscribe, Unsubscribe, and Validate to the MQTT adapter; enforces non-nil callback on Publish and validates topic/QoS for publish/subscribe/unsubscribe; runs callbacks asynchronously with panic recovery and logging; introduces three new public errors, a CLI Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Caller
participant A as MQTT Adapter
participant M as MQTT Client
Note over A: Publish (validate + async callback)
C->>A: Publish(topic,qos,retained,payload,cb)
A->>A: Validate(topic,qos)
alt validation or cb error
A-->>C: error (ErrNilCallback / validation error)
else
A->>M: Publish(topic,payload,qos,retained)
M-->>A: Token (wait)
alt publish error
A-->>C: er.ErrPublishFailed (Raw=...)
else
A->>A: spawn goroutine -> cb(string(payload))
A-->>C: nil
end
end
sequenceDiagram
autonumber
participant C as Caller
participant A as MQTT Adapter
participant M as MQTT Client
participant H as Handler Goroutine
Note over A: Subscribe (validate, register handler, panic-safe callback)
C->>A: Subscribe(topic,qos,retained,cb)
A->>A: Validate(topic,qos)
alt validation or cb error
A-->>C: error (ErrNilCallback / validation error)
else
A->>M: Subscribe(topic,qos,handler)
M-->>A: Token (wait)
alt subscribe error
A-->>C: er.ErrSubscribeFailed
else
A-->>C: nil
M-->>A: Message(topic,payload)
A->>H: spawn goroutine -> try { cb(string(payload)) } catch { log recover }
end
end
Note over A: Unsubscribe
C->>A: Unsubscribe(topic)
A->>M: Unsubscribe(topic)
M-->>A: Token (wait)
alt error
A-->>C: er.ErrUnsubscribeFailed
else
A-->>C: nil
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/mqtt/mqtt.go (1)
42-51: Use WaitTimeout on MQTT tokens to avoid indefinite hangs.Blocking Wait() can hang forever on broker/network issues. Apply timeouts and surface a timeout as Raw (e.g., context.DeadlineExceeded). Consider making the duration configurable later.
@@ -import ( - "fmt" +import ( + "context" + "fmt" "sync" "time" @@ func (a *Adapter) Connect() error { - if token := a.client.Connect(); token.Wait() && token.Error() != nil { + const opTimeout = 10 * time.Second + if token := a.client.Connect(); !token.WaitTimeout(opTimeout) { + return &er.Error{Package: "MQTT", Func: "Connect", Message: er.ErrMqttConnectionFailed, Raw: context.DeadlineExceeded} + } else if token.Error() != nil { tErr := token.Error() return &er.Error{ Package: "MQTT", Func: "Connect", Message: er.ErrMqttConnectionFailed, Raw: tErr, } } @@ - token := a.client.Publish(topic, qos, retained, payload) - token.Wait() + token := a.client.Publish(topic, qos, retained, payload) + if !token.WaitTimeout(opTimeout) { + return &er.Error{Package: "MQTT", Func: "Publish", Message: er.ErrPublishFailed, Raw: context.DeadlineExceeded} + } @@ - token := a.client.Unsubscribe(topic) - token.Wait() + token := a.client.Unsubscribe(topic) + if !token.WaitTimeout(opTimeout) { + return &er.Error{Package: "MQTT", Func: "Unsubscribe", Message: er.ErrUnsubscribeFailed, Raw: context.DeadlineExceeded} + } @@ - token.Wait() + if !token.WaitTimeout(opTimeout) { + return &er.Error{Package: "MQTT", Func: "Subscribe", Message: er.ErrSubscribeFailed, Raw: context.DeadlineExceeded} + }Also applies to: 69-70, 96-97, 140-141
🧹 Nitpick comments (3)
internal/mqtt/mqtt.go (3)
56-62: Publish: avoid hard-requiring a callback; default to no-op.For Publish, making the callback mandatory adds friction without safety gains. Default to a no-op instead of returning ErrNilCallback.
func (a *Adapter) Publish(topic string, qos byte, retained bool, payload any, callback func()) error { - if callback == nil { - return &er.Error{ - Package: "MQTT", - Func: "Publish", - Message: er.ErrNilCallback, - } - } + if callback == nil { + callback = func() {} + }
130-134: Improve panic log context with actual message topic.Log the message’s topic too; the subscription pattern may differ (wildcards).
- logger.Error("panic in subscription callback", - logger.Any("recover", r), - logger.String("topic", topic), - ) + logger.Error("panic in subscription callback", + logger.Any("recover", r), + logger.String("sub_topic", topic), + logger.String("msg_topic", msg.Topic()), + )
80-85: Verify WaitGroup usage can’t race with Disconnect.Additions to the WaitGroup occur inside async handlers. Ensure Disconnect (which calls wg.Wait()) can’t run concurrently with new wg.Add(1) calls. If this can occur in your lifecycle, gate callback spawning during shutdown (e.g., atomic “closing” flag set before Disconnect and checked here) or Unsubscribe before Disconnect to quiesce handlers.
I can provide a guarded pattern (atomic closing + gating Add) if needed.
Also applies to: 126-137
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal/mqtt/mqtt.go(2 hunks)pkg/er/er.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/mqtt/mqtt.go (2)
pkg/er/er.go (7)
Error(27-32)ErrNilCallback(24-24)ErrPublishFailed(21-21)ErrUnsubscribeFailed(23-23)ErrSubscribeFailed(22-22)ErrEmptyTopic(19-19)ErrInvalidQoS(14-14)pkg/logger/logger.go (2)
Any(293-295)String(273-275)
🔇 Additional comments (3)
pkg/er/er.go (1)
22-24: New error vars are appropriate and consistent.Good additions; they align with the adapter’s new behaviors.
internal/mqtt/mqtt.go (2)
120-123: Minor: early-return propagation is good.Input validation via Validate before side effects is clean and consistent.
110-112: Fix Subscribe doc; remove misleading “retained” reference// Subscribe subscribes to the specified topic at the given QoS.
// Note: MQTT “retain” is a publish‐time option; subscribers automatically receive any retained message for a topic.-// Subscribe subscribes to the specified topic with the given QoS level and retention flag +// Subscribe subscribes to the specified topic at the given QoS. +// Note: MQTT “retain” is a publish‐time option; subscribers automatically receive any retained message for a topic. func (a *Adapter) Subscribe(topic string, qos byte, retained bool, callback func(payload string)) error {If you’d like to drop the unused retained parameter now, I can prepare a follow-up diff and call-site update script.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
internal/mqtt/mqtt.go (1)
80-92: Nice: publish callback now has panic recovery; add context to the log.
Include topic/qos/retained to aid triage.Apply this diff:
defer func() { if r := recover(); r != nil { logger.Error("panic in publish callback", logger.Any("recover", r), + logger.String("topic", topic), + logger.Any("qos", qos), + logger.Bool("retained", retained), ) } }()
🧹 Nitpick comments (2)
internal/mqtt/mqtt.go (2)
131-146: Log the actual message topic (not just the subscription filter) and note retained messages.
Improves usefulness when subscribing with wildcards.Apply this diff:
defer func() { if r := recover(); r != nil { logger.Error("panic in subscription callback", logger.Any("recover", r), - logger.String("topic", topic), + logger.String("topic", msg.Topic()), + logger.Bool("retained", msg.Retained()), ) } }()
117-119: Remove unusedretainedparameter fromAdapter.Subscribe
retainedis never referenced in the method body and is misleading for MQTT v3.1.1 subscriptions. Apply:-file: internal/mqtt/mqtt.go -// Subscribe subscribes to the specified topic with the given QoS level and retention flag -func (a *Adapter) Subscribe(topic string, qos byte, retained bool, callback func(payload string)) error { +// Subscribe subscribes to the specified topic with the given QoS level. +func (a *Adapter) Subscribe(topic string, qos byte, callback func(payload string)) error {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/mqtt/mqtt.go(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/mqtt/mqtt.go (2)
pkg/er/er.go (7)
Error(27-32)ErrNilCallback(24-24)ErrPublishFailed(21-21)ErrUnsubscribeFailed(23-23)ErrSubscribeFailed(22-22)ErrEmptyTopic(19-19)ErrInvalidQoS(14-14)pkg/logger/logger.go (2)
Any(293-295)String(273-275)
🔇 Additional comments (3)
internal/mqtt/mqtt.go (3)
11-11: Import of logger is appropriate and used correctly.
56-66: Good: enforce non-nil callback and validate inputs up front.
This prevents NPEs and bad publish requests early.
162-176: Validate looks correct.
Covers empty topic and QoS range 0–2.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (8)
pkg/logger/logger.go (1)
252-263: Separate “subscribe event” vs “message received” logging; minor attr cleanup
- The method name and message (“Subscribed”) suggest a one-time subscription event, not per-message receipt. In usage (internal/bench/sub.go), it’s invoked for each payload, which can be misleading and noisy. Consider keeping LogSubscribe for the successful SUBSCRIBE event and adding a dedicated LogMessageReceived to log inbound messages (topic, qos, payload_len, maybe retained).
- Small consistency nit: reuse the existing ClientID helper.
Apply this diff for the attr nit:
func (l *Logger) LogSubscribe(clientID, topic string, qos int, attrs ...slog.Attr) { - baseAttrs := []slog.Attr{ - slog.String("client_id", clientID), - slog.String("topic", topic), - slog.Int("qos", qos), - } + baseAttrs := []slog.Attr{ + ClientID(clientID), + slog.String("topic", topic), + slog.Int("qos", qos), + } baseAttrs = append(baseAttrs, attrs...) l.LogAttrs(context.Background(), slog.LevelInfo, "Subscribed", baseAttrs...) }Optionally add:
// LogMessageReceived logs an inbound message event func (l *Logger) LogMessageReceived(clientID, topic string, qos int, payloadLen int, attrs ...slog.Attr) { base := []slog.Attr{ ClientID(clientID), slog.String("topic", topic), slog.Int("qos", qos), slog.Int("payload_len", payloadLen), } base = append(base, attrs...) l.LogAttrs(context.Background(), slog.LevelInfo, "MessageReceived", base...) }cmd/sub.go (2)
27-31: Prefer signal.NotifyContext over os.Exit in a goroutine; enable graceful shutdownUsing os.Exit inside a goroutine skips defers and can drop final summaries. Create a context with signal.NotifyContext and plumb it into the benchmark so b.Subscribe can exit early and perform cleanup.
Apply this diff here:
import ( + "context" "os" "os/signal" "syscall" @@ - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - defer signal.Stop(sigs) + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() @@ - go func() { - <-sigs - logger.Info("Received shutdown signal", logger.State("interrupted")) - os.Exit(0) - }() - - b.Subscribe() + // Requires: change signature to Subscribe(ctx context.Context) + b.Subscribe(ctx)If changing Subscribe’s signature is out of scope now, at least replace os.Exit with a coordinated shutdown (e.g., set a shared flag or cancel a context) so clients disconnect cleanly and the final log is emitted. I can provide a follow-up patch across bench.Subscribe to support context.
Also applies to: 97-101, 3-11
110-119: Minor UX nits on flags (optional)
- “delay” help says “lifetime checks”; actual behavior in Subscribe uses delay*count to sleep. Consider clarifying text to “Total runtime ≈ delay(ms) × count per client”.
- Consider adding an example to the Long help showing a typical sub command.
internal/bench/sub.go (5)
14-15: Avoid casting UnixNano to int; log a readable timestampCasting int64 nanoseconds to int is unnecessary and can overflow on 32-bit. Log RFC3339 instead.
- b.logger.Info("Started subscribe benchmark", logger.Int("startTime", int(start.UnixNano()))) + b.logger.Info("Started subscribe benchmark", logger.String("start", start.Format(time.RFC3339Nano)))
16-18: Use int64 counters and add throughput; log counts without narrowingLarge runs can overflow int32; also compute throughput for parity with Publish.
- var received int32 - var failed int32 + var received int64 + var failed int64 @@ - atomic.AddInt32(&received, 1) - b.logger.LogSubscribe(id, b.topic, int(b.qos), logger.String("payload", payload)) + atomic.AddInt64(&received, 1) + // Prefer lightweight per-message logging to avoid skewing results + b.logger.Info("Message received", + logger.ClientID(id), + logger.String("topic", b.topic), + logger.Int("qos", int(b.qos)), + logger.Int("payload_len", len(payload)), + ) @@ - atomic.AddInt32(&failed, 1) + atomic.AddInt64(&failed, int64(b.messageCount)) b.logger.Error("Failed to subscribe", logger.ClientID(id), logger.ErrorAttr(err)) return } - elapsed := time.Since(start).Seconds() - b.logger.Info("Finished subscribe benchmark", - logger.Int("clients", b.clients), - logger.Int("expectedMessages", b.clients*b.messageCount), - logger.Int("received", int(received)), - logger.Int("failed", int(failed)), - logger.Float("elapsedSec", elapsed), - ) + elapsed := time.Since(start).Seconds() + expected := int64(b.clients) * int64(b.messageCount) + throughput := float64(received) / elapsed + b.logger.Info("Finished subscribe benchmark", + logger.Int("clients", b.clients), + logger.Any("expectedMessages", expected), + logger.Any("received", received), + logger.Any("failed", failed), + logger.Float("elapsedSec", elapsed), + logger.Float("throughputMsgPerSec", throughput), + )Also applies to: 41-42, 45-46, 60-67
33-37: Align “failed” metric with Publish on connection errorIn Publish, a connect failure increments failed by messageCount. Here it increments by 1, skewing summary metrics. Prefer the same semantics.
- atomic.AddInt32(&failed, 1) + atomic.AddInt64(&failed, int64(b.messageCount))Note: this assumes the int64 change from the previous comment. If you keep int32, cast accordingly.
40-47: Log subscription event once; keep per-message logs separateLogSubscribe should reflect the successful SUBSCRIBE operation, not each payload. Move LogSubscribe after a successful Subscribe() and keep the callback lightweight.
- err := client.Subscribe(b.topic, byte(b.qos), b.retained, func(payload string) { - atomic.AddInt32(&received, 1) - b.logger.LogSubscribe(id, b.topic, int(b.qos), logger.String("payload", payload)) - }) + err := client.Subscribe(b.topic, byte(b.qos), b.retained, func(payload string) { + atomic.AddInt64(&received, 1) + b.logger.Info("Message received", + logger.ClientID(id), + logger.String("topic", b.topic), + logger.Int("qos", int(b.qos)), + logger.Int("payload_len", len(payload)), + ) + }) if err != nil { - atomic.AddInt32(&failed, 1) + atomic.AddInt64(&failed, int64(b.messageCount)) b.logger.Error("Failed to subscribe", logger.ClientID(id), logger.ErrorAttr(err)) return } + b.logger.LogSubscribe(id, b.topic, int(b.qos))
50-55: Make runtime bounded by signal/context rather than fixed sleepSleeping for delay×count (or 5s) makes shutdown laggy and may under/over-run the intended window. Prefer waiting on a context with a timer (or target “received >= expected”) so Ctrl-C ends promptly and you still emit the summary. This dovetails with the CLI suggestion to switch to signal.NotifyContext.
I can wire Subscribe(ctx context.Context), replace the sleep with a select on ctx.Done() or timer, and ensure clean unsubscribe/disconnect.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cmd/sub.go(1 hunks)internal/bench/sub.go(1 hunks)pkg/logger/logger.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
internal/bench/sub.go (5)
internal/bench/bench.go (2)
Bench(14-30)b(95-156)pkg/logger/logger.go (7)
Info(179-181)Int(290-292)ClientID(265-267)State(270-272)ErrorAttr(310-312)String(285-287)Float(300-302)pkg/config/config.go (1)
Client(27-33)internal/mqtt/mqtt.go (3)
NewClient(21-38)a(54-91)client(14-17)internal/bench/pub.go (1)
b(12-72)
cmd/sub.go (4)
pkg/logger/logger.go (3)
ErrorAttr(310-312)State(270-272)Info(179-181)internal/bench/bench.go (11)
NewBenchmark(54-92)WithClientID(170-174)WithClients(164-168)WithTopic(176-180)WithQoS(191-195)WithMessageCount(230-234)WithDelay(158-162)WithCleanSession(182-189)WithKeepAlive(197-204)b(95-156)delay(14-30)cmd/root.go (1)
Cfg(12-12)cmd/pub.go (1)
init(124-138)
🔇 Additional comments (1)
cmd/sub.go (1)
81-91: Benchmark construction looks solidOptions are applied clearly and rely on bench.validate for guardrails. LGTM.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor