diff --git a/apps/hash-api/src/ai/infer-entities-websocket.ts b/apps/hash-api/src/ai/infer-entities-websocket.ts index fa37b5322f2..678f0abdb5b 100644 --- a/apps/hash-api/src/ai/infer-entities-websocket.ts +++ b/apps/hash-api/src/ai/infer-entities-websocket.ts @@ -74,7 +74,7 @@ const inferEntitiesMessageHandler = async ({ temporalClient, }); - for (const flowRun of openFlowRuns) { + for (const flowRun of openFlowRuns.flowRuns) { for (const inputRequest of flowRun.inputRequests) { if (!inputRequest.resolvedAt) { const requestMessage: ExternalInputWebsocketRequestMessage = { diff --git a/apps/hash-api/src/ai/infer-entities-websocket/handle-infer-entities-request.ts b/apps/hash-api/src/ai/infer-entities-websocket/handle-infer-entities-request.ts index 16c7a5ab846..a6859170a4f 100644 --- a/apps/hash-api/src/ai/infer-entities-websocket/handle-infer-entities-request.ts +++ b/apps/hash-api/src/ai/infer-entities-websocket/handle-infer-entities-request.ts @@ -79,7 +79,7 @@ export const handleInferEntitiesRequest = async ({ temporalClient, }); - for (const flowRun of openFlowRuns) { + for (const flowRun of openFlowRuns.flowRuns) { const flowIsAlreadyRunningOnPage = ( flowRun.inputs[0].flowTrigger.outputs as StepOutput< AutomaticInferenceTriggerInputs[AutomaticInferenceTriggerInputName] diff --git a/apps/hash-api/src/graphql/resolvers/flows/get-flow-runs.ts b/apps/hash-api/src/graphql/resolvers/flows/get-flow-runs.ts index acb48745c01..8d2d55cc628 100644 --- a/apps/hash-api/src/graphql/resolvers/flows/get-flow-runs.ts +++ b/apps/hash-api/src/graphql/resolvers/flows/get-flow-runs.ts @@ -10,7 +10,7 @@ import type { GraphQLContext } from "../../context"; import { wereDetailedFieldsRequested } from "./shared/were-detailed-fields-requested"; export const getFlowRunsResolver: ResolverFn< - FlowRun[] | SparseFlowRun[], + { flowRuns: FlowRun[] | SparseFlowRun[]; totalCount: number }, Record, Pick, QueryGetFlowRunsArgs @@ -19,13 +19,15 @@ export const getFlowRunsResolver: ResolverFn< const { authentication, dataSources, temporal } = context; - const { flowDefinitionIds, executionStatus } = args; + const { flowDefinitionIds, executionStatus, offset, limit } = args; return await getFlowRuns({ authentication, filters: { flowDefinitionIds, executionStatus, + offset, + limit, }, graphApiClient: dataSources.graphApi, includeDetails, diff --git a/apps/hash-api/src/graphql/resolvers/flows/shared/were-detailed-fields-requested.ts b/apps/hash-api/src/graphql/resolvers/flows/shared/were-detailed-fields-requested.ts index e7be84fddca..233bd87f3c6 100644 --- a/apps/hash-api/src/graphql/resolvers/flows/shared/were-detailed-fields-requested.ts +++ b/apps/hash-api/src/graphql/resolvers/flows/shared/were-detailed-fields-requested.ts @@ -1,15 +1,26 @@ import type { DetailedFlowField } from "@local/hash-isomorphic-utils/flows/types"; import { detailedFlowFields } from "@local/hash-isomorphic-utils/flows/types"; import type { GraphQLResolveInfo } from "graphql"; +import type { ResolveTree } from "graphql-parse-resolve-info"; import { parseResolveInfo } from "graphql-parse-resolve-info"; +/** + * Works for both `getFlowRuns` (returns `PaginatedFlowRuns` wrapping `FlowRun`) + * and `getFlowRunById` (returns `FlowRun` directly). + */ export const wereDetailedFieldsRequested = ( info: GraphQLResolveInfo, ): boolean => { const parsedResolveInfoFragment = parseResolveInfo(info); - const requestedFieldsTree = - parsedResolveInfoFragment?.fieldsByTypeName.FlowRun; + let requestedFieldsTree = parsedResolveInfoFragment?.fieldsByTypeName.FlowRun; + + if (!requestedFieldsTree) { + const paginatedFields = parsedResolveInfoFragment?.fieldsByTypeName + .PaginatedFlowRuns as Record | undefined; + + requestedFieldsTree = paginatedFields?.flowRuns?.fieldsByTypeName.FlowRun; + } if (!requestedFieldsTree) { throw new Error("Expected FlowRun to be requested in query"); diff --git a/apps/hash-frontend/src/pages/shared/flow-runs-context.tsx b/apps/hash-frontend/src/pages/shared/flow-runs-context.tsx index 405a47a55f2..698889208ec 100644 --- a/apps/hash-frontend/src/pages/shared/flow-runs-context.tsx +++ b/apps/hash-frontend/src/pages/shared/flow-runs-context.tsx @@ -17,9 +17,18 @@ import type { } from "../../graphql/api-types.gen"; import { FlowRunStatus, FlowStepStatus } from "../../graphql/api-types.gen"; +export type FlowRunsPaginationState = { + page: number; + rowsPerPage: number; + onPageChange: (newPage: number) => void; + onRowsPerPageChange: (newRowsPerPage: number) => void; +}; + export type FlowRunsContextType = { - flowRuns: GetFlowRunsQuery["getFlowRuns"]; + flowRuns: GetFlowRunsQuery["getFlowRuns"]["flowRuns"]; + totalCount: number; loading: boolean; + pagination: FlowRunsPaginationState | null; selectedFlowRun: FlowRun | null; selectedFlowRunId: string | null; }; @@ -28,13 +37,25 @@ export const FlowRunsContext = createContext(null); export const FlowRunsContextProvider = ({ children, + pagination, selectedFlowRunId, -}: PropsWithChildren<{ selectedFlowRunId: string | null }>) => { +}: PropsWithChildren<{ + pagination?: FlowRunsPaginationState; + selectedFlowRunId: string | null; +}>) => { + const variables: GetFlowRunsQueryVariables = pagination + ? { + offset: pagination.page * pagination.rowsPerPage, + limit: pagination.rowsPerPage, + } + : { offset: 0, limit: 50 }; + const { data: flowRunsData, loading: flowRunsLoading } = useQuery< GetFlowRunsQuery, GetFlowRunsQueryVariables >(getFlowRunsQuery, { pollInterval: 3_000, + variables, }); const { data: selectedFlowRunData, loading: selectedFlowRunLoading } = @@ -51,11 +72,13 @@ export const FlowRunsContextProvider = ({ const flowRuns = useMemo(() => { if (flowRunsData) { - return flowRunsData.getFlowRuns; + return flowRunsData.getFlowRuns.flowRuns; } return []; }, [flowRunsData]); + const totalCount = flowRunsData?.getFlowRuns.totalCount ?? 0; + const selectedFlowRun = useMemo(() => { if (selectedFlowRunData) { return selectedFlowRunData.getFlowRunById; @@ -66,13 +89,17 @@ export const FlowRunsContextProvider = ({ const context = useMemo( () => ({ flowRuns, + totalCount, loading: selectedFlowRunLoading || flowRunsLoading, + pagination: pagination ?? null, selectedFlowRun, selectedFlowRunId, }), [ flowRuns, + totalCount, flowRunsLoading, + pagination, selectedFlowRunLoading, selectedFlowRun, selectedFlowRunId, diff --git a/apps/hash-frontend/src/pages/workers.page.tsx b/apps/hash-frontend/src/pages/workers.page.tsx index c0a97eb1f76..953761c88f8 100644 --- a/apps/hash-frontend/src/pages/workers.page.tsx +++ b/apps/hash-frontend/src/pages/workers.page.tsx @@ -2,7 +2,7 @@ import { TerminalLightIcon } from "@hashintel/design-system"; import { workerFlowFilterParam } from "@local/hash-isomorphic-utils/flows/frontend-paths"; import { Box, Container, Stack, Typography } from "@mui/material"; import { useRouter } from "next/router"; -import { useMemo } from "react"; +import { useCallback, useMemo, useState } from "react"; import type { NextPageWithLayout } from "../shared/layout"; import { getLayoutWithSidebar } from "../shared/layout"; @@ -12,6 +12,8 @@ import { FlowRunsContextProvider } from "./shared/flow-runs-context"; import { FlowRunTable } from "./workers.page/flow-run-table"; import { FlowSchedulesTable } from "./workers.page/flow-schedules-table"; +const defaultRowsPerPage = 20; + const WorkersPageContent = () => { const { query: { [workerFlowFilterParam]: definitionFilterQueryParam }, @@ -61,9 +63,31 @@ const WorkersPageContent = () => { }; const WorkersPage: NextPageWithLayout = () => { + const [page, setPage] = useState(0); + const [rowsPerPage, setRowsPerPage] = useState(defaultRowsPerPage); + + const handlePageChange = useCallback((newPage: number) => { + setPage(newPage); + }, []); + + const handleRowsPerPageChange = useCallback((newRowsPerPage: number) => { + setRowsPerPage(newRowsPerPage); + setPage(0); + }, []); + + const pagination = useMemo( + () => ({ + page, + rowsPerPage, + onPageChange: handlePageChange, + onRowsPerPageChange: handleRowsPerPageChange, + }), + [page, rowsPerPage, handlePageChange, handleRowsPerPageChange], + ); + return ( - + diff --git a/apps/hash-frontend/src/pages/workers.page/flow-run-table.tsx b/apps/hash-frontend/src/pages/workers.page/flow-run-table.tsx index 923d352b380..1a35256be05 100644 --- a/apps/hash-frontend/src/pages/workers.page/flow-run-table.tsx +++ b/apps/hash-frontend/src/pages/workers.page/flow-run-table.tsx @@ -280,7 +280,12 @@ export const FlowRunTable = ({ flowDefinitionIdFilter }: FlowRunTableProps) => { const { flowDefinitions } = useFlowDefinitionsContext(); - const { flowRuns: unfilteredFlowRuns, loading } = useFlowRunsContext(); + const { + flowRuns: unfilteredFlowRuns, + totalCount, + loading, + pagination, + } = useFlowRunsContext(); const { authenticatedUser } = useAuthenticatedUser(); @@ -431,15 +436,81 @@ export const FlowRunTable = ({ flowDefinitionIdFilter }: FlowRunTableProps) => { ); return ( - - + + + + + {pagination && totalCount > 0 && ( + + palette.gray[70] }} + > + Showing{" "} + {pagination.page * pagination.rowsPerPage + 1} to{" "} + + {Math.min( + (pagination.page + 1) * pagination.rowsPerPage, + totalCount, + )} + {" "} + of {totalCount} + + + {pagination.page > 0 && ( + pagination.onPageChange(pagination.page - 1)} + sx={{ + fontSize: 13, + fontWeight: 600, + color: ({ palette }) => palette.blue[70], + cursor: "pointer", + background: "none", + border: "none", + padding: 0, + "&:hover": { + textDecoration: "underline", + }, + }} + > + Previous page + + )} + {(pagination.page + 1) * pagination.rowsPerPage < totalCount && ( + pagination.onPageChange(pagination.page + 1)} + sx={{ + fontSize: 13, + fontWeight: 600, + color: ({ palette }) => palette.blue[70], + cursor: "pointer", + background: "none", + border: "none", + padding: 0, + "&:hover": { + textDecoration: "underline", + }, + }} + > + Next page + + )} + + + )} ); }; diff --git a/apps/plugin-browser/src/graphql/queries/flow.queries.ts b/apps/plugin-browser/src/graphql/queries/flow.queries.ts index 2bb4ed86ee3..aa2c8e68866 100644 --- a/apps/plugin-browser/src/graphql/queries/flow.queries.ts +++ b/apps/plugin-browser/src/graphql/queries/flow.queries.ts @@ -1,16 +1,19 @@ export const getMinimalFlowRunsQuery = /* GraphQL */ ` query getMinimalFlowRuns { getFlowRuns { - name - flowDefinitionId - flowRunId - webId - status - executedAt - closedAt - inputs - inputRequests - outputs + totalCount + flowRuns { + name + flowDefinitionId + flowRunId + webId + status + executedAt + closedAt + inputs + inputRequests + outputs + } } } `; diff --git a/apps/plugin-browser/src/pages/popup/popup-contents/action-center/shared/use-flow-runs.ts b/apps/plugin-browser/src/pages/popup/popup-contents/action-center/shared/use-flow-runs.ts index edfc75e6402..873dd8a4bed 100644 --- a/apps/plugin-browser/src/pages/popup/popup-contents/action-center/shared/use-flow-runs.ts +++ b/apps/plugin-browser/src/pages/popup/popup-contents/action-center/shared/use-flow-runs.ts @@ -28,7 +28,7 @@ import { useStorageSync } from "../../../../shared/use-storage-sync"; import { useUserContext } from "../../shared/user-context"; const mapFlowRunToMinimalFlowRun = ( - flowRun: GetMinimalFlowRunsQuery["getFlowRuns"][number], + flowRun: GetMinimalFlowRunsQuery["getFlowRuns"]["flowRuns"][number], ): MinimalFlowRun => { const persistedEntities = (flowRun.outputs ?? []).flatMap((output) => (output.contents[0]?.outputs ?? []).flatMap(({ outputName, payload }) => { @@ -75,7 +75,7 @@ const getFlowRuns = async ({ getMinimalFlowRunsQuery, ) .then(({ data }) => - data.getFlowRuns.sort((a, b) => { + data.getFlowRuns.flowRuns.sort((a, b) => { if (!a.executedAt) { return b.executedAt ? 1 : 0; } diff --git a/libs/@local/hash-backend-utils/src/flows.ts b/libs/@local/hash-backend-utils/src/flows.ts index b355d36546a..47719c8b4ad 100644 --- a/libs/@local/hash-backend-utils/src/flows.ts +++ b/libs/@local/hash-backend-utils/src/flows.ts @@ -134,6 +134,8 @@ const convertScreamingSnakeToPascalCase = (str: string) => type GetFlowRunsFilters = { executionStatus?: FlowRunStatus | null; flowDefinitionIds?: string[] | null; + offset?: number | null; + limit?: number | null; }; type GetFlowRunsFnArgs = { @@ -152,17 +154,24 @@ type MinimalFlowMetadata = { webId: WebId; }; +type PaginatedFlowRuns = { + flowRuns: T[]; + totalCount: number; +}; + export async function getFlowRuns( args: GetFlowRunsFnArgs, -): Promise; +): Promise>; export async function getFlowRuns( args: GetFlowRunsFnArgs, -): Promise; +): Promise>; export async function getFlowRuns( args: GetFlowRunsFnArgs, -): Promise; +): Promise< + PaginatedFlowRuns +>; export async function getFlowRuns({ authentication, @@ -171,7 +180,9 @@ export async function getFlowRuns({ includeDetails, storageProvider, temporalClient, -}: GetFlowRunsFnArgs): Promise { +}: GetFlowRunsFnArgs): Promise< + PaginatedFlowRuns +> { const temporalWorkflowIdToFlowDetails = await queryEntities( { graphApi: graphApiClient }, authentication, @@ -237,7 +248,7 @@ export async function getFlowRuns({ const temporalWorkflowIds = typedKeys(temporalWorkflowIdToFlowDetails); if (!temporalWorkflowIds.length) { - return []; + return { flowRuns: [], totalCount: 0 }; } /** @see https://docs.temporal.io/develop/typescript/observability#search-attributes */ @@ -255,33 +266,48 @@ export async function getFlowRuns({ const workflowIdToLatestRunTime: Record = {}; + const deduplicatedWorkflowIds: string[] = []; + + for await (const workflow of workflowIterable) { + const temporalWorkflowId = workflow.workflowId; + + const startTime = workflow.startTime.toISOString(); + workflowIdToLatestRunTime[temporalWorkflowId] ??= startTime; + + if (startTime < workflowIdToLatestRunTime[temporalWorkflowId]) { + /** + * This is an earlier run of the same workflow – it is a flow run that has been reset and started from a specific point. + * + * It could also theoretically be: + * 1. a workflow that has been 'continued as new', but we do not yet use that Temporal feature. + * 2. a workflowId that has been re-used, but we do not do that in our business logic – we generate a new workflowId for each flow run. + * workflowIds are only re-used by Temporal automatically in the 'reset' or 'continue as new' cases. + */ + continue; + } + + if (!temporalWorkflowIdToFlowDetails[temporalWorkflowId]) { + throw new Error( + `Could not find details for workflowId ${workflow.workflowId}`, + ); + } + + deduplicatedWorkflowIds.push(temporalWorkflowId); + } + + const totalCount = deduplicatedWorkflowIds.length; + + const { offset, limit } = filters; + const paginatedIds = + offset != null && limit != null + ? deduplicatedWorkflowIds.slice(offset, offset + limit) + : deduplicatedWorkflowIds; + if (includeDetails) { const workflows: FlowRun[] = []; - for await (const workflow of workflowIterable) { - const temporalWorkflowId = workflow.workflowId; - const flowDetails = temporalWorkflowIdToFlowDetails[temporalWorkflowId]; - - const startTime = workflow.startTime.toISOString(); - workflowIdToLatestRunTime[temporalWorkflowId] ??= startTime; - - if (startTime < workflowIdToLatestRunTime[temporalWorkflowId]) { - /** - * This is an earlier run of the same workflow – it is a flow run that has been reset and started from a specific point. - * - * It could also theoretically be: - * 1. a workflow that has been 'continued as new', but we do not yet use that Temporal feature. - * 2. a workflowId that has been re-used, but we do not do that in our business logic – we generate a new workflowId for each flow run. - * workflowIds are only re-used by Temporal automatically in the 'reset' or 'continue as new' cases. - */ - continue; - } - - if (!flowDetails) { - throw new Error( - `Could not find details for workflowId ${workflow.workflowId}`, - ); - } + for (const temporalWorkflowId of paginatedIds) { + const flowDetails = temporalWorkflowIdToFlowDetails[temporalWorkflowId]!; const runInfo = await getFlowRunFromTemporalWorkflowId({ flowRunId: flowDetails.flowRunId, @@ -294,35 +320,12 @@ export async function getFlowRuns({ workflows.push(runInfo); } - return workflows; + return { flowRuns: workflows, totalCount }; } else { const workflows: SparseFlowRun[] = []; - for await (const workflow of workflowIterable) { - const temporalWorkflowId = workflow.workflowId; - - const startTime = workflow.startTime.toISOString(); - workflowIdToLatestRunTime[temporalWorkflowId] ??= startTime; - - if (startTime < workflowIdToLatestRunTime[temporalWorkflowId]) { - /** - * This is an earlier run of the same workflow – it is a flow run that has been reset and started from a specific point. - * - * It could also theoretically be: - * 1. a workflow that has been 'continued as new', but we do not yet use that Temporal feature. - * 2. a workflowId that has been re-used, but we do not do that in our business logic – we generate a new workflowId for each flow run. - * workflowIds are only re-used by Temporal automatically in the 'reset' or 'continue as new' cases. - */ - continue; - } - - const flowDetails = temporalWorkflowIdToFlowDetails[temporalWorkflowId]; - - if (!flowDetails) { - throw new Error( - `Could not find details for workflowId ${workflow.workflowId}`, - ); - } + for (const temporalWorkflowId of paginatedIds) { + const flowDetails = temporalWorkflowIdToFlowDetails[temporalWorkflowId]!; const runInfo = await getSparseFlowRunFromTemporalWorkflowId({ flowRunId: flowDetails.flowRunId, @@ -334,6 +337,6 @@ export async function getFlowRuns({ workflows.push(runInfo); } - return workflows; + return { flowRuns: workflows, totalCount }; } } diff --git a/libs/@local/hash-isomorphic-utils/src/graphql/queries/flow.queries.ts b/libs/@local/hash-isomorphic-utils/src/graphql/queries/flow.queries.ts index 54f79ef4d88..a0259a6455b 100644 --- a/libs/@local/hash-isomorphic-utils/src/graphql/queries/flow.queries.ts +++ b/libs/@local/hash-isomorphic-utils/src/graphql/queries/flow.queries.ts @@ -1,29 +1,32 @@ import { gql } from "@apollo/client"; export const getFlowRunsQuery = gql` - query getFlowRuns { - getFlowRuns { - name - flowDefinitionId - flowRunId - flowScheduleId - webId - status - startedAt - executedAt - closedAt - # Requesting 'inputRequests' requires the API going through the event history for each flow run, - # which ideally we would not have to do when requesting all flow runs. - # This field is required to indicate goals which are pending input on the /goals page - # @todo consider some way of caching a 'input requested' status to avoid this, e.g. on the Flow entity - inputRequests - # We need 'steps' to be able to populate the 'last event occurred at' field in a goals list, - # Similarly to 'inputRequests', this involves going through the event history for each flow run - steps { - scheduledAt + query getFlowRuns($offset: Int, $limit: Int) { + getFlowRuns(offset: $offset, limit: $limit) { + totalCount + flowRuns { + name + flowDefinitionId + flowRunId + flowScheduleId + webId + status startedAt + executedAt closedAt - logs + # Requesting 'inputRequests' requires the API going through the event history for each flow run, + # which ideally we would not have to do when requesting all flow runs. + # This field is required to indicate goals which are pending input on the /goals page + # @todo consider some way of caching a 'input requested' status to avoid this, e.g. on the Flow entity + inputRequests + # We need 'steps' to be able to populate the 'last event occurred at' field in a goals list, + # Similarly to 'inputRequests', this involves going through the event history for each flow run + steps { + scheduledAt + startedAt + closedAt + logs + } } } } diff --git a/libs/@local/hash-isomorphic-utils/src/graphql/type-defs/knowledge/flow.typedef.ts b/libs/@local/hash-isomorphic-utils/src/graphql/type-defs/knowledge/flow.typedef.ts index 1947e402e17..ecc2b8a670d 100644 --- a/libs/@local/hash-isomorphic-utils/src/graphql/type-defs/knowledge/flow.typedef.ts +++ b/libs/@local/hash-isomorphic-utils/src/graphql/type-defs/knowledge/flow.typedef.ts @@ -187,7 +187,16 @@ export const flowTypedef = gql` steps: [StepRun!]! } - + type PaginatedFlowRuns { + """ + The flow runs for the requested page + """ + flowRuns: [FlowRun!]! + """ + The total number of flow runs matching the filters (before pagination) + """ + totalCount: Int! + } extend type Query { getFlowRuns( @@ -199,7 +208,17 @@ export const flowTypedef = gql` Return only flows that match the given status """ executionStatus: FlowRunStatus - ): [FlowRun!]! + """ + Number of flow runs to skip (for offset-based pagination). + When omitted, all matching flow runs are returned. + """ + offset: Int + """ + Maximum number of flow runs to return (for offset-based pagination). + When omitted, all matching flow runs are returned. + """ + limit: Int + ): PaginatedFlowRuns! getFlowRunById(flowRunId: String!): FlowRun! }