Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
package org.lfdecentralizedtrust.splice.scan.automation

import org.apache.pekko.stream.Materializer
import com.daml.metrics.api.MetricsContext
import org.lfdecentralizedtrust.splice.automation.{
PollingParallelTaskExecutionTrigger,
TaskOutcome,
TaskSuccess,
TriggerContext,
}
import org.lfdecentralizedtrust.splice.scan.metrics.RewardComputationMetrics
import org.lfdecentralizedtrust.splice.scan.store.{AppActivityStore, ScanAppRewardsStore}
import org.lfdecentralizedtrust.splice.store.UpdateHistory
import com.digitalasset.canton.lifecycle.{AsyncOrSyncCloseable, SyncCloseable}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.tracing.TraceContext
import io.opentelemetry.api.trace.Tracer
Expand All @@ -38,6 +41,12 @@ class RewardComputationTrigger(
mat: Materializer,
) extends PollingParallelTaskExecutionTrigger[RewardComputationTrigger.Task] {

private val rewardMetrics = new RewardComputationMetrics(context.metricsFactory)(
MetricsContext(
"current_migration_id" -> updateHistory.domainMigrationInfo.currentMigrationId.toString
)
)

override def retrieveTasks()(implicit
tc: TraceContext
): Future[Seq[RewardComputationTrigger.Task]] = {
Expand All @@ -61,14 +70,27 @@ class RewardComputationTrigger(
)(implicit tc: TraceContext): Future[TaskOutcome] =
appRewardsStore
.computeAndStoreRewards(task.roundNumber)
.map(_ => TaskSuccess(s"Computed rewards for round ${task.roundNumber}"))
.map { summary =>
rewardMetrics.record(summary)
TaskSuccess(
s"Computed rewards for round ${task.roundNumber}: " +
s"${summary.activePartiesCount} active parties, " +
s"${summary.activityRecordsCount} activity records, " +
s"${summary.rewardedPartiesCount} rewarded parties, " +
s"${summary.batchesCreatedCount} batches"
)
}

override protected def isStaleTask(
task: RewardComputationTrigger.Task
)(implicit tc: TraceContext): Future[Boolean] =
appRewardsStore
.lookupLatestRoundWithRewardComputation()
.map(_.exists(_ >= task.roundNumber))

override def closeAsync(): Seq[AsyncOrSyncCloseable] =
super.closeAsync() :+
SyncCloseable("RewardComputationMetrics", rewardMetrics.close())
}

object RewardComputationTrigger {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.scan.metrics

import com.daml.metrics.api.MetricHandle.{Gauge, LabeledMetricsFactory}
import com.daml.metrics.api.MetricQualification.Traffic
import com.daml.metrics.api.{MetricInfo, MetricName, MetricsContext}
import org.lfdecentralizedtrust.splice.environment.SpliceMetrics
import org.lfdecentralizedtrust.splice.scan.store.db.DbScanAppRewardsStore.RewardComputationSummary

class RewardComputationMetrics(metricsFactory: LabeledMetricsFactory)(implicit
metricsContext: MetricsContext
) extends AutoCloseable {
private val prefix: MetricName =
SpliceMetrics.MetricsPrefix :+ "scan" :+ "reward_computation"

val activePartiesCount: Gauge[Long] = metricsFactory.gauge(
MetricInfo(
name = prefix :+ "active_parties_count",
summary = "Number of parties with activity in the latest computed round",
qualification = Traffic,
),
0L,
)(metricsContext)

val activityRecordsCount: Gauge[Long] = metricsFactory.gauge(
MetricInfo(
name = prefix :+ "activity_records_count",
summary = "Number of activity records in the latest computed round",
qualification = Traffic,
),
0L,
)(metricsContext)

val rewardedPartiesCount: Gauge[Long] = metricsFactory.gauge(
MetricInfo(
name = prefix :+ "rewarded_parties_count",
summary = "Number of parties with rewards in the latest computed round",
qualification = Traffic,
),
0L,
)(metricsContext)

val batchesCreatedCount: Gauge[Long] = metricsFactory.gauge(
MetricInfo(
name = prefix :+ "batches_created_count",
summary = "Number of reward batches created in the latest computed round",
qualification = Traffic,
),
0L,
)(metricsContext)

def record(summary: RewardComputationSummary): Unit = {
activePartiesCount.updateValue(summary.activePartiesCount)
activityRecordsCount.updateValue(summary.activityRecordsCount)
rewardedPartiesCount.updateValue(summary.rewardedPartiesCount)
batchesCreatedCount.updateValue(summary.batchesCreatedCount)
}

override def close(): Unit = {
activePartiesCount.close()
activityRecordsCount.close()
rewardedPartiesCount.close()
batchesCreatedCount.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package org.lfdecentralizedtrust.splice.scan.store

import com.digitalasset.canton.tracing.TraceContext
import org.lfdecentralizedtrust.splice.scan.store.db.DbScanAppRewardsStore.RewardComputationSummary

import scala.concurrent.Future

Expand All @@ -28,5 +29,5 @@ trait ScanAppRewardsStore {
*/
def computeAndStoreRewards(roundNumber: Long)(implicit
tc: TraceContext
): Future[Unit]
): Future[RewardComputationSummary]
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ object DbScanAppRewardsStore {
roundNumber: Long,
rootHash: ByteString,
)

/** Summary of a single round's reward computation, for metrics reporting. */
final case class RewardComputationSummary(
activePartiesCount: Long,
activityRecordsCount: Long,
rewardedPartiesCount: Long,
batchesCreatedCount: Long,
)
}

class DbScanAppRewardsStore(
Expand Down Expand Up @@ -177,6 +185,16 @@ class DbScanAppRewardsStore(
)
}

private implicit val getResultRewardComputationSummary
: GetResult[DbScanAppRewardsStore.RewardComputationSummary] = GetResult { prs =>
DbScanAppRewardsStore.RewardComputationSummary(
activePartiesCount = prs.<<[Long],
activityRecordsCount = prs.<<[Long],
rewardedPartiesCount = prs.<<[Long],
batchesCreatedCount = prs.<<[Long],
)
}

// -- app_activity_party_totals --------------------------------------------

private def batchInsertAppActivityPartyTotals(
Expand Down Expand Up @@ -496,8 +514,11 @@ class DbScanAppRewardsStore(
*/
def computeAndStoreRewards(
roundNumber: Long
)(implicit tc: TraceContext): Future[Unit] =
aggregateActivityTotals(roundNumber)
)(implicit tc: TraceContext): Future[DbScanAppRewardsStore.RewardComputationSummary] =
for {
_ <- aggregateActivityTotals(roundNumber)
summary <- readComputationSummary(roundNumber)
} yield summary

/** Aggregate per-party and per-round activity totals for the given round from
* `app_activity_record_store`.
Expand Down Expand Up @@ -634,6 +655,32 @@ class DbScanAppRewardsStore(
)
}

// -- Computation summary ----------------------------------------------------

/** Read back the summary counters for a round that was just computed,
* within the same transaction.
*/
private def readComputationSummary(roundNumber: Long)(implicit
tc: TraceContext
): Future[DbScanAppRewardsStore.RewardComputationSummary] =
runQuery(
sql"""select
(select active_app_provider_parties_count
from #${Tables.appActivityRoundTotals}
where history_id = $historyId and round_number = $roundNumber),
(select activity_records_count
from #${Tables.appActivityRoundTotals}
where history_id = $historyId and round_number = $roundNumber),
(select coalesce(rewarded_app_provider_parties_count, 0)
from #${Tables.appRewardRoundTotals}
where history_id = $historyId and round_number = $roundNumber),
(select count(*)
from #${Tables.appRewardBatchHashes}
where history_id = $historyId and round_number = $roundNumber)
Comment thread
adetokunbo marked this conversation as resolved.
""".as[DbScanAppRewardsStore.RewardComputationSummary].head,
"appRewards.readComputationSummary",
)

// -- Private helpers -------------------------------------------------------

private def runQuery[T](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,54 @@ class DbScanAppRewardsStoreTest
}
}

// -- computeAndStoreRewards summary tests ----------------------------------

"computeAndStoreRewards — returns correct summary counts" in {
for {
(store, historyId) <- newStore()
_ <- insertSentinelRecords(historyId, roundNumber)
// 3 activity records, 2 parties (alice in 2 records, bob in 2)
_ <- insertActivityRecord(
historyId,
roundNumber,
Seq("alice::provider", "bob::provider"),
Seq(3000000L, 2000000L),
)
_ <- insertActivityRecord(
historyId,
roundNumber,
Seq("alice::provider"),
Seq(1000000L),
)
_ <- insertActivityRecord(
historyId,
roundNumber,
Seq("bob::provider"),
Seq(500000L),
)
summary <- store.computeAndStoreRewards(roundNumber)
} yield {
summary.activePartiesCount shouldBe 2L
summary.activityRecordsCount shouldBe 4L // sum of per-party counts: alice=2 + bob=2
// TODO(#4382): update when full reward pipeline is wired into computeAndStoreRewards
summary.rewardedPartiesCount shouldBe 0L
summary.batchesCreatedCount shouldBe 0L
}
}

"computeAndStoreRewards — empty round returns zero counts" in {
for {
(store, historyId) <- newStore()
_ <- insertSentinelRecords(historyId, roundNumber)
summary <- store.computeAndStoreRewards(roundNumber)
} yield {
summary.activePartiesCount shouldBe 0L
summary.activityRecordsCount shouldBe 0L
summary.rewardedPartiesCount shouldBe 0L
summary.batchesCreatedCount shouldBe 0L
}
}

// -- computeRewardTotals tests -------------------------------------------

val rewardTotalsTestCases = Seq(
Expand Down
Loading