Skip to content

zoobz-io/herald

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

24 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

herald

CI Status codecov Go Report Card CodeQL Go Reference License Go Version Release

Bidirectional bindings between capitan events and message brokers.

Emit a capitan event, herald publishes it. Herald receives a message, capitan emits it. Same types, same signals, automatic serialization.

Bidirectional Event Distribution

// capitan → broker: Publish events to external systems
pub := herald.NewPublisher(provider, signal, key, nil)
pub.Start()

// broker → capitan: Subscribe to external messages as events
sub := herald.NewSubscriber(provider, signal, key, nil)
sub.Start(ctx)

One provider, one signal, one key. Herald handles serialization, acknowledgment, and error routing.

Installation

go get github.com/zoobz-io/herald

Requires Go 1.23+.

Quick Start

package main

import (
    "context"
    "fmt"

    kafkago "github.com/segmentio/kafka-go"
    "github.com/zoobz-io/capitan"
    "github.com/zoobz-io/herald"
    "github.com/zoobz-io/herald/kafka"
)

type Order struct {
    ID    string  `json:"id"`
    Total float64 `json:"total"`
}

func main() {
    ctx := context.Background()

    // Define signal and typed key
    orderCreated := capitan.NewSignal("order.created", "New order")
    orderKey := capitan.NewKey[Order]("order", "app.Order")

    // Create Kafka provider
    writer := &kafkago.Writer{Addr: kafkago.TCP("localhost:9092"), Topic: "orders"}
    reader := kafkago.NewReader(kafkago.ReaderConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "orders",
        GroupID: "order-processor",
    })
    provider := kafka.New("orders", kafka.WithWriter(writer), kafka.WithReader(reader))
    defer provider.Close()

    // Publish: capitan events → Kafka
    pub := herald.NewPublisher(provider, orderCreated, orderKey, nil)
    pub.Start()
    defer pub.Close()

    // Subscribe: Kafka → capitan events
    sub := herald.NewSubscriber(provider, orderCreated, orderKey, nil)
    sub.Start(ctx)
    defer sub.Close()

    // Handle incoming messages with standard capitan hooks
    capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
        order, _ := orderKey.From(e)
        fmt.Printf("Received order: %s\n", order.ID)
    })

    // Emit an event — automatically published to Kafka
    capitan.Emit(ctx, orderCreated, orderKey.Field(Order{ID: "ORD-123", Total: 99.99}))

    capitan.Shutdown()
}

Capabilities

Feature Description Docs
Bidirectional Flow Publish capitan events to brokers or subscribe broker messages as events Publishing, Subscribing
Type-Safe Generics Compile-time checked publishers and subscribers Overview
11 Providers Kafka, NATS, JetStream, Pub/Sub, Redis, SQS, AMQP, SNS, Bolt, Firestore, io Providers
Pipeline Middleware Validation, transformation, and side effects via processors Reliability
Reliability Patterns Retry, backoff, timeout, circuit breaker, rate limiting via pipz Reliability
Auto Acknowledgment Messages acked/nacked based on processing outcome Subscribing
Custom Codecs Pluggable serialization (JSON default, custom supported) Codecs
Error Observability All errors emit as capitan events Error Handling

Why herald?

  • Type-safe — Generic publishers and subscribers with compile-time checking
  • Bidirectional — Publish to brokers or subscribe from brokers
  • 11 providers — Kafka, NATS, JetStream, Pub/Sub, Redis, SQS, RabbitMQ, SNS, BoltDB, Firestore, io
  • Reliable — Pipeline middleware for retry, backoff, timeout, circuit breaker, rate limiting
  • Observable — Errors flow through capitan

Unified Event Flow

Herald enables a pattern: internal events become external messages, external messages become internal events.

Your application emits capitan events as usual. Herald publishes them to any broker. Other services publish to brokers. Herald subscribes and emits them as capitan events. Same signals, same keys, same hooks — the boundary between internal and external disappears.

// Service A: emit locally, publish externally
capitan.Emit(ctx, orderCreated, orderKey.Field(order))

// Service B: subscribe externally, handle locally
capitan.Hook(orderCreated, processOrder)

Two services, one event type, zero coupling. The broker is just a transport.

Providers

Provider Package Use Case
Kafka kafka High-throughput streaming
NATS nats Lightweight cloud messaging
JetStream jetstream NATS with persistence and headers
Google Pub/Sub pubsub GCP managed messaging
Redis Streams redis In-memory with persistence
AWS SQS sqs AWS managed queues
RabbitMQ/AMQP amqp Traditional message broker
AWS SNS sns Pub/sub fanout
BoltDB bolt Embedded local queues
Firestore firestore Firebase/GCP document store
io io Testing with io.Reader/Writer

Processing Hooks

Add processing steps via middleware processors:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithMiddleware(
        herald.UseApply[Order]("validate", func(ctx context.Context, env *herald.Envelope[Order]) (*herald.Envelope[Order], error) {
            if env.Value.Total < 0 {
                return env, errors.New("invalid total")
            }
            return env, nil
        }),
        herald.UseEffect[Order]("log", func(ctx context.Context, env *herald.Envelope[Order]) error {
            log.Printf("order %s", env.Value.ID)
            return nil
        }),
        herald.UseTransform[Order]("enrich", func(ctx context.Context, env *herald.Envelope[Order]) *herald.Envelope[Order] {
            env.Value.ProcessedAt = time.Now()
            return env
        }),
    ),
})
  • UseApply — Transform envelope with possible error
  • UseEffect — Side effect, envelope passes through unchanged
  • UseTransform — Pure transform, cannot fail

Pipeline Options

Add reliability features via pipz:

pub := herald.NewPublisher(provider, signal, key, []herald.Option[Order]{
    herald.WithRetry[Order](3),
    herald.WithBackoff[Order](3, 100*time.Millisecond),
    herald.WithTimeout[Order](5*time.Second),
    herald.WithCircuitBreaker[Order](5, 30*time.Second),
    herald.WithRateLimit[Order](100, 10),
})

See Reliability Guide for middleware and pipeline details.

Acknowledgment

Herald handles message acknowledgment automatically:

Outcome Action
Message processed successfully Ack() — Message acknowledged
Deserialization fails Nack() — Message returned for redelivery
Provider doesn't support ack No-op (e.g., NATS core, SNS)

Error Handling

All errors flow through capitan:

capitan.Hook(herald.ErrorSignal, func(ctx context.Context, e *capitan.Event) {
    err, _ := herald.ErrorKey.From(e)
    log.Printf("[herald] %s: %v", err.Operation, err.Err)
})

See Error Handling Guide for details.

Documentation

Full documentation is available in the docs/ directory:

Learn

  • Overview — Architecture and philosophy
  • Publishing — Forward capitan events to brokers
  • Subscribing — Consume broker messages as capitan events
  • Providers — Available broker implementations

Guides

  • Reliability — Retry, backoff, circuit breaker, rate limiting
  • Codecs — Custom serialization formats
  • Error Handling — Centralized error management
  • Testing — Testing herald-based applications

Reference

Contributing

See CONTRIBUTING.md for guidelines.

License

MIT License — see LICENSE for details.

About

Bidirectional bindings between capitan events and message brokers

Topics

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Contributors