Skip to content
35 changes: 34 additions & 1 deletion frontend/apps/desktop/src/app-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,34 @@ const DISCOVERY_POLL_INTERVAL_MS = 14_000
const ACTIVITY_POLL_INTERVAL_MS = 1_000
const DELETED_POLL_INTERVAL_MS = 60_000 // Slower polling for deleted/redirected resources

// Cap concurrent in-flight discovery RPCs. The backend has its own worker pool
// (config.MaxWorkers, default 6), but capping on this side keeps grpc-client
// and tRPC overhead bounded when many subs reach a polling tick at once.
const MAX_CONCURRENT_DISCOVERY = 4
let discoveryInFlight = 0
const discoveryQueue: Array<() => void> = []

/** Runs `fn` immediately if a slot is free, otherwise queues it FIFO. */
function withDiscoverySlot<T>(fn: () => Promise<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
const run = () => {
discoveryInFlight++
fn()
.then(resolve, reject)
.finally(() => {
discoveryInFlight--
const next = discoveryQueue.shift()
if (next) next()
})
}
if (discoveryInFlight < MAX_CONCURRENT_DISCOVERY) {
run()
} else {
discoveryQueue.push(run)
}
})
}

/** Returns 1 when app is focused, 10 when backgrounded. */
function getPollingMultiplier(): number {
return isAnyWindowFocused() ? 1 : 10
Expand Down Expand Up @@ -825,7 +853,12 @@ function createSubscription(sub: ResourceSubscription): SubscriptionState {
return
}

runDiscovery(sub)
withDiscoverySlot(async () => {
// Re-check cancellation before consuming a slot — the loop may have been
// unsubscribed while waiting in the queue.
if (cancelled) return null
return runDiscovery(sub)
})
.then((result) => {
if (cancelled) return

Expand Down
6 changes: 5 additions & 1 deletion frontend/apps/desktop/src/components/doc-navigation.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ export function DocNavigation({showCollapsed}: {showCollapsed: boolean}) {
const route = useNavRoute()
if (route.key !== 'document') throw new Error('DocNavigation only supports document route')
const {id} = route
const entity = useResource(id, {subscribed: true, recursive: true}) // recursive subscriptions to make sure children get loaded
// The active document route already opens a high-priority recursive
// subscription on this id, so the navigation pane just needs cached metadata.
// Subscribing again here forks the dedup key (no priority vs `!high`) into
// a second renderer-side entry without adding any daemon coverage.
const entity = useResource(id)
const navigate = useNavigate('replace')
const document =
// @ts-ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ export function EditNavHeaderPane({homeId}: {homeId: UnpackedHypermediaId}) {
const {canEdit, beginEditIfNeeded} = useEditorGate()
const send = useDocumentSend()
const machineNavigation = useDocumentNavigationOptional()
const homeResource = useResource(homeId, {subscribed: true})
// The home doc is already subscribed by the active resource page; this pane
// only reads cached navigation metadata and does not need its own discovery.
const homeResource = useResource(homeId)
const homeDocument = homeResource.data?.type === 'document' ? homeResource.data.document : null

if (!canEdit) return null
Expand Down
6 changes: 4 additions & 2 deletions frontend/apps/desktop/src/components/import-doc-button.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,10 @@ export function useImporting(parentId: UnpackedHypermediaId) {
mutationFn: (url: string) => client.webImporting.checkWebUrl.mutate(url),
})

// Private documents require a site URL and must be at the home doc level
const siteHomeResource = useResource(hmId(parentId.uid), {subscribed: true})
// Private documents require a site URL and must be at the home doc level.
// The home doc's metadata is already cached by the active resource page or
// joined-site recursive sub; this button only reads the site URL field.
const siteHomeResource = useResource(hmId(parentId.uid))
const siteUrl =
siteHomeResource.data?.type === 'document' ? siteHomeResource.data.document?.metadata?.siteUrl : undefined
const isHomeDoc = !parentId.path?.length
Expand Down
12 changes: 8 additions & 4 deletions frontend/apps/desktop/src/components/sidebar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,11 @@ function SubscriptionsSection() {
)
: undefined

// Fetch site resources for all joined sites to ensure metadata is available
// Fetch site resources for all joined sites to ensure metadata is available.
// Joined sites are already covered by derived-subscriptions' root recursive
// subscription, so this listing only needs cached metadata for display.
const siteIds = siteSubscribed?.map((contact) => hmId(contact.subject)) || []
const siteResources = useResources(siteIds, {subscribed: true})
const siteResources = useResources(siteIds)

// Sort by activity using the backend's account order (already sorted by activity desc)
const accounts = accountList.data?.accounts || []
Expand Down Expand Up @@ -482,9 +484,11 @@ function FollowingSection() {
)
: undefined

// Fetch profile resources for all followed contacts to ensure metadata is available
// Fetch profile resources for all followed contacts to ensure metadata is
// available. Followed accounts are already covered by derived-subscriptions'
// root recursive subscription, so this listing only needs cached metadata.
const profileIds = profileSubscribed?.map((contact) => hmId(contact.subject)) || []
const profileResources = useResources(profileIds, {subscribed: true})
const profileResources = useResources(profileIds)

// Sort by activity using the backend's account order
const accounts = accountList.data?.accounts || []
Expand Down
5 changes: 4 additions & 1 deletion frontend/apps/desktop/src/editor/query-block.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,13 @@ function Render(block: Block<HMBlockSchema>, editor: BlockNoteEditor<HMBlockSche
return null
})
const mode = queryIncludes[0]?.mode || 'Children'
// Always subscribe recursively so that toggling the display mode does not
// fork the renderer dedup key (id vs id/*) for the same query target. The
// daemon already collapses overlapping recursive coverage.
const entity = useResource(queryId, {
enabled: !!queryId,
subscribed: true,
recursive: mode === 'AllDescendants',
recursive: true,
})
const directoryItems = useDirectory(queryId, {
mode,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import {beforeEach, describe, expect, it, vi} from 'vitest'
import {hmId} from '@shm/shared/utils/entity-id-url'

// vi.mock is hoisted, so its factory is evaluated before the rest of this file
// runs. Declare the mock fns via vi.hoisted so they are available to the
// factory while still being stable references the test cases can reset.
const {subscribeMock, discoveryStateSubscribeMock, focusHandlers} = vi.hoisted(() => ({
subscribeMock: vi.fn(),
discoveryStateSubscribeMock: vi.fn(),
focusHandlers: [] as Array<(focused: boolean) => void>,
}))

vi.mock('@/grpc-client', () => ({grpcClient: {}}))

vi.mock('@/trpc', () => ({
client: {
sync: {
subscribe: {subscribe: subscribeMock},
discoveryState: {subscribe: discoveryStateSubscribeMock},
},
},
}))

vi.mock('../documents', () => ({usePushResource: () => vi.fn()}))

vi.mock('../window-focus', () => ({
isThisWindowFocused: () => true,
onThisWindowFocusChange: (handler: (focused: boolean) => void) => {
focusHandlers.push(handler)
return () => {
const idx = focusHandlers.indexOf(handler)
if (idx >= 0) focusHandlers.splice(idx, 1)
}
},
}))

import {addSubscribedEntity, cleanupAllEntitySubscriptions, removeSubscribedEntity} from '../entities'

type Handle = {unsubscribe: ReturnType<typeof vi.fn>}

describe('entity subscription dedup', () => {
let daemonHandles: Handle[]
let discoveryHandles: Handle[]

beforeEach(() => {
cleanupAllEntitySubscriptions()
subscribeMock.mockReset()
discoveryStateSubscribeMock.mockReset()
daemonHandles = []
discoveryHandles = []
subscribeMock.mockImplementation(() => {
const handle: Handle = {unsubscribe: vi.fn()}
daemonHandles.push(handle)
return handle
})
discoveryStateSubscribeMock.mockImplementation(() => {
const handle: Handle = {unsubscribe: vi.fn()}
discoveryHandles.push(handle)
return handle
})
})

it('opens a single daemon sub for two callers on the same entity with different priorities', () => {
const id = hmId('alice', {path: ['doc']})
addSubscribedEntity({id, recursive: true, priority: 'high'})
addSubscribedEntity({id, recursive: true, priority: 'normal'})

expect(subscribeMock).toHaveBeenCalledTimes(1)
expect(subscribeMock).toHaveBeenCalledWith({id, recursive: true}, expect.anything())
expect(discoveryStateSubscribeMock).toHaveBeenCalledTimes(1)
})

it('merges options up: a recursive caller arriving later upgrades the daemon sub', () => {
const id = hmId('alice', {path: ['doc']})
addSubscribedEntity({id, recursive: false})
expect(subscribeMock).toHaveBeenLastCalledWith({id, recursive: false}, expect.anything())

addSubscribedEntity({id, recursive: true})
expect(subscribeMock).toHaveBeenCalledTimes(2)
expect(subscribeMock).toHaveBeenLastCalledWith({id, recursive: true}, expect.anything())
// Old non-recursive sub torn down when re-issued.
expect(daemonHandles[0]!.unsubscribe).toHaveBeenCalledTimes(1)
expect(daemonHandles[1]!.unsubscribe).not.toHaveBeenCalled()
})

it('does not churn the daemon sub on rapid unmount/remount of a single caller', async () => {
const id = hmId('alice', {path: ['doc']})
const oldSub = {id}
const newSub = {id}

addSubscribedEntity(oldSub)
removeSubscribedEntity(oldSub)
addSubscribedEntity(newSub)
// Flush the queueMicrotask that defers the removal sync.
await Promise.resolve()
await Promise.resolve()

expect(subscribeMock).toHaveBeenCalledTimes(1)
expect(daemonHandles[0]!.unsubscribe).not.toHaveBeenCalled()
})

it('keeps the shared discovery-state sub alive until the last caller leaves', async () => {
const id = hmId('alice', {path: ['doc']})
const subA = {id, priority: 'high' as const}
const subB = {id, priority: 'normal' as const}

addSubscribedEntity(subA)
addSubscribedEntity(subB)
expect(discoveryStateSubscribeMock).toHaveBeenCalledTimes(1)

removeSubscribedEntity(subA)
await Promise.resolve()
await Promise.resolve()
// B is still active, the shared discovery sub must NOT have been torn down.
expect(discoveryHandles[0]!.unsubscribe).not.toHaveBeenCalled()

removeSubscribedEntity(subB)
await Promise.resolve()
await Promise.resolve()
expect(discoveryHandles[0]!.unsubscribe).toHaveBeenCalledTimes(1)
expect(daemonHandles[0]!.unsubscribe).toHaveBeenCalledTimes(1)
})

it('downgrades the merged options when the recursive caller leaves', async () => {
const id = hmId('alice', {path: ['doc']})
const recSub = {id, recursive: true}
const nonRecSub = {id, recursive: false}

addSubscribedEntity(nonRecSub)
addSubscribedEntity(recSub)
expect(subscribeMock).toHaveBeenCalledTimes(2)
// After the recursive caller arrives, current sub is recursive: true.

removeSubscribedEntity(recSub)
await Promise.resolve()
await Promise.resolve()

// Should have re-issued back to recursive: false now that only non-rec caller remains.
expect(subscribeMock).toHaveBeenCalledTimes(3)
expect(subscribeMock).toHaveBeenLastCalledWith({id, recursive: false}, expect.anything())
})

it('tracks separate entity ids independently', () => {
const aliceDoc = hmId('alice', {path: ['doc']})
const bobDoc = hmId('bob', {path: ['doc']})
addSubscribedEntity({id: aliceDoc})
addSubscribedEntity({id: bobDoc})

expect(subscribeMock).toHaveBeenCalledTimes(2)
expect(discoveryStateSubscribeMock).toHaveBeenCalledTimes(2)
})
})

describe('window blur pause', () => {
beforeEach(() => {
cleanupAllEntitySubscriptions()
subscribeMock.mockReset()
discoveryStateSubscribeMock.mockReset()
subscribeMock.mockImplementation(() => ({unsubscribe: vi.fn()}))
discoveryStateSubscribeMock.mockImplementation(() => ({unsubscribe: vi.fn()}))
})

it('tears down daemon subs after the grace period and re-issues on focus', () => {
vi.useFakeTimers()
try {
const id = hmId('alice', {path: ['doc']})
addSubscribedEntity({id})
expect(subscribeMock).toHaveBeenCalledTimes(1)
const initialDaemonHandle = subscribeMock.mock.results[0]!.value as {unsubscribe: ReturnType<typeof vi.fn>}

// Simulate window blur. Grace timer should be scheduled but not yet fired.
focusHandlers.forEach((h) => h(false))
vi.advanceTimersByTime(29_000)
expect(initialDaemonHandle.unsubscribe).not.toHaveBeenCalled()

// Past the grace window the daemon sub should be torn down.
vi.advanceTimersByTime(2_000)
expect(initialDaemonHandle.unsubscribe).toHaveBeenCalledTimes(1)

// Regaining focus re-issues the daemon sub.
focusHandlers.forEach((h) => h(true))
expect(subscribeMock).toHaveBeenCalledTimes(2)
} finally {
vi.useRealTimers()
}
})

it('cancels the grace timer when focus returns before it fires', () => {
vi.useFakeTimers()
try {
const id = hmId('alice', {path: ['doc']})
addSubscribedEntity({id})
const handle = subscribeMock.mock.results[0]!.value as {unsubscribe: ReturnType<typeof vi.fn>}

focusHandlers.forEach((h) => h(false))
vi.advanceTimersByTime(10_000)
focusHandlers.forEach((h) => h(true))
// Advance past where the timer would have fired without the cancel.
vi.advanceTimersByTime(60_000)

expect(handle.unsubscribe).not.toHaveBeenCalled()
// No unnecessary re-issue either, since we never actually paused.
expect(subscribeMock).toHaveBeenCalledTimes(1)
} finally {
vi.useRealTimers()
}
})
})
Loading
Loading