diff --git a/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/console/SvAppReference.scala b/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/console/SvAppReference.scala index 4a1ef1b8c3..7cf2d75fd8 100644 --- a/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/console/SvAppReference.scala +++ b/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/console/SvAppReference.scala @@ -395,6 +395,16 @@ class SvAppBackendReference( } } + @Help.Summary( + "Archive dry-run CalculateRewardsV2 and ProcessRewardsV2 contracts for the given rounds (via admin API)" + ) + def archiveDryRunRewardAccountingContracts(rounds: Seq[Long]): Unit = + consoleEnvironment.run { + httpCommand( + HttpSvOperatorAppClient.ArchiveDryRunRewardAccountingContracts(rounds) + ) + } + @Help.Summary("Get the CometBFT node debug dump") def cometBftNodeDump(): definitions.CometBftNodeDumpResponse = consoleEnvironment.run { diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsSvAppTimeBasedIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsSvAppTimeBasedIntegrationTest.scala new file mode 100644 index 0000000000..375a419b96 --- /dev/null +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsSvAppTimeBasedIntegrationTest.scala @@ -0,0 +1,259 @@ +package org.lfdecentralizedtrust.splice.integration.tests + +import com.digitalasset.canton.HasExecutionContext +import com.digitalasset.canton.data.CantonTimestamp +import com.digitalasset.canton.topology.PartyId +import java.time.Duration +import java.util.Optional +import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletconfig.{ + AmuletConfig, + RewardConfig, + RewardVersion, + USD, +} +import org.lfdecentralizedtrust.splice.config.ConfigTransforms +import org.lfdecentralizedtrust.splice.http.v0.definitions +import definitions.GetRewardAccountingBatchResponse +import definitions.GetRewardAccountingRootHashResponse +import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition +import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.{ + IntegrationTestWithIsolatedEnvironment, + SpliceTestConsoleEnvironment, +} +import org.lfdecentralizedtrust.splice.sv.automation.confirmation.{ + CalculateRewardsDryRunTrigger, + CalculateRewardsTrigger, +} +import org.lfdecentralizedtrust.splice.sv.config.InitialRewardConfig +import org.lfdecentralizedtrust.splice.util.{ + AmuletConfigSchedule, + AmuletConfigUtil, + TimeTestUtil, + TriggerTestUtil, + WalletTestUtil, +} + +// This test focuses on the SV app side triggers testing +// - Turning on/off of dry-run and minting-version in rewardConfig +// And confirming that rewards processing works. +// +// Later this test would be extended to cover unhide, expire, etc +@org.lfdecentralizedtrust.splice.util.scalatesttags.SpliceAmulet_0_1_19 +class TrafficBasedRewardsSvAppTimeBasedIntegrationTest + extends IntegrationTestWithIsolatedEnvironment + with HasExecutionContext + with WalletTestUtil + with TriggerTestUtil + with TimeTestUtil + with AmuletConfigUtil { + + override def environmentDefinition: SpliceEnvironmentDefinition = + EnvironmentDefinition + .simpleTopology4SvsWithSimTime(this.getClass.getSimpleName) + .addConfigTransform((_, config) => + ConfigTransforms.withRewardConfig( + InitialRewardConfig( + dryRunVersion = None, + appRewardCouponThreshold = BigDecimal("0"), + ) + )(config) + ) + + "Enable, disable of dryRunVersion/mintingVersion take effect at round closure" in { + implicit env => + val aliceParty = onboardWalletUser(aliceWalletClient, aliceValidatorBackend) + val bobParty = onboardWalletUser(bobWalletClient, bobValidatorBackend) + + aliceWalletClient.tap(20000) + + grantFeaturedAppRight(aliceWalletClient) + grantFeaturedAppRight(bobWalletClient) + + for (round <- 1 to 3) { + advanceRoundsToNextRoundOpening + assertOldestOpenRound(round.toLong) + } + + // oldest=3: rounds 3,4,5 open. + // Next round to open is R6, it will have dryRun enabled + clue("vote to enable dryRunVersion") { + changeRewardConfig(enableDryRun = true) + } + + advanceRoundsToNextRoundOpening + assertOldestOpenRound(4) + doTransfer(bobParty) + + // oldest=4: rounds 4,5,6 open. + // R7 will have the disabled config. + clue("vote to disable dryRunVersion") { + changeRewardConfig(enableDryRun = false) + } + + advanceRoundsToNextRoundOpening + assertOldestOpenRound(5) + doTransfer(bobParty) + + // oldest=5: rounds 5,6,7 open. R8 will have + // both dryRunVersion and mintingVersion set. + clue("vote to enable dryRunVersion + mintingVersion") { + changeRewardConfig(enableDryRun = true, enableMinting = true) + } + + val svBackends = Seq(sv1Backend, sv2Backend, sv3Backend, sv4Backend) + val calculateRewardsDryRunTriggers = + svBackends.map(_.dsoAutomation.trigger[CalculateRewardsDryRunTrigger]) + val calculateRewardsTriggers = + svBackends.map(_.dsoAutomation.trigger[CalculateRewardsTrigger]) + + // Create activity for 6, 7, and 8 and confirm creation of CalculateRewardsV2 + setTriggersWithin( + triggersToPauseAtStart = calculateRewardsDryRunTriggers ++ calculateRewardsTriggers + ) { + advanceRoundsToNextRoundOpening + assertOldestOpenRound(6) + doTransfer(bobParty) + + advanceRoundsToNextRoundOpening + assertOldestOpenRound(7) + doTransfer(bobParty) + + advanceRoundsToNextRoundOpening + assertOldestOpenRound(8) + doTransfer(bobParty) + + advanceRoundsToNextRoundOpening + assertOldestOpenRound(9) + doTransfer(bobParty) + + clue("CalculateRewardsV2 are created for rounds, 6 and 8") { + eventually() { + val v2s = sv1Backend.appState.dsoStore.listCalculateRewardsV2().futureValue + v2s.map(_.payload.round.number) should contain(6L) + v2s.map(_.payload.round.number) should not contain 7L + v2s + .filter(_.payload.round.number == 8L) + .map(_.payload.dryRun) + .toSet shouldBe Set(true, false) + } + } + } + + clue("Alice and Bob have minting allowances for R6") { + eventually() { + val hash = inside(sv1ScanBackend.getRewardAccountingRootHash(6L)) { + case GetRewardAccountingRootHashResponse.members.RewardAccountingRootHashOk(h) => + h.rootHash + } + val providers = walkBatch(6L, hash).map(_.provider) + providers should contain(aliceParty.toProtoPrimitive) + providers should contain(bobParty.toProtoPrimitive) + } + } + + clue("Alice and Bob have minting allowances for R8") { + eventually() { + val hash = inside(sv1ScanBackend.getRewardAccountingRootHash(8L)) { + case GetRewardAccountingRootHashResponse.members.RewardAccountingRootHashOk(h) => + h.rootHash + } + val providers = walkBatch(8L, hash).map(_.provider) + providers should contain(aliceParty.toProtoPrimitive) + providers should contain(bobParty.toProtoPrimitive) + } + } + + clue("All CalculateRewardsV2 and ProcessRewardsV2 contracts consumed") { + eventually() { + sv1Backend.appState.dsoStore.listCalculateRewardsV2().futureValue shouldBe empty + sv1Backend.appState.dsoStore.listProcessRewardsV2().futureValue shouldBe empty + } + } + + clue("Alice and Bob received RewardCouponV2 for R8") { + eventually() { + val coupons = sv1Backend.appState.dsoStore.listRewardCouponsV2().futureValue + coupons.filter(c => + c.payload.round.number == 8L && c.payload.provider == aliceParty.toProtoPrimitive + ) should not be empty + coupons.filter(c => + c.payload.round.number == 8L && c.payload.provider == bobParty.toProtoPrimitive + ) should not be empty + } + } + } + + private def doTransfer( + bobParty: PartyId + )(implicit env: SpliceTestConsoleEnvironment): Unit = { + val offerCid = aliceWalletClient.createTransferOffer( + bobParty, + BigDecimal(10.0), + "activity", + CantonTimestamp.now().plus(Duration.ofMinutes(1)), + s"transfer-${scala.util.Random.nextInt()}", + ) + bobWalletClient.acceptTransferOffer(offerCid) + } + + private def walkBatch( + round: Long, + hash: String, + )(implicit + env: SpliceTestConsoleEnvironment + ): Seq[definitions.RewardAccountingMintingAllowance] = + sv1ScanBackend.getRewardAccountingBatch(round, hash).toList.flatMap { + case GetRewardAccountingBatchResponse.members.RewardAccountingBatchOfBatches(b) => + b.childHashes.flatMap(h => walkBatch(round, h)) + case GetRewardAccountingBatchResponse.members.RewardAccountingBatchOfMintingAllowances(b) => + b.mintingAllowances.toSeq + } + + private def assertOldestOpenRound( + expected: Long + )(implicit env: SpliceTestConsoleEnvironment): Unit = { + clue(s"Asserting oldest open round=$expected") { + eventually() { + val (openRounds, _) = sv1ScanBackend.getOpenAndIssuingMiningRounds() + val roundNumbers = openRounds.map(_.contract.payload.round.number.toLong).sorted + roundNumbers should have size 3 + roundNumbers.head shouldBe expected + } + } + } + + private def changeRewardConfig( + enableDryRun: Boolean, + enableMinting: Boolean = false, + )(implicit env: SpliceTestConsoleEnvironment): Unit = { + val amuletRules = sv1Backend.getDsoInfo().amuletRules + val existing = AmuletConfigSchedule(amuletRules).getConfigAsOf(env.environment.clock.now) + val rc = existing.rewardConfig.get() + val newRc = new RewardConfig( + if (enableMinting) RewardVersion.REWARDVERSION_TRAFFICBASEDAPPREWARDS + else rc.mintingVersion, + if (enableDryRun) Optional.of(RewardVersion.REWARDVERSION_TRAFFICBASEDAPPREWARDS) + else Optional.empty[RewardVersion](), + rc.batchSize, + rc.rewardCouponTimeToLive, + rc.appRewardCouponThreshold, + ) + val newConfig = new AmuletConfig[USD]( + existing.transferConfig, + existing.issuanceCurve, + existing.decentralizedSynchronizer, + existing.tickDuration, + existing.packageConfig, + existing.transferPreapprovalFee, + existing.featuredAppActivityMarkerAmount, + existing.optDevelopmentFundManager, + existing.externalPartyConfigStateTickDuration, + Optional.of(newRc), + ) + setAmuletConfig(Seq((None, newConfig, existing))) + eventually() { + sv1Backend.listVoteRequests() shouldBe empty + } + } + +} diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsTimeBasedIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsTimeBasedIntegrationTest.scala index bbcfbaef33..87b29808a7 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsTimeBasedIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TrafficBasedRewardsTimeBasedIntegrationTest.scala @@ -5,7 +5,11 @@ import com.daml.ledger.api.v2.transaction_filter import com.digitalasset.canton.HasExecutionContext import com.digitalasset.canton.admin.api.client.commands.LedgerApiCommands.UpdateService.TransactionWrapper import com.digitalasset.canton.config.RequireTypes.PositiveInt +import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.topology.PartyId +import com.digitalasset.canton.tracing.TraceContext +import java.time.Duration +import java.util.UUID import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.{ allocationrequestv1, allocationv1, @@ -14,19 +18,22 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.{ import org.lfdecentralizedtrust.splice.console.WalletAppClientReference import org.lfdecentralizedtrust.splice.codegen.java.splice.testing.apps.tradingapp import org.lfdecentralizedtrust.splice.config.ConfigTransforms -import org.lfdecentralizedtrust.splice.config.ConfigTransforms.{ - ConfigurableApp, - updateAutomationConfig, -} +import ConfigTransforms.{ConfigurableApp, updateAutomationConfig} import org.lfdecentralizedtrust.splice.sv.config.InitialRewardConfig +import org.lfdecentralizedtrust.splice.validator.automation.ReceiveFaucetCouponTrigger +import org.lfdecentralizedtrust.splice.wallet.automation.CollectRewardsAndMergeAmuletsTrigger import org.lfdecentralizedtrust.splice.http.v0.definitions import definitions.DamlValueEncoding.members.CompactJson +import definitions.GetRewardAccountingBatchResponse import definitions.GetRewardAccountingActivityTotalsResponse import definitions.GetRewardAccountingRootHashResponse import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.IntegrationTestWithIsolatedEnvironment import org.lfdecentralizedtrust.splice.integration.tests.TokenStandardTest.CreateAllocationRequestResult -import org.lfdecentralizedtrust.splice.scan.automation.RewardComputationTrigger +import org.lfdecentralizedtrust.splice.sv.automation.confirmation.{ + CalculateRewardsTrigger, + CalculateRewardsDryRunTrigger, +} import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.ExpiredAmuletTransferInstructionTrigger import org.lfdecentralizedtrust.splice.util.{ ChoiceContextWithDisclosures, @@ -36,8 +43,6 @@ import org.lfdecentralizedtrust.splice.util.{ } import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.SpliceTestConsoleEnvironment -import java.time.Duration -import java.util.UUID import scala.jdk.CollectionConverters.* import scala.util.Random @@ -46,7 +51,8 @@ import scala.util.Random // // DvP settlement from TokenStandardTest is used here just to confirm distribution of rewards @org.lfdecentralizedtrust.splice.util.scalatesttags.SpliceTokenTestTradingApp_1_0_0 -class TrafficBasedRewardsTimeBasedIntegrationTest +@org.lfdecentralizedtrust.splice.util.scalatesttags.SpliceAmulet_0_1_19 +abstract class TrafficBasedRewardsTimeBasedIntegrationTestBase extends IntegrationTestWithIsolatedEnvironment with HasExecutionContext with WalletTestUtil @@ -55,6 +61,14 @@ class TrafficBasedRewardsTimeBasedIntegrationTest with ExternallySignedPartyTestUtil with TokenStandardTest { + protected def rewardConfigMode: TrafficBasedRewardsTimeBasedIntegrationTestBase.RewardConfigMode + + private def dryRunEnabled: Boolean = + rewardConfigMode == TrafficBasedRewardsTimeBasedIntegrationTestBase.RewardConfigMode.DryRun + + private def mintingTrafficBased: Boolean = + rewardConfigMode == TrafficBasedRewardsTimeBasedIntegrationTestBase.RewardConfigMode.MintingTrafficBased + override def environmentDefinition: SpliceEnvironmentDefinition = EnvironmentDefinition .simpleTopology1SvWithSimTime(this.getClass.getSimpleName) @@ -68,22 +82,31 @@ class TrafficBasedRewardsTimeBasedIntegrationTest backend.participantClient.upload_dar_unless_exists(tokenStandardTestDarPath) } }) - .addConfigTransforms((_, config) => - updateAutomationConfig(ConfigurableApp.Scan)( - _.withPausedTrigger[RewardComputationTrigger] - )(config) - ) .addConfigTransform((_, config) => ConfigTransforms.withRewardConfig( InitialRewardConfig( - mintingVersion = TrafficBasedRewardsTimeBasedIntegrationTest.trafficBasedAppRewards, + mintingVersion = + if (mintingTrafficBased) + TrafficBasedRewardsTimeBasedIntegrationTestBase.trafficBasedAppRewards + else TrafficBasedRewardsTimeBasedIntegrationTestBase.featuredAppMarkers, + dryRunVersion = Option.when(dryRunEnabled)( + TrafficBasedRewardsTimeBasedIntegrationTestBase.trafficBasedAppRewards + ), appRewardCouponThreshold = - TrafficBasedRewardsTimeBasedIntegrationTest.appRewardCouponThreshold, + TrafficBasedRewardsTimeBasedIntegrationTestBase.appRewardCouponThreshold, ) )(config) ) + // Pause background wallet/validator automation so that we can test round with no activity, + // and even do calcs comparison for known transactions in a round + .addConfigTransform((_, config) => + updateAutomationConfig(ConfigurableApp.Validator)( + _.withPausedTrigger[ReceiveFaucetCouponTrigger] + .withPausedTrigger[CollectRewardsAndMergeAmuletsTrigger] + )(config) + ) - "App activity records are created for featured app parties" in { implicit env => + "CIP-104 reward accounting pipeline works" in { implicit env => val aliceParty = onboardWalletUser(aliceWalletClient, aliceValidatorBackend) val bobParty = onboardWalletUser(bobWalletClient, bobValidatorBackend) val venuePartyHint = s"venue-party-${Random.nextInt()}" @@ -96,12 +119,12 @@ class TrafficBasedRewardsTimeBasedIntegrationTest ), ) - aliceWalletClient.tap(1000) - bobWalletClient.tap(1000) + aliceWalletClient.tap(20000) + bobWalletClient.tap(20000) assertOldestOpenRound(0) - clue("Reward accounting endpoints report 'Undetermined' before any data is available") { + clue("Reward accounting endpoints report Undetermined before any data is available") { sv1ScanBackend.getRewardAccountingEarliestAvailableRound() shouldBe None sv1ScanBackend.getRewardAccountingActivityTotals(0L) shouldBe an[ GetRewardAccountingActivityTotalsResponse.members.RewardAccountingActivityTotalsUndetermined @@ -112,68 +135,190 @@ class TrafficBasedRewardsTimeBasedIntegrationTest // confirm that activity record computations does happen properly even when // the ingestion is catching up, by reading the Tcs store data for the // archived rounds. I.e., pausing is not necessary, it merely improves test coverage. - // + // The pause of CalculateRewardsTrigger is necessary to confirm contracts + // were created for each round. + val calculateRewardsTriggers = + activeSvs.map(_.dsoAutomation.trigger[CalculateRewardsTrigger]) + val calculateRewardsDryRunTriggers = + activeSvs.map(_.dsoAutomation.trigger[CalculateRewardsDryRunTrigger]) + // Sequence of actions // Open rounds | Action // ------------+-------------------------------------- // 3, 4 | settle id0, grant venue FAP // 4, 5 | settle id1, grant alice FAP // 5, 6 | settle id2, cancel venue FAP - // 6, 7 | settle id3 - // 7, 8 | settle id4 + // 6, 7 | settle id3, (total 2 DvP trades) + // 7, 8 | settle id4, (total 3 DvP trades) + // 8, 9 | no-activity + // 9, 10 | settle id5, 1 DvP + 3 direct trades + // 10, 11 | settle id6, (total 5 DvP trades) + // 11, 12 | settle id7, (round not closed) val ( updateId0, updateId1, - updateId2, updateId3, updateId4, + updateId5, + updateId6, + updateId7, aliceCreateId, svExpireId, ) = pauseScanVerdictIngestionWithin(sv1ScanBackend) { + setTriggersWithin(triggersToPauseAtStart = + calculateRewardsTriggers ++ calculateRewardsDryRunTriggers + ) { + + // 3 initial advances to get open rounds with staggered opensAt + for (round <- 1 to 3) { + advanceRoundsToNextRoundOpening + assertOldestOpenRound(round.toLong) + } + + val id0 = settleTrade(aliceParty, bobParty, venueParty) + grantFeaturedAppRight(splitwellWalletClient) - // 3 initial advances to get open rounds with staggered opensAt - for (round <- 1 to 3) { advanceRoundsToNextRoundOpening - assertOldestOpenRound(round.toLong) - } + assertOldestOpenRound(4) - val id0 = settleTrade(aliceParty, bobParty, venueParty) - grantFeaturedAppRight(splitwellWalletClient) + val id1 = settleTrade(aliceParty, bobParty, venueParty) + grantFeaturedAppRight(aliceWalletClient) - advanceRoundsToNextRoundOpening - assertOldestOpenRound(4) + advanceRoundsToNextRoundOpening + assertOldestOpenRound(5) - val id1 = settleTrade(aliceParty, bobParty, venueParty) - grantFeaturedAppRight(aliceWalletClient) + settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) - advanceRoundsToNextRoundOpening - assertOldestOpenRound(5) + advanceRoundsToNextRoundOpening + assertOldestOpenRound(6) - val id2 = settleTrade(aliceParty, bobParty, venueParty) - actAndCheck( - "Cancel venue's featured app right", - retryCommandSubmission(splitwellWalletClient.cancelFeaturedAppRight()), - )( - "Wait for right cancellation to be ingested", - _ => sv1ScanBackend.lookupFeaturedAppRight(venueParty) shouldBe None, - ) + val id3 = settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) - advanceRoundsToNextRoundOpening - assertOldestOpenRound(6) + advanceRoundsToNextRoundOpening + assertOldestOpenRound(7) + + val id4 = settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) - val id3 = settleTrade(aliceParty, bobParty, venueParty) + // alice creates an AmuletTransferInstruction which is archived by an SV + val (aliceCreateId, svExpireId) = + aliceCreateAndSvExpireInstruction(aliceParty, bobParty) - advanceRoundsToNextRoundOpening - assertOldestOpenRound(7) + advanceRoundsToNextRoundOpening + assertOldestOpenRound(8) - val id4 = settleTrade(aliceParty, bobParty, venueParty) + // No activity for round 8 - // alice creates an AmuletTransferInstruction which is archived by an SV - val (aliceCreateId, svExpireId) = - aliceCreateAndSvExpireInstruction(aliceParty, bobParty) + advanceRoundsToNextRoundOpening + assertOldestOpenRound(9) + + // Do only one DvP; this would not generate enough activity to reward the parties. + val id5 = settleTrade(aliceParty, bobParty, venueParty) + + // But do additional txs by alice such that only alice receives the rewards + (1 to 3).foreach { _ => + val offerCid = aliceWalletClient.createTransferOffer( + bobParty, + BigDecimal(10.0), + "round-9-alice-only", + CantonTimestamp.now().plus(Duration.ofMinutes(1)), + s"round9-transfer-${scala.util.Random.nextInt()}", + ) + bobWalletClient.acceptTransferOffer(offerCid) + } - (id0, id1, id2, id3, id4, aliceCreateId, svExpireId) + actAndCheck( + "Cancel venue's featured app right", + retryCommandSubmission(splitwellWalletClient.cancelFeaturedAppRight()), + )( + "Wait for right cancellation to be ingested", + _ => sv1ScanBackend.lookupFeaturedAppRight(venueParty) shouldBe None, + ) + + advanceRoundsToNextRoundOpening + assertOldestOpenRound(10) + + // Do five in a round to check nested BatchOfBatches processing + val id6 = settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) + + advanceRoundsToNextRoundOpening + assertOldestOpenRound(11) + + val id7 = settleTrade(aliceParty, bobParty, venueParty) + settleTrade(aliceParty, bobParty, venueParty) + + clue( + "CalculateRewardsV2 contracts should exist for each round" + ) { + eventually() { + val calculateRewardsRounds = + sv1Backend.appState.dsoStore + .listCalculateRewardsV2() + .futureValue + .map(_.payload.round.number) + .toSet + (0L to 10L).foreach { round => + calculateRewardsRounds should contain(round) withClue + s"CalculateRewardsV2 should exist for round $round" + } + } + // For rounds 0..5 scan will not be able calculate activity totals + // and root hashes, so archiving them here keep the + // CalculateRewardsTrigger's logs clean, as they expect each round + // with CalculateRewardsV2 to have a root hash. + if (dryRunEnabled) { + clue("Archive dry-run CalculateRewardsV2 for rounds 0..5 via sv admin API") { + sv1Backend.archiveDryRunRewardAccountingContracts((0L to 5L).toSeq) + } + } else { + // TODO (#5624): add support for bootstrapping + // Bootstrapping a network with mintingVersion set to trafficBasedAppRewards + // is in principle not supported, as the round 0 will never have + // activity totals/root-hash calculated, and its CalculateRewardsV2 cannot be processed. + // So the only way to handle this in test is via direct archive. + clue("Archive CalculateRewardsV2 for rounds 0..5 directly as dso") { + val cids = sv1Backend.appState.dsoStore + .listCalculateRewardsV2() + .futureValue + .filter(c => c.payload.round.number >= 0L && c.payload.round.number <= 5L) + .map(_.contractId) + if (cids.nonEmpty) { + sv1Backend.participantClientWithAdminToken.ledger_api_extensions.commands + .submitJava( + userId = sv1Backend.config.ledgerApiUser, + actAs = Seq(dsoParty), + commands = cids.flatMap(_.exerciseArchive().commands.asScala.toSeq), + ) + } + } + } + clue("CalculateRewardsV2 contracts for rounds 0..5 are gone") { + eventually() { + val remaining = sv1Backend.appState.dsoStore + .listCalculateRewardsV2() + .futureValue + .map(_.payload.round.number) + .toSet + (0L to 5L).foreach { round => + remaining should not contain round withClue + s"CalculateRewardsV2 for round $round should be archived" + } + } + } + } + + (id0, id1, id3, id4, id5, id6, id7, aliceCreateId, svExpireId) + } } def fetchEvent(updateId: String, label: String): definitions.EventHistoryItem = @@ -201,36 +346,17 @@ class TrafficBasedRewardsTimeBasedIntegrationTest assertNoAppActivity(event, "updateId1") } - // Expected featured app providers per round — used for both event-level - // activity assertions and reward pipeline provider assertions. - val expectedProvidersByRound: Map[Long, Set[PartyId]] = Map( - 5L -> Set(venueParty), - 6L -> Set(venueParty, aliceParty), - 7L -> Set(aliceParty), - ) + clue("updateId3") { + val event = fetchEvent(updateId3, "updateId3") + assertTrafficSummary(event, "updateId3") + assertAppActivity(event, "updateId3", Set(venueParty, aliceParty), expectedRound = 6) + } - // Capture per-round traffic costs for reward pipeline assertions. - // Each round has exactly one settlement in this test. - val trafficCostByRound: Map[Long, Long] = Map( - 5L -> clue("updateId2") { - val event = fetchEvent(updateId2, "updateId2") - assertTrafficSummary(event, "updateId2") - assertAppActivity(event, "updateId2", expectedProvidersByRound(5L), expectedRound = 5) - event.trafficSummary.value.totalTrafficCost - }, - 6L -> clue("updateId3") { - val event = fetchEvent(updateId3, "updateId3") - assertTrafficSummary(event, "updateId3") - assertAppActivity(event, "updateId3", expectedProvidersByRound(6L), expectedRound = 6) - event.trafficSummary.value.totalTrafficCost - }, - 7L -> clue("updateId4") { - val event = fetchEvent(updateId4, "updateId4") - assertTrafficSummary(event, "updateId4") - assertAppActivity(event, "updateId4", expectedProvidersByRound(7L), expectedRound = 7) - event.trafficSummary.value.totalTrafficCost - }, - ) + clue("updateId4") { + val event = fetchEvent(updateId4, "updateId4") + assertTrafficSummary(event, "updateId4") + assertAppActivity(event, "updateId4", Set(aliceParty, venueParty), expectedRound = 7) + } clue("Alice-submitted create TransferInstruction has app activity for alice") { val event = fetchEvent(aliceCreateId, "aliceCreateId") @@ -246,83 +372,128 @@ class TrafficBasedRewardsTimeBasedIntegrationTest assertNoAppActivity(event, "svExpireId") } - // -- Reward pipeline endpoint checks -------------------------------------- - // ScanAggregationTrigger runs unpaused throughout the test and has already - // aggregated completed rounds. Run the paused RewardComputationTrigger to - // compute rewards, then verify the reward accounting HTTP endpoints. + clue("updateId5") { + val event = fetchEvent(updateId5, "updateId5") + assertTrafficSummary(event, "updateId5") + // Round 9: one DvP — venue has activity but will be below the coupon threshold; + // alice's additional transfers push her above it. + assertAppActivity(event, "updateId5", Set(aliceParty, venueParty), expectedRound = 9) + } - clue("Run the reward computation trigger") { - sv1ScanBackend.automation - .trigger[RewardComputationTrigger] - .runOnce() - .futureValue + clue("updateId6") { + val event = fetchEvent(updateId6, "updateId6") + assertTrafficSummary(event, "updateId6") + assertAppActivity(event, "updateId6", Set(aliceParty, venueParty), expectedRound = 10) } - val earliest = clue("Verify earliest available round is returned") { - val e = sv1ScanBackend.getRewardAccountingEarliestAvailableRound() - e shouldBe defined - e.value + clue("updateId7") { + val event = fetchEvent(updateId7, "updateId7") + assertTrafficSummary(event, "updateId7") + // Round 11: venue's FAP was cancelled in round 9, so only alice is a + // featured-app provider here. + assertAppActivity(event, "updateId7", Set(aliceParty), expectedRound = 11) } - val expectedProviders = expectedProvidersByRound.getOrElse( - earliest, - fail(s"No expected providers for earliest round $earliest"), - ) + assertRewardCalcs(aliceParty, venueParty) - clue("Verify activity totals for the computed round") { - val totals = inside(sv1ScanBackend.getRewardAccountingActivityTotals(earliest)) { - case GetRewardAccountingActivityTotalsResponse.members - .RewardAccountingActivityTotalsOk(t) => - t - } - totals.roundNumber shouldBe earliest - totals.activityRecordsCount should be > 0L - totals.activePartiesCount shouldBe expectedProviders.size.toLong - totals.totalAppActivityWeight should be > 0L - // The total weight must be at least as large as the traffic cost from the - // test's known settlement, since that settlement contributes activity records - // to the round (other background transactions may also contribute). - val roundTrafficCost = trafficCostByRound(earliest) - totals.totalAppActivityWeight should be >= roundTrafficCost + // Other misc API tests + clue("404 for non-existent batch data") { + sv1ScanBackend.getRewardAccountingBatch(6L, "0" * 64) shouldBe None } + } - clue("Verify root hash is available") { - val rootHash = inside(sv1ScanBackend.getRewardAccountingRootHash(earliest)) { - case GetRewardAccountingRootHashResponse.members.RewardAccountingRootHashOk(h) => h + private def assertRewardCalcs( + aliceParty: PartyId, + venueParty: PartyId, + )(implicit env: SpliceTestConsoleEnvironment): Unit = { + clue("Scan computes activity totals through round 10") { + eventually() { + sv1ScanBackend.getRewardAccountingActivityTotals(10L) shouldBe an[ + GetRewardAccountingActivityTotalsResponse.members.RewardAccountingActivityTotalsOk + ] } - rootHash.roundNumber shouldBe earliest - rootHash.rootHash should have length 64 // hex-encoded SHA-256 } - clue("Verify batch contains expected providers with non-zero amounts") { - val rootHashHex = inside(sv1ScanBackend.getRewardAccountingRootHash(earliest)) { - case GetRewardAccountingRootHashResponse.members.RewardAccountingRootHashOk(h) => - h.rootHash + val totalsByRound: Map[Long, definitions.RewardAccountingActivityTotalsOk] = + clue("Rounds 6..10 activity totals and root hash are computed") { + (6L to 10L).map { round => + val totalsOk = inside(sv1ScanBackend.getRewardAccountingActivityTotals(round)) { + case GetRewardAccountingActivityTotalsResponse.members + .RewardAccountingActivityTotalsOk(t) => + t + } withClue s"Round $round should have totals" + val rootHashOk = inside(sv1ScanBackend.getRewardAccountingRootHash(round)) { + case GetRewardAccountingRootHashResponse.members.RewardAccountingRootHashOk(h) => + h + } withClue s"Round $round should have a root hash" + rootHashOk.rootHash should have length 64 // hex-encoded SHA-256 + round -> totalsOk + }.toMap } - val batch = sv1ScanBackend.getRewardAccountingBatch(earliest, rootHashHex) - batch shouldBe defined - batch.value match { - case definitions.GetRewardAccountingBatchResponse.members - .RewardAccountingBatchOfMintingAllowances(allowances) => - val providers = allowances.mintingAllowances.map(_.provider).toSet - providers shouldBe expectedProviders.map(_.toProtoPrimitive) - allowances.mintingAllowances.foreach { ma => - BigDecimal(ma.amount) should be > BigDecimal(0) - } - case definitions.GetRewardAccountingBatchResponse.members - .RewardAccountingBatchOfBatches(batches) => - batches.childHashes should not be empty + + // For rounds 0..5 scan cannot do totals calcs/produce a root hash, rounds + // 0..4 had no activity records; round 5 is excluded as the earliest round + // with records, and round 11 has not closed yet. + clue( + "Minting allowances: rounds 6, 7, 10 reward both parties; round 9 only alice" + ) { + def providersFor(round: Long): Set[String] = + getMintingAllowancesForRound(round).map(_.provider).toSet + + // Log per-round, per-party minting so test failures surface concrete + // values without needing to re-run with debug. + (6L to 10L).foreach { round => + val amounts = getMintingAllowancesForRound(round) + .map(a => s"${a.provider.split("::").head}=${a.amount}") + .mkString(", ") + logger.info(s"Round $round minting: $amounts")(TraceContext.empty) + } + + // Rounds 6, 7, 10: both alice and venue did DvP trades. + Seq(6L, 7L, 10L).foreach { round => + providersFor(round) shouldBe Seq(aliceParty, venueParty) + .map(_.toProtoPrimitive) + .toSet withClue + s"Both parties should be rewarded in round $round" } + + // Round 8: no trades and background triggers are paused, so there is no + // activity records at all. + providersFor(8L) shouldBe Set() withClue + "Round 8 has no activity so no minting allowances are produced" + totalsByRound(8L).activityRecordsCount shouldBe 0L withClue + "Round 8 has no activity records" + totalsByRound(8L).totalAppActivityWeight shouldBe 0L withClue + "Round 8 has no activity weight" + + // Round 9: one DvP + alice→bob transfers; venue is below the coupon threshold + // so only alice receives minting allowances. + providersFor(9L) shouldBe Set(aliceParty.toProtoPrimitive) withClue + "Round 9: venue is below reward threshold so only alice should be rewarded" + } - clue("Verify response for non-existent data") { - sv1ScanBackend.getRewardAccountingActivityTotals(earliest + 100) shouldBe an[ - GetRewardAccountingActivityTotalsResponse.members.RewardAccountingActivityTotalsUndetermined - ] - sv1ScanBackend.getRewardAccountingRootHash(earliest + 100) shouldBe an[ - GetRewardAccountingRootHashResponse.members.RewardAccountingRootHashUndetermined - ] - sv1ScanBackend.getRewardAccountingBatch(earliest, "0" * 64) shouldBe None + // The remaining assertions cover the handling of V2 contracts created on ledger. + def listProcessRewardsV2Rounds(): Seq[Long] = + sv1Backend.appState.dsoStore + .listProcessRewardsV2() + .futureValue + .map(_.payload.round.number) + + clue("CalculateRewards and ProcessRewards triggers consume middle-round (6..10) contracts") { + // V2 contracts for rounds 6..10 should be processed by SVs + eventually() { + val remainingCalculate = sv1Backend.appState.dsoStore + .listCalculateRewardsV2() + .futureValue + .filter(c => c.payload.round.number >= 6L && c.payload.round.number <= 10L) + remainingCalculate shouldBe empty withClue + "Middle-round CalculateRewardsV2 contracts (6..10) should be consumed" + val remainingProcess = listProcessRewardsV2Rounds() + .filter(r => r >= 6L && r <= 10L) + remainingProcess shouldBe empty withClue + "Middle-round ProcessRewardsV2 contracts (6..10) should be consumed" + } } } @@ -355,52 +526,6 @@ class TrafficBasedRewardsTimeBasedIntegrationTest } } - private def assertAppActivity( - event: definitions.EventHistoryItem, - cluePrefix: String, - expectedProviders: Set[PartyId], - expectedRound: Long, - ): Unit = { - withClue(s"$cluePrefix should have app activity") { - event.appActivityRecords shouldBe defined - } - val totalTrafficCost = event.trafficSummary.value.totalTrafficCost - event.appActivityRecords.foreach { activity => - withClue(s"$cluePrefix app activity round number") { - activity.roundNumber shouldBe expectedRound - } - withClue(s"$cluePrefix app activity provider parties") { - activity.records.map(_.party).toSet shouldBe expectedProviders.map(_.toProtoPrimitive) - } - withClue(s"$cluePrefix each app activity weight should be positive") { - activity.records.foreach { r => - r.weight should be > 0L - } - } - val weightSum = activity.records.map(_.weight).sum - val numFeaturedAppParties = expectedProviders.size.toLong - withClue( - s"$cluePrefix sum of weights should be within [totalTrafficCost - numFeaturedAppParties, totalTrafficCost]" - ) { - weightSum should be > (totalTrafficCost - numFeaturedAppParties) - weightSum should be <= totalTrafficCost - } - } - } - - private def assertOldestOpenRound( - expectedOldestRound: Long - )(implicit env: SpliceTestConsoleEnvironment): Unit = { - clue(s"Asserting oldest open round=$expectedOldestRound") { - eventually() { - val (openRounds, _) = sv1ScanBackend.getOpenAndIssuingMiningRounds() - val roundNumbers = openRounds.map(_.contract.payload.round.number.toLong).sorted - roundNumbers should have size 3 - roundNumbers.head shouldBe expectedOldestRound - } - } - } - /** Alice creates an AmuletTransferInstruction with a short deadline, then we * advance past the deadline and have the SV trigger archive it. Returns * (aliceCreateId, svExpireId). @@ -500,6 +625,71 @@ class TrafficBasedRewardsTimeBasedIntegrationTest (createUid.value, archiveUid.value) } + private def assertAppActivity( + event: definitions.EventHistoryItem, + cluePrefix: String, + expectedProviders: Set[PartyId], + expectedRound: Long, + ): Unit = { + withClue(s"$cluePrefix should have app activity") { + event.appActivityRecords shouldBe defined + } + val totalTrafficCost = event.trafficSummary.value.totalTrafficCost + event.appActivityRecords.foreach { activity => + withClue(s"$cluePrefix app activity round number") { + activity.roundNumber shouldBe expectedRound + } + withClue(s"$cluePrefix app activity provider parties") { + activity.records.map(_.party).toSet shouldBe expectedProviders.map(_.toProtoPrimitive) + } + withClue(s"$cluePrefix each app activity weight should be positive") { + activity.records.foreach { r => + r.weight should be > 0L + } + } + val weightSum = activity.records.map(_.weight).sum + val numFeaturedAppParties = expectedProviders.size.toLong + withClue( + s"$cluePrefix sum of weights should be within [totalTrafficCost - numFeaturedAppParties, totalTrafficCost]" + ) { + weightSum should be > (totalTrafficCost - numFeaturedAppParties) + weightSum should be <= totalTrafficCost + } + } + } + + private def assertOldestOpenRound( + expectedOldestRound: Long + )(implicit env: SpliceTestConsoleEnvironment): Unit = { + clue(s"Asserting oldest open round=$expectedOldestRound") { + eventually() { + val (openRounds, _) = sv1ScanBackend.getOpenAndIssuingMiningRounds() + val roundNumbers = openRounds.map(_.contract.payload.round.number.toLong).sorted + roundNumbers should have size 3 + roundNumbers.head shouldBe expectedOldestRound + } + } + } + + private def getMintingAllowancesForRound( + round: Long + )(implicit + env: SpliceTestConsoleEnvironment + ): Seq[definitions.RewardAccountingMintingAllowance] = { + val hash = inside(sv1ScanBackend.getRewardAccountingRootHash(round)) { + case GetRewardAccountingRootHashResponse.members.RewardAccountingRootHashOk(h) => + h.rootHash + } + def walk(h: String): Seq[definitions.RewardAccountingMintingAllowance] = + sv1ScanBackend.getRewardAccountingBatch(round, h).toList.flatMap { + case GetRewardAccountingBatchResponse.members.RewardAccountingBatchOfBatches(b) => + b.childHashes.flatMap(walk) + case GetRewardAccountingBatchResponse.members.RewardAccountingBatchOfMintingAllowances(b) => + b.mintingAllowances.toSeq + } + walk(hash) + } + private def settleTrade( aliceParty: PartyId, bobParty: PartyId, @@ -593,12 +783,39 @@ class TrafficBasedRewardsTimeBasedIntegrationTest } } -object TrafficBasedRewardsTimeBasedIntegrationTest { +object TrafficBasedRewardsTimeBasedIntegrationTestBase { + + sealed trait RewardConfigMode + object RewardConfigMode { + // dryRunVersion = TrafficBased + case object DryRun extends RewardConfigMode + // mintingVersion = TrafficBased, dryRunVersion = None + case object MintingTrafficBased extends RewardConfigMode + } - // Use traffic-based app rewards (CIP-0104), not on-ledger coupon counting. val trafficBasedAppRewards = "RewardVersion_TrafficBasedAppRewards" + val featuredAppMarkers = "RewardVersion_FeaturedAppMarkers" + + // This threshold has been chosen to keep the venue's app activity below + // threshold for the round 9. See the test for details. + // + // appRewardCouponThreshold is in USD and compared against the minting + // allowance. For this test amuletPrice = 0.005, trafficPrice = 16.67 USD/MB, + // and issuancePerCoupon is observed to be ~100, so 30 KB of activity ≈ 50 USD + // (30/1000 MB × 16.67 × 100). + val appRewardCouponThreshold = BigDecimal("50") +} + +class TrafficBasedRewardsTimeBasedIntegrationTest + extends TrafficBasedRewardsTimeBasedIntegrationTestBase { + override protected val rewardConfigMode + : TrafficBasedRewardsTimeBasedIntegrationTestBase.RewardConfigMode = + TrafficBasedRewardsTimeBasedIntegrationTestBase.RewardConfigMode.MintingTrafficBased +} - // Set to zero so no rewards are filtered out in this test. - // In production this would be a small USD amount (e.g. 0.5). - val appRewardCouponThreshold = BigDecimal(0.0) +class TrafficBasedRewardsDryRunTimeBasedIntegrationTest + extends TrafficBasedRewardsTimeBasedIntegrationTestBase { + override protected val rewardConfigMode + : TrafficBasedRewardsTimeBasedIntegrationTestBase.RewardConfigMode = + TrafficBasedRewardsTimeBasedIntegrationTestBase.RewardConfigMode.DryRun } diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/AmuletConfigUtil.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/AmuletConfigUtil.scala index 9634c73da2..12491d554c 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/AmuletConfigUtil.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/AmuletConfigUtil.scala @@ -150,10 +150,11 @@ trait AmuletConfigUtil extends TestCommon { .requiredNumVotes(dsoRules) ) { eventually() { - sv.listVoteRequests() - .filter( - _.payload.trackingCid == voteRequestCid.contractId - ) should have size 1 + sv.listVoteRequests().filter { vr => + vr.contractId == voteRequestCid.contractId || + (vr.payload.trackingCid.isPresent && + vr.payload.trackingCid.get == voteRequestCid.contractId) + } should have size 1 } actAndCheck( s"${sv.name} casts a vote", { @@ -171,7 +172,6 @@ trait AmuletConfigUtil extends TestCommon { sv.lookupVoteRequest(voteRequestCid.contractId) .payload .votes should have size voteCount - sv.listVoteRequests() shouldBe empty }, ) } diff --git a/apps/common/src/test/java/org/lfdecentralizedtrust/splice/util/scalatesttags/SpliceAmulet_0_1_19.java b/apps/common/src/test/java/org/lfdecentralizedtrust/splice/util/scalatesttags/SpliceAmulet_0_1_19.java new file mode 100644 index 0000000000..a89bc1645a --- /dev/null +++ b/apps/common/src/test/java/org/lfdecentralizedtrust/splice/util/scalatesttags/SpliceAmulet_0_1_19.java @@ -0,0 +1,14 @@ +package org.lfdecentralizedtrust.splice.util.scalatesttags; + +import org.scalatest.TagAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +// Don't run this test when testing against splice-amulet < 0.1.19 +@TagAnnotation +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.TYPE}) +public @interface SpliceAmulet_0_1_19 {} diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/util/ScalaTestTags.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/util/ScalaTestTags.scala index 71a400af25..8f6f0f7af5 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/util/ScalaTestTags.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/util/ScalaTestTags.scala @@ -15,4 +15,7 @@ object Tags { // Don't run this test when testing against splice-amulet < 0.1.17 object SpliceAmulet_0_1_17 extends Tag("org.lfdecentralizedtrust.splice.util.scalatesttags.SpliceAmulet_0_1_17") + // Don't run this test when testing against splice-amulet < 0.1.19 + object SpliceAmulet_0_1_19 + extends Tag("org.lfdecentralizedtrust.splice.util.scalatesttags.SpliceAmulet_0_1_19") } diff --git a/apps/metrics-docs/src/main/scala/org/lfdecentralizedtrust/splice/metrics/MetricsDocs.scala b/apps/metrics-docs/src/main/scala/org/lfdecentralizedtrust/splice/metrics/MetricsDocs.scala index b253c925b4..2da6f55c17 100644 --- a/apps/metrics-docs/src/main/scala/org/lfdecentralizedtrust/splice/metrics/MetricsDocs.scala +++ b/apps/metrics-docs/src/main/scala/org/lfdecentralizedtrust/splice/metrics/MetricsDocs.scala @@ -18,6 +18,7 @@ import org.lfdecentralizedtrust.splice.sv.automation.singlesv.SequencerPruningMe import org.lfdecentralizedtrust.splice.sv.automation.{ AmuletPriceMetricsTrigger, ReportSvStatusMetricsExportTrigger, + RewardProcessingMetrics, } import org.lfdecentralizedtrust.splice.sv.store.db.DbSvDsoStoreMetrics import org.lfdecentralizedtrust.splice.store.{DomainParamsStore, HistoryMetrics, StoreMetrics} @@ -103,6 +104,7 @@ object MetricsDocs { generator, ) new AmuletPriceMetricsTrigger.AmuletPriceMetrics(generator) + new RewardProcessingMetrics(generator) val svMetrics = generator.getAll() generator.reset() // scan diff --git a/apps/sv/src/main/openapi/sv-internal.yaml b/apps/sv/src/main/openapi/sv-internal.yaml index 285628d302..9328fb8b43 100644 --- a/apps/sv/src/main/openapi/sv-internal.yaml +++ b/apps/sv/src/main/openapi/sv-internal.yaml @@ -687,6 +687,26 @@ paths: "500": $ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/500" + /v0/admin/reward-accounting-process/archive-dry-runs: + post: + tags: [ sv ] + x-jvm-package: sv_operator + operationId: "archiveDryRunRewardAccountingContracts" + description: | + Archive active `CalculateRewardsV2` and `ProcessRewardsV2` contracts + for the specified rounds if they have `dryRun` as `True`. + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/ArchiveDryRunRewardAccountingContractsRequest" + responses: + "200": + description: ok + "400": + $ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/400" + components: schemas: ListOngoingValidatorOnboardingsResponse: @@ -1261,3 +1281,17 @@ components: type: array items: $ref: "../../../../common/src/main/openapi/common-external.yaml#/components/schemas/Contract" + + ArchiveDryRunRewardAccountingContractsRequest: + type: object + required: + - rounds + properties: + rounds: + description: | + Round numbers whose dry-run `CalculateRewardsV2` and `ProcessRewardsV2` + contracts should be archived. + type: array + items: + type: integer + format: int64 diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala index b43925066c..50c6cd547d 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala @@ -995,6 +995,55 @@ object SvApp { } } + def archiveDryRunRewardAccountingContracts( + rounds: Seq[Long], + dsoStoreWithIngestion: AppStoreWithIngestion[SvDsoStore], + retryProvider: RetryProvider, + logger: TracedLogger, + )(implicit + ec: ExecutionContext, + traceContext: TraceContext, + ): Future[Unit] = { + val store = dsoStoreWithIngestion.store + store.listDryRunRewardAccountingContractsByRounds(rounds).flatMap { + case (calculateRewards, processRewards) => + if (calculateRewards.isEmpty && processRewards.isEmpty) { + Future.unit + } else { + retryProvider.retryForClientCalls( + "archiveDryRunRewardAccountingContracts", + "archiveDryRunRewardAccountingContracts", + for { + dsoRules <- store.getDsoRules() + amuletRules <- store.getAmuletRules() + choiceArg = + new splice.amuletrules.AmuletRules_ArchiveDryRunRewardAccountingV2( + calculateRewards.map(_.contractId).asJava, + processRewards.map(_.contractId).asJava, + ) + cmd = dsoRules.exercise( + _.exerciseDsoRules_ArchiveDryRunRewardAccountingV2( + amuletRules.contractId, + choiceArg, + store.key.svParty.toProtoPrimitive, + ) + ) + _ <- dsoStoreWithIngestion + .connection(SpliceLedgerConnectionPriority.Low) + .submit( + actAs = Seq(store.key.svParty), + readAs = Seq(store.key.dsoParty), + update = cmd, + ) + .noDedup + .yieldUnit() + } yield (), + logger, + ) + } + } + } + def castVote( trackingCid: splice.dsorules.VoteRequest.ContractId, isAccepted: Boolean, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/commands/HttpSvOperatorAppClient.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/commands/HttpSvOperatorAppClient.scala index cd1181cb30..b9f300eebf 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/commands/HttpSvOperatorAppClient.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/commands/HttpSvOperatorAppClient.scala @@ -365,6 +365,28 @@ object HttpSvOperatorAppClient { } } + case class ArchiveDryRunRewardAccountingContracts(rounds: Seq[Long]) + extends BaseCommand[http.ArchiveDryRunRewardAccountingContractsResponse, Unit] { + + override def submitRequest( + client: Client, + headers: List[HttpHeader], + ): EitherT[Future, Either[ + Throwable, + HttpResponse, + ], http.ArchiveDryRunRewardAccountingContractsResponse] = + client.archiveDryRunRewardAccountingContracts( + body = definitions.ArchiveDryRunRewardAccountingContractsRequest(rounds.toVector), + headers = headers, + ) + + override def handleOk()(implicit + decoder: TemplateJsonDecoder + ) = { case http.ArchiveDryRunRewardAccountingContractsResponse.OK => + Right(()) + } + } + case class GetPartyToParticipant(partyId: String) extends BaseCommand[ http.GetPartyToParticipantResponse, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvOperatorHandler.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvOperatorHandler.scala index 77ab3ae296..e500ef53ed 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvOperatorHandler.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvOperatorHandler.scala @@ -350,6 +350,24 @@ class HttpSvOperatorHandler( .map(r0.LookupDsoRulesVoteRequestResponse.OK) } + override def archiveDryRunRewardAccountingContracts( + respond: r0.ArchiveDryRunRewardAccountingContractsResponse.type + )( + body: definitions.ArchiveDryRunRewardAccountingContractsRequest + )(extracted: ActAsKnownUserRequest): Future[r0.ArchiveDryRunRewardAccountingContractsResponse] = { + implicit val ActAsKnownUserRequest(traceContext) = extracted + withSpan(s"$workflowId.archiveDryRunRewardAccountingContracts") { _ => _ => + SvApp + .archiveDryRunRewardAccountingContracts( + body.rounds.toSeq, + dsoStoreWithIngestion, + retryProvider, + logger, + ) + .map(_ => r0.ArchiveDryRunRewardAccountingContractsResponseOK) + } + } + override def castVote(respond: r0.CastVoteResponse.type)( body: definitions.CastVoteRequest )(extracted: ActAsKnownUserRequest): Future[r0.CastVoteResponse] = { diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/DsoDelegateBasedAutomationService.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/DsoDelegateBasedAutomationService.scala index 6b924f3bd7..0cfe95d612 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/DsoDelegateBasedAutomationService.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/DsoDelegateBasedAutomationService.scala @@ -17,11 +17,12 @@ import org.lfdecentralizedtrust.splice.store.{ DomainTimeSynchronization, DomainUnpausedSynchronization, } +import org.lfdecentralizedtrust.splice.scan.admin.api.client.ScanConnection import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.* -import org.lfdecentralizedtrust.splice.sv.config.SvAppBackendConfig import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.ExpiredAmuletAllocationTrigger +import org.lfdecentralizedtrust.splice.sv.config.SvAppBackendConfig -import scala.concurrent.ExecutionContext +import scala.concurrent.{ExecutionContextExecutor, Future} class DsoDelegateBasedAutomationService( clock: Clock, @@ -29,10 +30,11 @@ class DsoDelegateBasedAutomationService( domainUnpausedSync: DomainUnpausedSynchronization, config: SvAppBackendConfig, svTaskContext: SvTaskBasedTrigger.Context, + scanConnectionF: Future[ScanConnection], retryProvider: RetryProvider, override protected val loggerFactory: NamedLoggerFactory, )(implicit - ec: ExecutionContext, + ec: ExecutionContextExecutor, mat: Materializer, tracer: Tracer, ) extends AutomationService( @@ -140,6 +142,13 @@ class DsoDelegateBasedAutomationService( svTaskContext, ) ) + + registerTrigger( + new ProcessRewardsTrigger(triggerContext, svTaskContext, scanConnectionF) + ) + registerTrigger( + new ProcessRewardsDryRunTrigger(triggerContext, svTaskContext, scanConnectionF) + ) } } @@ -179,5 +188,7 @@ object DsoDelegateBasedAutomationService extends AutomationServiceCompanion { aTrigger[MergeUnclaimedDevelopmentFundCouponsTrigger], aTrigger[ExpiredDevelopmentFundCouponTrigger], aTrigger[BootstrapExternalPartyConfigStateInstructionTrigger], + aTrigger[ProcessRewardsTrigger], + aTrigger[ProcessRewardsDryRunTrigger], ) } diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/RewardProcessingMetrics.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/RewardProcessingMetrics.scala new file mode 100644 index 0000000000..4f03e36a69 --- /dev/null +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/RewardProcessingMetrics.scala @@ -0,0 +1,36 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.sv.automation + +import com.daml.metrics.api.{MetricInfo, MetricName} +import com.daml.metrics.api.MetricHandle.{LabeledMetricsFactory, Timer} +import com.daml.metrics.api.MetricQualification.Latency +import org.lfdecentralizedtrust.splice.environment.SpliceMetrics + +class RewardProcessingMetrics(metricsFactory: LabeledMetricsFactory) { + + private val prefix: MetricName = SpliceMetrics.MetricsPrefix + + val calculateRewardsProcessingDelay: Timer = + metricsFactory.timer( + MetricInfo( + name = prefix :+ "calculate_rewards_v2" :+ "processing_delay", + summary = "Delay between round close and CalculateRewardsV2 confirmation creation", + description = + "This metric captures the time it took between the closing of a round, and this SV's confirmation for the CalculateRewardsV2 contract's processing. Labeled with dryRun.", + qualification = Latency, + ) + ) + + val processRewardsProcessingDelay: Timer = + metricsFactory.timer( + MetricInfo( + name = prefix :+ "process_rewards_v2" :+ "processing_delay", + summary = "Delay between round close and ProcessRewardsV2 processing", + description = + "This metric captures the time it took between the closing of a round, and this SV's processing of a ProcessRewardsV2 contract for that round. Labeled with dryRun.", + qualification = Latency, + ) + ) +} diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala index 65cd69338d..d17a311bf3 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala @@ -6,10 +6,12 @@ package org.lfdecentralizedtrust.splice.sv.automation import cats.implicits.catsSyntaxOptionId import com.daml.grpc.adapter.ExecutionSequencerFactory import com.digitalasset.canton.SynchronizerAlias -import com.digitalasset.canton.config.ClientConfig +import com.digitalasset.canton.config.{ClientConfig, NonNegativeDuration} +import com.digitalasset.canton.lifecycle.{AsyncCloseable, AsyncOrSyncCloseable} import com.digitalasset.canton.logging.NamedLoggerFactory import com.digitalasset.canton.time.{Clock, WallClock} import com.digitalasset.canton.topology.SynchronizerId +import com.digitalasset.canton.tracing.TraceContext import io.opentelemetry.api.trace.Tracer import monocle.Monocle.toAppliedFocusOps import org.apache.pekko.stream.Materializer @@ -24,11 +26,14 @@ import org.lfdecentralizedtrust.splice.automation.AutomationServiceCompanion.{ } import org.lfdecentralizedtrust.splice.config.{ EnabledFeaturesConfig, + NetworkAppClientConfig, SpliceInstanceNamesConfig, UpgradesConfig, } import org.lfdecentralizedtrust.splice.environment.* import org.lfdecentralizedtrust.splice.http.HttpClient +import org.lfdecentralizedtrust.splice.scan.admin.api.client.ScanConnection +import org.lfdecentralizedtrust.splice.scan.config.ScanAppClientConfig import org.lfdecentralizedtrust.splice.store.{ DomainTimeSynchronization, DomainUnpausedSynchronization, @@ -59,7 +64,7 @@ import org.lfdecentralizedtrust.splice.sv.onboarding.SynchronizerNodeReconciler import org.lfdecentralizedtrust.splice.sv.store.{SvDsoStore, SvSvStore} import org.lfdecentralizedtrust.splice.util.TemplateJsonDecoder -import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.{ExecutionContextExecutor, Future} class SvDsoAutomationService( clock: Clock, @@ -87,6 +92,7 @@ class SvDsoAutomationService( httpClient: HttpClient, templateJsonDecoder: TemplateJsonDecoder, esf: ExecutionSequencerFactory, + tc: TraceContext, ) extends SpliceAppAutomationService( config.automation, clock, @@ -102,6 +108,26 @@ class SvDsoAutomationService( : org.lfdecentralizedtrust.splice.sv.automation.SvDsoAutomationService.type = SvDsoAutomationService + // Shared long-lived connection to the SV's own scan, used by the reward triggers + private val scanConnectionF: Future[ScanConnection] = ScanConnection.singleUncached( + ScanAppClientConfig(NetworkAppClientConfig(config.scan.internalUrl)), + upgradesConfig, + clock, + retryProvider, + loggerFactory, + retryConnectionOnInitialFailure = true, + ) + + override protected def closeAsync(): Seq[AsyncOrSyncCloseable] = + super.closeAsync() :+ AsyncCloseable( + "scan-connection", + scanConnectionF.transform { + case scala.util.Success(c) => scala.util.Try(c.close()) + case scala.util.Failure(_) => scala.util.Success(()) + }, + NonNegativeDuration.tryFromDuration(timeouts.shutdownNetwork.duration), + ) + private val packageVettingService = new PackageVettingLookupService( config.packageVettingCache, connection( @@ -128,6 +154,7 @@ class SvDsoAutomationService( retryProvider, packageVersionSupport, packageVettingService, + scanConnectionF, ) // required for triggers that must run in sim time as well @@ -364,6 +391,24 @@ class SvDsoAutomationService( ) ) + registerTrigger( + new CalculateRewardsTrigger( + triggerContext, + dsoStore, + connection(SpliceLedgerConnectionPriority.Medium), + scanConnectionF, + ) + ) + + registerTrigger( + new CalculateRewardsDryRunTrigger( + triggerContext, + dsoStore, + connection(SpliceLedgerConnectionPriority.Medium), + scanConnectionF, + ) + ) + registerTrigger(restartDsoDelegateBasedAutomationTrigger) registerTrigger( @@ -554,6 +599,8 @@ object SvDsoAutomationService extends AutomationServiceCompanion { aTrigger[SvOnboardingRequestTrigger], aTrigger[ReceiveSvRewardCouponTrigger], aTrigger[ArchiveClosedMiningRoundsTrigger], + aTrigger[CalculateRewardsTrigger], + aTrigger[CalculateRewardsDryRunTrigger], aTrigger[RestartDsoDelegateBasedAutomationTrigger], aTrigger[AnsSubscriptionInitialPaymentTrigger], aTrigger[SvPackageVettingTrigger], diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/confirmation/CalculateRewardsTrigger.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/confirmation/CalculateRewardsTrigger.scala new file mode 100644 index 0000000000..b40f7a104b --- /dev/null +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/confirmation/CalculateRewardsTrigger.scala @@ -0,0 +1,229 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.sv.automation.confirmation + +import org.apache.pekko.stream.Materializer +import org.lfdecentralizedtrust.splice.automation.{ + PollingParallelTaskExecutionTrigger, + TaskNoop, + TaskOutcome, + TaskSuccess, + TriggerContext, +} +import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet.cryptohash.Hash +import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet.rewardaccountingv2.CalculateRewardsV2 +import org.lfdecentralizedtrust.splice.http.v0.definitions.GetRewardAccountingRootHashResponse +import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletrules.AmuletRules_StartProcessingRewardsV2 +import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.ActionRequiringConfirmation +import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.actionrequiringconfirmation.ARC_AmuletRules +import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.amuletrules_actionrequiringconfirmation.CRARC_StartProcessingRewardsV2 +import org.lfdecentralizedtrust.splice.environment.SpliceLedgerConnection +import org.lfdecentralizedtrust.splice.scan.admin.api.client.ScanConnection +import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.QueryResult +import org.lfdecentralizedtrust.splice.sv.automation.RewardProcessingMetrics +import org.lfdecentralizedtrust.splice.sv.store.SvDsoStore +import org.lfdecentralizedtrust.splice.util.AssignedContract +import org.lfdecentralizedtrust.splice.util.PrettyInstances.* +import com.daml.metrics.api.MetricsContext +import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} +import com.digitalasset.canton.tracing.TraceContext +import io.grpc.Status +import io.opentelemetry.api.trace.Tracer + +import scala.concurrent.{ExecutionContextExecutor, Future} + +abstract class CalculateRewardsTriggerBase( + override protected val context: TriggerContext, + store: SvDsoStore, + connection: SpliceLedgerConnection, + scanConnectionF: Future[ScanConnection], + isDryRun: Boolean, +)(implicit + ec: ExecutionContextExecutor, + mat: Materializer, + tracer: Tracer, +) extends PollingParallelTaskExecutionTrigger[CalculateRewardsTriggerBase.Task] { + + import CalculateRewardsTriggerBase.* + + private val svParty = store.key.svParty + private val dsoParty = store.key.dsoParty + private val rewardMetrics = new RewardProcessingMetrics(context.metricsFactory) + + override def retrieveTasks()(implicit tc: TraceContext): Future[Seq[Task]] = for { + // These are ordered by round, so we process the oldest first + calculateRewards <- store.listCalculateRewardsV2() + confirmedCids <- listConfirmedCalculateRewardsCids() + } yield calculateRewards + .filter(_.payload.dryRun == isDryRun) + .filterNot(c => confirmedCids.contains(c.contractId)) + .map(Task(_)) + + override def completeTask( + task: Task + )(implicit tc: TraceContext): Future[TaskOutcome] = { + val round = task.calculateRewards.payload.round.number + getRootHash(round).flatMap { + case None => + throw Status.FAILED_PRECONDITION + .withDescription( + s"Scan has not yet computed the root hash for CalculateRewardsV2 round $round." + ) + .asRuntimeException() + case Some(rootHash) => + val action = startProcessingRewardsAction( + task.calculateRewards.contractId, + rootHash, + ) + for { + queryResult <- store.lookupConfirmationByActionWithOffset(svParty, action) + taskOutcome <- queryResult match { + case QueryResult(_, Some(_)) => + Future.successful(TaskNoop) + case QueryResult(offset, None) => + for { + dsoRules <- store.getDsoRules() + cmd = dsoRules.exercise( + _.exerciseDsoRules_ConfirmAction( + svParty.toProtoPrimitive, + action, + ) + ) + _ <- connection + .submit( + actAs = Seq(svParty), + readAs = Seq(dsoParty), + update = cmd, + ) + .withDedup( + commandId = SpliceLedgerConnection.CommandId( + "org.lfdecentralizedtrust.splice.sv.createStartProcessingRewardsV2Confirmation", + Seq(svParty, dsoParty), + task.calculateRewards.contractId.contractId, + ), + deduplicationOffset = offset, + ) + .yieldUnit() + delay = java.time.Duration.between( + task.calculateRewards.payload.roundClosedAt, + context.clock.now.toInstant, + ) + _ = rewardMetrics.calculateRewardsProcessingDelay.update(delay)( + MetricsContext.Empty.withExtraLabels("dryRun" -> isDryRun.toString) + ) + } yield TaskSuccess( + s"created confirmation for CalculateRewardsV2 round $round, processingDelay=$delay" + ) + } + } yield taskOutcome + } + } + + override def isStaleTask(task: Task)(implicit + tc: TraceContext + ): Future[Boolean] = + store.multiDomainAcsStore + .lookupContractById(CalculateRewardsV2.COMPANION)(task.calculateRewards.contractId) + .flatMap { + case None => Future.successful(true) + case Some(_) => + listConfirmedCalculateRewardsCids().map( + _.contains(task.calculateRewards.contractId) + ) + } + + private def listConfirmedCalculateRewardsCids()(implicit + tc: TraceContext + ): Future[Set[CalculateRewardsV2.ContractId]] = + store.listConfirmationsByConfirmer(svParty).map { confirmations => + confirmations.iterator.flatMap { c => + c.payload.action match { + case arc: ARC_AmuletRules => + arc.amuletRulesAction match { + case crarc: CRARC_StartProcessingRewardsV2 => + Some(crarc.amuletRules_StartProcessingRewardsV2Value.calculateRewardsCid) + case _ => None + } + case _ => None + } + }.toSet + } + + private def getRootHash(round: Long)(implicit tc: TraceContext): Future[Option[Hash]] = + scanConnectionF.flatMap(_.getRewardAccountingRootHash(round)).map { + case GetRewardAccountingRootHashResponse.members.RewardAccountingRootHashOk(ok) => + Some(new Hash(ok.rootHash)) + case GetRewardAccountingRootHashResponse.members.RewardAccountingRootHashUndetermined(_) => + None + case GetRewardAccountingRootHashResponse.members.RewardAccountingRootHashCannotProvide(_) => + // TODO (#5623) replace with BFT read + throw new RuntimeException( + s"Scan cannot provide root hash for round $round" + ) + } + + private def startProcessingRewardsAction( + calculateRewardsCid: CalculateRewardsV2.ContractId, + rootHash: Hash, + ): ActionRequiringConfirmation = + new ARC_AmuletRules( + new CRARC_StartProcessingRewardsV2( + new AmuletRules_StartProcessingRewardsV2( + calculateRewardsCid, + rootHash, + ) + ) + ) + +} + +class CalculateRewardsTrigger( + override protected val context: TriggerContext, + store: SvDsoStore, + connection: SpliceLedgerConnection, + scanConnectionF: Future[ScanConnection], +)(implicit + ec: ExecutionContextExecutor, + mat: Materializer, + tracer: Tracer, +) extends CalculateRewardsTriggerBase( + context, + store, + connection, + scanConnectionF, + isDryRun = false, + ) + +class CalculateRewardsDryRunTrigger( + override protected val context: TriggerContext, + store: SvDsoStore, + connection: SpliceLedgerConnection, + scanConnectionF: Future[ScanConnection], +)(implicit + ec: ExecutionContextExecutor, + mat: Materializer, + tracer: Tracer, +) extends CalculateRewardsTriggerBase( + context, + store, + connection, + scanConnectionF, + isDryRun = true, + ) + +object CalculateRewardsTriggerBase { + + final case class Task( + calculateRewards: AssignedContract[ + CalculateRewardsV2.ContractId, + CalculateRewardsV2, + ] + ) extends PrettyPrinting { + override def pretty: Pretty[this.type] = + prettyOfClass( + param("round", _.calculateRewards.payload.round.number), + param("dryRun", _.calculateRewards.payload.dryRun.toString.unquoted), + ) + } +} diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/ExecuteConfirmedActionTrigger.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/ExecuteConfirmedActionTrigger.scala index 874abe182a..dbed349496 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/ExecuteConfirmedActionTrigger.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/ExecuteConfirmedActionTrigger.scala @@ -29,9 +29,11 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.ansentrycont ANSRARC_CollectInitialEntryPayment, ANSRARC_RejectEntryInitialPayment, } +import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet.rewardaccountingv2.CalculateRewardsV2 import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.amuletrules_actionrequiringconfirmation.{ CRARC_MiningRound_Archive, CRARC_MiningRound_StartIssuing, + CRARC_StartProcessingRewardsV2, } import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.dsorules_actionrequiringconfirmation.* import org.lfdecentralizedtrust.splice.config.Thresholds @@ -202,6 +204,15 @@ class ExecuteConfirmedActionTrigger( closedRoundCid, ) .map(_.isEmpty) + case startProcessingAction: CRARC_StartProcessingRewardsV2 => + val calculateRewardsCid = + startProcessingAction.amuletRules_StartProcessingRewardsV2Value.calculateRewardsCid + store.multiDomainAcsStore + .lookupContractByIdOnDomain(CalculateRewardsV2.COMPANION)( + confirmation.domain, + calculateRewardsCid, + ) + .map(_.isEmpty) case action => throw new UnsupportedOperationException( show"AmuletRules $action is not yet supported" diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/ProcessRewardsTrigger.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/ProcessRewardsTrigger.scala new file mode 100644 index 0000000000..449955cc36 --- /dev/null +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/delegatebased/ProcessRewardsTrigger.scala @@ -0,0 +1,187 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.sv.automation.delegatebased + +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Source +import org.lfdecentralizedtrust.splice.automation.{ + OnAssignedContractTrigger, + TaskOutcome, + TaskSuccess, + TriggerContext, +} +import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet.cryptohash.Hash +import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet.rewardaccountingv2.{ + MintingAllowance, + ProcessRewardsV2, + ProcessRewardsV2_ProcessBatch, +} +import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet.rewardaccountingv2.batch.{ + BatchOfBatches, + BatchOfMintingAllowances, +} +import org.lfdecentralizedtrust.splice.http.v0.definitions.{ + GetRewardAccountingBatchResponse, + RewardAccountingMintingAllowance, +} +import org.lfdecentralizedtrust.splice.scan.admin.api.client.ScanConnection +import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority +import org.lfdecentralizedtrust.splice.sv.automation.RewardProcessingMetrics +import org.lfdecentralizedtrust.splice.util.AssignedContract +import com.daml.metrics.api.MetricsContext +import com.digitalasset.canton.tracing.TraceContext +import io.opentelemetry.api.trace.Tracer +import org.lfdecentralizedtrust.splice.codegen.java.da.set.types.{Set as DamlSet} + +import java.math.BigDecimal +import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.jdk.CollectionConverters.* + +import ProcessRewardsTriggerBase.* + +private[delegatebased] abstract class ProcessRewardsTriggerBase( + override protected val context: TriggerContext, + override protected val svTaskContext: SvTaskBasedTrigger.Context, + scanConnectionF: Future[ScanConnection], + isDryRun: Boolean, +)(implicit + ec: ExecutionContextExecutor, + mat: Materializer, + tracer: Tracer, +) extends OnAssignedContractTrigger.Template[ + ProcessRewardsV2.ContractId, + ProcessRewardsV2, + ]( + svTaskContext.dsoStore, + ProcessRewardsV2.COMPANION, + ) + with SvTaskBasedTrigger[ProcessRewardsV2Contract] { + + private val store = svTaskContext.dsoStore + private val rewardMetrics = new RewardProcessingMetrics(context.metricsFactory) + + override protected def source(implicit + traceContext: TraceContext + ): Source[ProcessRewardsV2Contract, NotUsed] = + super.source.filter(_.payload.dryRun == isDryRun) + + override def completeTaskAsDsoDelegate( + task: ProcessRewardsV2Contract, + controller: String, + )(implicit tc: TraceContext): Future[TaskOutcome] = { + val round = task.payload.round.number + val batchHash = task.payload.batchHash.value + val batchF = fetchBatch(round, batchHash) + val dsoRulesF = store.getDsoRules() + for { + batch <- batchF + dsoRules <- dsoRulesF + damlBatch = convertBatch(batch) + choiceArg = new ProcessRewardsV2_ProcessBatch( + damlBatch, + // TODO (#5715) determine 'providersWithWrongVettingState' + new DamlSet(java.util.Collections.emptyMap()), + ) + cmd = dsoRules.exercise( + _.exerciseDsoRules_ProcessRewardsV2_ProcessBatch( + task.contractId, + choiceArg, + controller, + ) + ) + _ <- svTaskContext + .connection(SpliceLedgerConnectionPriority.Low) + .submit( + Seq(store.key.svParty), + Seq(store.key.dsoParty), + cmd, + ) + .noDedup + .yieldUnit() + delay = java.time.Duration + .between(task.payload.roundClosedAt, context.clock.now.toInstant) + _ = rewardMetrics.processRewardsProcessingDelay.update(delay)( + MetricsContext.Empty.withExtraLabels("dryRun" -> isDryRun.toString) + ) + } yield TaskSuccess( + s"Processed round $round, processingDelay=$delay, batchType=${batchTypeOf(batch)}" + ) + } + + private def batchTypeOf(response: GetRewardAccountingBatchResponse): String = + response match { + case _: GetRewardAccountingBatchResponse.members.RewardAccountingBatchOfBatches => + "BatchOfBatches" + case _: GetRewardAccountingBatchResponse.members.RewardAccountingBatchOfMintingAllowances => + "BatchOfMintingAllowances" + } + + private def convertBatch( + response: GetRewardAccountingBatchResponse + ): org.lfdecentralizedtrust.splice.codegen.java.splice.amulet.rewardaccountingv2.Batch = + response match { + case GetRewardAccountingBatchResponse.members.RewardAccountingBatchOfBatches(value) => + val childHashes = value.childHashes.map(h => new Hash(h)).asJava + new BatchOfBatches(childHashes) + case GetRewardAccountingBatchResponse.members.RewardAccountingBatchOfMintingAllowances( + value + ) => + val allowances = value.mintingAllowances + .map((a: RewardAccountingMintingAllowance) => + new MintingAllowance(a.provider, new BigDecimal(a.amount)) + ) + .asJava + new BatchOfMintingAllowances(allowances) + } + + private def fetchBatch(round: Long, batchHash: String)(implicit + tc: TraceContext + ): Future[GetRewardAccountingBatchResponse] = + scanConnectionF.flatMap(_.getRewardAccountingBatch(round, batchHash)).map { + case Some(response) => response + case None => + // TODO (#5623) replace with BFT read + throw new RuntimeException( + s"Batch not found from scan for round $round with hash $batchHash" + ) + } +} + +class ProcessRewardsTrigger( + override protected val context: TriggerContext, + override protected val svTaskContext: SvTaskBasedTrigger.Context, + scanConnectionF: Future[ScanConnection], +)(implicit + ec: ExecutionContextExecutor, + mat: Materializer, + tracer: Tracer, +) extends ProcessRewardsTriggerBase( + context, + svTaskContext, + scanConnectionF, + isDryRun = false, + ) + +class ProcessRewardsDryRunTrigger( + override protected val context: TriggerContext, + override protected val svTaskContext: SvTaskBasedTrigger.Context, + scanConnectionF: Future[ScanConnection], +)(implicit + ec: ExecutionContextExecutor, + mat: Materializer, + tracer: Tracer, +) extends ProcessRewardsTriggerBase( + context, + svTaskContext, + scanConnectionF, + isDryRun = true, + ) + +private[delegatebased] object ProcessRewardsTriggerBase { + type ProcessRewardsV2Contract = AssignedContract[ + ProcessRewardsV2.ContractId, + ProcessRewardsV2, + ] +} diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/RestartDsoDelegateBasedAutomationTrigger.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/RestartDsoDelegateBasedAutomationTrigger.scala index 63aaef9920..0eae3058f9 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/RestartDsoDelegateBasedAutomationTrigger.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/RestartDsoDelegateBasedAutomationTrigger.scala @@ -17,6 +17,7 @@ import org.lfdecentralizedtrust.splice.environment.{ RetryProvider, SpliceLedgerConnection, } +import org.lfdecentralizedtrust.splice.scan.admin.api.client.ScanConnection import org.lfdecentralizedtrust.splice.store.{ DomainTimeSynchronization, DomainUnpausedSynchronization, @@ -30,7 +31,7 @@ import com.digitalasset.canton.time.Clock import com.digitalasset.canton.tracing.TraceContext import io.opentelemetry.api.trace.Tracer -import scala.concurrent.{ExecutionContext, Future, blocking} +import scala.concurrent.{ExecutionContextExecutor, Future, blocking} import com.digitalasset.canton.lifecycle.RunOnClosing import com.digitalasset.canton.lifecycle.AsyncOrSyncCloseable import com.digitalasset.canton.lifecycle.SyncCloseable @@ -50,8 +51,9 @@ class RestartDsoDelegateBasedAutomationTrigger( appLevelRetryProvider: RetryProvider, packageVersionSupport: PackageVersionSupport, packageVettingService: PackageVettingLookupService, + scanConnectionF: Future[ScanConnection], )(implicit - override val ec: ExecutionContext, + override val ec: ExecutionContextExecutor, mat: Materializer, tracer: Tracer, ) extends OnAssignedContractTrigger.Template[ @@ -129,9 +131,7 @@ class RestartDsoDelegateBasedAutomationTrigger( } } - private def restartAutomation(epoch: Long)(implicit - ec: ExecutionContext - ): TaskOutcome = { + private def restartAutomation(epoch: Long): TaskOutcome = { val svTaskContext = SvTaskBasedTrigger.Context( store, @@ -164,6 +164,7 @@ class RestartDsoDelegateBasedAutomationTrigger( domainUnpausedSync, config, svTaskContext, + scanConnectionF, retryProvider, loggerFactory, ) diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/NodeInitializerUtil.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/NodeInitializerUtil.scala index c56447738b..e9ed3f1f13 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/NodeInitializerUtil.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/NodeInitializerUtil.scala @@ -151,6 +151,7 @@ trait NodeInitializerUtil extends NamedLogging with Spanning with SynchronizerNo httpClient: HttpClient, templateJsonDecoder: TemplateJsonDecoder, esf: ExecutionSequencerFactory, + tc: TraceContext, ) = new SvDsoAutomationService( clock, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvDsoStore.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvDsoStore.scala index 1b932b4bd2..11e5e421b4 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvDsoStore.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvDsoStore.scala @@ -240,6 +240,13 @@ trait SvDsoStore tc: TraceContext ): Future[Seq[Contract[splice.dsorules.Confirmation.ContractId, splice.dsorules.Confirmation]]] + def listConfirmationsByConfirmer( + confirmer: PartyId, + limit: Limit = defaultLimit, + )(implicit + tc: TraceContext + ): Future[Seq[Contract[splice.dsorules.Confirmation.ContractId, splice.dsorules.Confirmation]]] + def listAppRewardCouponsOnDomain( round: Long, synchronizerId: SynchronizerId, @@ -544,6 +551,45 @@ trait SvDsoStore splice.round.SummarizingMiningRound, ]]] + def listCalculateRewardsV2( + limit: Limit = defaultLimit + )(implicit tc: TraceContext): Future[Seq[AssignedContract[ + splice.amulet.rewardaccountingv2.CalculateRewardsV2.ContractId, + splice.amulet.rewardaccountingv2.CalculateRewardsV2, + ]]] + + def listProcessRewardsV2( + limit: Limit = defaultLimit + )(implicit tc: TraceContext): Future[Seq[AssignedContract[ + splice.amulet.rewardaccountingv2.ProcessRewardsV2.ContractId, + splice.amulet.rewardaccountingv2.ProcessRewardsV2, + ]]] + + def listRewardCouponsV2( + limit: Limit = defaultLimit + )(implicit tc: TraceContext): Future[Seq[AssignedContract[ + splice.amulet.RewardCouponV2.ContractId, + splice.amulet.RewardCouponV2, + ]]] + + /** Returns the dry-run `CalculateRewardsV2` and `ProcessRewardsV2` contracts whose + * round number is in the given set. + */ + def listDryRunRewardAccountingContractsByRounds(rounds: Seq[Long])(implicit + tc: TraceContext + ): Future[ + ( + Seq[AssignedContract[ + splice.amulet.rewardaccountingv2.CalculateRewardsV2.ContractId, + splice.amulet.rewardaccountingv2.CalculateRewardsV2, + ]], + Seq[AssignedContract[ + splice.amulet.rewardaccountingv2.ProcessRewardsV2.ContractId, + splice.amulet.rewardaccountingv2.ProcessRewardsV2, + ]], + ) + ] + /** All `ClosedMiningRound` contracts that should be confirmed to be archived. * * These are all `ClosedMiningRound` contracts for which @@ -1265,6 +1311,31 @@ object SvDsoStore { rewardWeight = Some(contract.payload.weight), ) }, + mkFilter(splice.amulet.RewardCouponV2.COMPANION)(co => co.payload.dso == dso) { contract => + DsoAcsStoreRowData( + contract, + rewardRound = Some(contract.payload.round.number), + rewardParty = Some(PartyId.tryFromProtoPrimitive(contract.payload.provider)), + rewardAmount = Some(contract.payload.amount), + contractExpiresAt = Some(Timestamp.assertFromInstant(contract.payload.expiresAt)), + ) + }, + mkFilter(splice.amulet.rewardaccountingv2.CalculateRewardsV2.COMPANION)(co => + co.payload.dso == dso + ) { contract => + DsoAcsStoreRowData( + contract, + miningRound = Some(contract.payload.round.number), + ) + }, + mkFilter(splice.amulet.rewardaccountingv2.ProcessRewardsV2.COMPANION)(co => + co.payload.dso == dso + ) { contract => + DsoAcsStoreRowData( + contract, + miningRound = Some(contract.payload.round.number), + ) + }, mkFilter(splice.round.OpenMiningRound.COMPANION)(co => co.payload.dso == dso) { contract => DsoAcsStoreRowData( contract, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvDsoStore.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvDsoStore.scala index 49b14ab967..64b9711671 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvDsoStore.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvDsoStore.scala @@ -94,7 +94,7 @@ class DbSvDsoStore( // Any change in the store descriptor will lead to previously deployed applications // forgetting all persisted data once they upgrade to the new version. acsStoreDescriptor = StoreDescriptor( - version = 2, + version = 3, name = "DbSvDsoStore", party = key.dsoParty, participant = participantId, @@ -281,6 +281,29 @@ class DbSvDsoStore( } yield limited.map(contractFromRow(Confirmation.COMPANION)(_)) } + override def listConfirmationsByConfirmer( + confirmer: PartyId, + limit: Limit, + )(implicit + tc: TraceContext + ): Future[Seq[Contract[Confirmation.ContractId, Confirmation]]] = waitUntilAcsIngested { + for { + result <- storage + .query( + selectFromAcsTable( + DsoTables.acsTableName, + acsStoreId, + domainMigrationId, + Confirmation.COMPANION, + where = sql"""confirmer = $confirmer""", + orderLimit = sql"""limit ${sqlLimit(limit)}""", + ), + "listConfirmationsByConfirmer", + ) + limited = applyLimit("listConfirmationsByConfirmer", limit, result) + } yield limited.map(contractFromRow(Confirmation.COMPANION)(_)) + } + override def listAppRewardCouponsOnDomain( round: Long, synchronizerId: SynchronizerId, @@ -691,6 +714,140 @@ class DbSvDsoStore( limited = applyLimit("listOldestSummarizingMiningRounds", limit, result) } yield limited.map(assignedContractFromRow(SummarizingMiningRound.COMPANION)(_)) + override def listCalculateRewardsV2(limit: Limit = defaultLimit)(implicit + tc: TraceContext + ): Future[Seq[ + AssignedContract[ + splice.amulet.rewardaccountingv2.CalculateRewardsV2.ContractId, + splice.amulet.rewardaccountingv2.CalculateRewardsV2, + ] + ]] = + for { + result <- storage + .query( + selectFromAcsTableWithState( + DsoTables.acsTableName, + acsStoreId, + domainMigrationId, + splice.amulet.rewardaccountingv2.CalculateRewardsV2.COMPANION, + orderLimit = sql"""order by mining_round limit ${sqlLimit(limit)}""", + ), + "listCalculateRewardsV2", + ) + limited = applyLimit("listCalculateRewardsV2", limit, result) + } yield limited.map( + assignedContractFromRow(splice.amulet.rewardaccountingv2.CalculateRewardsV2.COMPANION)(_) + ) + + override def listProcessRewardsV2(limit: Limit = defaultLimit)(implicit + tc: TraceContext + ): Future[Seq[ + AssignedContract[ + splice.amulet.rewardaccountingv2.ProcessRewardsV2.ContractId, + splice.amulet.rewardaccountingv2.ProcessRewardsV2, + ] + ]] = + for { + result <- storage + .query( + selectFromAcsTableWithState( + DsoTables.acsTableName, + acsStoreId, + domainMigrationId, + splice.amulet.rewardaccountingv2.ProcessRewardsV2.COMPANION, + orderLimit = sql"""order by mining_round limit ${sqlLimit(limit)}""", + ), + "listProcessRewardsV2", + ) + limited = applyLimit("listProcessRewardsV2", limit, result) + } yield limited.map( + assignedContractFromRow(splice.amulet.rewardaccountingv2.ProcessRewardsV2.COMPANION)(_) + ) + + override def listRewardCouponsV2(limit: Limit = defaultLimit)(implicit + tc: TraceContext + ): Future[Seq[ + AssignedContract[ + splice.amulet.RewardCouponV2.ContractId, + splice.amulet.RewardCouponV2, + ] + ]] = + for { + result <- storage + .query( + selectFromAcsTableWithState( + DsoTables.acsTableName, + acsStoreId, + domainMigrationId, + splice.amulet.RewardCouponV2.COMPANION, + orderLimit = sql"""order by reward_round limit ${sqlLimit(limit)}""", + ), + "listRewardCouponsV2", + ) + limited = applyLimit("listRewardCouponsV2", limit, result) + } yield limited.map( + assignedContractFromRow(splice.amulet.RewardCouponV2.COMPANION)(_) + ) + + override def listDryRunRewardAccountingContractsByRounds(rounds: Seq[Long])(implicit + tc: TraceContext + ): Future[ + ( + Seq[AssignedContract[ + splice.amulet.rewardaccountingv2.CalculateRewardsV2.ContractId, + splice.amulet.rewardaccountingv2.CalculateRewardsV2, + ]], + Seq[AssignedContract[ + splice.amulet.rewardaccountingv2.ProcessRewardsV2.ContractId, + splice.amulet.rewardaccountingv2.ProcessRewardsV2, + ]], + ) + ] = { + if (rounds.isEmpty) + Future.successful((Seq.empty, Seq.empty)) + else { + val roundsClause = inClause("mining_round", rounds) + val calculateRewardsF = storage + .query( + selectFromAcsTableWithState( + DsoTables.acsTableName, + acsStoreId, + domainMigrationId, + splice.amulet.rewardaccountingv2.CalculateRewardsV2.COMPANION, + additionalWhere = (sql" and " ++ roundsClause).toActionBuilder, + ), + "listDryRunCalculateRewardsV2ByRounds", + ) + .map( + _.map( + assignedContractFromRow(splice.amulet.rewardaccountingv2.CalculateRewardsV2.COMPANION)( + _ + ) + ).filter(_.payload.dryRun) + ) + val processRewardsF = storage + .query( + selectFromAcsTableWithState( + DsoTables.acsTableName, + acsStoreId, + domainMigrationId, + splice.amulet.rewardaccountingv2.ProcessRewardsV2.COMPANION, + additionalWhere = (sql" and " ++ roundsClause).toActionBuilder, + ), + "listDryRunProcessRewardsV2ByRounds", + ) + .map( + _.map( + assignedContractFromRow(splice.amulet.rewardaccountingv2.ProcessRewardsV2.COMPANION)(_) + ).filter(_.payload.dryRun) + ) + for { + calculateRewards <- calculateRewardsF + processRewards <- processRewardsF + } yield (calculateRewards, processRewards) + } + } + override def lookupConfirmationByActionWithOffset( confirmer: PartyId, action: ActionRequiringConfirmation, diff --git a/test-full-class-names-sim-time.log b/test-full-class-names-sim-time.log index a58e539232..e05e384aec 100644 --- a/test-full-class-names-sim-time.log +++ b/test-full-class-names-sim-time.log @@ -14,6 +14,8 @@ org.lfdecentralizedtrust.splice.integration.tests.TimeBasedTreasuryIntegrationTe org.lfdecentralizedtrust.splice.integration.tests.TimeBasedTreasuryIntegrationTestWithoutMerging org.lfdecentralizedtrust.splice.integration.tests.TokenStandardCliTestDataTimeBasedIntegrationTest org.lfdecentralizedtrust.splice.integration.tests.TokenStandardMetadataTimeBasedIntegrationTest +org.lfdecentralizedtrust.splice.integration.tests.TrafficBasedRewardsDryRunTimeBasedIntegrationTest +org.lfdecentralizedtrust.splice.integration.tests.TrafficBasedRewardsSvAppTimeBasedIntegrationTest org.lfdecentralizedtrust.splice.integration.tests.TrafficBasedRewardsTimeBasedIntegrationTest org.lfdecentralizedtrust.splice.integration.tests.ValidatorFaucetCapZeroTimeBasedIntegrationTest org.lfdecentralizedtrust.splice.integration.tests.ValidatorLicenseMetadataTimeBasedIntegrationTest