Conversation
|
Caution Review failedThe pull request is closed. WalkthroughAdds an MQTT adapter with NewClient, Connect, and async Publish (with WaitGroup for callbacks), a new publish CLI command and bench publish workload with message/count/retained options, an ErrPublishFailed error, and a logger Float helper. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor CLI/App
participant Bench as Bench
participant Adapter as MQTT Adapter
participant Client as MQTT Client
participant Broker
CLI/App->>Bench: NewBenchmark(...WithMessage..., WithRetained..., WithMessageCount...)
CLI/App->>Bench: PublishMessages()
loop per client (concurrent)
Bench->>Adapter: NewClient(cfg)
Adapter->>Client: Connect()
alt connect ok
Client-->>Adapter: connected
else connect error
Adapter-->>Bench: report failed (increment failed by messageCount)
end
loop per message (messageCount)
Bench->>Adapter: Publish(topic,qos,retained,payload,callback)
Adapter->>Client: Publish(...)
par async callback
Adapter-->>Bench: callback() (goroutine) -> atomic succeeded++
and wait token
Client-->>Adapter: token result
end
alt publish error
Adapter-->>Bench: atomic failed++
end
end
end
Bench->>Bench: wait for clients & callbacks (WaitGroup)
Bench-->>CLI/App: log summary (throughput, succeeded, failed)
sequenceDiagram
autonumber
actor CLI
participant ConnCmd as conn command
participant Bench
CLI->>ConnCmd: start
ConnCmd->>Bench: NewBenchmark(...WithCleanSession,WithKeepAlive,WithClientID...)
ConnCmd->>Bench: RunConnections() (goroutine)
ConnCmd->>ConnCmd: wait select { signal | done }
alt signal received
ConnCmd-->>CLI: log "Received shutdown signal" (interrupted)
else bench done
ConnCmd-->>CLI: log "Connection benchmark completed" (completed)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/mqtt/mqtt.go (1)
19-37: Guard against nil/invalid config and return error instead of panickingAs written, a nil cfg or empty host/invalid port will panic at runtime. Since you already have er.ErrNilConfig/ErrEmptyServerHost/ErrInvalidServerPort, surface them from NewClient. Also, build the broker URL IPv6-safely.
Apply this diff within this hunk:
-// NewClient creates a new MQTT adapter instance -func NewClient(cfg *config.Config) *Adapter { +// NewClient creates a new MQTT adapter instance +func NewClient(cfg *config.Config) (*Adapter, error) { + if cfg == nil { + return nil, er.ErrNilConfig + } + if cfg.Server.Host == "" { + return nil, er.ErrEmptyServerHost + } + if cfg.Server.Port <= 0 || cfg.Server.Port > 65535 { + return nil, er.ErrInvalidServerPort + } // Initialize MQTT client options opts := mq.NewClientOptions() - opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.Server.Host, cfg.Server.Port)) + addr := net.JoinHostPort(cfg.Server.Host, fmt.Sprintf("%d", cfg.Server.Port)) + opts.AddBroker("tcp://" + addr) opts.SetClientID(cfg.Client.ClientID) opts.SetKeepAlive(time.Duration(cfg.Client.KeepAlive) * time.Second) opts.SetCleanSession(cfg.Client.CleanSession) opts.SetUsername(cfg.Client.Username) opts.SetPassword(cfg.Client.Password) opts.SetProtocolVersion(4) // Default set to MQTT 3.1.1 // Create a new MQTT client instance client := mq.NewClient(opts) // Return the initialized MQTT adapter - return &Adapter{client: client} + return &Adapter{client: client}, nil }Add required imports:
// add to imports "net"Follow-up: callers of NewClient must handle the new (adapter, error) return.
I can update the call sites in this PR if you want.
🧹 Nitpick comments (3)
pkg/er/er.go (1)
8-22: Nit: keep error message prefixes consistentYou use "mqtt connection failed" (no prefix) vs "mqtt: failed to publish" (prefixed). Consider standardizing (e.g., "mqtt: connection failed") for easier grep/Is checks and consistent logs.
internal/mqtt/mqtt.go (2)
19-37: Optional: set sane connection defaults (autoreconnect, timeout, order)Recommend enabling AutoReconnect and a bounded ConnectTimeout; for a bench tool, disabling in-order delivery can increase throughput.
Apply within this hunk:
opts := mq.NewClientOptions() +opts.SetAutoReconnect(true) +opts.SetConnectTimeout(10 * time.Second) // consider making this configurable +opts.SetOrderMatters(false) // higher throughput if order is not required
53-74: Avoid indefinite blocking on token.WaitFor robustness under broker/network issues, prefer a bounded wait (WaitTimeout) and return a timeout-wrapped ErrPublishFailed. Consider a configurable publish timeout.
Example:
const defaultPublishTimeout = 30 * time.Second if !token.WaitTimeout(defaultPublishTimeout) { return &er.Error{Package: "MQTT", Func: "Publish", Message: er.ErrPublishFailed, Raw: fmt.Errorf("timeout after %s", defaultPublishTimeout)} }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
internal/mqtt/mqtt.go(3 hunks)pkg/er/er.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/mqtt/mqtt.go (2)
pkg/config/config.go (1)
Client(27-33)pkg/er/er.go (2)
Error(24-29)ErrPublishFailed(21-21)
🔇 Additional comments (4)
pkg/er/er.go (1)
21-21: ErrPublishFailed addition — LGTMGood, this gives a clear, reusable sentinel for publish failures.
internal/mqtt/mqtt.go (3)
13-17: Adapter struct — LGTMMinimal, clear ownership of client + WaitGroup.
39-51: Connect error wrapping — LGTMClear wrapping via er.Error; good place for upstream handling with errors.Is(err, er.ErrMqttConnectionFailed).
76-80: Verify ordering: wait callbacks before disconnectingIf callbacks might interact with the client, wait for them first to avoid races; otherwise current order is fine. Pick one based on intended callback semantics.
Suggested change:
-func (a *Adapter) Disconnect() { - a.client.Disconnect(200) - a.wg.Wait() -} +func (a *Adapter) Disconnect() { + a.wg.Wait() + a.client.Disconnect(200) +}
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (13)
pkg/logger/logger.go (1)
297-300: Make ErrorAttr nil-safe and preserve typed errorsCalling ErrorAttr(nil) will panic. Prefer logging the error value as-is and handling nil.
Example (outside this hunk):
// Keep key consistent with slog conventions. func ErrorAttr(err error) slog.Attr { if err == nil { return slog.Any("err", nil) } return slog.Any("err", err) }internal/bench/bench.go (3)
224-228: Optional: normalize message inputIf empty/whitespace messages aren’t meaningful in your use case, trim to reduce accidental empty publishes.
Apply inside this hunk:
func WithMessage(message string) Option { return func(b *Bench) { - b.message = message + // Optional: trim accidental whitespace-only payloads. + b.message = message } }If you decide to trim, add:
import "strings" // ... b.message = strings.TrimSpace(message)
230-234: Guard against non-positive message countsNegative/zero counts can yield no-op runs or downstream invariants. Clamp to a sane minimum.
func WithMessageCount(count int) Option { return func(b *Bench) { - b.messageCount = count + if count < 1 { + count = DefaultMessageCount + } + b.messageCount = count } }
94-156: Optional: validate messageCount in Bench.validate()A centralized check helps even when Bench is constructed programmatically without options.
Example (outside this hunk’s ranges):
if b.messageCount < 1 { // either clamp or return an error via er package b.messageCount = DefaultMessageCount }cmd/conn.go (2)
42-47: Graceful shutdown: consider cancellation wiringRunConnections can’t be signaled to stop. Consider passing a context to enable early exit on SIGINT/SIGTERM.
Sketch (requires adapting RunConnections(ctx)):
ctx, cancel := context.WithCancel(cmd.Context()) done := make(chan struct{}) go func() { b.RunConnections(ctx) close(done) }() defer cancel()And on signal case: call cancel() then wait on done briefly (with timeout) for a graceful drain.
49-55: Stop signal notifications before returningMinor hygiene to release resources and avoid stray deliveries.
select { case <-sigs: - logger.Info("Received shutdown signal", logger.State("terminated")) + signal.Stop(sigs) + logger.Info("Received shutdown signal", logger.State("terminated")) return case <-done: logger.Info("Connection benchmark completed", logger.State("completed")) }cmd/pub.go (2)
35-46: Handle flag parsing errors (defensive)Cobra’s Get* can fail (typos/alias collisions). Log and exit on error to avoid surprising defaults.
Example:
get := func[T any](getter func() (T, error), name string) T { v, err := getter(); if err != nil { logger.Error("invalid flag", logger.String("flag", name), logger.ErrorAttr(err)); os.Exit(2) } return v } clientID := get(func() (string, error) { return cmd.Flags().GetString("clientID") }, "clientID") // repeat for others
1-3: Replace Cobra template header placeholdersKeep repo metadata clean.
Apply:
-/* -Copyright © 2025 NAME HERE <EMAIL ADDRESS> -*/ +// Copyright © 2025 benchmq authorsinternal/bench/pub.go (5)
13-15: Use consistent key name with conn benchmark (“time” vs “startTime”)Keeps logs uniform and easier to query.
Apply:
- b.logger.Info("Started publish benchmark", logger.Int("startTime", int(start.UnixNano()))) + b.logger.Info("Started publish benchmark", logger.Int("time", int(start.UnixNano())))
53-56: Guard against divide-by-zero when run completes “instantly”Rare, but safe.
Apply:
- elapsed := time.Since(start).Seconds() - total := b.clients * b.messageCount - throughput := float64(total) / elapsed + elapsed := time.Since(start).Seconds() + total := b.clients * b.messageCount + var throughput float64 + if elapsed > 0 { + throughput = float64(total) / elapsed + } else { + throughput = 0 + }
34-37: Optional: move delay to post-first publish to avoid initial idleCurrent sleep delays the very first message. If you want immediate send then pacing, gate on j > 0.
Example:
if j > 0 && b.delay > 0 { time.Sleep(time.Duration(b.delay) * time.Millisecond) }
57-65: Consider throughput based on delivered messagesReporting based on total scheduled can mask failures. Consider using succeeded for throughput and add success ratio.
Example attrs:
- throughputDelivered = float64(succeeded)/elapsed
- successRatio = float64(succeeded)/float64(total)
16-18: Potential counter overflow on large runsint32 may overflow with very large message volumes. Use int64 atomics if you expect high ranges.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
cmd/conn.go(2 hunks)cmd/pub.go(1 hunks)internal/bench/bench.go(4 hunks)internal/bench/pub.go(1 hunks)pkg/logger/logger.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
cmd/pub.go (4)
internal/bench/bench.go (10)
NewBenchmark(54-92)WithClientID(170-174)WithClients(164-168)WithTopic(176-180)WithQoS(191-195)WithMessageCount(230-234)WithDelay(158-162)WithCleanSession(182-189)WithKeepAlive(197-204)WithMessage(224-228)cmd/root.go (1)
Cfg(12-12)pkg/logger/logger.go (3)
ErrorAttr(298-300)Info(179-181)State(258-260)internal/bench/conn.go (1)
b(11-39)
cmd/conn.go (2)
pkg/logger/logger.go (4)
Error(189-191)ErrorAttr(298-300)Info(179-181)State(258-260)internal/bench/conn.go (1)
b(11-39)
internal/bench/pub.go (4)
internal/bench/bench.go (1)
Bench(14-30)pkg/logger/logger.go (4)
Info(179-181)Int(278-280)ErrorAttr(298-300)Float(288-290)internal/mqtt/mqtt.go (1)
NewClient(20-37)internal/bench/conn.go (1)
b(11-39)
🔇 Additional comments (7)
pkg/logger/logger.go (1)
287-290: Add Float helper — LGTMWrapper aligns with slog.Float64 and keeps callsites consistent.
internal/bench/bench.go (4)
19-22: New fields on Bench — LGTMmessage/messageCount/retained integrate cleanly with existing options/state.
48-51: Reasonable defaults — LGTMDefaults look sensible for publish benchmarks.
69-72: Default initialization in constructor — LGTMMatches new defaults and avoids zero-value surprises.
236-240: Retained flag option — LGTMMatches MQTT semantics; no further comments.
cmd/conn.go (2)
22-26: Flag reads — LGTMIgnoring GetX errors is acceptable here since flags are predeclared.
38-39: Error path messaging — LGTMConcise and uses structured error attribute.
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (3)
cmd/pub.go (3)
20-31: Fix help text: include missing flags and tighten phrasing- Long: `Publish messages to a topic with specified parameters. + Long: `Publish messages to a topic with specified parameters. Parameters: + - clientID: Base client ID prefix (each client appends "-<n>") - clients: Number of concurrent clients + - delay: Delay between messages in milliseconds - count: Number of messages to publish per client - - qos: Quality of service level (0, 1, 2) - - message: The message payload + - qos: QoS level (0, 1, 2) + - message: Message payload - topic: Topic to publish to - - retain: Whether to retain the last message - - clean: Whether to use a clean session + - retain: Retain the last message + - clean: Use a clean session - keepalive: Keepalive interval in seconds`,
32-34: Stop the signal notifierPrevents leaking the subscription.
sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + defer signal.Stop(sigs)
65-69: Don’t call os.Exit on SIGINT; prefer graceful cancellationos.Exit skips defers and may drop in-flight MQTT ops. Prefer context cancellation; minimally, return instead of exiting.
Minimal:
- logger.Info("Received shutdown signal", logger.State("interrupted")) - os.Exit(0) + logger.Info("Received shutdown signal", logger.State("interrupted")) + returnPreferred (requires bench.PublishMessages to accept context and honor ctx.Done()):
// at top: import "context" ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { <-sigs logger.Info("Received shutdown signal", logger.State("interrupted")) cancel() }() b.PublishMessages(ctx)
🧹 Nitpick comments (7)
cmd/conn.go (3)
22-26: Handle flag parsing errors; don’t silently fall back to defaultsIgnoring Get* errors can hide misspelled flags or invalid types.
- clients, _ := cmd.Flags().GetInt("clients") - delay, _ := cmd.Flags().GetInt("delay") - clean, _ := cmd.Flags().GetBool("clean") - keepalive, _ := cmd.Flags().GetUint16("keepalive") - clientID, _ := cmd.Flags().GetString("clientID") + clients, err := cmd.Flags().GetInt("clients") + if err != nil { logger.Error("Invalid --clients flag", logger.ErrorAttr(err)); return } + delay, err := cmd.Flags().GetInt("delay") + if err != nil { logger.Error("Invalid --delay flag", logger.ErrorAttr(err)); return } + clean, err := cmd.Flags().GetBool("clean") + if err != nil { logger.Error("Invalid --clean flag", logger.ErrorAttr(err)); return } + keepalive, err := cmd.Flags().GetUint16("keepalive") + if err != nil { logger.Error("Invalid --keepalive flag", logger.ErrorAttr(err)); return } + clientID, err := cmd.Flags().GetString("clientID") + if err != nil { logger.Error("Invalid --clientID flag", logger.ErrorAttr(err)); return }
38-39: Add state to error log for consistencyTag failures uniformly.
- logger.Error("Failed to create benchmark", logger.ErrorAttr(err)) + logger.Error("Failed to create benchmark", logger.State("failed"), logger.ErrorAttr(err))
49-55: Improve shutdown: stop signal notifications; consider graceful wait/cancel
- Stop the notifier to avoid leaking the subscription.
- Optional: wait briefly for the worker to finish, or introduce context cancellation in Bench.
Outside this range, add after signal.Notify:
defer signal.Stop(sigs)Optional (requires adding import "time"):
case <-sigs: - logger.Info("Received shutdown signal", logger.State("interrupted")) - return + logger.Info("Received shutdown signal", logger.State("interrupted")) + // Best-effort grace period + select { + case <-done: + case <-time.After(2 * time.Second): + } + returncmd/pub.go (4)
1-3: Replace placeholder copyright headerUse project-appropriate header or remove it.
-/* -Copyright © 2025 NAME HERE <EMAIL ADDRESS> -*/ +/* benchmq — MQTT benchmarking tool (c) 2025 benchmq authors */
35-46: Validate flag parsing instead of ignoring errorsAvoid surprising defaults on invalid flags.
- clientID, _ := cmd.Flags().GetString("clientID") - clients, _ := cmd.Flags().GetInt("clients") - delay, _ := cmd.Flags().GetInt("delay") - count, _ := cmd.Flags().GetInt("count") - retain, _ := cmd.Flags().GetBool("retain") - message, _ := cmd.Flags().GetString("message") - topic, _ := cmd.Flags().GetString("topic") - qos, _ := cmd.Flags().GetUint16("qos") - cleanSession, _ := cmd.Flags().GetBool("clean") - keepalive, _ := cmd.Flags().GetUint16("keepalive") + clientID, err := cmd.Flags().GetString("clientID") + if err != nil { logger.Error("Invalid --clientID flag", logger.ErrorAttr(err)); return } + clients, err := cmd.Flags().GetInt("clients") + if err != nil { logger.Error("Invalid --clients flag", logger.ErrorAttr(err)); return } + delay, err := cmd.Flags().GetInt("delay") + if err != nil { logger.Error("Invalid --delay flag", logger.ErrorAttr(err)); return } + count, err := cmd.Flags().GetInt("count") + if err != nil { logger.Error("Invalid --count flag", logger.ErrorAttr(err)); return } + retain, err := cmd.Flags().GetBool("retain") + if err != nil { logger.Error("Invalid --retain flag", logger.ErrorAttr(err)); return } + message, err := cmd.Flags().GetString("message") + if err != nil { logger.Error("Invalid --message flag", logger.ErrorAttr(err)); return } + topic, err := cmd.Flags().GetString("topic") + if err != nil { logger.Error("Invalid --topic flag", logger.ErrorAttr(err)); return } + qos, err := cmd.Flags().GetUint16("qos") + if err != nil { logger.Error("Invalid --qos flag", logger.ErrorAttr(err)); return } + cleanSession, err := cmd.Flags().GetBool("clean") + if err != nil { logger.Error("Invalid --clean flag", logger.ErrorAttr(err)); return } + keepalive, err := cmd.Flags().GetUint16("keepalive") + if err != nil { logger.Error("Invalid --keepalive flag", logger.ErrorAttr(err)); return }
60-63: Add state to error logConsistent failure tagging.
- logger.Error("Failed to create benchmark", logger.ErrorAttr(err)) + logger.Error("Failed to create benchmark", logger.State("failed"), logger.ErrorAttr(err))
71-71: Make publish workload cancellableWire a context into PublishMessages so SIGINT can stop loops promptly.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
cmd/conn.go(2 hunks)cmd/pub.go(1 hunks)internal/bench/pub.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/bench/pub.go
🧰 Additional context used
🧬 Code graph analysis (2)
cmd/pub.go (4)
internal/bench/bench.go (13)
NewBenchmark(54-92)WithClientID(170-174)WithClients(164-168)WithTopic(176-180)WithQoS(191-195)WithMessageCount(230-234)WithDelay(158-162)WithRetained(236-240)WithCleanSession(182-189)WithKeepAlive(197-204)WithMessage(224-228)delay(14-27)b(86-147)cmd/root.go (1)
Cfg(12-12)pkg/logger/logger.go (3)
ErrorAttr(298-300)Info(179-181)State(258-260)internal/bench/conn.go (1)
b(11-39)
cmd/conn.go (3)
pkg/logger/logger.go (4)
Error(189-191)ErrorAttr(298-300)Info(179-181)State(258-260)internal/bench/conn.go (1)
b(11-39)internal/bench/bench.go (5)
delay(14-27)b(86-147)WithClients(155-159)WithKeepAlive(188-195)NewBenchmark(48-83)
🔇 Additional comments (2)
cmd/conn.go (2)
28-28: No action neededComment-only change; safe to ignore.
42-47: Concurrent run + done signaling looks good
Summary by CodeRabbit
New Features
Bug Fixes
Chores