-
-
Notifications
You must be signed in to change notification settings - Fork 0
10. Labels
Labels are a flexible metadata system in F-Mesh, inspired by Kubernetes labels. They provide a common API for attaching label-value pairs to signals, components, and ports. Labels are useful for categorization, filtering, routing, debugging, and implementing custom logic based on metadata.
All labeled entities (signals, components, and ports) share the same label API, making it easy to work with metadata consistently throughout your mesh. Labels are particularly powerful when combined with F-Mesh's collection APIs for filtering and querying.
For complete API documentation, see the labels package.
Labels are simple string label-value pairs:
- Label: A unique identifier (e.g., "priority", "source", "type")
- Value: A string value (e.g., "high", "sensor1", "temperature")
Labels are managed through labels.Collection, which provides a rich API for adding, querying, filtering, and manipulating label sets.
Signals can carry labels to provide metadata about the data they're transporting.
// Create signal with labels using SetLabels
sig := signal.New("temperature: 25°C").SetLabels(labels.Map{
"source": "sensor1",
"type": "temperature",
"priority": "normal",
})
// Or chain AddLabel calls
sig := signal.New("temperature: 25°C").
AddLabel("source", "sensor1").
AddLabel("type", "temperature").
AddLabel("priority", "normal")
// Add labels to existing signal
sig.AddLabel("processed", "true").AddLabel("timestamp", "2024-01-01")// Check if signal has specific label
if sig.Labels().Has("priority") {
// Handle labeled signal
}
// Check label value
if sig.Labels().ValueIs("priority", "high") {
// Handle high priority signal
}
// Get label value
priority := sig.Labels().ValueOrDefault("priority", "normal")// Filter signals in a port by labels
highPriority := port.Signals().Filter(func(s *signal.Signal) bool {
return s.Labels().ValueIs("priority", "high")
})
// Filter signals from specific source
fromSensor1 := port.Signals().Filter(func(s *signal.Signal) bool {
return s.Labels().Has("source") &&
s.Labels().ValueOrDefault("source", "") == "sensor1"
})
// Filter signals with any of multiple labels
urgent := port.Signals().Filter(func(s *signal.Signal) bool {
return s.Labels().HasAny("urgent", "critical", "emergency")
})Labels are preserved when signals flow through pipes, making them reliable for tracking data throughout your mesh.
Components can be labeled to categorize and organize them within the mesh.
// Create component with labels using SetLabels
c := component.New("processor").
AddInputs("in").
AddOutputs("out").
SetLabels(labels.Map{
"layer": "business-logic",
"category": "data-processing",
"criticality": "high",
})
// Or chain AddLabel calls
c := component.New("processor").
AddInputs("in").
AddOutputs("out").
AddLabel("layer", "business-logic").
AddLabel("category", "data-processing").
AddLabel("criticality", "high")
// Add labels after creation
c.AddLabel("version", "2.0").AddLabel("owner", "team-a")// Check component labels
if c.Labels().ValueIs("criticality", "high") {
// Apply special monitoring
}
// Get label value with default
layer := c.Labels().ValueOrDefault("layer", "unknown")// Get all critical components
critical := fm.Components().Filter(func(c *component.Component) bool {
return c.Labels().ValueIs("criticality", "high")
})
// Get components by layer
businessLogic := fm.Components().Filter(func(c *component.Component) bool {
return c.Labels().ValueIs("layer", "business-logic")
})
// Get components with specific labels
monitored := fm.Components().Filter(func(c *component.Component) bool {
return c.Labels().HasAll("owner", "environment")
})Ports can also be labeled for categorization and filtering.
c := component.New("processor").
AddInputs("data", "config", "control").
AddOutputs("result", "logs")
// Label individual ports using AddLabel
c.InputByName("data").
AddLabel("type", "primary").
AddLabel("required", "true")
c.InputByName("config").
AddLabel("type", "configuration").
AddLabel("optional", "true")
c.OutputByName("logs").
AddLabel("type", "diagnostic")
// Or use SetLabels
c.InputByName("data").SetLabels(labels.Map{
"type": "primary",
"required": "true",
})// Get all required input ports
required := c.Inputs().Filter(func(p *port.Port) bool {
return p.Labels().ValueIs("required", "true")
})
// Get all diagnostic output ports
diagnostic := c.Outputs().Filter(func(p *port.Port) bool {
return p.Labels().Has("type") &&
p.Labels().ValueOrDefault("type", "") == "diagnostic"
})The labels.Collection type provides a rich set of methods for working with labels.
lc := labels.NewCollection()
// Add single label
lc.Add("status", "active")
// Add multiple labels
lc.AddMany(labels.Map{
"env": "production",
"region": "us-west",
"tier": "backend",
})
// Remove labels
lc.Without("old-status", "deprecated")
// Clear all labels
lc.Clear()
// Get label count
count := lc.Len()
// Check if empty
if lc.IsEmpty() {
// No labels
}// Check if label exists
if lc.Has("environment") {
// Label exists
}
// Check for multiple labels
if lc.HasAll("env", "region", "tier") {
// All labels present
}
if lc.HasAny("dev", "staging", "prod") {
// At least one present
}
// Check specific value
if lc.ValueIs("env", "production") {
// Environment is production
}
// Get value with error handling
value, err := lc.Value("region")
if err != nil {
// Label not found
}
// Get value with default
region := lc.ValueOrDefault("region", "unknown")// Filter labels
filtered := lc.Filter(func(label, value string) bool {
return strings.HasPrefix(label, "app.")
})
// Transform labels
transformed := lc.Map(func(label, value string) (string, string) {
return strings.ToUpper(label), strings.ToLower(value)
})
// Iterate over labels
lc.ForEach(func(label, value string) error {
fmt.Printf("%s=%s\n", label, value)
return nil
})
// Count matching labels
count := lc.CountMatch(func(label, value string) bool {
return strings.Contains(value, "prod")
})
// Check if all labels match predicate
allProd := lc.AllMatch(func(label, value string) bool {
return strings.Contains(value, "prod")
})
// Check if any label matches predicate
anyProd := lc.AnyMatch(func(label, value string) bool {
return strings.Contains(value, "prod")
})collection1 := labels.NewCollection().Add("a", "1").Add("b", "2")
collection2 := labels.NewCollection().Add("a", "1")
// Check if collection1 has all labels from collection2
if collection1.HasAllFrom(collection2) {
// collection1 contains all labels in collection2 (ignoring values)
}
// Check if collections have any common labels
if collection1.HasAnyFrom(collection2) {
// At least one label is common
}// Route signals based on priority
component.New("router").
WithActivationFunc(func(c *component.Component) error {
// Approach 1: Using All() with error handling
signals, err := c.InputByName("in").Signals().All()
if err != nil {
return err
}
for _, sig := range signals {
priority := sig.Labels().ValueOrDefault("priority", "normal")
switch priority {
case "high":
c.OutputByName("high-priority").PutSignals(sig)
case "low":
c.OutputByName("low-priority").PutSignals(sig)
default:
c.OutputByName("normal").PutSignals(sig)
}
}
return nil
})
// Approach 2: Using ForEach() (cleaner and handles errors automatically)
component.New("router").
WithActivationFunc(func(c *component.Component) error {
return c.InputByName("in").Signals().ForEach(func(sig *signal.Signal) error {
priority := sig.Labels().ValueOrDefault("priority", "normal")
switch priority {
case "high":
return c.OutputByName("high-priority").PutSignals(sig).ChainableErr()
case "low":
return c.OutputByName("low-priority").PutSignals(sig).ChainableErr()
default:
return c.OutputByName("normal").PutSignals(sig).ChainableErr()
}
})
})// Track signal origin
sig := signal.New(data).SetLabels(labels.Map{
"source": "api-gateway",
"request-id": "abc-123",
"user-id": "user-456",
})
// Later in the mesh, trace back the source
if sig.Labels().ValueIs("source", "api-gateway") {
requestID := sig.Labels().ValueOrDefault("request-id", "unknown")
// Log or process based on source
}// Organize components by layers
dataLayer := fm.Components().Filter(func(c *component.Component) bool {
return c.Labels().ValueIs("layer", "data")
})
logicLayer := fm.Components().Filter(func(c *component.Component) bool {
return c.Labels().ValueIs("layer", "business-logic")
})
// Find all components by team
teamA := fm.Components().Filter(func(c *component.Component) bool {
return c.Labels().ValueIs("team", "team-a")
})// Add trace labels for debugging
sig := signal.New(data).SetLabels(labels.Map{
"trace-id": generateTraceID(),
"timestamp": time.Now().Format(time.RFC3339),
"component": c.Name(),
})
// Use hooks to log signals with specific labels
c.SetupHooks(func(h *component.Hooks) {
h.AfterActivation(func(ctx *component.ActivationContext) error {
if ctx.Component.Labels().Has("debug") {
ctx.Component.Logger().Printf("Debug mode: processed %d signals\n",
ctx.Component.OutputByName("out").Signals().Len())
}
return nil
})
})// Enable/disable components based on labels
component.New("optional-processor").
AddLabel("enabled", "true").
WithActivationFunc(func(c *component.Component) error {
if c.Labels().ValueIs("enabled", "false") {
// Skip processing, just forward
port.ForwardSignals(c.InputByName("in"), c.OutputByName("out"))
return nil
}
// Normal processing
// ...
return nil
})// Label components by environment
c := component.New("db-connector").SetLabels(labels.Map{
"env": "production",
"region": "us-east-1",
})
// Adjust behavior based on environment
if c.Labels().ValueIs("env", "production") {
// Use production settings
} else {
// Use development settings
}Choose a consistent naming scheme for labels:
// Good: consistent prefix and naming
lc.Add("app.name", "my-mesh")
lc.Add("app.version", "2.0")
lc.Add("app.environment", "production")
// Avoid: inconsistent naming
lc.Add("appName", "my-mesh")
lc.Add("version", "2.0")
lc.Add("ENV", "production")Labels are strings - keep them simple and avoid complex data structures:
// Good: simple string values
sig.AddLabel("priority", "high").AddLabel("type", "temperature")
// Bad: trying to store complex data
sig.AddLabel("config", "{\"timeout\": 30, \"retry\": 3}") // Use payload instead// Good: separate concerns
sig := signal.New(temperatureData). // Actual data in payload
AddLabel("unit", "celsius"). // Metadata in labels
AddLabel("sensor", "room1")
// Bad: mixing data and metadata
sig := signal.New(labels.Map{ // Don't put metadata in payload
"data": temperatureData,
"unit": "celsius",
})Document what labels mean in your mesh:
// Document label usage
// Supported labels:
// - "priority": "high" | "normal" | "low" - signal processing priority
// - "source": string - originating component or system
// - "type": string - data type classificationLabels work well for concerns that span multiple components:
// Good use cases:
// - Routing and filtering
// - Monitoring and observability
// - Debugging and tracing
// - Feature flags
// - Environment configuration
// Poor use cases:
// - Component-specific business logic (use state instead)
// - Actual data transfer (use payload instead)
// - Complex nested structures (use custom types in payload)| Feature | Labels | Component State | Signal Payload |
|---|---|---|---|
| Purpose | Metadata/categorization | Persistent component data | Actual data transfer |
| Scope | Signal/Component/Port | Single component | Single signal |
| Type | String label-value | Any type | Any type |
| Persistence | Travels with signal/entity | Persists across cycles | Consumed in cycle |
| Filtering | Built-in collection APIs | Manual | Manual |
| Best for | Routing, debugging, organization | Counters, accumulators, state machines | Business data |
Labels provide a powerful and flexible way to add metadata throughout your F-Mesh without cluttering your core logic. They're particularly useful when combined with filtering and collection APIs to implement dynamic, metadata-driven behavior.