Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { WorkflowActionService } from "../../service/workflow-graph/model/workfl
import { WorkflowStatusService } from "../../service/workflow-status/workflow-status.service";
import { ExecutionState, OperatorState } from "../../types/execute-workflow.interface";
import { LogicalPort, OperatorLink } from "../../types/workflow-common.interface";
import { EdgeStatistics } from "../../types/workflow-websocket.interface";
import { auditTime, filter, map, takeUntil } from "rxjs/operators";
import { UntilDestroy, untilDestroyed } from "@ngneat/until-destroy";
import { UndoRedoService } from "../../service/undo-redo/undo-redo.service";
Expand All @@ -41,6 +42,7 @@ import * as _ from "lodash";
import * as joint from "jointjs";
import { isDefined } from "../../../common/util/predicate";
import { GuiConfigService } from "../../../common/service/gui-config.service";
import { WorkflowWebsocketService } from "../../service/workflow-websocket/workflow-websocket.service";
import { line, curveCatmullRomClosed } from "d3-shape";
import concaveman from "concaveman";

Expand Down Expand Up @@ -103,6 +105,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy
private jointUIService: JointUIService,
private workflowStatusService: WorkflowStatusService,
private executeWorkflowService: ExecuteWorkflowService,
private workflowWebsocketService: WorkflowWebsocketService,
private nzModalService: NzModalService,
private changeDetectorRef: ChangeDetectorRef,
private undoRedoService: UndoRedoService,
Expand Down Expand Up @@ -163,6 +166,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy
this.handlePortHighlightEvent();
this.registerPortDisplayNameChangeHandler();
this.handleOperatorStatisticsUpdate();
this.handleEdgeStatisticsUpdate();
this.handleRegionEvents();
this.handleOperatorSuggestionHighlightEvent();
this.handleElementDelete();
Expand Down Expand Up @@ -334,6 +338,146 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy
});
}

private handleEdgeStatisticsUpdate(): void {
this.workflowWebsocketService
.subscribeToEvent("EdgeStatisticsUpdateEvent")
.pipe(untilDestroyed(this))
.subscribe(event =>
this.updateEdgeStatisticsLabels(event.edgeStatistics, event.maxCreditAllowedInBytesPerChannel)
);

this.executeWorkflowService
.getExecutionStateStream()
.pipe(untilDestroyed(this))
.subscribe(event => {
if (event.current.state === ExecutionState.Uninitialized) {
this.paper.model.getLinks().forEach(link => link.labels([]));
}
});
}

private updateEdgeStatisticsLabels(
edgeStatistics: ReadonlyArray<EdgeStatistics>,
maxCreditAllowedInBytesPerChannel: number
): void {
this.paper.model.getLinks().forEach(link => {
link.labels([]);
link.attr(".connection/stroke", linkPathStrokeColor);
});

const percentageDenominator = maxCreditAllowedInBytesPerChannel > 0 ? maxCreditAllowedInBytesPerChannel : 1;
const links = this.workflowActionService.getTexeraGraph().getAllLinks();

edgeStatistics.forEach(edge => {
const link =
links.find(
candidate =>
candidate.source.operatorID === edge.fromOpId &&
candidate.target.operatorID === edge.toOpId &&
candidate.source.portID === String(edge.fromPortId) &&
candidate.target.portID === String(edge.toPortId)
) ||
links.find(
candidate =>
candidate.source.operatorID === edge.fromOpId &&
candidate.target.operatorID === edge.toOpId
);
if (!link) {
return;
}

const jointLink = this.paper.getModelById(link.linkID) as joint.dia.Link | undefined;
if (!jointLink) {
return;
}

const percentage = Math.min(100, (Math.max(0, edge.usageBytes) / percentageDenominator) * 100);
const edgeColor = this.getEdgeUsageColor(percentage);
const labelText = `${this.formatBytes(edge.usageBytes)} (${percentage.toFixed(1)}%)`;
jointLink.attr(".connection/stroke", edgeColor);
jointLink.label(0, {
position: 0.5,
attrs: {
rect: {
fill: "#FFFFFF",
stroke: "#6C757D",
"stroke-width": 1,
rx: 4,
ry: 4,
},
text: {
text: labelText,
fill: "#495057",
"font-size": 11,
"font-weight": 600,
},
},
});
});

// Fallback path: if websocket edge IDs don't match Texera graph IDs, label Joint links directly.
if (edgeStatistics.length > 0 && this.paper.model.getLinks().every(link => link.labels().length === 0)) {
edgeStatistics.forEach(edge => {
const paperLinks = this.paper.model.getLinks().filter(link => {
const source = link.get("source");
const target = link.get("target");
return source?.id === edge.fromOpId && target?.id === edge.toOpId;
});
const percentage = Math.min(100, (Math.max(0, edge.usageBytes) / percentageDenominator) * 100);
const edgeColor = this.getEdgeUsageColor(percentage);
const labelText = `${this.formatBytes(edge.usageBytes)} (${percentage.toFixed(1)}%)`;
paperLinks.forEach(link => {
link.attr(".connection/stroke", edgeColor);
link.label(0, {
position: 0.5,
attrs: {
rect: {
fill: "#FFFFFF",
stroke: "#6C757D",
"stroke-width": 1,
rx: 4,
ry: 4,
},
text: {
text: labelText,
fill: "#495057",
"font-size": 11,
"font-weight": 600,
},
},
});
});
});
}
}

private getEdgeUsageColor(percentage: number): string {
if (percentage > 80) {
return "#D62828";
}
if (percentage > 60) {
return "orange";
}
if (percentage <= 20) {
return "gray";
}
return "green";
}

private formatBytes(bytes: number): string {
const value = Math.max(0, bytes);
if (value < 1024) {
return `${value} B`;
}
if (value < 1024 * 1024) {
return `${(value / 1024).toFixed(1)} KB`;
}
if (value < 1024 * 1024 * 1024) {
return `${(value / (1024 * 1024)).toFixed(1)} MB`;
}
return `${(value / (1024 * 1024 * 1024)).toFixed(2)} GB`;
}

private handleRegionEvents(): void {
const Region = joint.dia.Element.define(
"region",
Expand Down
Loading