Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/rfl/datalog.rfl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
; ── 10. sym-name: readable output ───────────────────────
(println "--- sym-name: convert intern IDs ---")
(set p (pull db 1))
(set name-id (get p 1))
(set name-id (get p 'name))
(println name-id)
(println (sym-name name-id))

Expand Down
10 changes: 5 additions & 5 deletions examples/rfl/flips.rfl
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

(set flips (except (.csv.read
[DATE I64 I64 SYMBOL I64 SYMBOL TIME F64 F64 I64
I64 I64 SYMBOL GUID SYMBOL C8 C8 I64 I64 F64 I64 SYMBOL
SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL C8 SYMBOL SYMBOL F64
C8 C8 C8 C8 C8 C8 SYMBOL GUID F64 C8 I64 C8
SYMBOL SYMBOL I64 C8 C8 SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL
I64 I64 SYMBOL GUID SYMBOL STR STR I64 I64 F64 I64 SYMBOL
SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL STR SYMBOL SYMBOL F64
STR STR STR STR STR STR SYMBOL GUID F64 STR I64 STR
SYMBOL SYMBOL I64 STR STR SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL
SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL
SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL SYMBOL
SYMBOL SYMBOL F64 C8 I64 I64 I64 F64 F64 F64 F64 I64 I64 I64 I64
SYMBOL SYMBOL F64 STR I64 I64 I64 F64 F64 F64 F64 I64 I64 I64 I64
SYMBOL I64 I64 SYMBOL SYMBOL SYMBOL TIME SYMBOL I64]
csvpath) 'date))

Expand Down
44 changes: 30 additions & 14 deletions examples/rfl/journal.rfl
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
;; Journaling example
(set f (fn [x y] (println "RES: %" (+ x y))))

;; Write journal
(set h (.ipc.open "/tmp/jou.log"))
(write h (list 'f 1 2))
(write h (list 'f 2 3))
(write h (list 'f 3 4))
(.ipc.close h)

;; Replay journal
(set h (.ipc.open "/tmp/jou.log"))
(read h)
(.ipc.close h)
;; Transaction-log journaling example (the `.log.*` API).
;;
;; In production you start the server with `-l <base>` (async) or `-L <base>`
;; (sync, fsync per write): every mutation that arrives over IPC is appended to
;; <base>.log and replayed automatically on restart, so user state survives a
;; crash. The `.log.*` verbs expose that machinery directly.

(set base "/tmp/jou_ex")
(.sys.exec "rm -f /tmp/jou_ex*")

;; Open a journal in async mode (writes buffered; use 'sync for fsync-per-write).
(.log.open 'async base)

;; Append entries. `.log.write` serializes its (already-evaluated) argument
;; into the log; replay re-evaluates each payload in order.
(.log.write 100)
(.log.write [1 2 3])
(.log.write {sym: 'AAPL px: 191.5})

;; Flush buffered writes to disk and close.
(.log.sync)
(.log.close)

;; Inspect the log: (.log.validate path) -> [chunks valid_bytes].
(println "validate %.log -> %" base (.log.validate (format "%.log" base)))

;; Replay the journal — returns the number of entries replayed.
(println "replayed % entries" (.log.replay (format "%.log" base)))

(.sys.exec "rm -f /tmp/jou_ex*")
(exit 0)
26 changes: 23 additions & 3 deletions examples/rfl/window.rfl
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
;; Window join example.
;;
;; A window join aggregates, for each trade, the quotes whose timestamps fall
;; in a per-trade time window. The `intervals` argument is two parallel
;; vectors — (lo_vec hi_vec) — with one entry per trade row: the window for
;; trade r is [lo_vec[r], hi_vec[r]].
;;
;; window-join (wj) seeds each window with the PREVAILING quote — the last
;; quote at/before lo — then extends through hi.
;; window-join1 (wj1) is strict: only quotes whose time is inside [lo, hi].

(set trades (table [Sym Time Price] (list [x x x] [12:00:01 12:00:04 12:00:06] [89.17 70.5 80.54])))
(set quotes (table [Sym Time Size] (list [x x x x x x x x x x] [12:00:00 12:00:01 12:00:02 12:00:03 12:00:04 12:00:05 12:00:06 12:00:07 12:00:08 12:00:09] [928 528 648 914 918 626 577 817 620 698])))

(set intervals (list [11:59:59 12:00:02 12:00:04] [12:00:03 12:00:06 12:00:08]))

;; (window-join [Sym Time] intervals trades quotes {a: (sum Bid) b: (sum Ask)})
;; Per-trade windows: two parallel vectors (lo_vec hi_vec) built with map-left,
;; here ±1.5s around each trade time. The 1.5s offset makes each window open
;; *between* quotes, so wj's prevailing-quote seed differs visibly from wj1.
(set intervals (map-left + [-1500 1500] (at trades 'Time)))

(println "window-join (prevailing quote seeded):")
(println "%" (window-join [Sym Time] intervals trades quotes {n: (count Size) lo: (min Size) hi: (max Size) total: (sum Size)}))

(println "window-join1 (strict window):")
(println "%" (window-join1 [Sym Time] intervals trades quotes {n: (count Size) lo: (min Size) hi: (max Size) total: (sum Size)}))

(exit 0)
2 changes: 1 addition & 1 deletion src/lang/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -2728,7 +2728,7 @@ static void ray_register_builtins(void) {
register_vary("inner-join", RAY_FN_NONE, ray_inner_join_fn);
register_vary("anti-join", RAY_FN_NONE, ray_anti_join_fn);
register_vary("window-join", RAY_FN_SPECIAL_FORM, ray_window_join_fn);
register_vary("window-join1", RAY_FN_SPECIAL_FORM, ray_window_join_fn);
register_vary("window-join1", RAY_FN_SPECIAL_FORM, ray_window_join1_fn);
register_vary("asof-join", RAY_FN_NONE, ray_asof_join_fn);

/* I/O builtins */
Expand Down
1 change: 1 addition & 0 deletions src/lang/eval.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ ray_t* ray_xbar_fn(ray_t* col, ray_t* bucket);
ray_t* ray_left_join_fn(ray_t** args, int64_t n);
ray_t* ray_inner_join_fn(ray_t** args, int64_t n);
ray_t* ray_window_join_fn(ray_t** args, int64_t n);
ray_t* ray_window_join1_fn(ray_t** args, int64_t n);

/* I/O */
ray_t* ray_println_fn(ray_t** args, int64_t n);
Expand Down
1 change: 1 addition & 0 deletions src/lang/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ ray_t* ray_left_join_fn(ray_t** args, int64_t n);
ray_t* ray_inner_join_fn(ray_t** args, int64_t n);
ray_t* ray_anti_join_fn(ray_t** args, int64_t n);
ray_t* ray_window_join_fn(ray_t** args, int64_t n);
ray_t* ray_window_join1_fn(ray_t** args, int64_t n);
ray_t* ray_asof_join_fn(ray_t** args, int64_t n);

/* Graph builtins (.graph.* family). Implemented in src/ops/graph_builtin.c —
Expand Down
5 changes: 4 additions & 1 deletion src/ops/builtins.c
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,10 @@ ray_t* ray_timeit_fn(ray_t** args, int64_t n) {
int64_t t0 = ray_profile_now_ns();
ray_t* result = ray_eval(args[0]);
int64_t t1 = ray_profile_now_ns();
if (result && !RAY_IS_ERR(result)) ray_release(result);
/* Propagate errors instead of swallowing them — otherwise a failing
* expression silently reports a timing as if it had succeeded. */
if (result && RAY_IS_ERR(result)) return result;
if (result) ray_release(result);
double ms = (double)(t1 - t0) / 1e6;
return make_f64(ms);
}
Expand Down
142 changes: 102 additions & 40 deletions src/ops/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -11153,6 +11153,7 @@ typedef struct {
int64_t right_nrows;
int64_t n_eq;
int64_t n_agg;
int mode; /* 0 = wj (prevailing-quote seeded), 1 = wj1 (strict) */

/* Left-row metadata — pre-extracted to int64 so workers can read
* without touching any ray_t objects (no locking, no allocation). */
Expand Down Expand Up @@ -11183,6 +11184,18 @@ typedef struct {
uint8_t* result_null[WJ_MAX_AGG]; /* 1 byte per row: 1 = null */
} wj_scan_ctx_t;

/* Compare the equality tuple of sorted-right row `ri` against `target_eq`.
* Returns <0, 0, >0. Used to bracket a left row's eq partition. */
static inline int wj_eq_cmp(const wj_scan_ctx_t* c, int64_t ri,
const int64_t* target_eq, int64_t n_eq) {
for (int64_t e = 0; e < n_eq; e++) {
int64_t rv = read_col_i64(c->eq_data[e], ri, c->eq_type[e], c->eq_attrs[e]);
if (rv < target_eq[e]) return -1;
if (rv > target_eq[e]) return 1;
}
return 0;
}

static void wj_scan_fn(void* ctx_, uint32_t worker_id, int64_t start, int64_t end) {
(void)worker_id;
wj_scan_ctx_t* c = (wj_scan_ctx_t*)ctx_;
Expand All @@ -11201,33 +11214,49 @@ static void wj_scan_fn(void* ctx_, uint32_t worker_id, int64_t start, int64_t en
for (int64_t e = 0; e < n_eq; e++)
target_eq[e] = c->left_eq_arr[e][lr];

/* lower_bound: first rank with (eq, time) >= (target_eq, lo) */
int64_t lb = 0, lb_hi = rn;
while (lb < lb_hi) {
int64_t m = (lb + lb_hi) >> 1;
int64_t ri = right_sort[m];
int cmp = 0;
for (int64_t e = 0; e < n_eq && cmp == 0; e++) {
int64_t rv = read_col_i64(c->eq_data[e], ri, c->eq_type[e], c->eq_attrs[e]);
if (rv < target_eq[e]) cmp = -1;
else if (rv > target_eq[e]) cmp = 1;
}
if (cmp == 0 && rt_time_i[ri] < lo) cmp = -1;
if (cmp < 0) lb = m + 1; else lb_hi = m;
}
int64_t ub = lb, ub_hi = rn;
/* Locate the eq partition [ps, pe) in the sorted right table — the
* contiguous run of ranks whose equality tuple == target_eq. Compare
* eq keys only (the sort is (eq..., time), so the run is contiguous). */
int64_t ps = 0, ps_hi = rn;
while (ps < ps_hi) {
int64_t m = (ps + ps_hi) >> 1;
if (wj_eq_cmp(c, right_sort[m], target_eq, n_eq) < 0) ps = m + 1;
else ps_hi = m;
}
int64_t pe = ps, pe_hi = rn;
while (pe < pe_hi) {
int64_t m = (pe + pe_hi) >> 1;
if (wj_eq_cmp(c, right_sort[m], target_eq, n_eq) <= 0) pe = m + 1;
else pe_hi = m;
}

/* Upper bound (both modes): first rank in [ps, pe) with time > hi. */
int64_t ub = ps, ub_hi = pe;
while (ub < ub_hi) {
int64_t m = (ub + ub_hi) >> 1;
int64_t ri = right_sort[m];
int cmp = 0;
for (int64_t e = 0; e < n_eq && cmp == 0; e++) {
int64_t rv = read_col_i64(c->eq_data[e], ri, c->eq_type[e], c->eq_attrs[e]);
if (rv < target_eq[e]) cmp = -1;
else if (rv > target_eq[e]) cmp = 1;
if (rt_time_i[right_sort[m]] <= hi) ub = m + 1; else ub_hi = m;
}

/* Lower bound differs by mode:
* wj1 (mode 1): first rank in [ps, pe) with time >= lo (strict window).
* wj (mode 0): the prevailing quote — rightmost rank with time <= lo,
* i.e. (first rank with time > lo) - 1, clamped to ps. */
int64_t lb;
if (c->mode == 1) {
lb = ps; int64_t lbh = pe;
while (lb < lbh) {
int64_t m = (lb + lbh) >> 1;
if (rt_time_i[right_sort[m]] < lo) lb = m + 1; else lbh = m;
}
} else {
int64_t r = ps, rh = pe;
while (r < rh) {
int64_t m = (r + rh) >> 1;
if (rt_time_i[right_sort[m]] <= lo) r = m + 1; else rh = m;
}
if (cmp == 0 && rt_time_i[ri] <= hi) cmp = -1;
if (cmp < 0) ub = m + 1; else ub_hi = m;
lb = (r > ps) ? r - 1 : ps;
}
if (lb > ub) lb = ub; /* empty window / no partition → null result */

memset(acc, 0, sizeof(acc));
for (int64_t a = 0; a < n_agg; a++) {
Expand Down Expand Up @@ -11477,10 +11506,21 @@ static void wj_scan_fn(void* ctx_, uint32_t worker_id, int64_t start, int64_t en
}
}

/* (window-join t1 t2 [eq-keys] time-col)
* ASOF join: for each left row, find closest right row with time <= left.time
* within the same equality partition. */
ray_t* ray_window_join_fn(ray_t** args, int64_t n) {
/* Window join (Rayforce convention):
* (window-join [eq-keys.. timeKey] intervals left right {agg})
* (window-join1 [eq-keys.. timeKey] intervals left right {agg})
* `intervals` is a 2-element list (lo_vec hi_vec) of parallel vectors, one
* entry per left row: the window for left row r is [lo_vec[r], hi_vec[r]].
*
* mode 0 (window-join / wj): each window is seeded with the PREVAILING quote
* — the rightmost quote with time <= lo[r] within the eq partition — then
* extended through the last quote with time <= hi[r].
* mode 1 (window-join1 / wj1): strict window — only quotes whose time falls in
* [lo[r], hi[r]].
*
* The legacy asof fall-through ((window-join L R [keys] time)) is mode-agnostic.
*/
static ray_t* window_join_impl(ray_t** args, int64_t n, int mode) {
if (n < 4) return ray_error("domain", NULL);

/* Special form: evaluate first 4 args, keep agg dict (args[4]) unevaluated */
Expand Down Expand Up @@ -11812,26 +11852,37 @@ ray_t* ray_window_join_fn(ray_t** args, int64_t n) {
for (int i = 0; i < 4; i++) ray_release(eargs[i]);
return ray_error("oom", NULL);
}
for (int64_t lr = 0; lr < left_nrows; lr++) {
int alloc_iv = 0;
ray_t* iv = collection_elem(intervals, lr, &alloc_iv);
if (!iv || RAY_IS_ERR(iv) || ray_len(iv) < 2) {
if (alloc_iv && iv) ray_release(iv);
/* `intervals` is the v1 two-parallel-vector form: a 2-element list
* (lo_vec hi_vec), each a vector with one entry per left row. The
* window for left row r is [lo_vec[r], hi_vec[r]]. */
{
int alloc_lo_v = 0, alloc_hi_v = 0;
ray_t* lo_vec = (ray_len(intervals) >= 2)
? collection_elem(intervals, 0, &alloc_lo_v) : NULL;
ray_t* hi_vec = (ray_len(intervals) >= 2)
? collection_elem(intervals, 1, &alloc_hi_v) : NULL;
if (!lo_vec || !hi_vec || RAY_IS_ERR(lo_vec) || RAY_IS_ERR(hi_vec) ||
!ray_is_vec(lo_vec) || !ray_is_vec(hi_vec) ||
ray_len(lo_vec) != left_nrows || ray_len(hi_vec) != left_nrows) {
if (alloc_lo_v && lo_vec) ray_release(lo_vec);
if (alloc_hi_v && hi_vec) ray_release(hi_vec);
if (lo_hdr) scratch_free(lo_hdr);
if (hi_hdr) scratch_free(hi_hdr);
WJ_CLEANUP_TEMP();
for (int64_t a = 0; a < n_agg; a++) ray_release(agg_result_vecs[a]);
for (int i = 0; i < 4; i++) ray_release(eargs[i]);
return ray_error("domain", NULL);
}
int alloc_lo = 0, alloc_hi = 0;
ray_t* lo_atom = collection_elem(iv, 0, &alloc_lo);
ray_t* hi_atom = collection_elem(iv, 1, &alloc_hi);
lo_arr[lr] = as_i64(lo_atom);
hi_arr[lr] = as_i64(hi_atom);
if (alloc_lo) ray_release(lo_atom);
if (alloc_hi) ray_release(hi_atom);
if (alloc_iv) ray_release(iv);
const void* lo_d = ray_data(lo_vec);
const void* hi_d = ray_data(hi_vec);
int8_t lo_t = lo_vec->type, hi_t = hi_vec->type;
uint8_t lo_a = lo_vec->attrs, hi_a = hi_vec->attrs;
for (int64_t lr = 0; lr < left_nrows; lr++) {
lo_arr[lr] = read_col_i64(lo_d, lr, lo_t, lo_a);
hi_arr[lr] = read_col_i64(hi_d, lr, hi_t, hi_a);
}
if (alloc_lo_v) ray_release(lo_vec);
if (alloc_hi_v) ray_release(hi_vec);
}

ray_t* left_eq_hdr[WJ_MAX_AGG] = {0};
Expand Down Expand Up @@ -11883,6 +11934,7 @@ ray_t* ray_window_join_fn(ray_t** args, int64_t n) {
wctx.right_nrows = right_nrows;
wctx.n_eq = n_eq;
wctx.n_agg = n_agg;
wctx.mode = mode;
wctx.lo_arr = lo_arr;
wctx.hi_arr = hi_arr;
wctx.right_sort = right_sort;
Expand Down Expand Up @@ -12005,6 +12057,16 @@ ray_t* ray_window_join_fn(ray_t** args, int64_t n) {
return result;
}

/* window-join (wj): prevailing-quote-seeded window aggregation. */
ray_t* ray_window_join_fn(ray_t** args, int64_t n) {
return window_join_impl(args, n, 0);
}

/* window-join1 (wj1): strict-window aggregation (no prevailing quote). */
ray_t* ray_window_join1_fn(ray_t** args, int64_t n) {
return window_join_impl(args, n, 1);
}

/* (asof-join [key1 key2 ... timeKey] leftTable rightTable)
* Last key is the time/asof column, rest are equality keys. The equality
* keys are OPTIONAL: a lone time key (asof-join [timeKey] L R) performs an
Expand Down
Loading
Loading