Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions src/api/useSetAtomWsData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import type {
EmaHistoryArrayKey,
EmaObjectItem,
FromWorkerMessage,
FromWorkerWsEntity,
HistoryArrayKey,
KeyedValuesWithHistory,
WsEntity,
} from "./worker/types";
import { isEmaObjectKey } from "./worker/types";
import { DateTime } from "luxon";
Expand All @@ -18,6 +18,8 @@ import {
skipRateAtom,
setSlotResponseAtom,
epochAtom,
nextEpochAtom,
currentSlotAtom,
setSlotStatusAtom,
updatePeersAtom,
removePeersAtom,
Expand All @@ -30,7 +32,6 @@ import {
deleteSkippedClusterSlotsRangeAtom,
deleteSlotResponseBoundsAtom,
deleteSlotStatusBoundsAtom,
deletePreviousEpochsAtom,
supermajorityEpochAtom,
updateSupermajorityOnlinePeersAtom,
deleteSupermajorityDeltaEntriesAtom,
Expand Down Expand Up @@ -169,6 +170,10 @@ export function useSetAtomWsData() {
[setTileTimerHistory],
);

const setEpoch = useSetAtom(epochAtom);
const setNextEpoch = useSetAtom(nextEpochAtom);
const setCurrentSlot = useSetAtom(currentSlotAtom);

const onMessage = useCallback(
(msg: FromWorkerMessage) => {
switch (msg.type) {
Expand Down Expand Up @@ -207,11 +212,21 @@ export function useSetAtomWsData() {
updateEmaHistoryObject(item);
}
break;
case "currentSlot":
setCurrentSlot(msg.slot);
break;
case "epochData":
setEpoch(msg.currentEpoch);
setNextEpoch(msg.nextEpoch);
break;
}
},
[
setSocketState,
updateAtoms,
setCurrentSlot,
setEpoch,
setNextEpoch,
updateEmaHistoryArray,
updateHistoryArray,
updateEmaHistoryObject,
Expand Down Expand Up @@ -314,7 +329,7 @@ function useUpdateAtoms() {
const setSlotResponse = useSetAtom(setSlotResponseAtom);
const setSlotRankings = useSetAtom(slotRankingsAtom);

const [epoch, setEpoch] = useAtom(epochAtom);
const epoch = useAtomValue(epochAtom);

const setSlotStatus = useSetAtom(setSlotStatusAtom);

Expand Down Expand Up @@ -515,7 +530,7 @@ function useUpdateAtoms() {
);

const updateAtoms = useCallback(
(item: WsEntity) => {
(item: FromWorkerWsEntity) => {
const { topic, key, value } = item;
switch (topic) {
case "summary":
Expand Down Expand Up @@ -661,13 +676,6 @@ function useUpdateAtoms() {
break;
}
break;
case "epoch":
switch (key) {
case "new":
setEpoch(value);
break;
}
break;
case "gossip":
switch (key) {
case "network_stats": {
Expand Down Expand Up @@ -778,7 +786,6 @@ function useUpdateAtoms() {
setDbLiveTileMetrics,
setDbLiveTxnWaterfall,
setDbTileTimer,
setEpoch,
setGossipPeersCells,
setGossipPeersRows,
setIdentityBalance,
Expand Down Expand Up @@ -814,7 +821,6 @@ function useUpdateAtoms() {
const deleteSkippedClusterSlotsRange = useSetAtom(
deleteSkippedClusterSlotsRangeAtom,
);
const deletePreviousEpochs = useSetAtom(deletePreviousEpochsAtom);

useInterval(() => {
deleteSlotStatusBounds();
Expand All @@ -832,8 +838,7 @@ function useUpdateAtoms() {
useEffect(() => {
if (!epoch) return;
deleteSkippedClusterSlotsRange(epoch.start_slot, epoch.end_slot);
deletePreviousEpochs(epoch.epoch);
}, [deleteSkippedClusterSlotsRange, deletePreviousEpochs, epoch]);
}, [deleteSkippedClusterSlotsRange, epoch]);

useEffect(() => {
if (!epoch) return;
Expand Down
76 changes: 76 additions & 0 deletions src/api/worker/cache/epochCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import type { Epoch } from "../../types";

/**
* Post on updates
*/
export function createEpochCache(
postCurrentSlot: (currentSlot: number) => void,
postEpochs: (args: {
currentEpoch: Epoch | undefined;
nextEpoch: Epoch | undefined;
}) => void,
) {
let epochs: Epoch[] = [];
let currentEpoch: Epoch | undefined;
let nextEpoch: Epoch | undefined;
let currentSlot: number | undefined;

function findCurrentEpoch() {
const slot = currentSlot;
if (!epochs.length || slot === undefined) return;

const epoch = epochs.find(
({ start_slot, end_slot }) => slot >= start_slot && slot <= end_slot,
);
if (!epoch) return;

return epoch;
}

function findNextEpoch(currentEpochNumber?: number) {
if (currentEpochNumber == null) return;
return epochs.find((epoch) => epoch.epoch === currentEpochNumber + 1);
}

function updateAndPostEpochs() {
const newCurrentEpoch = findCurrentEpoch();
const newCurrentEpochNumber = newCurrentEpoch?.epoch;

const newNextEpoch = findNextEpoch(newCurrentEpochNumber);
const changed =
newCurrentEpoch !== currentEpoch || newNextEpoch !== nextEpoch;

currentEpoch = newCurrentEpoch;
nextEpoch = newNextEpoch;

if (newCurrentEpochNumber != null) {
// delete past epochs
epochs = epochs.filter((epoch) => epoch.epoch >= newCurrentEpochNumber);
}

if (changed) {
postEpochs({ currentEpoch, nextEpoch });
}
}
Comment thread
asuzuki-jumptrading marked this conversation as resolved.

return {
setCurrentSlot(slot: number) {
const newCurrentSlot = Math.max(slot, currentSlot ?? 0);
const changed = newCurrentSlot !== currentSlot;
currentSlot = newCurrentSlot;

if (changed) {
postCurrentSlot(currentSlot);
updateAndPostEpochs();
}
},
addEpoch(epoch: Epoch) {
const isDuplicate =
epochs.findIndex((e) => e.epoch === epoch.epoch) !== -1;
if (isDuplicate) return;

epochs.push(epoch);
updateAndPostEpochs();
},
};
}
46 changes: 30 additions & 16 deletions src/api/worker/messageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import {
overviewHistoryBufferMs,
} from "./cache/consts";
import { gossipHealthEmaFields } from "../atoms";
import type {
EmaHistoryArrayKey,
FromWorkerMessage,
HistoryArrayKey,
WsEntity,
import {
isEntry,
type EmaHistoryArrayKey,
type FromWorkerMessage,
type HistoryArrayKey,
type WsEntity,
} from "./types";
import { createEpochCache } from "./cache/epochCache";

const gossipHealthEmaOptions: EmaHistoryObjectCacheOptions = {
halfLifeMs: 5_000,
Expand All @@ -46,17 +48,6 @@ const tileTimerOptions: HistoryArrayOptions = {
historyWindowMs: overviewRenderWindowMs + overviewHistoryBufferMs,
};

function isEntry<

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to types.ts

T extends WsEntity["topic"],
K extends Extract<WsEntity, { topic: T }>["key"],
>(
it: WsEntity,
topic: T,
key: K,
): it is Extract<WsEntity, { topic: T; key: K }> {
return it.topic === topic && it.key === key;
}

export function createMessageHandler(post: (msg: FromWorkerMessage) => void) {
const emaArrayCache = createEmaHistoryArrayCache<EmaHistoryArrayKey>(
(items) => post({ type: "emaHistoryArray", items }),
Expand All @@ -68,10 +59,20 @@ export function createMessageHandler(post: (msg: FromWorkerMessage) => void) {
post({ type: "historyArray", items }),
);

const epochCache = createEpochCache(
(slot) => post({ type: "currentSlot", slot }),
({ currentEpoch, nextEpoch }) =>
post({ type: "epochData", currentEpoch, nextEpoch }),
);

return {
onMessage(item: WsEntity): void {
const nowMs = performance.now();

if (isEntry(item, "epoch", "new")) {
epochCache.addEpoch(item.value);
}

if (isEntry(item, "gossip", "network_stats")) {
emaObjectCache.subscribe("gossipHealth", gossipHealthEmaOptions);
emaObjectCache.update("gossipHealth", item.value.health, nowMs);
Expand All @@ -88,6 +89,19 @@ export function createMessageHandler(post: (msg: FromWorkerMessage) => void) {
historyArrayCache.subscribe("tileTimers", tileTimerOptions);
historyArrayCache.update("tileTimers", item.value);
}

if (isEntry(item, "slot", "update")) {
if (item.value) {
const { slot, level } = item.value.publish;
if (
level === "completed" ||
level === "optimistically_confirmed" ||
level === "rooted"
) {
Comment thread
asuzuki-jumptrading marked this conversation as resolved.
epochCache.setCurrentSlot(slot + 1);
}
}
}
},
};
}
27 changes: 25 additions & 2 deletions src/api/worker/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
supermajoritySchema,
} from "../entities";
import type { GossipHealthEma } from "../atoms";
import type { Epoch } from "../types";

export const WsMessageSchema = z.discriminatedUnion("topic", [
summarySchema,
Expand Down Expand Up @@ -43,12 +44,17 @@ export type WsEntity =
| KvFrom<typeof blockEngineSchema, "block_engine">
| KvFrom<typeof supermajoritySchema, "wait_for_supermajority">;

export type FromWorkerWsEntity = Exclude<
WsEntity,
KvFrom<typeof epochSchema, "epoch">
>;

export type FromWorkerMessage =
| { type: "connecting" }
| { type: "connected" }
| { type: "disconnected" }
| { type: "kvb"; items: WsEntity[] }
| ({ type: "kv" } & WsEntity)
| { type: "kvb"; items: FromWorkerWsEntity[] }
| ({ type: "kv" } & FromWorkerWsEntity)
// batch publisher caches
| { type: "ema"; items: EmaItem[] }
| {
Expand All @@ -59,6 +65,12 @@ export type FromWorkerMessage =
| {
type: "emaHistoryObject";
items: EmaObjectItem<Record<string, number>, string>[];
}
| { type: "currentSlot"; slot: number }
| {
type: "epochData";
currentEpoch: Epoch | undefined;
nextEpoch: Epoch | undefined;
};

export interface EmaItem {
Expand Down Expand Up @@ -108,3 +120,14 @@ export function isEmaObjectKey<K extends keyof EmaHistoryObjectRegistry>(
): item is EmaObjectItem<EmaHistoryObjectRegistry[K], K> {
return item.key === key;
}

export function isEntry<
T extends WsEntity["topic"],
K extends Extract<WsEntity, { topic: T }>["key"],
>(
it: WsEntity,
topic: T,
key: K,
): it is Extract<WsEntity, { topic: T; key: K }> {
return it.topic === topic && it.key === key;
}
14 changes: 12 additions & 2 deletions src/api/worker/wsWorker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { ZstdInit, type ZstdDec } from "@oneidentity/zstd-js/decompress";
import { logDebug, logError, logWarning } from "../../logger";
import { WsMessageSchema, type WsEntity, type ToWorkerMessage } from "./types";
import {
WsMessageSchema,
type WsEntity,
type ToWorkerMessage,
type FromWorkerWsEntity,
} from "./types";
import { createMessageHandler } from "./messageHandler";

const reconnectDelayMs = 3_000;
Expand All @@ -11,13 +16,18 @@ let ws: WebSocket | null = null;
let reconnectTimer: ReturnType<typeof setTimeout>;

let scheduled = false;
const pendingBatches = new Map<string, WsEntity[]>();
const pendingBatches = new Map<string, FromWorkerWsEntity[]>();

const handler = createMessageHandler((msg) => ctx.postMessage(msg));

function enqueue(item: WsEntity) {
handler.onMessage(item);

// filter unnecessary ws items
if (item.topic === "epoch") {
return;
}
Comment thread
asuzuki-jumptrading marked this conversation as resolved.

const key = `${item.topic}:${item.key}`;
if (pendingBatches.has(key)) {
pendingBatches.get(key)?.push(item);
Expand Down
Loading