diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java index 367db740c..f5e85f275 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java @@ -32,6 +32,7 @@ import com.linkedin.datastream.common.DatastreamRuntimeException; import com.linkedin.datastream.common.zk.ZkClient; import com.linkedin.datastream.metrics.BrooklinGaugeInfo; +import com.linkedin.datastream.metrics.BrooklinMeterInfo; import com.linkedin.datastream.metrics.BrooklinMetricInfo; import com.linkedin.datastream.metrics.DynamicMetricsManager; import com.linkedin.datastream.metrics.MetricsAware; @@ -77,6 +78,8 @@ public class StickyPartitionAssignmentStrategy extends StickyMulticastStrategy i private static final String ACTUAL_PARTITIONS_PER_TASK = "actualPartitionsPerTask"; @VisibleForTesting static final String ELASTIC_TASK_PARAMETERS_NEED_ADJUSTMENT = "elasticTaskParametersNeedAdjustment"; + @VisibleForTesting + static final String NUM_TASKS_CAPPED_BY_MAX_TASKS = "numTasksCappedByMaxTasks"; private static final Logger LOG = LoggerFactory.getLogger(StickyPartitionAssignmentStrategy.class.getName()); private static final DynamicMetricsManager DYNAMIC_METRICS_MANAGER = DynamicMetricsManager.getInstance(); @@ -533,6 +536,7 @@ public List getMetricInfos() { metrics.add(new BrooklinGaugeInfo(prefix + NUM_TASKS)); metrics.add(new BrooklinGaugeInfo(prefix + ACTUAL_PARTITIONS_PER_TASK)); metrics.add(new BrooklinGaugeInfo(prefix + ELASTIC_TASK_PARAMETERS_NEED_ADJUSTMENT)); + metrics.add(new BrooklinMeterInfo(prefix + NUM_TASKS_CAPPED_BY_MAX_TASKS)); return Collections.unmodifiableList(metrics); } @@ -631,6 +635,9 @@ protected int validateNumTasksAgainstMaxTasks(DatastreamGroupPartitionsMetadata // Only have the maxTasks override kick in if it's present as part of the datastream metadata. LOG.warn("The number of tasks needed {} is higher than maxTasks {}. Setting numTasks to maxTasks", numTasks, maxTasks); + DYNAMIC_METRICS_MANAGER.createOrUpdateMeter(CLASS_NAME, + datastreamPartitions.getDatastreamGroup().getTaskPrefix(), + NUM_TASKS_CAPPED_BY_MAX_TASKS, 1); return maxTasks; } return numTasks;