diff --git a/src/api/useSetAtomWsData.ts b/src/api/useSetAtomWsData.ts index ac731925..ab647620 100644 --- a/src/api/useSetAtomWsData.ts +++ b/src/api/useSetAtomWsData.ts @@ -5,9 +5,9 @@ import type { EmaHistoryArrayKey, EmaObjectItem, FromWorkerMessage, + FromWorkerWsEntity, HistoryArrayKey, KeyedValuesWithHistory, - WsEntity, } from "./worker/types"; import { isEmaObjectKey } from "./worker/types"; import { DateTime } from "luxon"; @@ -18,6 +18,8 @@ import { skipRateAtom, setSlotResponseAtom, epochAtom, + nextEpochAtom, + currentSlotAtom, setSlotStatusAtom, updatePeersAtom, removePeersAtom, @@ -30,7 +32,6 @@ import { deleteSkippedClusterSlotsRangeAtom, deleteSlotResponseBoundsAtom, deleteSlotStatusBoundsAtom, - deletePreviousEpochsAtom, supermajorityEpochAtom, updateSupermajorityOnlinePeersAtom, deleteSupermajorityDeltaEntriesAtom, @@ -169,6 +170,10 @@ export function useSetAtomWsData() { [setTileTimerHistory], ); + const setEpoch = useSetAtom(epochAtom); + const setNextEpoch = useSetAtom(nextEpochAtom); + const setCurrentSlot = useSetAtom(currentSlotAtom); + const onMessage = useCallback( (msg: FromWorkerMessage) => { switch (msg.type) { @@ -207,11 +212,21 @@ export function useSetAtomWsData() { updateEmaHistoryObject(item); } break; + case "currentSlot": + setCurrentSlot(msg.slot); + break; + case "epochData": + setEpoch(msg.currentEpoch); + setNextEpoch(msg.nextEpoch); + break; } }, [ setSocketState, updateAtoms, + setCurrentSlot, + setEpoch, + setNextEpoch, updateEmaHistoryArray, updateHistoryArray, updateEmaHistoryObject, @@ -314,7 +329,7 @@ function useUpdateAtoms() { const setSlotResponse = useSetAtom(setSlotResponseAtom); const setSlotRankings = useSetAtom(slotRankingsAtom); - const [epoch, setEpoch] = useAtom(epochAtom); + const epoch = useAtomValue(epochAtom); const setSlotStatus = useSetAtom(setSlotStatusAtom); @@ -515,7 +530,7 @@ function useUpdateAtoms() { ); const updateAtoms = useCallback( - (item: WsEntity) => { + (item: FromWorkerWsEntity) => { const { topic, key, value } = item; switch (topic) { case "summary": @@ -661,13 +676,6 @@ function useUpdateAtoms() { break; } break; - case "epoch": - switch (key) { - case "new": - setEpoch(value); - break; - } - break; case "gossip": switch (key) { case "network_stats": { @@ -778,7 +786,6 @@ function useUpdateAtoms() { setDbLiveTileMetrics, setDbLiveTxnWaterfall, setDbTileTimer, - setEpoch, setGossipPeersCells, setGossipPeersRows, setIdentityBalance, @@ -814,7 +821,6 @@ function useUpdateAtoms() { const deleteSkippedClusterSlotsRange = useSetAtom( deleteSkippedClusterSlotsRangeAtom, ); - const deletePreviousEpochs = useSetAtom(deletePreviousEpochsAtom); useInterval(() => { deleteSlotStatusBounds(); @@ -832,8 +838,7 @@ function useUpdateAtoms() { useEffect(() => { if (!epoch) return; deleteSkippedClusterSlotsRange(epoch.start_slot, epoch.end_slot); - deletePreviousEpochs(epoch.epoch); - }, [deleteSkippedClusterSlotsRange, deletePreviousEpochs, epoch]); + }, [deleteSkippedClusterSlotsRange, epoch]); useEffect(() => { if (!epoch) return; diff --git a/src/api/worker/cache/epochCache.ts b/src/api/worker/cache/epochCache.ts new file mode 100644 index 00000000..75544774 --- /dev/null +++ b/src/api/worker/cache/epochCache.ts @@ -0,0 +1,76 @@ +import type { Epoch } from "../../types"; + +/** + * Post on updates + */ +export function createEpochCache( + postCurrentSlot: (currentSlot: number) => void, + postEpochs: (args: { + currentEpoch: Epoch | undefined; + nextEpoch: Epoch | undefined; + }) => void, +) { + let epochs: Epoch[] = []; + let currentEpoch: Epoch | undefined; + let nextEpoch: Epoch | undefined; + let currentSlot: number | undefined; + + function findCurrentEpoch() { + const slot = currentSlot; + if (!epochs.length || slot === undefined) return; + + const epoch = epochs.find( + ({ start_slot, end_slot }) => slot >= start_slot && slot <= end_slot, + ); + if (!epoch) return; + + return epoch; + } + + function findNextEpoch(currentEpochNumber?: number) { + if (currentEpochNumber == null) return; + return epochs.find((epoch) => epoch.epoch === currentEpochNumber + 1); + } + + function updateAndPostEpochs() { + const newCurrentEpoch = findCurrentEpoch(); + const newCurrentEpochNumber = newCurrentEpoch?.epoch; + + const newNextEpoch = findNextEpoch(newCurrentEpochNumber); + const changed = + newCurrentEpoch !== currentEpoch || newNextEpoch !== nextEpoch; + + currentEpoch = newCurrentEpoch; + nextEpoch = newNextEpoch; + + if (newCurrentEpochNumber != null) { + // delete past epochs + epochs = epochs.filter((epoch) => epoch.epoch >= newCurrentEpochNumber); + } + + if (changed) { + postEpochs({ currentEpoch, nextEpoch }); + } + } + + return { + setCurrentSlot(slot: number) { + const newCurrentSlot = Math.max(slot, currentSlot ?? 0); + const changed = newCurrentSlot !== currentSlot; + currentSlot = newCurrentSlot; + + if (changed) { + postCurrentSlot(currentSlot); + updateAndPostEpochs(); + } + }, + addEpoch(epoch: Epoch) { + const isDuplicate = + epochs.findIndex((e) => e.epoch === epoch.epoch) !== -1; + if (isDuplicate) return; + + epochs.push(epoch); + updateAndPostEpochs(); + }, + }; +} diff --git a/src/api/worker/messageHandler.ts b/src/api/worker/messageHandler.ts index 485fc891..4f8726dd 100644 --- a/src/api/worker/messageHandler.ts +++ b/src/api/worker/messageHandler.ts @@ -17,12 +17,14 @@ import { overviewHistoryBufferMs, } from "./cache/consts"; import { gossipHealthEmaFields } from "../atoms"; -import type { - EmaHistoryArrayKey, - FromWorkerMessage, - HistoryArrayKey, - WsEntity, +import { + isEntry, + type EmaHistoryArrayKey, + type FromWorkerMessage, + type HistoryArrayKey, + type WsEntity, } from "./types"; +import { createEpochCache } from "./cache/epochCache"; const gossipHealthEmaOptions: EmaHistoryObjectCacheOptions = { halfLifeMs: 5_000, @@ -46,17 +48,6 @@ const tileTimerOptions: HistoryArrayOptions = { historyWindowMs: overviewRenderWindowMs + overviewHistoryBufferMs, }; -function isEntry< - T extends WsEntity["topic"], - K extends Extract["key"], ->( - it: WsEntity, - topic: T, - key: K, -): it is Extract { - return it.topic === topic && it.key === key; -} - export function createMessageHandler(post: (msg: FromWorkerMessage) => void) { const emaArrayCache = createEmaHistoryArrayCache( (items) => post({ type: "emaHistoryArray", items }), @@ -68,10 +59,20 @@ export function createMessageHandler(post: (msg: FromWorkerMessage) => void) { post({ type: "historyArray", items }), ); + const epochCache = createEpochCache( + (slot) => post({ type: "currentSlot", slot }), + ({ currentEpoch, nextEpoch }) => + post({ type: "epochData", currentEpoch, nextEpoch }), + ); + return { onMessage(item: WsEntity): void { const nowMs = performance.now(); + if (isEntry(item, "epoch", "new")) { + epochCache.addEpoch(item.value); + } + if (isEntry(item, "gossip", "network_stats")) { emaObjectCache.subscribe("gossipHealth", gossipHealthEmaOptions); emaObjectCache.update("gossipHealth", item.value.health, nowMs); @@ -88,6 +89,19 @@ export function createMessageHandler(post: (msg: FromWorkerMessage) => void) { historyArrayCache.subscribe("tileTimers", tileTimerOptions); historyArrayCache.update("tileTimers", item.value); } + + if (isEntry(item, "slot", "update")) { + if (item.value) { + const { slot, level } = item.value.publish; + if ( + level === "completed" || + level === "optimistically_confirmed" || + level === "rooted" + ) { + epochCache.setCurrentSlot(slot + 1); + } + } + } }, }; } diff --git a/src/api/worker/types.ts b/src/api/worker/types.ts index e39bfa92..6c08ed8e 100644 --- a/src/api/worker/types.ts +++ b/src/api/worker/types.ts @@ -9,6 +9,7 @@ import { supermajoritySchema, } from "../entities"; import type { GossipHealthEma } from "../atoms"; +import type { Epoch } from "../types"; export const WsMessageSchema = z.discriminatedUnion("topic", [ summarySchema, @@ -43,12 +44,17 @@ export type WsEntity = | KvFrom | KvFrom; +export type FromWorkerWsEntity = Exclude< + WsEntity, + KvFrom +>; + export type FromWorkerMessage = | { type: "connecting" } | { type: "connected" } | { type: "disconnected" } - | { type: "kvb"; items: WsEntity[] } - | ({ type: "kv" } & WsEntity) + | { type: "kvb"; items: FromWorkerWsEntity[] } + | ({ type: "kv" } & FromWorkerWsEntity) // batch publisher caches | { type: "ema"; items: EmaItem[] } | { @@ -59,6 +65,12 @@ export type FromWorkerMessage = | { type: "emaHistoryObject"; items: EmaObjectItem, string>[]; + } + | { type: "currentSlot"; slot: number } + | { + type: "epochData"; + currentEpoch: Epoch | undefined; + nextEpoch: Epoch | undefined; }; export interface EmaItem { @@ -108,3 +120,14 @@ export function isEmaObjectKey( ): item is EmaObjectItem { return item.key === key; } + +export function isEntry< + T extends WsEntity["topic"], + K extends Extract["key"], +>( + it: WsEntity, + topic: T, + key: K, +): it is Extract { + return it.topic === topic && it.key === key; +} diff --git a/src/api/worker/wsWorker.ts b/src/api/worker/wsWorker.ts index 15f042ad..c80f28bb 100644 --- a/src/api/worker/wsWorker.ts +++ b/src/api/worker/wsWorker.ts @@ -1,6 +1,11 @@ import { ZstdInit, type ZstdDec } from "@oneidentity/zstd-js/decompress"; import { logDebug, logError, logWarning } from "../../logger"; -import { WsMessageSchema, type WsEntity, type ToWorkerMessage } from "./types"; +import { + WsMessageSchema, + type WsEntity, + type ToWorkerMessage, + type FromWorkerWsEntity, +} from "./types"; import { createMessageHandler } from "./messageHandler"; const reconnectDelayMs = 3_000; @@ -11,13 +16,18 @@ let ws: WebSocket | null = null; let reconnectTimer: ReturnType; let scheduled = false; -const pendingBatches = new Map(); +const pendingBatches = new Map(); const handler = createMessageHandler((msg) => ctx.postMessage(msg)); function enqueue(item: WsEntity) { handler.onMessage(item); + // filter unnecessary ws items + if (item.topic === "epoch") { + return; + } + const key = `${item.topic}:${item.key}`; if (pendingBatches.has(key)) { pendingBatches.get(key)?.push(item); diff --git a/src/atoms.ts b/src/atoms.ts index cf869c88..6436d137 100644 --- a/src/atoms.ts +++ b/src/atoms.ts @@ -39,51 +39,8 @@ export const _isNavCollapsedAtom = atom(false); export const bootProgressContainerElAtom = atom(); -const _epochsAtom = atomWithImmer([]); -export const epochAtom = atom( - (get) => { - const currentSlot = get(currentSlotAtom); - const epochs = get(_epochsAtom); - if (!epochs.length || currentSlot === undefined) return; - - const epoch = epochs.find( - ({ start_slot, end_slot }) => - currentSlot >= start_slot && currentSlot <= end_slot, - ); - if (!epoch) return; - - return epoch; - }, - (_get, set, epoch: Epoch) => { - set(_epochsAtom, (draft) => { - const isDuplicate = - draft.findIndex((e) => e.epoch === epoch.epoch) !== -1; - if (isDuplicate) return; - - draft.push(epoch); - }); - }, -); - -export const deletePreviousEpochsAtom = atom( - null, - (_get, set, currentEpoch: number) => { - set(_epochsAtom, (draft) => - draft.filter(({ epoch }) => epoch >= currentEpoch), - ); - }, -); - -export const nextEpochAtom = atom((get) => { - const currentEpoch = get(epochAtom); - if (!currentEpoch) return; - - const nextEpoch = get(_epochsAtom).find( - (epoch) => epoch.epoch === currentEpoch?.epoch + 1, - ); - - return nextEpoch; -}); +export const epochAtom = atom(); +export const nextEpochAtom = atom(); export const [slotOverrideAtom, autoScrollAtom] = (function getSlotOverrideAtom() { @@ -145,13 +102,6 @@ export const slotNavFilterAtom = (function getSlotNavFilterAtom() { export const setSlotStatusAtom = atom( null, (_, set, slot: number, level: SlotLevel) => { - if ( - level === "completed" || - level === "optimistically_confirmed" || - level === "rooted" - ) { - set(currentSlotAtom, slot + 1); - } set(slotStatusAtom, (draft) => { draft[slot] = level; }); @@ -341,18 +291,21 @@ export const lastProcessedLeaderAtom = atom((get) => { : undefined; }); -const _currentSlotAtom = atom(undefined); -export const currentSlotAtom = atom( - (get) => get(_currentSlotAtom), - (get, set, slot: number) => { - const nextLeaderSlot = get(nextLeaderSlotAtom); - if (nextLeaderSlot === undefined || slot >= nextLeaderSlot) { - set(nextLeaderSlotAtom, slot); - } +export const currentSlotAtom = (function getCurrentSlotAtom() { + const _currentSlotAtom = atom(); - set(_currentSlotAtom, (prev) => Math.max(slot, prev ?? 0)); - }, -); + return atom( + (get) => get(_currentSlotAtom), + (get, set, slot: number) => { + const nextLeaderSlot = get(nextLeaderSlotAtom); + if (nextLeaderSlot === undefined || slot >= nextLeaderSlot) { + set(nextLeaderSlotAtom, slot); + } + + set(_currentSlotAtom, slot); + }, + ); +})(); /** In order array of your leader slots (only first slot in group of 4) */ export const leaderSlotsAtom = atom((get) => {