diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/RewardComputationTrigger.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/RewardComputationTrigger.scala index 8acdc04e63..d0428a1905 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/RewardComputationTrigger.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/RewardComputationTrigger.scala @@ -4,14 +4,17 @@ package org.lfdecentralizedtrust.splice.scan.automation import org.apache.pekko.stream.Materializer +import com.daml.metrics.api.MetricsContext import org.lfdecentralizedtrust.splice.automation.{ PollingParallelTaskExecutionTrigger, TaskOutcome, TaskSuccess, TriggerContext, } +import org.lfdecentralizedtrust.splice.scan.metrics.RewardComputationMetrics import org.lfdecentralizedtrust.splice.scan.store.{AppActivityStore, ScanAppRewardsStore} import org.lfdecentralizedtrust.splice.store.UpdateHistory +import com.digitalasset.canton.lifecycle.{AsyncOrSyncCloseable, SyncCloseable} import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} import com.digitalasset.canton.tracing.TraceContext import io.opentelemetry.api.trace.Tracer @@ -38,6 +41,12 @@ class RewardComputationTrigger( mat: Materializer, ) extends PollingParallelTaskExecutionTrigger[RewardComputationTrigger.Task] { + private val rewardMetrics = new RewardComputationMetrics(context.metricsFactory)( + MetricsContext( + "current_migration_id" -> updateHistory.domainMigrationInfo.currentMigrationId.toString + ) + ) + override def retrieveTasks()(implicit tc: TraceContext ): Future[Seq[RewardComputationTrigger.Task]] = { @@ -61,7 +70,16 @@ class RewardComputationTrigger( )(implicit tc: TraceContext): Future[TaskOutcome] = appRewardsStore .computeAndStoreRewards(task.roundNumber) - .map(_ => TaskSuccess(s"Computed rewards for round ${task.roundNumber}")) + .map { summary => + rewardMetrics.record(summary) + TaskSuccess( + s"Computed rewards for round ${task.roundNumber}: " + + s"${summary.activePartiesCount} active parties, " + + s"${summary.activityRecordsCount} activity records, " + + s"${summary.rewardedPartiesCount} rewarded parties, " + + s"${summary.batchesCreatedCount} batches" + ) + } override protected def isStaleTask( task: RewardComputationTrigger.Task @@ -69,6 +87,10 @@ class RewardComputationTrigger( appRewardsStore .lookupLatestRoundWithRewardComputation() .map(_.exists(_ >= task.roundNumber)) + + override def closeAsync(): Seq[AsyncOrSyncCloseable] = + super.closeAsync() :+ + SyncCloseable("RewardComputationMetrics", rewardMetrics.close()) } object RewardComputationTrigger { diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/metrics/RewardComputationMetrics.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/metrics/RewardComputationMetrics.scala new file mode 100644 index 0000000000..113a3bd983 --- /dev/null +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/metrics/RewardComputationMetrics.scala @@ -0,0 +1,67 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.scan.metrics + +import com.daml.metrics.api.MetricHandle.{Gauge, LabeledMetricsFactory} +import com.daml.metrics.api.MetricQualification.Traffic +import com.daml.metrics.api.{MetricInfo, MetricName, MetricsContext} +import org.lfdecentralizedtrust.splice.environment.SpliceMetrics +import org.lfdecentralizedtrust.splice.scan.store.db.DbScanAppRewardsStore.RewardComputationSummary + +class RewardComputationMetrics(metricsFactory: LabeledMetricsFactory)(implicit + metricsContext: MetricsContext +) extends AutoCloseable { + private val prefix: MetricName = + SpliceMetrics.MetricsPrefix :+ "scan" :+ "reward_computation" + + val activePartiesCount: Gauge[Long] = metricsFactory.gauge( + MetricInfo( + name = prefix :+ "active_parties_count", + summary = "Number of parties with activity in the latest computed round", + qualification = Traffic, + ), + 0L, + )(metricsContext) + + val activityRecordsCount: Gauge[Long] = metricsFactory.gauge( + MetricInfo( + name = prefix :+ "activity_records_count", + summary = "Number of activity records in the latest computed round", + qualification = Traffic, + ), + 0L, + )(metricsContext) + + val rewardedPartiesCount: Gauge[Long] = metricsFactory.gauge( + MetricInfo( + name = prefix :+ "rewarded_parties_count", + summary = "Number of parties with rewards in the latest computed round", + qualification = Traffic, + ), + 0L, + )(metricsContext) + + val batchesCreatedCount: Gauge[Long] = metricsFactory.gauge( + MetricInfo( + name = prefix :+ "batches_created_count", + summary = "Number of reward batches created in the latest computed round", + qualification = Traffic, + ), + 0L, + )(metricsContext) + + def record(summary: RewardComputationSummary): Unit = { + activePartiesCount.updateValue(summary.activePartiesCount) + activityRecordsCount.updateValue(summary.activityRecordsCount) + rewardedPartiesCount.updateValue(summary.rewardedPartiesCount) + batchesCreatedCount.updateValue(summary.batchesCreatedCount) + } + + override def close(): Unit = { + activePartiesCount.close() + activityRecordsCount.close() + rewardedPartiesCount.close() + batchesCreatedCount.close() + } +} diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanAppRewardsStore.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanAppRewardsStore.scala index 46af735fa9..ad08436e88 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanAppRewardsStore.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanAppRewardsStore.scala @@ -4,6 +4,7 @@ package org.lfdecentralizedtrust.splice.scan.store import com.digitalasset.canton.tracing.TraceContext +import org.lfdecentralizedtrust.splice.scan.store.db.DbScanAppRewardsStore.RewardComputationSummary import scala.concurrent.Future @@ -28,5 +29,5 @@ trait ScanAppRewardsStore { */ def computeAndStoreRewards(roundNumber: Long)(implicit tc: TraceContext - ): Future[Unit] + ): Future[RewardComputationSummary] } diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanAppRewardsStore.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanAppRewardsStore.scala index 6f1bcbfd94..34299f7faa 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanAppRewardsStore.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanAppRewardsStore.scala @@ -79,6 +79,14 @@ object DbScanAppRewardsStore { roundNumber: Long, rootHash: ByteString, ) + + /** Summary of a single round's reward computation, for metrics reporting. */ + final case class RewardComputationSummary( + activePartiesCount: Long, + activityRecordsCount: Long, + rewardedPartiesCount: Long, + batchesCreatedCount: Long, + ) } class DbScanAppRewardsStore( @@ -177,6 +185,16 @@ class DbScanAppRewardsStore( ) } + private implicit val getResultRewardComputationSummary + : GetResult[DbScanAppRewardsStore.RewardComputationSummary] = GetResult { prs => + DbScanAppRewardsStore.RewardComputationSummary( + activePartiesCount = prs.<<[Long], + activityRecordsCount = prs.<<[Long], + rewardedPartiesCount = prs.<<[Long], + batchesCreatedCount = prs.<<[Long], + ) + } + // -- app_activity_party_totals -------------------------------------------- private def batchInsertAppActivityPartyTotals( @@ -496,8 +514,11 @@ class DbScanAppRewardsStore( */ def computeAndStoreRewards( roundNumber: Long - )(implicit tc: TraceContext): Future[Unit] = - aggregateActivityTotals(roundNumber) + )(implicit tc: TraceContext): Future[DbScanAppRewardsStore.RewardComputationSummary] = + for { + _ <- aggregateActivityTotals(roundNumber) + summary <- readComputationSummary(roundNumber) + } yield summary /** Aggregate per-party and per-round activity totals for the given round from * `app_activity_record_store`. @@ -634,6 +655,32 @@ class DbScanAppRewardsStore( ) } + // -- Computation summary ---------------------------------------------------- + + /** Read back the summary counters for a round that was just computed, + * within the same transaction. + */ + private def readComputationSummary(roundNumber: Long)(implicit + tc: TraceContext + ): Future[DbScanAppRewardsStore.RewardComputationSummary] = + runQuery( + sql"""select + (select active_app_provider_parties_count + from #${Tables.appActivityRoundTotals} + where history_id = $historyId and round_number = $roundNumber), + (select activity_records_count + from #${Tables.appActivityRoundTotals} + where history_id = $historyId and round_number = $roundNumber), + (select coalesce(rewarded_app_provider_parties_count, 0) + from #${Tables.appRewardRoundTotals} + where history_id = $historyId and round_number = $roundNumber), + (select count(*) + from #${Tables.appRewardBatchHashes} + where history_id = $historyId and round_number = $roundNumber) + """.as[DbScanAppRewardsStore.RewardComputationSummary].head, + "appRewards.readComputationSummary", + ) + // -- Private helpers ------------------------------------------------------- private def runQuery[T]( diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/DbScanAppRewardsStoreTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/DbScanAppRewardsStoreTest.scala index 30c8e49525..1aabe2db2a 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/DbScanAppRewardsStoreTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/DbScanAppRewardsStoreTest.scala @@ -500,6 +500,54 @@ class DbScanAppRewardsStoreTest } } + // -- computeAndStoreRewards summary tests ---------------------------------- + + "computeAndStoreRewards — returns correct summary counts" in { + for { + (store, historyId) <- newStore() + _ <- insertSentinelRecords(historyId, roundNumber) + // 3 activity records, 2 parties (alice in 2 records, bob in 2) + _ <- insertActivityRecord( + historyId, + roundNumber, + Seq("alice::provider", "bob::provider"), + Seq(3000000L, 2000000L), + ) + _ <- insertActivityRecord( + historyId, + roundNumber, + Seq("alice::provider"), + Seq(1000000L), + ) + _ <- insertActivityRecord( + historyId, + roundNumber, + Seq("bob::provider"), + Seq(500000L), + ) + summary <- store.computeAndStoreRewards(roundNumber) + } yield { + summary.activePartiesCount shouldBe 2L + summary.activityRecordsCount shouldBe 4L // sum of per-party counts: alice=2 + bob=2 + // TODO(#4382): update when full reward pipeline is wired into computeAndStoreRewards + summary.rewardedPartiesCount shouldBe 0L + summary.batchesCreatedCount shouldBe 0L + } + } + + "computeAndStoreRewards — empty round returns zero counts" in { + for { + (store, historyId) <- newStore() + _ <- insertSentinelRecords(historyId, roundNumber) + summary <- store.computeAndStoreRewards(roundNumber) + } yield { + summary.activePartiesCount shouldBe 0L + summary.activityRecordsCount shouldBe 0L + summary.rewardedPartiesCount shouldBe 0L + summary.batchesCreatedCount shouldBe 0L + } + } + // -- computeRewardTotals tests ------------------------------------------- val rewardTotalsTestCases = Seq(