A FlowQL program is a linear pipeline:
source <source-spec>
| <stage>
| <stage>
| ...
| sink <sink-spec>
Stages are separated by | (pipe). A pipeline must begin with source and end with sink.
Lines starting with # are comments:
# This is a comment
source stdin # inline comment
| sink stdout
Read lines from a file. Each line becomes an event with a _line field.
Read lines from standard input.
Alias for source file("path").
Parse the _line field of each event as a JSON object. The resulting event has one field per JSON key.
Input: {"status": 200, "path": "/api"} → Event with fields status=200, path="/api"
Supports: strings, integers, reals, booleans, null. Nested objects are not expanded.
Parse the _line field as CSV (comma-separated values).
parse_csvorparse_csv header— first line is treated as a header row defining field namesparse_csv noheader— no header row; fields are namedc0,c1,c2, etc.
Values are auto-detected as integer, real, boolean, or string.
Parse the _line field as TSV (tab-separated values). Same header modes as parse_csv.
parse_tsvorparse_tsv header— first line is the header rowparse_tsv noheader— fields namedc0,c1,c2, etc.
Parse the _line field as whitespace-separated key=value pairs. Supports quoted values (key="value with spaces"). Values are auto-detected as integer, real, boolean, or string.
Input: level=error host=web01 latency=42 → Event with fields level="error", host="web01", latency=42
Rename the raw _line field to line. This is a simple transform (not a parse stage) that can coexist with parse stages. Useful for text processing pipelines that want to filter or count raw lines.
source file("errors.log")
| lines
| filter line != ""
| count
| sink stdout
Keep only events where the expression evaluates to true.
filter status >= 500
filter status >= 500 and latency_ms > 100
filter not (status = 200)
Create a new event with the specified fields. Fields can be bare (pass-through) or computed.
map { path, status } # pass-through
map { path, slow = latency_ms > 1000 } # computed field
map { path, latency_s = latency_ms / 1000 } # arithmetic
Select specific fields from the event. Like map but without computed fields.
project { name, email, age }
Count all events. Emits a single event with a count field when the stream ends.
Count events grouped by the value of a field. Emits a single event where each field name is a group key and the value is the count.
count by path
# Input: [{path:"/a"}, {path:"/b"}, {path:"/a"}]
# Output: {"/a": 2, "/b": 1}
Collect events into groups of n. Each batch is passed through as a group. Maximum batch size: 64.
Write each event as a JSON line to standard output.
Write each event as a JSON line to a file.
| Type | Examples |
|---|---|
| Integer | 42, 0, -1 |
| Real | 3.14, 0.5 |
| String | "hello", "error" |
| Boolean | true, false |
| Null | null |
Bare identifiers refer to event fields: status, latency_ms, path
If a field doesn't exist, it evaluates to null.
Comparison (produce boolean):
=, !=, >, >=, <, <=
Arithmetic (produce int or real):
+, -, *, /
String + performs concatenation.
Logic (produce boolean):
and, or, not
Short-circuit evaluation: and stops on first false, or stops on first true.
orand=,!=,>,>=,<,<=+,-*,/not, unary-- parentheses
null→ falsefalse→ false0/0.0→ false""(empty string) → false- Everything else → true