diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto index 43613b5cfdc..05c7906bf8b 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto @@ -43,6 +43,7 @@ message ControlReturn { WorkerStateResponse workerStateResponse = 50; WorkerMetricsResponse workerMetricsResponse = 51; FinalizeCheckpointResponse finalizeCheckpointResponse = 52; + FlowControlUsageResponse flowControlUsageResponse = 53; // common responses ControlError controlError = 101; @@ -138,4 +139,8 @@ message WorkerStateResponse { message WorkerMetricsResponse { worker.WorkerMetrics metrics = 1 [(scalapb.field).no_box = true]; -} \ No newline at end of file +} + +message FlowControlUsageResponse { + map channel_usage_bytes = 1; +} diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto index dbcd6d8a5e0..6eaf7e27c95 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto @@ -39,6 +39,7 @@ service WorkerService { rpc OpenExecutor(EmptyRequest) returns (EmptyReturn); rpc PauseWorker(EmptyRequest) returns (WorkerStateResponse); rpc PrepareCheckpoint(PrepareCheckpointRequest) returns (EmptyReturn); + rpc QueryFlowControlUsage(EmptyRequest) returns (FlowControlUsageResponse); rpc QueryStatistics(EmptyRequest) returns (WorkerMetricsResponse); rpc ResumeWorker(EmptyRequest) returns (WorkerStateResponse); rpc RetrieveState(EmptyRequest) returns (EmptyReturn); diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto index 85d1fcf4aaa..679ca62b2de 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/worker/statistics.proto @@ -55,6 +55,7 @@ message WorkerStatistics { int64 data_processing_time = 3; int64 control_processing_time = 4; int64 idle_time = 5; + map channel_usage_bytes = 6; } message WorkerMetrics { diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/common/executionruntimestate.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/common/executionruntimestate.proto index e712b3adc8a..310eefb5bfe 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/common/executionruntimestate.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/common/executionruntimestate.proto @@ -84,11 +84,20 @@ message OperatorMetrics{ OperatorStatistics operator_statistics = 2 [(scalapb.field).no_box = true]; } +message EdgeStatistics{ + string from_op_id = 1; + int32 from_port_id = 2; + string to_op_id = 3; + int32 to_port_id = 4; + int64 usage_bytes = 5; +} + message ExecutionStatsStore { int64 startTimeStamp = 1; int64 endTimeStamp = 2; map operator_info = 3; repeated OperatorWorkerMapping operator_worker_mapping = 4; + repeated EdgeStatistics edge_info = 5; } diff --git a/amber/src/main/python/core/architecture/managers/statistics_manager.py b/amber/src/main/python/core/architecture/managers/statistics_manager.py index 6b36b78e577..b59b6cf4fda 100644 --- a/amber/src/main/python/core/architecture/managers/statistics_manager.py +++ b/amber/src/main/python/core/architecture/managers/statistics_manager.py @@ -16,7 +16,7 @@ # under the License. from collections import defaultdict -from typing import DefaultDict +from typing import DefaultDict, Optional from proto.org.apache.texera.amber.core import PortIdentity from proto.org.apache.texera.amber.engine.architecture.worker import ( @@ -40,7 +40,11 @@ def __init__(self) -> None: self._total_execution_time: int = 0 self._worker_start_time: int = 0 - def get_statistics(self) -> WorkerStatistics: + def get_statistics( + self, channel_usage_bytes: Optional[dict[str, int]] = None + ) -> WorkerStatistics: + if channel_usage_bytes is None: + channel_usage_bytes = {} # Compile and return worker statistics return WorkerStatistics( [ @@ -56,6 +60,7 @@ def get_statistics(self) -> WorkerStatistics: self._total_execution_time - self._data_processing_time - self._control_processing_time, + channel_usage_bytes, ) def increase_input_statistics(self, port_id: PortIdentity, size: int) -> None: diff --git a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py index 3b46e6db4d7..f0e661ffa6f 100644 --- a/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py +++ b/amber/src/main/python/core/util/customized_queue/linked_blocking_multi_queue.py @@ -29,13 +29,34 @@ T = TypeVar("T") +def _estimate_in_mem_size(item: T) -> int: + """ + Estimate in-memory bytes for queue accounting. + Prefer payload/frame byte size when available; otherwise fall back to object size. + """ + if item is None: + return 0 + + payload = getattr(item, "payload", None) + frame = getattr(payload, "frame", None) if payload is not None else None + if frame is not None: + if hasattr(frame, "nbytes"): + return int(frame.nbytes) + if hasattr(frame, "to_table"): + table = frame.to_table() + if hasattr(table, "nbytes"): + return int(table.nbytes) + + return sys.getsizeof(item) + + class LinkedBlockingMultiQueue(IKeyedQueue): @inner class Node(Generic[T]): def __init__(self, item: T): self.item = item self.next: Optional[LinkedBlockingMultiQueue.Node[T]] = None - self.in_mem_size = sys.getsizeof(item) + self.in_mem_size = _estimate_in_mem_size(item) @inner class SubQueue(Generic[T]): diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py index ea6ddc5e43f..00e2f4ea95b 100644 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py +++ b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py @@ -419,6 +419,9 @@ class ControlReturn(betterproto.Message): finalize_checkpoint_response: "FinalizeCheckpointResponse" = ( betterproto.message_field(52, group="sealed_value") ) + flow_control_usage_response: "FlowControlUsageResponse" = betterproto.message_field( + 53, group="sealed_value" + ) control_error: "ControlError" = betterproto.message_field(101, group="sealed_value") """common responses""" @@ -517,6 +520,13 @@ class WorkerMetricsResponse(betterproto.Message): metrics: "_worker__.WorkerMetrics" = betterproto.message_field(1) +@dataclass(eq=False, repr=False) +class FlowControlUsageResponse(betterproto.Message): + channel_usage_bytes: Dict[str, int] = betterproto.map_field( + 1, betterproto.TYPE_STRING, betterproto.TYPE_INT64 + ) + + class RpcTesterStub(betterproto.ServiceStub): async def send_ping( self, @@ -833,7 +843,7 @@ async def prepare_checkpoint( timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None - ) -> "EmptyReturn": + ) -> "EmptyReturn": return await self._unary_unary( "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint", prepare_checkpoint_request, @@ -843,6 +853,23 @@ async def prepare_checkpoint( metadata=metadata, ) + async def query_flow_control_usage( + self, + empty_request: "EmptyRequest", + *, + timeout: Optional[float] = None, + deadline: Optional["Deadline"] = None, + metadata: Optional["MetadataLike"] = None + ) -> "FlowControlUsageResponse": + return await self._unary_unary( + "/org.apache.amber.engine.architecture.rpc.WorkerService/QueryFlowControlUsage", + empty_request, + FlowControlUsageResponse, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + async def query_statistics( self, empty_request: "EmptyRequest", diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py index 072e7c8ce65..cfde0cba130 100644 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py +++ b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py @@ -4,7 +4,7 @@ # This file has been @generated from dataclasses import dataclass -from typing import List +from typing import Dict, List import betterproto @@ -39,6 +39,9 @@ class WorkerStatistics(betterproto.Message): data_processing_time: int = betterproto.int64_field(3) control_processing_time: int = betterproto.int64_field(4) idle_time: int = betterproto.int64_field(5) + channel_usage_bytes: Dict[str, int] = betterproto.map_field( + 6, betterproto.TYPE_STRING, betterproto.TYPE_INT64 + ) @dataclass(eq=False, repr=False) diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py index 55c789aa395..802a546ed2c 100644 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py +++ b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py @@ -115,6 +115,15 @@ class OperatorMetrics(betterproto.Message): operator_statistics: "OperatorStatistics" = betterproto.message_field(2) +@dataclass(eq=False, repr=False) +class EdgeStatistics(betterproto.Message): + from_op_id: str = betterproto.string_field(1) + from_port_id: int = betterproto.int32_field(2) + to_op_id: str = betterproto.string_field(3) + to_port_id: int = betterproto.int32_field(4) + usage_bytes: int = betterproto.int64_field(5) + + @dataclass(eq=False, repr=False) class ExecutionStatsStore(betterproto.Message): start_time_stamp: int = betterproto.int64_field(1) @@ -125,6 +134,7 @@ class ExecutionStatsStore(betterproto.Message): operator_worker_mapping: List["OperatorWorkerMapping"] = betterproto.message_field( 4 ) + edge_info: List["EdgeStatistics"] = betterproto.message_field(5) @dataclass(eq=False, repr=False) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala index 3401e3ff639..82de35fecbf 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AkkaMessageTransferService.scala @@ -70,7 +70,7 @@ class AkkaMessageTransferService( private def checkCreditPolling(): Unit = { channelToFC.foreach { case (channel, fc) => - if (fc.isOverloaded) { + if (fc.isOverloaded || fc.getQueuedBytes > 0) { refService.askForCredit(channel) } } @@ -164,6 +164,14 @@ class AkkaMessageTransferService( handleBackpressure(backpressured) } + def getChannelUsageBytes: Map[String, Long] = { + channelToFC.map { + case (channelId, flowControl) => + val key = java.util.Base64.getEncoder.encodeToString(channelId.toByteArray) + key -> flowControl.getUsedBytes + }.toMap + } + private def checkResend(): Unit = { refService.clearQueriedActorRefs() channelToCC.foreach { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala index 1092af15e77..2c8ff1d28b3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ClientEvent.scala @@ -23,13 +23,16 @@ import org.apache.texera.amber.core.tuple.Tuple import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState import org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload -import org.apache.texera.amber.engine.common.executionruntimestate.OperatorMetrics +import org.apache.texera.amber.engine.common.executionruntimestate.{EdgeStatistics, OperatorMetrics} trait ClientEvent extends WorkflowFIFOMessagePayload case class ExecutionStateUpdate(state: WorkflowAggregatedState) extends ClientEvent -case class ExecutionStatsUpdate(operatorMetrics: Map[String, OperatorMetrics]) extends ClientEvent +case class ExecutionStatsUpdate( + operatorMetrics: Map[String, OperatorMetrics], + edgeStatistics: Seq[EdgeStatistics] = Seq.empty +) extends ClientEvent case class RuntimeStatisticsPersist(operatorMetrics: Map[String, OperatorMetrics]) extends ClientEvent diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala index b0e4f3fdc32..cbe88c132f6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/Controller.scala @@ -244,7 +244,8 @@ class Controller( outputMessages.foreach(transferService.send) cp.asyncRPCClient.sendToClient( ExecutionStatsUpdate( - cp.workflowExecution.getAllRegionExecutionsStats + cp.workflowExecution.getAllRegionExecutionsStats, + cp.workflowExecution.getAllRegionEdgeStatistics ) ) globalReplayManager.markRecoveryStatus(CONTROLLER, isRecovering = false) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index b806479b892..3947e004af8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -19,12 +19,13 @@ package org.apache.texera.amber.engine.architecture.controller.execution -import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity +import org.apache.texera.amber.core.virtualidentity.{ChannelIdentity, PhysicalOpIdentity} import org.apache.texera.amber.engine.architecture.controller.execution.ExecutionUtils.aggregateMetrics import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState._ import org.apache.texera.amber.engine.architecture.scheduling.{Region, RegionIdentity} -import org.apache.texera.amber.engine.common.executionruntimestate.OperatorMetrics +import org.apache.texera.amber.engine.common.executionruntimestate.{EdgeStatistics, OperatorMetrics} +import org.apache.texera.amber.util.VirtualIdentityUtils import scala.collection.mutable @@ -105,6 +106,68 @@ case class WorkflowExecution() { */ def getAllRegionExecutions: Iterable[RegionExecution] = regionExecutions.values + def getAllRegionEdgeStatistics: Seq[EdgeStatistics] = { + val channelUsage: mutable.HashMap[ChannelIdentity, Long] = mutable.HashMap() + + getAllRegionExecutions.foreach { regionExecution => + regionExecution.getAllOperatorExecutions.foreach { + case (_, operatorExecution) => + operatorExecution.getWorkerIds.foreach { workerId => + val stats = operatorExecution.getWorkerExecution(workerId).getStats + stats.channelUsageBytes.foreach { + case (encodedChannelId, usageBytes) => + val bytes = + java.util.Base64.getDecoder.decode(encodedChannelId) + val channelId = ChannelIdentity.parseFrom(bytes) + val value = Math.max(0L, usageBytes) + // Keep latest sampled usage for this channel. + channelUsage.update(channelId, value) + } + } + } + } + + val logicalEdgeUsage = + mutable.HashMap[(String, String), mutable.ArrayBuffer[Long]]() + + channelUsage.foreach { + case (channelId, usageBytes) => + val fromLogical = VirtualIdentityUtils.getPhysicalOpId(channelId.fromWorkerId).logicalOpId.id + val toLogical = VirtualIdentityUtils.getPhysicalOpId(channelId.toWorkerId).logicalOpId.id + logicalEdgeUsage + .getOrElseUpdate((fromLogical, toLogical), mutable.ArrayBuffer()) + .append(usageBytes) + } + + val edgeUsage = + mutable.HashMap[(String, Int, String, Int), mutable.ArrayBuffer[Long]]() + val edgeChannelCounts = mutable.HashMap[(String, Int, String, Int), Int]() + + getAllRegionExecutions.foreach { regionExecution => + regionExecution.region.resourceConfig.foreach { resourceConfig => + resourceConfig.linkConfigs.foreach { + case (link, linkConfig) => + val fromOpId = link.fromOpId.logicalOpId.id + val toOpId = link.toOpId.logicalOpId.id + val fromPortId = link.fromPortId.id + val toPortId = link.toPortId.id + val key = (fromOpId, fromPortId, toOpId, toPortId) + edgeUsage.getOrElseUpdate(key, mutable.ArrayBuffer()) + edgeChannelCounts.update(key, linkConfig.channelConfigs.size) + logicalEdgeUsage.get((fromOpId, toOpId)).foreach { usages => + edgeUsage.getOrElseUpdate(key, mutable.ArrayBuffer()).appendAll(usages) + } + } + } + } + + edgeUsage.map { + case ((fromOpId, fromPortId, toOpId, toPortId), usages) => + val avgUsage = if (usages.nonEmpty) usages.sum / usages.size else 0L + EdgeStatistics(fromOpId, fromPortId, toOpId, toPortId, avgUsage) + }.toSeq + } + /** * Retrieves the latest `OperatorExecution` associated with the specified physical operatorId. * diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala index 35a85f56ae9..3516fdb9a06 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/PauseHandler.scala @@ -85,10 +85,11 @@ trait PauseHandler { .map { _ => // update frontend workflow status and persist statistics val stats = cp.workflowExecution.getAllRegionExecutionsStats - sendToClient(ExecutionStatsUpdate(stats)) + val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics + sendToClient(ExecutionStatsUpdate(stats, edgeStats)) sendToClient(RuntimeStatisticsPersist(stats)) sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState)) - logger.info(s"workflow paused") + logger.info("workflow paused") } EmptyReturn() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala index 6551579f719..dceffb5b59e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala @@ -21,6 +21,7 @@ package org.apache.texera.amber.engine.architecture.controller.promisehandlers import com.twitter.util.Future import org.apache.texera.amber.config.ApplicationConfig +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity import org.apache.texera.amber.engine.architecture.controller.{ ControllerAsyncRPCHandlerInitializer, @@ -65,18 +66,41 @@ trait QueryWorkerStatisticsHandler { // Reads the current cached stats and forwards them to the appropriate client sink(s). private def forwardStats(updateTarget: StatisticsUpdateTarget): Unit = { val stats = cp.workflowExecution.getAllRegionExecutionsStats + val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics updateTarget match { case StatisticsUpdateTarget.UI_ONLY => - sendToClient(ExecutionStatsUpdate(stats)) + sendToClient(ExecutionStatsUpdate(stats, edgeStats)) case StatisticsUpdateTarget.PERSISTENCE_ONLY => sendToClient(RuntimeStatisticsPersist(stats)) case StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE | StatisticsUpdateTarget.Unrecognized(_) => - sendToClient(ExecutionStatsUpdate(stats)) + sendToClient(ExecutionStatsUpdate(stats, edgeStats)) sendToClient(RuntimeStatisticsPersist(stats)) } } + private def withChannelUsage( + statsResp: WorkerMetricsResponse, + usage: Map[String, Long] + ): WorkerMetricsResponse = { + val mergedWorkerStats = + statsResp.metrics.workerStatistics.copy(channelUsageBytes = usage) + statsResp.copy(metrics = statsResp.metrics.copy(workerStatistics = mergedWorkerStats)) + } + + private def queryStatisticsWithFlow( + wid: ActorVirtualIdentity + ): Future[WorkerMetricsResponse] = { + Future + .join( + workerInterface.queryStatistics(EmptyRequest(), wid), + workerInterface.queryFlowControlUsage(EmptyRequest(), wid) + ) + .map { case (statsResp, flowResp) => + withChannelUsage(statsResp, flowResp.channelUsageBytes) + } + } + override def controllerInitiateQueryStatistics( msg: QueryStatisticsRequest, ctx: AsyncRPCContext @@ -156,9 +180,13 @@ trait QueryWorkerStatisticsHandler { // Send queryStatistics to each worker and update internal state on reply workerIds.map { wid => - workerInterface.queryStatistics(EmptyRequest(), wid).map { resp => + queryStatisticsWithFlow(wid).map { mergedResp => collectedResults.addOne( - (exec.getWorkerExecution(wid), resp, System.nanoTime()) + ( + exec.getWorkerExecution(wid), + mergedResp, + System.nanoTime() + ) ) } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala index c94ba91c205..87f0d2ccc25 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ResumeHandler.scala @@ -60,7 +60,8 @@ trait ResumeHandler { .map { _ => // update frontend status and persist statistics val stats = cp.workflowExecution.getAllRegionExecutionsStats - sendToClient(ExecutionStatsUpdate(stats)) + val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics + sendToClient(ExecutionStatsUpdate(stats, edgeStats)) sendToClient(RuntimeStatisticsPersist(stats)) cp.controllerTimerService .enableStatusUpdate() //re-enabled it since it is disabled in pause diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala index 5ee98a4918d..addcdf30a0b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/WorkerStateUpdatedHandler.scala @@ -52,7 +52,8 @@ trait WorkerStateUpdatedHandler { operatorExecution.getWorkerExecution(ctx.sender).update(System.nanoTime(), msg.state) ) val stats = cp.workflowExecution.getAllRegionExecutionsStats - sendToClient(ExecutionStatsUpdate(stats)) + val edgeStats = cp.workflowExecution.getAllRegionEdgeStatistics + sendToClient(ExecutionStatsUpdate(stats, edgeStats)) sendToClient(RuntimeStatisticsPersist(stats)) EmptyReturn() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala index 55e1e309181..a78bdb948bd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecution.scala @@ -35,7 +35,7 @@ case class WorkerExecution() extends Serializable { private var state: WorkerState = UNINITIALIZED private var stats: WorkerStatistics = { - WorkerStatistics(Seq.empty, Seq.empty, 0, 0, 0) + WorkerStatistics(Seq.empty, Seq.empty, 0, 0, 0, Map.empty) } private var lastUpdateTimeStamp = 0L diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/FlowControl.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/FlowControl.scala index d4b24dad1d8..d61c45d3ca6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/FlowControl.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/FlowControl.scala @@ -119,4 +119,12 @@ class FlowControl { def getCredit: Long = { maxByteAllowed - inflightCredit - queuedCredit } + + def getUsedBytes: Long = { + inflightCredit + queuedCredit + } + + def getQueuedBytes: Long = { + queuedCredit + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala index 6618e857b1d..34d2e01cb41 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala @@ -219,8 +219,11 @@ class PythonProxyClient(portNumberPromise: Promise[Int], val actorId: ActorVirtu val schemaRoot = VectorSchemaRoot.create(ArrowUtils.fromTexeraSchema(schema), allocator) val writer = flightClient.startPut(descriptor, schemaRoot, flightListener) schemaRoot.allocateNew() + var tupleInMemBytes: Long = 0L while (tuples.nonEmpty) { - ArrowUtils.appendTexeraTuple(tuples.dequeue(), schemaRoot) + val tuple = tuples.dequeue() + tupleInMemBytes += tuple.inMemSize + ArrowUtils.appendTexeraTuple(tuple, schemaRoot) } writer.putNext() schemaRoot.clear() diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala index c904e436bcd..1e4f39e3ac4 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala @@ -50,6 +50,12 @@ private class AmberProducer( outputPort: NetworkOutputGateway, promise: Promise[Int] ) extends NoOpFlightProducer { + @volatile private var queuedCreditOf: ChannelIdentity => Long = _ => 0L + + def setQueuedCreditOf(provider: ChannelIdentity => Long): Unit = { + queuedCreditOf = provider + } + var _portNumber: AtomicInteger = new AtomicInteger(0) def portNumber: AtomicInteger = _portNumber @@ -78,8 +84,8 @@ private class AmberProducer( throw new RuntimeException(s"not supported payload $payload") } - // get little-endian representation of credits - var creditVal: Long = 30L // TODO : replace with actual credit value + // Return current queued credit (bytes). + val creditVal: Long = queuedCreditOf(pythonControlMessage.tag) val creditByteArr: Array[Byte] = ByteBuffer.allocate(Longs.BYTES).order(ByteOrder.LITTLE_ENDIAN).putLong(creditVal).array @@ -112,9 +118,7 @@ private class AmberProducer( val bufferAllocator = new RootAllocator(8 * 1024) try { val arrowBuf: ArrowBuf = bufferAllocator.buffer(Longs.BYTES + 4) - arrowBuf.writeLong( - 31L - ) // TODO : replace with actual credit value + arrowBuf.writeLong(queuedCreditOf(to)) ackStream.onNext(PutResult.metadata(arrowBuf)) arrowBuf.close() } finally if (bufferAllocator != null) bufferAllocator.close() @@ -153,6 +157,16 @@ class PythonProxyServer( ) extends Runnable with AutoCloseable with AmberLogging { + @volatile private var queuedCreditProvider: ChannelIdentity => Long = _ => 0L + + def setQueuedCreditProvider(provider: ChannelIdentity => Long): Unit = { + queuedCreditProvider = provider + } + + private def queuedCreditOf(channelId: ChannelIdentity): Long = { + queuedCreditProvider(channelId) + } + private lazy val portNumber: AtomicInteger = new AtomicInteger(getFreeLocalPort) def getPortNumber: AtomicInteger = portNumber @@ -160,7 +174,9 @@ class PythonProxyServer( val allocator: BufferAllocator = new RootAllocator().newChildAllocator("flight-server", 0, Long.MaxValue) - val producer: FlightProducer = new AmberProducer(actorId, outputPort, promise) + private val producerImpl = new AmberProducer(actorId, outputPort, promise) + producerImpl.setQueuedCreditOf(queuedCreditOf) + val producer: FlightProducer = producerImpl val location: Location = (() => { Location.forGrpcInsecure("localhost", portNumber.intValue()) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 3aa5fa90a46..331a9544da3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -91,11 +91,19 @@ class DataProcessor( inputGateway.getChannel(channelId).getQueuedCredit } + @volatile private var channelUsageBytesProvider: () => Map[String, Long] = () => Map.empty + + def setChannelUsageBytesProvider(provider: () => Map[String, Long]): Unit = { + channelUsageBytesProvider = provider + } + + def collectFlowControlUsage(): Map[String, Long] = channelUsageBytesProvider() + /** * provide API for actor to get stats of this operator */ def collectStatistics(): WorkerStatistics = - statisticsManager.getStatistics(executor) + statisticsManager.getStatistics(executor, Map.empty) /** * process currentInputTuple through executor logic. diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala index 2abcdf66975..2fd4b2e228e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala @@ -41,6 +41,7 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) with OpenExecutorHandler with PauseHandler with AddPartitioningHandler + with QueryFlowControlUsageHandler with QueryStatisticsHandler with ResumeHandler with StartHandler diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala index d1a0a300d93..c1ebbfbb575 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/WorkflowWorker.scala @@ -95,6 +95,7 @@ class WorkflowWorker( override def initState(): Unit = { dp.initTimerService(timerService) + dp.setChannelUsageBytesProvider(() => transferService.getChannelUsageBytes) if (replayInitialization.restoreConfOpt.isDefined) { context.parent ! ReplayStatusUpdate(actorId, status = true) setupReplay( @@ -172,6 +173,7 @@ class WorkflowWorker( logger.info("output messages restored.") dp = dpState // overwrite dp state dp.outputHandler = logManager.sendCommitted + dp.setChannelUsageBytesProvider(() => transferService.getChannelUsageBytes) dp.initTimerService(timerService) logger.info("start re-initialize executor from checkpoint.") val (executor, iter) = dp.serializationManager.restoreExecutorState(chkpt) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala index 8ae0419f0a3..16f173d33df 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/StatisticsManager.scala @@ -47,7 +47,10 @@ class StatisticsManager { * @param operator the operator executor * @return a WorkerStatistics object containing the statistics */ - def getStatistics(operator: OperatorExecutor): WorkerStatistics = { + def getStatistics( + operator: OperatorExecutor, + channelUsageBytes: Map[String, Long] + ): WorkerStatistics = { WorkerStatistics( inputStatistics.map { case (portId, (tupleCount, tupleSize)) => @@ -59,7 +62,8 @@ class StatisticsManager { }.toSeq, dataProcessingTime, controlProcessingTime, - totalExecutionTime - dataProcessingTime - controlProcessingTime + totalExecutionTime - dataProcessingTime - controlProcessingTime, + channelUsageBytes ) } diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala index da072c80ea5..f408a5f00bb 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/TexeraWebSocketEvent.scala @@ -32,6 +32,7 @@ import org.apache.texera.web.model.websocket.response.{HeartBeatResponse, Modify new Type(value = classOf[WorkflowErrorEvent]), new Type(value = classOf[WorkflowStateEvent]), new Type(value = classOf[OperatorStatisticsUpdateEvent]), + new Type(value = classOf[EdgeStatisticsUpdateEvent]), new Type(value = classOf[WebResultUpdateEvent]), new Type(value = classOf[ConsoleUpdateEvent]), new Type(value = classOf[CacheStatusUpdateEvent]), diff --git a/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala b/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala index 3703a2bf417..132fb074ba1 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ExecutionStatsService.scala @@ -21,6 +21,7 @@ package org.apache.texera.web.service import com.google.protobuf.timestamp.Timestamp import com.typesafe.scalalogging.LazyLogging +import org.apache.texera.amber.config.ApplicationConfig import org.apache.texera.amber.core.storage.model.BufferedItemWriter import org.apache.texera.amber.core.storage.result.ResultSchema import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory} @@ -28,39 +29,16 @@ import org.apache.texera.amber.core.tuple.Tuple import org.apache.texera.amber.core.workflow.WorkflowContext import org.apache.texera.amber.core.workflowruntimestate.FatalErrorType.EXECUTION_FAILURE import org.apache.texera.amber.core.workflowruntimestate.WorkflowFatalError -import org.apache.texera.amber.engine.architecture.controller.{ - ExecutionStateUpdate, - ExecutionStatsUpdate, - FatalError, - RuntimeStatisticsPersist, - WorkerAssignmentUpdate, - WorkflowRecoveryStatus -} +import org.apache.texera.amber.engine.architecture.controller.{ExecutionStateUpdate, ExecutionStatsUpdate, FatalError, RuntimeStatisticsPersist, WorkerAssignmentUpdate, WorkflowRecoveryStatus} import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState -import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{ - COMPLETED, - FAILED, - KILLED -} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.{COMPLETED, FAILED, KILLED} import org.apache.texera.amber.engine.common.Utils import org.apache.texera.amber.engine.common.Utils.maptoStatusCode import org.apache.texera.amber.engine.common.client.AmberClient -import org.apache.texera.amber.engine.common.executionruntimestate.{ - OperatorMetrics, - OperatorStatistics, - OperatorWorkerMapping -} -import org.apache.texera.amber.error.ErrorUtils.{ - getOperatorFromActorIdOpt, - getStackTraceWithAllCauses -} +import org.apache.texera.amber.engine.common.executionruntimestate.{OperatorMetrics, OperatorStatistics, OperatorWorkerMapping} +import org.apache.texera.amber.error.ErrorUtils.{getOperatorFromActorIdOpt, getStackTraceWithAllCauses} import org.apache.texera.web.SubscriptionManager -import org.apache.texera.web.model.websocket.event.{ - ExecutionDurationUpdateEvent, - OperatorAggregatedMetrics, - OperatorStatisticsUpdateEvent, - WorkerAssignmentUpdateEvent -} +import org.apache.texera.web.model.websocket.event.{EdgeStatistics, EdgeStatisticsUpdateEvent, ExecutionDurationUpdateEvent, OperatorAggregatedMetrics, OperatorStatisticsUpdateEvent, WorkerAssignmentUpdateEvent} import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import org.apache.texera.web.storage.ExecutionStateStore import org.apache.texera.web.storage.ExecutionStateStore.updateWorkflowState @@ -135,6 +113,29 @@ class ExecutionStatsService( }) ) + addSubscription( + stateStore.statsStore.registerDiffHandler((oldState, newState) => { + if (newState.edgeInfo != oldState.edgeInfo) { + Iterable( + EdgeStatisticsUpdateEvent( + newState.edgeInfo.map { edge => + EdgeStatistics( + edge.fromOpId, + edge.fromPortId, + edge.toOpId, + edge.toPortId, + edge.usageBytes + ) + }, + ApplicationConfig.maxCreditAllowedInBytesPerChannel + ) + ) + } else { + Iterable.empty + } + }) + ) + addSubscription( stateStore.statsStore.registerDiffHandler((oldState, newState) => { // update operators' workers. @@ -187,7 +188,7 @@ class ExecutionStatsService( client .registerCallback[ExecutionStatsUpdate]((evt: ExecutionStatsUpdate) => { stateStore.statsStore.updateState { statsStore => - statsStore.withOperatorInfo(evt.operatorMetrics) + statsStore.withOperatorInfo(evt.operatorMetrics).withEdgeInfo(evt.edgeStatistics) } }) ) @@ -207,7 +208,7 @@ class ExecutionStatsService( addSubscription( client.registerCallback[ExecutionStateUpdate] { case ExecutionStateUpdate(state: WorkflowAggregatedState.Recognized) - if Set(COMPLETED, FAILED, KILLED).contains(state) => + if Set(COMPLETED, FAILED, KILLED).contains(state) => logger.info("Workflow execution terminated. Commit runtime statistics.") try { runtimeStatsWriter.close() diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts b/frontend/src/app/workspace/types/workflow-websocket.interface.ts index afd5ea6f04a..9bcb01344e1 100644 --- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts +++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts @@ -185,6 +185,19 @@ export type RegionStateEvent = Readonly<{ state: string; }>; +export type EdgeStatistics = Readonly<{ + fromOpId: string; + fromPortId: number; + toOpId: string; + toPortId: number; + usageBytes: number; +}>; + +export type EdgeStatisticsUpdateEvent = Readonly<{ + edgeStatistics: ReadonlyArray; + maxCreditAllowedInBytesPerChannel: number; +}>; + export type ModifyLogicResponse = Readonly<{ opId: string; isValid: boolean; @@ -243,6 +256,7 @@ export type TexeraWebsocketEventTypeMap = { ClusterStatusUpdateEvent: ClusterStatusUpdateEvent; RegionUpdateEvent: RegionUpdateEvent; RegionStateEvent: RegionStateEvent; + EdgeStatisticsUpdateEvent: EdgeStatisticsUpdateEvent; }; // helper type definitions to generate the request and event types