Skip to content

Conversation

@protonjhow
Copy link
Contributor

In reference to #749, here is a PR for review that adds NATS Jetstream WorkQueue support.

Before anyone asks, yes, claude was involved, but with significant care.

Summary

Adds support for NATS JetStream workqueue retention pattern to gnmic's JetStream input and output implementations, enabling exactly-once message processing for task distribution scenarios.

Output Changes

  • Add use-existing-stream configuration option to use pre-existing streams
  • Add retention-policy configuration option with support for limits (default) and workqueue retention policies
  • Add stream existence verification with detailed logging
  • Maintain full backward compatibility with existing configurations

Input Changes

  • Add consumer-mode configuration option with single (default) and multi modes
  • Add filter-subjects configuration for multi-consumer workqueue scenarios
  • Enable multiple consumers on workqueue streams with non-overlapping subject filters
  • Automatically detect workqueue streams and apply required consumer configuration (AckExplicitPolicy, DeliverAllPolicy)
  • Maintain full backward compatibility with existing configurations

Testing

  • 53 unit tests covering all new validation logic and configuration options
  • All existing tests continue to pass
  • Test coverage added for both output and input components
  • Successfully tested in lab environment at 2000+ messages/second

Documentation

  • Updated user guide for JetStream output with workqueue pattern examples
  • Updated user guide for JetStream input with consumer mode examples
  • Practical configuration examples and decision guides included

Test Plan

  • Unit tests pass (53 tests, 100% passing)
  • Existing tests continue to pass
  • Code builds successfully
  • Documentation updated
  • Integration tested with NATS server (2k+ msg/sec throughput)

Backward Compatibility

All changes are fully backward compatible:

  • Default retention policy is limits (existing behavior)
  • Default consumer mode is single (existing behavior)
  • Existing configurations work without modification
  • New fields are optional with sensible defaults

Implementation Details

The implementation automatically detects stream retention policies and adapts consumer behavior accordingly:

  • Workqueue streams: Uses AckExplicitPolicy + DeliverAllPolicy (required by NATS)
  • Limits-based streams: Uses configured policies (backward compatible)

- Add UseExistingStream bool to config struct
- Add Retention string to createStreamConfig struct
- Prepare for workqueue pattern support
- Add isValidRetentionPolicy validation function
- Add retentionPolicy conversion function
- Validate mutual exclusivity of use-existing-stream and create-stream
- Validate retention-policy values (limits or workqueue)
- Set retention-policy default to 'limits' for backward compatibility"
- Add verifyExistingStream to query and log stream info
- Return error if stream doesn't exist when use-existing-stream is true
- Log all relevant stream configuration details"
- Add specific check for nats.ErrStreamNotFound
- Remove unreachable stream == nil check
- Improve error message clarity for stream not found case"
- Update createStream to check use-existing-stream mode first
- Add Retention field to StreamConfig with retentionPolicy conversion
- Support both workqueue and limits retention policies
- Maintain backward compatibility with existing behavior
- Add consumerMode type with single and multi constants
- Add ConsumerMode field to config struct
- Add FilterSubjects field to config struct
- Prepare for workqueue consumer pattern support
- Set consumer-mode default to 'single'
- Validate consumer-mode values (single or multi)
- Require filter-subjects when consumer-mode is multi
- Ensure backward compatibility with default single mode
- Add filter subject determination based on consumer mode
- Single mode: use configured subjects as filter
- Multi mode: use explicitly configured filter-subjects
- Apply filter subjects to ConsumerConfig.FilterSubjects
- Support workqueue pattern with proper subject filtering
- Add output tests for retention policy validation and conversion
- Add output tests for setDefaults validation logic
- Add input tests for consumer mode validation
- Add input tests for filter-subjects requirements
- All 47 tests passing (26 output + 21 input)
- Document use-existing-stream and retention-policy for output
- Document consumer-mode and filter-subjects for input
- Add JetStream Queue Patterns section to output docs
- Add JetStream Consumer Modes section to input docs
- Include practical examples and decision guides
- Query stream info to determine retention policy
- Use AckExplicitPolicy for workqueue streams (required by NATS)
- Use AckAllPolicy for limits-based streams (backward compatible)
- Fixes 400 error: 'workqueue stream requires explicit ack'
- Query stream info to determine retention policy
- Workqueue streams require AckExplicitPolicy (explicit ack)
- Workqueue streams require DeliverAllPolicy (deliver all)
- Limits-based streams use configured policies (backward compatible)
- Add tests for toJSDeliverPolicy conversion function
- Fixes NATS errors: 'workqueue stream requires explicit ack' and 'consumer must be deliver all'
- Use fmt.Sprintf with loggingPrefix format string correctly
- Fixes log output showing literal '%s' instead of worker name
}

declare -a modules=("." "pkg/api" "pkg/cache")
declare -a modules=("." "pkg/api" "pkg/cache", "pkg/inputs/jetstream_input", "pkg/outputs/nats_outputs/jetstream")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed, the inputs and outputs should be included in module .

Subject string `mapstructure:"subject,omitempty" json:"subject,omitempty"`
SubjectFormat subjectFormat `mapstructure:"subject-format,omitempty" json:"subject-format,omitempty"`
CreateStream *createStreamConfig `mapstructure:"create-stream,omitempty" json:"create-stream,omitempty"`
UseExistingStream bool `mapstructure:"use-existing-stream,omitempty" json:"use-existing-stream,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need a separate knob for this. We can replace it with CreateStream == nil

Stream string `mapstructure:"stream,omitempty"`
Subjects []string `mapstructure:"subjects,omitempty"`
SubjectFormat subjectFormat `mapstructure:"subject-format,omitempty" json:"subject-format,omitempty"`
ConsumerMode consumerMode `mapstructure:"consumer-mode,omitempty" json:"consumer-mode,omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@protonjhow I think we don’t actually need consumerMode or a separate filterSubjects field here because this input always creates a single durable consumer and runs multiple workers on that same consumer. The case of multiple (different) consumers happens when running multiple gNMIc instances, a single instance config does not need to be aware of other instances configuration.

Comment on lines +202 to +218
// Get stream info to determine retention policy
streamInfo, err := s.Info(ctx)
if err != nil {
return fmt.Errorf("failed to get stream info: %v", err)
}

// Determine ack policy and deliver policy based on stream retention
// Workqueue streams have specific requirements
ackPolicy := jetstream.AckAllPolicy
deliverPolicy := toJSDeliverPolicy(n.Cfg.DeliverPolicy)

if streamInfo.Config.Retention == jetstream.WorkQueuePolicy {
// Workqueue streams require explicit ack
ackPolicy = jetstream.AckExplicitPolicy
// Workqueue streams require deliver all policy
deliverPolicy = jetstream.DeliverAllPolicy
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the bit that solves your use case (worker-queue retention).
I would change a small bit: if the retention policy is WorkQueue we should allow deliverPolicy = deliverNewPolicy as well as DeliverAllPolicy. It's not your use case, but it allows users to say: "pick up only new jobs"

Comment on lines +220 to +230
// Determine filter subjects based on consumer mode
var filterSubjects []string
switch n.Cfg.ConsumerMode {
case consumerModeSingle:
// Use configured subjects as filter
filterSubjects = n.Cfg.Subjects
case consumerModeMulti:
// Use explicitly configured filter-subjects
filterSubjects = n.Cfg.FilterSubjects
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ConsumerMode and FilterSubjects are removed we should just use Subjects in CreateOrUpdateConsumer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants