Type-safe, composable data pipelines for Go.
Build processing pipelines from simple parts, compose them into complex flows, and get rich error context when things fail.
Every primitive in pipz implements Chainable[T]:
type Chainable[T any] interface {
Process(context.Context, T) (T, error)
Identity() Identity
Schema() Node
Close() error
}Processors wrap your functions — the callback signature is the only difference:
// Transform: pure function, no errors
enrich := pipz.Transform(EnrichID, func(ctx context.Context, o Order) Order {
o.ProcessedAt = time.Now()
return o
})
// Apply: fallible function
validate := pipz.Apply(ValidateID, func(ctx context.Context, o Order) (Order, error) {
if o.Total <= 0 {
return o, errors.New("invalid total")
}
return o, nil
})
// Effect: side effect, data passes through unchanged
notify := pipz.Effect(NotifyID, func(ctx context.Context, o Order) error {
return sendNotification(o.ID)
})Connectors compose processors — and each other:
// Compose processors into a sequence
flow := pipz.NewSequence(FlowID, validate, enrich, notify)
// Wrap with resilience patterns
resilient := pipz.NewRetry(RetryID, flow, 3)
protected := pipz.NewTimeout(TimeoutID, resilient, 5*time.Second)
// Connectors nest freely — it's Chainable[T] all the way down
pipeline := pipz.NewCircuitBreaker(BreakerID, protected, 5, 30*time.Second)go get github.com/zoobz-io/pipzRequires Go 1.24+.
package main
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/zoobz-io/pipz"
)
// Identities for debugging and observability
var (
ValidateID = pipz.NewIdentity("validate", "Validates order totals")
EnrichID = pipz.NewIdentity("enrich", "Adds processing timestamp")
FormatID = pipz.NewIdentity("format", "Formats order ID")
PipelineID = pipz.NewIdentity("order-flow", "Main order pipeline")
)
type Order struct {
ID string
Total float64
ProcessedAt time.Time
}
func main() {
ctx := context.Background()
// Processors wrap functions
validate := pipz.Apply(ValidateID, func(_ context.Context, o Order) (Order, error) {
if o.Total <= 0 {
return o, errors.New("invalid total")
}
return o, nil
})
enrich := pipz.Transform(EnrichID, func(_ context.Context, o Order) Order {
o.ProcessedAt = time.Now()
return o
})
format := pipz.Transform(FormatID, func(_ context.Context, o Order) Order {
o.ID = strings.ToUpper(o.ID)
return o
})
// Connectors compose processors
pipeline := pipz.NewSequence(PipelineID, validate, enrich, format)
// Process
result, err := pipeline.Process(ctx, Order{ID: "order-123", Total: 99.99})
if err != nil {
var pipeErr *pipz.Error[Order]
if errors.As(err, &pipeErr) {
fmt.Printf("Failed at %s: %v\n", strings.Join(pipeErr.Path, "->"), pipeErr.Err)
}
return
}
fmt.Printf("Processed: %s at %v\n", result.ID, result.ProcessedAt)
}| Feature | Description | Docs |
|---|---|---|
| Uniform Interface | Everything implements Chainable[T] for seamless composition |
Core Concepts |
| Type-Safe Generics | Full compile-time checking with zero reflection | Architecture |
| Rich Error Context | Path tracking, timestamps, and input capture on failure | Safety & Reliability |
| Panic Recovery | Automatic recovery with security-conscious sanitization | Safety & Reliability |
| Signal Observability | State change events via capitan integration | Hooks |
| Pipeline Schemas | Schema() exports structure for visualization and debugging |
Cheatsheet |
- Type-safe — Full compile-time checking with generics
- Composable — Complex pipelines from simple parts
- Minimal dependencies — Standard library plus clockz
- Observable — Typed signals for state changes via capitan
- Rich errors — Full path tracking shows exactly where failures occur
- Panic-safe — Automatic recovery with security sanitization
Use pipz directly to build secure, observable reliability patterns over your types:
// Your domain type
type Order struct { ... }
// Wrap any operation with resilience
fetch := pipz.Apply(FetchID, fetchOrder)
reliable := pipz.NewSequence(ReliableID,
pipz.NewRateLimiter[Order](LimiterID, 100, 10), // throttle
pipz.NewRetry(RetryID, fetch, 3), // retry on failure
pipz.NewTimeout(TimeoutID, fetch, 5*time.Second), // enforce deadline
pipz.NewCircuitBreaker(BreakerID, fetch, 5, 30*time.Second), // prevent cascade
)
// Full error context when things fail
result, err := reliable.Process(ctx, order)Every connector emits capitan signals — circuit breaker state changes, retry attempts, rate limit hits — observable without instrumentation code.
Fix T to a domain type and Chainable[T] becomes your API surface:
// Library fixes T to a domain type
type File struct {
Name string
Size int64
Data []byte
Metadata map[string]string
}
// Library provides domain-specific primitives
func Scan(scanner VirusScanner) pipz.Chainable[*File] { ... }
func Thumbnail(width, height int) pipz.Chainable[*File] { ... }
func Compress(quality int) pipz.Chainable[*File] { ... }
func Upload(storage Storage) pipz.Chainable[*File] { ... }
// Users extend with their own — same interface, first-class citizen
type Watermark struct {
identity pipz.Identity
logo []byte
}
func (w *Watermark) Process(ctx context.Context, f *File) (*File, error) {
f.Data = applyWatermark(f.Data, w.logo)
return f, nil
}
func (w *Watermark) Identity() pipz.Identity { return w.identity }
func (w *Watermark) Schema() pipz.Node { return pipz.Node{Identity: w.identity, Type: "processor"} }
func (w *Watermark) Close() error { return nil }
// Everything composes — library primitives and user code, indistinguishable
pipeline := pipz.NewSequence(PipelineID,
Scan(clamav),
Thumbnail(800, 600),
&Watermark{logo}, // user's primitive slots right in
Compress(85),
Upload(s3),
)The built-in primitives are the base vocabulary. Users add their own words following the same grammar. The interface IS the API — implement it and express whatever you want.
- Overview — Design philosophy and architecture
- Quickstart — Build your first pipeline
- Introduction — What pipz is and why it exists
- Core Concepts — Processors, connectors, identity
- Architecture — Internal design and components
- Hooks — Signal-based observability
- Connector Selection — Choosing the right connector
- Cloning — Data isolation for parallel processing
- Best Practices — Patterns and recommendations
- Testing — Testing pipelines
- Performance — Optimization and benchmarking
- Safety & Reliability — Error handling, panics, timeouts
- Troubleshooting — Common issues and solutions
- Building Pipelines — Complete pipeline with validation, resilience, observability
- Library Resilience — Expose resilience patterns via functional options
- Extensible Vocabulary — Domain-specific APIs with composable primitives
- Cheatsheet — Quick reference for all primitives
- Types — Error, Identity, Node, Schema
| Processor | Purpose |
|---|---|
| Transform | Pure transformation (no errors) |
| Apply | Transformation that may fail |
| Effect | Side effect, passes data through |
| Mutate | Conditional modification |
| Enrich | Best-effort enhancement (errors ignored) |
| Connector | Purpose |
|---|---|
| Sequence | Run in order |
| Concurrent | Run in parallel, collect all results |
| WorkerPool | Bounded parallelism with fixed worker count |
| Scaffold | Fire-and-forget parallel execution |
| Fallback | Try primary, fall back on error |
| Race | First success wins |
| Contest | First result meeting condition wins |
| Switch | Route based on conditions |
| Filter | Conditional execution |
| Retry | Retry on failure |
| Backoff | Retry with exponential delays |
| Timeout | Enforce time limits |
| Handle | Error recovery pipeline |
| RateLimiter | Token bucket rate limiting |
| CircuitBreaker | Prevent cascading failures |
| Pipeline | Execution context for tracing |
See CONTRIBUTING.md for guidelines. Run make help for available commands.
MIT License — see LICENSE for details.