diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index f1532bf9a9d..f956a663a37 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -30,6 +30,7 @@ import { WorkflowActionService } from "../../service/workflow-graph/model/workfl import { WorkflowStatusService } from "../../service/workflow-status/workflow-status.service"; import { ExecutionState, OperatorState } from "../../types/execute-workflow.interface"; import { LogicalPort, OperatorLink } from "../../types/workflow-common.interface"; +import { EdgeStatistics } from "../../types/workflow-websocket.interface"; import { auditTime, filter, map, takeUntil } from "rxjs/operators"; import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy"; import { UndoRedoService } from "../../service/undo-redo/undo-redo.service"; @@ -41,6 +42,7 @@ import * as _ from "lodash"; import * as joint from "jointjs"; import { isDefined } from "../../../common/util/predicate"; import { GuiConfigService } from "../../../common/service/gui-config.service"; +import { WorkflowWebsocketService } from "../../service/workflow-websocket/workflow-websocket.service"; import { line, curveCatmullRomClosed } from "d3-shape"; import concaveman from "concaveman"; @@ -103,6 +105,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy private jointUIService: JointUIService, private workflowStatusService: WorkflowStatusService, private executeWorkflowService: ExecuteWorkflowService, + private workflowWebsocketService: WorkflowWebsocketService, private nzModalService: NzModalService, private changeDetectorRef: ChangeDetectorRef, private undoRedoService: UndoRedoService, @@ -163,6 +166,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy this.handlePortHighlightEvent(); this.registerPortDisplayNameChangeHandler(); this.handleOperatorStatisticsUpdate(); + this.handleEdgeStatisticsUpdate(); this.handleRegionEvents(); this.handleOperatorSuggestionHighlightEvent(); this.handleElementDelete(); @@ -334,6 +338,146 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy }); } + private handleEdgeStatisticsUpdate(): void { + this.workflowWebsocketService + .subscribeToEvent("EdgeStatisticsUpdateEvent") + .pipe(untilDestroyed(this)) + .subscribe(event => + this.updateEdgeStatisticsLabels(event.edgeStatistics, event.maxCreditAllowedInBytesPerChannel) + ); + + this.executeWorkflowService + .getExecutionStateStream() + .pipe(untilDestroyed(this)) + .subscribe(event => { + if (event.current.state === ExecutionState.Uninitialized) { + this.paper.model.getLinks().forEach(link => link.labels([])); + } + }); + } + + private updateEdgeStatisticsLabels( + edgeStatistics: ReadonlyArray, + maxCreditAllowedInBytesPerChannel: number + ): void { + this.paper.model.getLinks().forEach(link => { + link.labels([]); + link.attr(".connection/stroke", linkPathStrokeColor); + }); + + const percentageDenominator = maxCreditAllowedInBytesPerChannel > 0 ? maxCreditAllowedInBytesPerChannel : 1; + const links = this.workflowActionService.getTexeraGraph().getAllLinks(); + + edgeStatistics.forEach(edge => { + const link = + links.find( + candidate => + candidate.source.operatorID === edge.fromOpId && + candidate.target.operatorID === edge.toOpId && + candidate.source.portID === String(edge.fromPortId) && + candidate.target.portID === String(edge.toPortId) + ) || + links.find( + candidate => + candidate.source.operatorID === edge.fromOpId && + candidate.target.operatorID === edge.toOpId + ); + if (!link) { + return; + } + + const jointLink = this.paper.getModelById(link.linkID) as joint.dia.Link | undefined; + if (!jointLink) { + return; + } + + const percentage = Math.min(100, (Math.max(0, edge.usageBytes) / percentageDenominator) * 100); + const edgeColor = this.getEdgeUsageColor(percentage); + const labelText = `${this.formatBytes(edge.usageBytes)} (${percentage.toFixed(1)}%)`; + jointLink.attr(".connection/stroke", edgeColor); + jointLink.label(0, { + position: 0.5, + attrs: { + rect: { + fill: "#FFFFFF", + stroke: "#6C757D", + "stroke-width": 1, + rx: 4, + ry: 4, + }, + text: { + text: labelText, + fill: "#495057", + "font-size": 11, + "font-weight": 600, + }, + }, + }); + }); + + // Fallback path: if websocket edge IDs don't match Texera graph IDs, label Joint links directly. + if (edgeStatistics.length > 0 && this.paper.model.getLinks().every(link => link.labels().length === 0)) { + edgeStatistics.forEach(edge => { + const paperLinks = this.paper.model.getLinks().filter(link => { + const source = link.get("source"); + const target = link.get("target"); + return source?.id === edge.fromOpId && target?.id === edge.toOpId; + }); + const percentage = Math.min(100, (Math.max(0, edge.usageBytes) / percentageDenominator) * 100); + const edgeColor = this.getEdgeUsageColor(percentage); + const labelText = `${this.formatBytes(edge.usageBytes)} (${percentage.toFixed(1)}%)`; + paperLinks.forEach(link => { + link.attr(".connection/stroke", edgeColor); + link.label(0, { + position: 0.5, + attrs: { + rect: { + fill: "#FFFFFF", + stroke: "#6C757D", + "stroke-width": 1, + rx: 4, + ry: 4, + }, + text: { + text: labelText, + fill: "#495057", + "font-size": 11, + "font-weight": 600, + }, + }, + }); + }); + }); + } + } + + private getEdgeUsageColor(percentage: number): string { + if (percentage > 80) { + return "#D62828"; + } + if (percentage > 60) { + return "orange"; + } + if (percentage <= 20) { + return "gray"; + } + return "green"; + } + + private formatBytes(bytes: number): string { + const value = Math.max(0, bytes); + if (value < 1024) { + return `${value} B`; + } + if (value < 1024 * 1024) { + return `${(value / 1024).toFixed(1)} KB`; + } + if (value < 1024 * 1024 * 1024) { + return `${(value / (1024 * 1024)).toFixed(1)} MB`; + } + return `${(value / (1024 * 1024 * 1024)).toFixed(2)} GB`; + } + private handleRegionEvents(): void { const Region = joint.dia.Element.define( "region",