diff --git a/ocp/common/mint.go b/ocp/common/mint.go index 964d1ab..cb1c44a 100644 --- a/ocp/common/mint.go +++ b/ocp/common/mint.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "errors" + "sync" + "time" commonpb "github.com/code-payments/ocp-protobuf-api/generated/go/common/v1" "github.com/code-payments/ocp-server/ocp/config" @@ -22,22 +24,16 @@ var ( ErrUnsupportedMint = errors.New("unsupported mint") - badBoysMintAccount, _ = NewAccountFromPublicKeyString(config.BadBoysMintPublicKey) - bluebucksMintAccount, _ = NewAccountFromPublicKeyString(config.BluebucksMintPublicKey) - bitsMintAccount, _ = NewAccountFromPublicKeyString(config.BitsMintPublicKey) - bogeyMintAccount, _ = NewAccountFromPublicKeyString(config.BogeyMintPublicKey) - floatMintAccount, _ = NewAccountFromPublicKeyString(config.FloatMintPublicKey) - jeffyMintAccount, _ = NewAccountFromPublicKeyString(config.JeffyMintPublicKey) - lightspeedMintAccount, _ = NewAccountFromPublicKeyString(config.LightspeedMintPublicKey) - linksMintAccount, _ = NewAccountFromPublicKeyString(config.LinksMintPublicKey) - marketCoinMintAccount, _ = NewAccountFromPublicKeyString(config.MarketCoinMintPublicKey) - moonyMintAccount, _ = NewAccountFromPublicKeyString(config.MoonyMintPublicKey) - teddiesMintAccount, _ = NewAccountFromPublicKeyString(config.TeddiesMintPublicKey) - testMintAccount, _ = NewAccountFromPublicKeyString(config.TestMintPublicKey) - toshiMintAccount, _ = NewAccountFromPublicKeyString(config.ToshiMintPublicKey) - xpMintAccount, _ = NewAccountFromPublicKeyString(config.XpMintPublicKey) + supportedMintCacheMu sync.RWMutex + supportedMintCache = make(map[string]supportedMintCacheEntry) + supportedMintCacheTTL = 1 * time.Minute ) +type supportedMintCacheEntry struct { + isSupported bool + cachedAt time.Time +} + func GetBackwardsCompatMint(protoMint *commonpb.SolanaAccountId) (*Account, error) { if protoMint == nil { return CoreMintAccount, nil @@ -75,6 +71,31 @@ func GetMintQuarksPerUnit(mint *Account) uint64 { } func IsSupportedMint(ctx context.Context, data ocp_data.Provider, mintAccount *Account) (bool, error) { + mintAddress := mintAccount.PublicKey().ToBase58() + + supportedMintCacheMu.RLock() + entry, ok := supportedMintCache[mintAddress] + supportedMintCacheMu.RUnlock() + if ok && time.Since(entry.cachedAt) < supportedMintCacheTTL { + return entry.isSupported, nil + } + + isSupported, err := isSupportedMint(ctx, data, mintAccount) + if err != nil { + return false, err + } + + supportedMintCacheMu.Lock() + supportedMintCache[mintAddress] = supportedMintCacheEntry{ + isSupported: isSupported, + cachedAt: time.Now(), + } + supportedMintCacheMu.Unlock() + + return isSupported, nil +} + +func isSupportedMint(ctx context.Context, data ocp_data.Provider, mintAccount *Account) (bool, error) { if !IsCoreMint(mintAccount) && !IsCoreMintUsdStableCoin() { return false, nil } diff --git a/ocp/common/vm.go b/ocp/common/vm.go index 05b23bf..7baef16 100644 --- a/ocp/common/vm.go +++ b/ocp/common/vm.go @@ -2,72 +2,19 @@ package common import ( "context" + "sync" "github.com/code-payments/ocp-server/ocp/config" ocp_data "github.com/code-payments/ocp-server/ocp/data" + vm_metadata "github.com/code-payments/ocp-server/ocp/data/vm/metadata" ) var ( CoreMintVmAccount, _ = NewAccountFromPublicKeyString(config.CoreMintVmAccountPublicKey) CoreMintVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.CoreMintVmOmnibusPublicKey) - // todo: DB store to track VM per mint - - badBoysAuthority, _ = NewAccountFromPublicKeyString(config.BadBoysAuthorityPublicKey) - badBoysVmAccount, _ = NewAccountFromPublicKeyString(config.BadBoysVmAccountPublicKey) - badBoysVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.BadBoysVmOmnibusPublicKey) - - bluebucksAuthority, _ = NewAccountFromPublicKeyString(config.BluebucksAuthorityPublicKey) - bluebucksVmAccount, _ = NewAccountFromPublicKeyString(config.BluebucksVmAccountPublicKey) - bluebucksVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.BluebucksVmOmnibusPublicKey) - - bitsAuthority, _ = NewAccountFromPublicKeyString(config.BitsAuthorityPublicKey) - bitsVmAccount, _ = NewAccountFromPublicKeyString(config.BitsVmAccountPublicKey) - bitsVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.BitsVmOmnibusPublicKey) - - bogeyAuthority, _ = NewAccountFromPublicKeyString(config.BogeyAuthorityPublicKey) - bogeyVmAccount, _ = NewAccountFromPublicKeyString(config.BogeyVmAccountPublicKey) - bogeyVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.BogeyVmOmnibusPublicKey) - - floatAuthority, _ = NewAccountFromPublicKeyString(config.FloatAuthorityPublicKey) - floatVmAccount, _ = NewAccountFromPublicKeyString(config.FloatVmAccountPublicKey) - floatVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.FloatVmOmnibusPublicKey) - - jeffyAuthority, _ = NewAccountFromPublicKeyString(config.JeffyAuthorityPublicKey) - jeffyVmAccount, _ = NewAccountFromPublicKeyString(config.JeffyVmAccountPublicKey) - jeffyVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.JeffyVmOmnibusPublicKey) - - lightspeedAuthority, _ = NewAccountFromPublicKeyString(config.LightspeedAuthorityPublicKey) - lightspeedVmAccount, _ = NewAccountFromPublicKeyString(config.LightspeedVmAccountPublicKey) - lightspeedVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.LightspeedVmOmnibusPublicKey) - - linksAuthority, _ = NewAccountFromPublicKeyString(config.LinksAuthorityPublicKey) - linksVmAccount, _ = NewAccountFromPublicKeyString(config.LinksVmAccountPublicKey) - linksVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.LinksVmOmnibusPublicKey) - - marketCoinAuthority, _ = NewAccountFromPublicKeyString(config.MarketCoinAuthorityPublicKey) - marketCoinVmAccount, _ = NewAccountFromPublicKeyString(config.MarketCoinVmAccountPublicKey) - marketCoinVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.MarketCoinVmOmnibusPublicKey) - - moonyAuthority, _ = NewAccountFromPublicKeyString(config.MoonyAuthorityPublicKey) - moonyVmAccount, _ = NewAccountFromPublicKeyString(config.MoonyVmAccountPublicKey) - moonyVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.MoonyVmOmnibusPublicKey) - - teddiesAuthority, _ = NewAccountFromPublicKeyString(config.TeddiesAuthorityPublicKey) - teddiesVmAccount, _ = NewAccountFromPublicKeyString(config.TeddiesVmAccountPublicKey) - teddiesVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.TeddiesVmOmnibusPublicKey) - - testAuthority, _ = NewAccountFromPublicKeyString(config.TestAuthorityPublicKey) - testVmAccount, _ = NewAccountFromPublicKeyString(config.TestVmAccountPublicKey) - testVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.TestVmOmnibusPublicKey) - - toshiAuthority, _ = NewAccountFromPublicKeyString(config.ToshiAuthorityPublicKey) - toshiVmAccount, _ = NewAccountFromPublicKeyString(config.ToshiVmAccountPublicKey) - toshiVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.ToshiVmOmnibusPublicKey) - - xpAuthority, _ = NewAccountFromPublicKeyString(config.XpAuthorityPublicKey) - xpVmAccount, _ = NewAccountFromPublicKeyString(config.XpVmAccountPublicKey) - xpVmOmnibusAccount, _ = NewAccountFromPublicKeyString(config.XpVmOmnibusPublicKey) + vmConfigCacheMu sync.RWMutex + vmConfigCache = make(map[string]*VmConfig) ) type VmConfig struct { @@ -78,285 +25,61 @@ type VmConfig struct { } func GetVmConfigForMint(ctx context.Context, data ocp_data.Provider, mintAccount *Account) (*VmConfig, error) { - if !IsCoreMint(mintAccount) && !IsCoreMintUsdStableCoin() { - return nil, ErrUnsupportedMint - } - - switch mintAccount.PublicKey().ToBase58() { - case CoreMintAccount.PublicKey().ToBase58(): + if IsCoreMint(mintAccount) { return &VmConfig{ Authority: GetSubsidizer(), Vm: CoreMintVmAccount, Omnibus: CoreMintVmOmnibusAccount, Mint: CoreMintAccount, }, nil - case badBoysMintAccount.PublicKey().ToBase58(): - if badBoysAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, badBoysAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } - - badBoysAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } - - return &VmConfig{ - Authority: badBoysAuthority, - Vm: badBoysVmAccount, - Omnibus: badBoysVmOmnibusAccount, - Mint: mintAccount, - }, nil - case bluebucksMintAccount.PublicKey().ToBase58(): - if bluebucksAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, bluebucksAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } - - bluebucksAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } - - return &VmConfig{ - Authority: bluebucksAuthority, - Vm: bluebucksVmAccount, - Omnibus: bluebucksVmOmnibusAccount, - Mint: mintAccount, - }, nil - case bitsMintAccount.PublicKey().ToBase58(): - if bitsAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, bitsAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } - - bitsAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } - - return &VmConfig{ - Authority: bitsAuthority, - Vm: bitsVmAccount, - Omnibus: bitsVmOmnibusAccount, - Mint: mintAccount, - }, nil - case bogeyMintAccount.PublicKey().ToBase58(): - if bogeyAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, bogeyAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } - - bogeyAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } - - return &VmConfig{ - Authority: bogeyAuthority, - Vm: bogeyVmAccount, - Omnibus: bogeyVmOmnibusAccount, - Mint: mintAccount, - }, nil - case floatMintAccount.PublicKey().ToBase58(): - if floatAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, floatAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } - - floatAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } - - return &VmConfig{ - Authority: floatAuthority, - Vm: floatVmAccount, - Omnibus: floatVmOmnibusAccount, - Mint: mintAccount, - }, nil - case jeffyMintAccount.PublicKey().ToBase58(): - if jeffyAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, jeffyAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } - - jeffyAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } - - return &VmConfig{ - Authority: jeffyAuthority, - Vm: jeffyVmAccount, - Omnibus: jeffyVmOmnibusAccount, - Mint: mintAccount, - }, nil - case lightspeedMintAccount.PublicKey().ToBase58(): - if lightspeedAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, lightspeedAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } - - lightspeedAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } - - return &VmConfig{ - Authority: lightspeedAuthority, - Vm: lightspeedVmAccount, - Omnibus: lightspeedVmOmnibusAccount, - Mint: mintAccount, - }, nil - case linksMintAccount.PublicKey().ToBase58(): - if linksAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, linksAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } - - linksAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } - - return &VmConfig{ - Authority: linksAuthority, - Vm: linksVmAccount, - Omnibus: linksVmOmnibusAccount, - Mint: mintAccount, - }, nil - case marketCoinMintAccount.PublicKey().ToBase58(): - if marketCoinAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, marketCoinAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } - - marketCoinAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } - - return &VmConfig{ - Authority: marketCoinAuthority, - Vm: marketCoinVmAccount, - Omnibus: marketCoinVmOmnibusAccount, - Mint: mintAccount, - }, nil - case moonyMintAccount.PublicKey().ToBase58(): - if moonyAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, moonyAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } + } - moonyAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } + mintAddress := mintAccount.PublicKey().ToBase58() - return &VmConfig{ - Authority: moonyAuthority, - Vm: moonyVmAccount, - Omnibus: moonyVmOmnibusAccount, - Mint: mintAccount, - }, nil - case teddiesMintAccount.PublicKey().ToBase58(): - if teddiesAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, teddiesAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } + vmConfigCacheMu.RLock() + cached, ok := vmConfigCache[mintAddress] + vmConfigCacheMu.RUnlock() + if ok { + return cached, nil + } - teddiesAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } + record, err := data.GetVmMetadataByMint(ctx, mintAddress) + if err == vm_metadata.ErrNotFound { + return nil, ErrUnsupportedMint + } else if err != nil { + return nil, err + } - return &VmConfig{ - Authority: teddiesAuthority, - Vm: teddiesVmAccount, - Omnibus: teddiesVmOmnibusAccount, - Mint: mintAccount, - }, nil - case testMintAccount.PublicKey().ToBase58(): - if testAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, testAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } + vaultRecord, err := data.GetKey(ctx, record.Authority) + if err != nil { + return nil, err + } - testAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } + authority, err := NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) + if err != nil { + return nil, err + } - return &VmConfig{ - Authority: testAuthority, - Vm: testVmAccount, - Omnibus: testVmOmnibusAccount, - Mint: mintAccount, - }, nil - case toshiMintAccount.PublicKey().ToBase58(): - if toshiAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, toshiAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } + vmAccount, err := NewAccountFromPublicKeyString(record.Vm) + if err != nil { + return nil, err + } - toshiAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } + omnibusAccount, err := NewAccountFromPublicKeyString(record.Omnibus) + if err != nil { + return nil, err + } - return &VmConfig{ - Authority: toshiAuthority, - Vm: toshiVmAccount, - Omnibus: toshiVmOmnibusAccount, - Mint: mintAccount, - }, nil - case xpMintAccount.PublicKey().ToBase58(): - if xpAuthority.PrivateKey() == nil { - vaultRecord, err := data.GetKey(ctx, xpAuthority.PublicKey().ToBase58()) - if err != nil { - return nil, err - } + vmConfig := &VmConfig{ + Authority: authority, + Vm: vmAccount, + Omnibus: omnibusAccount, + Mint: mintAccount, + } - xpAuthority, err = NewAccountFromPrivateKeyString(vaultRecord.PrivateKey) - if err != nil { - return nil, err - } - } + vmConfigCacheMu.Lock() + vmConfigCache[mintAddress] = vmConfig + vmConfigCacheMu.Unlock() - return &VmConfig{ - Authority: xpAuthority, - Vm: xpVmAccount, - Omnibus: xpVmOmnibusAccount, - Mint: mintAccount, - }, nil - default: - return nil, ErrUnsupportedMint - } + return vmConfig, nil } diff --git a/ocp/config/config.go b/ocp/config/config.go index f7ed72c..7456153 100644 --- a/ocp/config/config.go +++ b/ocp/config/config.go @@ -22,78 +22,6 @@ const ( CoreMintVmAccountPublicKey = "JACkaKsm2Rd6TNJwH4UB7G6tHrWUATJPTgNNnRVsg4ip" CoreMintVmOmnibusPublicKey = "D8oUTXRvarxhx9cjYdFJqWAVj2rmzry58bS6JSTiQsv5" - - // todo: DB store to track VM per mint - - BadBoysMintPublicKey = "64dkhPKhdjc2xg3NLyDjC14wiXHLnGXHHUxJnqZVugJt" - BadBoysAuthorityPublicKey = "bbVCosXYRMBTbcmLsRXqHSzaUDJpSStYRqTstcpVxwx" - BadBoysVmAccountPublicKey = "9EpU4RGoe7zfJdLjLyXWLpza6RZHqN6QV73z1gfNKCJa" - BadBoysVmOmnibusPublicKey = "7pQkiRaXdMW3bLJK6D1huP175AF5wD6gXNPBsvQ8bJZ3" - - BluebucksMintPublicKey = "4MrV36dgdC9kjkC3ibbe1LMUijXde27iJAjJByB7naSN" - BluebucksAuthorityPublicKey = "b1ue4ZAdQeQXw6fi8Gkn9vtmFsGmgFgARiAEwprrkBY" - BluebucksVmAccountPublicKey = "ED1AYFh6bfYCynvfYCiRHKrPSWvw4PfzprM7o3R954Bu" - BluebucksVmOmnibusPublicKey = "7EPGFrgYPNbVtHg5ovnhThSjiUST39R8jRegriadgehk" - - BitsMintPublicKey = "A3e8dzb1y4gqGP2cnCS3UU8dm5YNrFpZBpjjdoZdtfnB" - BitsAuthorityPublicKey = "bit8rCyAcstm1ZiwLq22FHdz8wAigKU46hmtrigrGub" - BitsVmAccountPublicKey = "5zDzL3CHb3wFxs7xnkxmWMGMR1gjNtYgV46PTBgSHmsJ" - BitsVmOmnibusPublicKey = "872F9FFXnSMPahkYjB8uF1pz1CrgMrmbfKQMGNCdVBvc" - - BogeyMintPublicKey = "3AhBb1fpDTp1F9hPkZjRPDejXBM9S5vfpVdvn66vLYnT" - BogeyAuthorityPublicKey = "bgy6SZdBCjVbftagjp4Vv5rvajTHmmh4ccsZbD9F3m3" - BogeyVmAccountPublicKey = "2KWN2ame8kZKfFc6iyNAWNZsTvByfo3Mrzue6NasNC5B" - BogeyVmOmnibusPublicKey = "H52LuBfwtBNZ3qrX8gqteF2mLx3EscdeUiryUC4fFFfR" - - FloatMintPublicKey = "5APqK9YUZupKt7rRUrpYy6WV3RPuxA71ZtKJffDUMdPP" - FloatAuthorityPublicKey = "f1t57oRwzBvtcqfAkjGKsEAbXymDSbL7pp3iRmDw9iz" - FloatVmAccountPublicKey = "5PKurz63VfSozHbkzwdudpDeA7mUuif8XTfNP932jRYy" - FloatVmOmnibusPublicKey = "98W5qFv7jiFvNA5c72kj7ku2hhFgaAEUfBXcLw8SK1yY" - - JeffyMintPublicKey = "54ggcQ23uen5b9QXMAns99MQNTKn7iyzq4wvCW6e8r25" - JeffyAuthorityPublicKey = "jfy1btcfsjSn2WCqLVaxiEjp4zgmemGyRsdCPbPwnZV" - JeffyVmAccountPublicKey = "8rwgUXsLSq1Pn51UJs4NGVA1cTkemwBgKyWPfFYgnm3B" - JeffyVmOmnibusPublicKey = "9XiqBPYSG2cBwpb8MqJeuFmLaQaAAr6gwikyBrPZDQ8R" - - LightspeedMintPublicKey = "D4Srn1tRU69JWvPeNGre1AiF2swFhw6bpUNyH18Dfag6" - LightspeedAuthorityPublicKey = "1spdvA8EGbo56cnGUDzt2FjWCwEWF5hY3jw6yz8BcWw" - LightspeedVmAccountPublicKey = "GkYnW2md9TdWV7ZmSnShaFGg3S1e62UmJMDtnYXG4gpe" - LightspeedVmOmnibusPublicKey = "5sBqTvFrfu6hCJPrxEcQ2PXvzLTfy4UCJjYxDoJcaP19" - - LinksMintPublicKey = "CPBcprSXXYerwh6LE6o3yc6NJqeZvm87GRSySgjTicE5" - LinksAuthorityPublicKey = "1nkSmZ9VCLBiQqSuMdmv9SXQ2y5wTJVnBW5ZsESjvzd" - LinksVmAccountPublicKey = "2s18j1EyBjRTcie48Hn6wyiZVCD9NrmkqH1JAKchoKT2" - LinksVmOmnibusPublicKey = "uWfyTHepnjF5KeJSTSWG7nUYsYrcFxV2URUh7Hh8htb" - - MarketCoinMintPublicKey = "311m6Sb1814PfAxkEcqq6MNdBiVZLr8VWuAWDSC72euW" - MarketCoinAuthorityPublicKey = "mrkthxqn4rKCZw1pDxhJTpCrvUBtK9m6aLvCeY3PyWR" - MarketCoinVmAccountPublicKey = "AeJ6x6mtwdjUM2AqppQ7zG6m89sx5c8qegfFPzjmD2x6" - MarketCoinVmOmnibusPublicKey = "CQMdG8AKtLP9JCU4GAzKunKnsfpXVFdMcSwQd3Hd5oZg" - - MoonyMintPublicKey = "4muAfB6m1P7C3Znad1VUsoqYFwvGQRkWGpJ3A4vupxz6" - MoonyAuthorityPublicKey = "mnyXRKy3pz8WXG4XQC6inaKNbzrqq2xRJfGv5Lq9qgR" - MoonyVmAccountPublicKey = "FCZitKZWuMfrZ3Krcp67egUwPzFoWM9Xo913S17qdjWV" - MoonyVmOmnibusPublicKey = "FvhtPHhARVkRJKfkM96Wvo2WEwjtiweyLAdpAroqT646" - - TeddiesMintPublicKey = "55TKRUaaxXoSNFMzchMYUwC5CQmbMko5ZQECpH5Uqajm" - TeddiesAuthorityPublicKey = "tedA5B3L5b6tL88ZyWdpxSbJV6PMxTg6ArtaKJcQ14J" - TeddiesVmAccountPublicKey = "2y5sWz6GfqqnvpRLJNbchg7hLmzC8khvjw1SvUBsuYKP" - TeddiesVmOmnibusPublicKey = "E1R3TWPqFFWHMxMURprpAXSeVzF93tJn6Pn8x165pcVe" - - TestMintPublicKey = "2psDP3LAvbNzfvBYNMs9ieMpsD8PVzyQsKNfZrjEKoDN" - TestAuthorityPublicKey = "tstBCPtzNDycsM7rAd2CxzdgKh1gWrMULzrDHuGVXAW" - TestVmAccountPublicKey = "CYtzE732LQ7YP9sngSPUQhCCTQ8GY14dmjWX2hXP16Np" - TestVmOmnibusPublicKey = "65Ghp2FFPmdB3FLkC2NCKkBQ3RuXgmjZ56DDGZLVAZZa" - - ToshiMintPublicKey = "EfBCnuhon2uWSRB3WFqmWRQumiHvhZXYbUZXUcHDHNyz" - ToshiAuthorityPublicKey = "tshUiGXuyb4ujawe2fByjissauSdgDjuysSeGoV9Aga" - ToshiVmAccountPublicKey = "9VpRW29UXcyAZkXJpk4Pqcdz2HyprKPft6pyBKH6LGC1" - ToshiVmOmnibusPublicKey = "AmYSBrPT5ozvxjYT7Gq3B3imRHGoEoNpZ7ZDzcxJ1xKp" - - XpMintPublicKey = "6oZnhB1FPrUaDfhRCVZnbVWNKVx9wgj84vKGH7eMpzXL" - XpAuthorityPublicKey = "xpTXV7BNXwsdvCaFKfeT4h6rSnKck2Bv5iBAFFS5Uwk" - XpVmAccountPublicKey = "4qnCaQkGxCnr66cmPfpNfM2rxaaTMFSSfbY9gXZgCYdS" - XpVmOmnibusPublicKey = "5FCroyNvCXpLFkNuNQ4fw8Lk8xNGwGpwoTjzhRMbaBUo" ) var ( diff --git a/ocp/data/currency/memory/store.go b/ocp/data/currency/memory/store.go index a67d7c6..7b59de1 100644 --- a/ocp/data/currency/memory/store.go +++ b/ocp/data/currency/memory/store.go @@ -193,6 +193,22 @@ func (s *store) GetMetadata(ctx context.Context, mint string) (*currency.Metadat return nil, currency.ErrNotFound } +func (s *store) GetAllMints(ctx context.Context) ([]string, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.metadataRecords) == 0 { + return nil, currency.ErrNotFound + } + + var mints []string + for _, item := range s.metadataRecords { + mints = append(mints, item.Mint) + } + + return mints, nil +} + func (s *store) PutReserveRecord(ctx context.Context, data *currency.ReserveRecord) error { if err := data.Validate(); err != nil { return err diff --git a/ocp/data/currency/postgres/model.go b/ocp/data/currency/postgres/model.go index a04366c..9ab4344 100644 --- a/ocp/data/currency/postgres/model.go +++ b/ocp/data/currency/postgres/model.go @@ -383,6 +383,22 @@ func dbGetMetadataByMint(ctx context.Context, db *sqlx.DB, mint string) (*metada return res, pgutil.CheckNoRows(err, currency.ErrNotFound) } +func dbGetAllMints(ctx context.Context, db *sqlx.DB) ([]string, error) { + var res []string + err := db.SelectContext(ctx, &res, + `SELECT mint FROM `+metadataTableName, + ) + + if err != nil { + return nil, pgutil.CheckNoRows(err, currency.ErrNotFound) + } + if len(res) == 0 { + return nil, currency.ErrNotFound + } + + return res, nil +} + func dbGetReserveByMintAndTime(ctx context.Context, db *sqlx.DB, mint string, t time.Time, ordering q.Ordering) (*reserveModel, error) { res := &reserveModel{} err := db.GetContext(ctx, res, diff --git a/ocp/data/currency/postgres/store.go b/ocp/data/currency/postgres/store.go index d5d03eb..d6823bc 100644 --- a/ocp/data/currency/postgres/store.go +++ b/ocp/data/currency/postgres/store.go @@ -124,6 +124,10 @@ func (s *store) GetMetadata(ctx context.Context, mint string) (*currency.Metadat return fromMetadataModel(model), nil } +func (s *store) GetAllMints(ctx context.Context) ([]string, error) { + return dbGetAllMints(ctx, s.db) +} + func (s *store) PutReserveRecord(ctx context.Context, record *currency.ReserveRecord) error { model, err := toReserveModel(record) if err != nil { diff --git a/ocp/data/currency/store.go b/ocp/data/currency/store.go index 964ede2..3b8e1c9 100644 --- a/ocp/data/currency/store.go +++ b/ocp/data/currency/store.go @@ -49,6 +49,11 @@ type Store interface { // GetMetadata gets currency creator mint metadata by the mint address GetMetadata(ctx context.Context, mint string) (*MetadataRecord, error) + // GetAllMints returns the public keys of all currency creator mints + // + // ErrNotFound is returned if no mints exist + GetAllMints(ctx context.Context) ([]string, error) + // PutReserveRecord puts a currency creator mint reserve records into the store. PutReserveRecord(ctx context.Context, record *ReserveRecord) error diff --git a/ocp/data/currency/tests/tests.go b/ocp/data/currency/tests/tests.go index 342a547..1e2c53a 100644 --- a/ocp/data/currency/tests/tests.go +++ b/ocp/data/currency/tests/tests.go @@ -18,6 +18,7 @@ func RunTests(t *testing.T, s currency.Store, teardown func()) { testExchangeRateRoundTrip, testGetExchangeRatesInRange, testMetadataRoundTrip, + testGetAllMints, testReserveRoundTrip, testGetReservesInRange, } { @@ -188,6 +189,70 @@ func testMetadataRoundTrip(t *testing.T, s currency.Store) { assertEquivalentMetadataRecords(t, cloned, actual) } +func testGetAllMints(t *testing.T, s currency.Store) { + // No mints should exist initially + mints, err := s.GetAllMints(context.Background()) + assert.Nil(t, mints) + assert.Equal(t, currency.ErrNotFound, err) + + // Insert two metadata records with different mints + record1 := ¤cy.MetadataRecord{ + Name: "Currency1", + Symbol: "C1", + Description: "First test currency", + ImageUrl: "https://example.com/c1.png", + BillColors: []string{"#000000"}, + SocialLinks: []currency.SocialLink{{Type: currency.SocialLinkTypeWebsite, Value: "https://example.com"}}, + + Seed: "seed1", + Authority: "auth1", + + Mint: "mint1111111111111111111111111111111111111111111", + MintBump: 255, + Decimals: currencycreator.DefaultMintDecimals, + + CurrencyConfig: "config1111111111111111111111111111111111111111", + CurrencyConfigBump: 255, + + LiquidityPool: "pool111111111111111111111111111111111111111111", + LiquidityPoolBump: 255, + + VaultMint: "vmint11111111111111111111111111111111111111111", + VaultMintBump: 255, + + VaultCore: "vcore11111111111111111111111111111111111111111", + VaultCoreBump: 255, + + SellFeeBps: currencycreator.DefaultSellFeeBps, + + Alt: "alt111111111111111111111111111111111111111111111", + + CreatedBy: "creator1", + CreatedAt: time.Now(), + } + + record2 := record1.Clone() + record2.Name = "Currency2" + record2.Symbol = "C2" + record2.Description = "Second test currency" + record2.Seed = "seed2" + record2.Mint = "mint2222222222222222222222222222222222222222222" + record2.CurrencyConfig = "config2222222222222222222222222222222222222222" + record2.LiquidityPool = "pool222222222222222222222222222222222222222222" + record2.VaultMint = "vmint22222222222222222222222222222222222222222" + record2.VaultCore = "vcore22222222222222222222222222222222222222222" + record2.Alt = "alt222222222222222222222222222222222222222222222" + + require.NoError(t, s.PutMetadata(context.Background(), record1)) + require.NoError(t, s.PutMetadata(context.Background(), record2)) + + mints, err = s.GetAllMints(context.Background()) + require.NoError(t, err) + assert.Len(t, mints, 2) + assert.Contains(t, mints, record1.Mint) + assert.Contains(t, mints, record2.Mint) +} + func testReserveRoundTrip(t *testing.T, s currency.Store) { now := time.Date(2021, 01, 29, 13, 0, 5, 0, time.UTC) diff --git a/ocp/data/internal.go b/ocp/data/internal.go index 0851afb..ea80438 100644 --- a/ocp/data/internal.go +++ b/ocp/data/internal.go @@ -137,6 +137,7 @@ type DatabaseData interface { ImportExchangeRates(ctx context.Context, record *currency.MultiRateRecord) error PutCurrencyMetadata(ctx context.Context, record *currency.MetadataRecord) error GetCurrencyMetadata(ctx context.Context, mint string) (*currency.MetadataRecord, error) + GetAllCurrencyMints(ctx context.Context) ([]string, error) PutCurrencyReserve(ctx context.Context, record *currency.ReserveRecord) error GetCurrencyReserveAtTime(ctx context.Context, mint string, t time.Time) (*currency.ReserveRecord, error) GetCurrencyReserveHistory(ctx context.Context, mint string, opts ...query.Option) ([]*currency.ReserveRecord, error) @@ -190,7 +191,8 @@ type DatabaseData interface { GetNonceCount(ctx context.Context, env nonce.Environment, instance string) (uint64, error) GetNonceCountByState(ctx context.Context, env nonce.Environment, instance string, state nonce.State) (uint64, error) GetNonceCountByStateAndPurpose(ctx context.Context, env nonce.Environment, instance string, state nonce.State, purpose nonce.Purpose) (uint64, error) - GetAllNonceByState(ctx context.Context, env nonce.Environment, instance string, state nonce.State, opts ...query.Option) ([]*nonce.Record, error) + GetAllNoncesByEnvironmentInstanceAndState(ctx context.Context, env nonce.Environment, instance string, state nonce.State, opts ...query.Option) ([]*nonce.Record, error) + GetAllNoncesByEnvironmentAndState(ctx context.Context, env nonce.Environment, state nonce.State, opts ...query.Option) ([]*nonce.Record, error) BatchClaimAvailableNoncesByPurpose(ctx context.Context, env nonce.Environment, instance string, purpose nonce.Purpose, limit int, nodeID string, minExpireAt, maxExpireAt time.Time) ([]*nonce.Record, error) SaveNonce(ctx context.Context, record *nonce.Record) error @@ -522,6 +524,9 @@ func (dp *DatabaseProvider) PutCurrencyMetadata(ctx context.Context, record *cur func (dp *DatabaseProvider) GetCurrencyMetadata(ctx context.Context, mint string) (*currency.MetadataRecord, error) { return dp.currencies.GetMetadata(ctx, mint) } +func (dp *DatabaseProvider) GetAllCurrencyMints(ctx context.Context) ([]string, error) { + return dp.currencies.GetAllMints(ctx) +} func (dp *DatabaseProvider) PutCurrencyReserve(ctx context.Context, record *currency.ReserveRecord) error { return dp.currencies.PutReserveRecord(ctx, record) } @@ -681,13 +686,21 @@ func (dp *DatabaseProvider) GetNonceCountByState(ctx context.Context, env nonce. func (dp *DatabaseProvider) GetNonceCountByStateAndPurpose(ctx context.Context, env nonce.Environment, instance string, state nonce.State, purpose nonce.Purpose) (uint64, error) { return dp.nonces.CountByStateAndPurpose(ctx, env, instance, state, purpose) } -func (dp *DatabaseProvider) GetAllNonceByState(ctx context.Context, env nonce.Environment, instance string, state nonce.State, opts ...query.Option) ([]*nonce.Record, error) { +func (dp *DatabaseProvider) GetAllNoncesByEnvironmentInstanceAndState(ctx context.Context, env nonce.Environment, instance string, state nonce.State, opts ...query.Option) ([]*nonce.Record, error) { + req, err := query.DefaultPaginationHandler(opts...) + if err != nil { + return nil, err + } + + return dp.nonces.GetAllByEnvironmentInstanceAndState(ctx, env, instance, state, req.Cursor, req.Limit, req.SortBy) +} +func (dp *DatabaseProvider) GetAllNoncesByEnvironmentAndState(ctx context.Context, env nonce.Environment, state nonce.State, opts ...query.Option) ([]*nonce.Record, error) { req, err := query.DefaultPaginationHandler(opts...) if err != nil { return nil, err } - return dp.nonces.GetAllByState(ctx, env, instance, state, req.Cursor, req.Limit, req.SortBy) + return dp.nonces.GetAllByEnvironmentAndState(ctx, env, state, req.Cursor, req.Limit, req.SortBy) } func (dp *DatabaseProvider) BatchClaimAvailableNoncesByPurpose(ctx context.Context, env nonce.Environment, instance string, purpose nonce.Purpose, limit int, nodeID string, minExpireAt, maxExpireAt time.Time) ([]*nonce.Record, error) { return dp.nonces.BatchClaimAvailableByPurpose(ctx, env, instance, purpose, limit, nodeID, minExpireAt, maxExpireAt) diff --git a/ocp/data/nonce/memory/store.go b/ocp/data/nonce/memory/store.go index 7ad5406..62d38ff 100644 --- a/ocp/data/nonce/memory/store.go +++ b/ocp/data/nonce/memory/store.go @@ -85,6 +85,16 @@ func (s *store) findByStateAndPurpose(env nonce.Environment, instance string, st return res } +func (s *store) findByEnvironmentAndState(env nonce.Environment, state nonce.State) []*nonce.Record { + res := make([]*nonce.Record, 0) + for _, item := range s.records { + if item.Environment == env && item.State == state { + res = append(res, item) + } + } + return res +} + func (s *store) findByEnvironmentInstance(env nonce.Environment, instance string) []*nonce.Record { res := make([]*nonce.Record, 0) for _, item := range s.records { @@ -215,7 +225,7 @@ func (s *store) Get(ctx context.Context, address string) (*nonce.Record, error) return nil, nonce.ErrNonceNotFound } -func (s *store) GetAllByState(ctx context.Context, env nonce.Environment, instance string, state nonce.State, cursor query.Cursor, limit uint64, direction query.Ordering) ([]*nonce.Record, error) { +func (s *store) GetAllByEnvironmentInstanceAndState(ctx context.Context, env nonce.Environment, instance string, state nonce.State, cursor query.Cursor, limit uint64, direction query.Ordering) ([]*nonce.Record, error) { s.mu.Lock() defer s.mu.Unlock() @@ -232,6 +242,23 @@ func (s *store) GetAllByState(ctx context.Context, env nonce.Environment, instan return nil, nonce.ErrNonceNotFound } +func (s *store) GetAllByEnvironmentAndState(ctx context.Context, env nonce.Environment, state nonce.State, cursor query.Cursor, limit uint64, direction query.Ordering) ([]*nonce.Record, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if items := s.findByEnvironmentAndState(env, state); len(items) > 0 { + res := s.filter(items, cursor, limit, direction) + + if len(res) == 0 { + return nil, nonce.ErrNonceNotFound + } + + return clonedRecords(res), nil + } + + return nil, nonce.ErrNonceNotFound +} + func (s *store) BatchClaimAvailableByPurpose(ctx context.Context, env nonce.Environment, instance string, purpose nonce.Purpose, limit int, nodeID string, minExpireAt, maxExpireAt time.Time) ([]*nonce.Record, error) { s.mu.Lock() defer s.mu.Unlock() diff --git a/ocp/data/nonce/postgres/model.go b/ocp/data/nonce/postgres/model.go index 042e157..cd1c344 100644 --- a/ocp/data/nonce/postgres/model.go +++ b/ocp/data/nonce/postgres/model.go @@ -172,7 +172,7 @@ func dbGetNonce(ctx context.Context, db *sqlx.DB, address string) (*nonceModel, return res, nil } -func dbGetAllByState(ctx context.Context, db *sqlx.DB, env nonce.Environment, instance string, state nonce.State, cursor q.Cursor, limit uint64, direction q.Ordering) ([]*nonceModel, error) { +func dbGetAllByEnvironmentInstanceAndState(ctx context.Context, db *sqlx.DB, env nonce.Environment, instance string, state nonce.State, cursor q.Cursor, limit uint64, direction q.Ordering) ([]*nonceModel, error) { res := []*nonceModel{} // Signature null check is required because some legacy records didn't have this @@ -201,6 +201,30 @@ func dbGetAllByState(ctx context.Context, db *sqlx.DB, env nonce.Environment, in return res, nil } +func dbGetAllByEnvironmentAndState(ctx context.Context, db *sqlx.DB, env nonce.Environment, state nonce.State, cursor q.Cursor, limit uint64, direction q.Ordering) ([]*nonceModel, error) { + res := []*nonceModel{} + + query := `SELECT + id, address, authority, blockhash, environment, environment_instance, purpose, state, signature, claim_node_id, claim_expires_at, version + FROM ` + nonceTableName + ` + WHERE environment = $1 AND state = $2 AND signature IS NOT NULL + ` + + opts := []interface{}{env, state} + query, opts = q.PaginateQuery(query, opts, cursor, limit, direction) + + err := db.SelectContext(ctx, &res, query, opts...) + if err != nil { + return nil, pgutil.CheckNoRows(err, nonce.ErrNonceNotFound) + } + + if len(res) == 0 { + return nil, nonce.ErrNonceNotFound + } + + return res, nil +} + func dbBatchClaimAvailableByPurpose( ctx context.Context, db *sqlx.DB, diff --git a/ocp/data/nonce/postgres/store.go b/ocp/data/nonce/postgres/store.go index 0055e46..5420ab6 100644 --- a/ocp/data/nonce/postgres/store.go +++ b/ocp/data/nonce/postgres/store.go @@ -5,8 +5,8 @@ import ( "database/sql" "time" - "github.com/code-payments/ocp-server/ocp/data/nonce" "github.com/code-payments/ocp-server/database/query" + "github.com/code-payments/ocp-server/ocp/data/nonce" "github.com/jmoiron/sqlx" ) @@ -58,8 +58,22 @@ func (s *store) Get(ctx context.Context, address string) (*nonce.Record, error) return fromNonceModel(obj), nil } -func (s *store) GetAllByState(ctx context.Context, env nonce.Environment, instance string, state nonce.State, cursor query.Cursor, limit uint64, direction query.Ordering) ([]*nonce.Record, error) { - models, err := dbGetAllByState(ctx, s.db, env, instance, state, cursor, limit, direction) +func (s *store) GetAllByEnvironmentInstanceAndState(ctx context.Context, env nonce.Environment, instance string, state nonce.State, cursor query.Cursor, limit uint64, direction query.Ordering) ([]*nonce.Record, error) { + models, err := dbGetAllByEnvironmentInstanceAndState(ctx, s.db, env, instance, state, cursor, limit, direction) + if err != nil { + return nil, err + } + + nonces := make([]*nonce.Record, len(models)) + for i, model := range models { + nonces[i] = fromNonceModel(model) + } + + return nonces, nil +} + +func (s *store) GetAllByEnvironmentAndState(ctx context.Context, env nonce.Environment, state nonce.State, cursor query.Cursor, limit uint64, direction query.Ordering) ([]*nonce.Record, error) { + models, err := dbGetAllByEnvironmentAndState(ctx, s.db, env, state, cursor, limit, direction) if err != nil { return nil, err } diff --git a/ocp/data/nonce/store.go b/ocp/data/nonce/store.go index 380ca7b..a0b0d17 100644 --- a/ocp/data/nonce/store.go +++ b/ocp/data/nonce/store.go @@ -33,11 +33,17 @@ type Store interface { // Returns ErrNotFound if no record is found. Get(ctx context.Context, address string) (*Record, error) - // GetAllByState returns nonce records in the store for a given confirmation state - // within an environment intance. + // GetAllByEnvironmentInstanceAndState returns nonce records in the store for a + // given confirmation state within an environment instance. // // Returns ErrNotFound if no records are found. - GetAllByState(ctx context.Context, env Environment, instance string, state State, cursor query.Cursor, limit uint64, direction query.Ordering) ([]*Record, error) + GetAllByEnvironmentInstanceAndState(ctx context.Context, env Environment, instance string, state State, cursor query.Cursor, limit uint64, direction query.Ordering) ([]*Record, error) + + // GetAllByEnvironmentAndState returns nonce records in the store for a given + // environment and state across all instances. + // + // Returns ErrNotFound if no records are found. + GetAllByEnvironmentAndState(ctx context.Context, env Environment, state State, cursor query.Cursor, limit uint64, direction query.Ordering) ([]*Record, error) // BatchClaimAvailableByPurpose batch claims up to the specified limit. // diff --git a/ocp/data/nonce/tests/tests.go b/ocp/data/nonce/tests/tests.go index 9b79fe1..7f1f4e0 100644 --- a/ocp/data/nonce/tests/tests.go +++ b/ocp/data/nonce/tests/tests.go @@ -22,7 +22,8 @@ func RunTests(t *testing.T, s nonce.Store, teardown func()) { testRoundTrip, testUpdateHappyPath, testUpdateStaleRecord, - testGetAllByState, + testGetAllByEnvironmentInstanceAndState, + testGetAllByEnvironmentAndState, testGetCount, testBatchClaimAvailableByPurpose, testBatchClaimAvailableByPurposeExpirationRandomness, @@ -156,8 +157,8 @@ func testUpdateStaleRecord(t *testing.T, s nonce.Store) { }) } -func testGetAllByState(t *testing.T, s nonce.Store) { - t.Run("testGetAllByState", func(t *testing.T) { +func testGetAllByEnvironmentInstanceAndState(t *testing.T, s nonce.Store) { + t.Run("testGetAllByEnvironmentInstanceAndState", func(t *testing.T) { ctx := context.Background() expected := []nonce.Record{ @@ -179,33 +180,33 @@ func testGetAllByState(t *testing.T, s nonce.Store) { } // Simple get all by state - actual, err := s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 5, query.Ascending) + actual, err := s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 5, query.Ascending) require.NoError(t, err) assert.Equal(t, 3, len(actual)) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateUnknown, query.EmptyCursor, 5, query.Ascending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateUnknown, query.EmptyCursor, 5, query.Ascending) require.NoError(t, err) assert.Equal(t, 1, len(actual)) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateInvalid, query.EmptyCursor, 5, query.Ascending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateInvalid, query.EmptyCursor, 5, query.Ascending) require.NoError(t, err) assert.Equal(t, 2, len(actual)) // Simple get all by state (reverse) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 5, query.Descending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 5, query.Descending) require.NoError(t, err) assert.Equal(t, 3, len(actual)) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateUnknown, query.EmptyCursor, 5, query.Descending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateUnknown, query.EmptyCursor, 5, query.Descending) require.NoError(t, err) assert.Equal(t, 1, len(actual)) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateInvalid, query.EmptyCursor, 5, query.Descending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateInvalid, query.EmptyCursor, 5, query.Descending) require.NoError(t, err) assert.Equal(t, 2, len(actual)) // Check items (asc) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 5, query.Ascending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 5, query.Ascending) require.NoError(t, err) assert.Equal(t, 3, len(actual)) assert.Equal(t, "t3", actual[0].Address) @@ -213,7 +214,7 @@ func testGetAllByState(t *testing.T, s nonce.Store) { assert.Equal(t, "t5", actual[2].Address) // Check items (desc) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 5, query.Descending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 5, query.Descending) require.NoError(t, err) assert.Equal(t, 3, len(actual)) assert.Equal(t, "t5", actual[0].Address) @@ -221,21 +222,21 @@ func testGetAllByState(t *testing.T, s nonce.Store) { assert.Equal(t, "t3", actual[2].Address) // Check items (asc + limit) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 2, query.Ascending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 2, query.Ascending) require.NoError(t, err) assert.Equal(t, 2, len(actual)) assert.Equal(t, "t3", actual[0].Address) assert.Equal(t, "t4", actual[1].Address) // Check items (desc + limit) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 2, query.Descending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.EmptyCursor, 2, query.Descending) require.NoError(t, err) assert.Equal(t, 2, len(actual)) assert.Equal(t, "t5", actual[0].Address) assert.Equal(t, "t4", actual[1].Address) // Check items (asc + cursor) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.ToCursor(1), 5, query.Ascending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.ToCursor(1), 5, query.Ascending) require.NoError(t, err) assert.Equal(t, 3, len(actual)) assert.Equal(t, "t3", actual[0].Address) @@ -243,7 +244,7 @@ func testGetAllByState(t *testing.T, s nonce.Store) { assert.Equal(t, "t5", actual[2].Address) // Check items (desc + cursor) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.ToCursor(6), 5, query.Descending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.ToCursor(6), 5, query.Descending) require.NoError(t, err) assert.Equal(t, 3, len(actual)) assert.Equal(t, "t5", actual[0].Address) @@ -251,26 +252,98 @@ func testGetAllByState(t *testing.T, s nonce.Store) { assert.Equal(t, "t3", actual[2].Address) // Check items (asc + cursor) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.ToCursor(3), 5, query.Ascending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.ToCursor(3), 5, query.Ascending) require.NoError(t, err) assert.Equal(t, 2, len(actual)) assert.Equal(t, "t4", actual[0].Address) assert.Equal(t, "t5", actual[1].Address) // Check items (desc + cursor) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.ToCursor(4), 5, query.Descending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.ToCursor(4), 5, query.Descending) require.NoError(t, err) assert.Equal(t, 1, len(actual)) assert.Equal(t, "t3", actual[0].Address) // Check items (asc + cursor + limit) - actual, err = s.GetAllByState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.ToCursor(3), 1, query.Ascending) + actual, err = s.GetAllByEnvironmentInstanceAndState(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, nonce.StateReserved, query.ToCursor(3), 1, query.Ascending) require.NoError(t, err) assert.Equal(t, 1, len(actual)) assert.Equal(t, "t4", actual[0].Address) }) } +func testGetAllByEnvironmentAndState(t *testing.T, s nonce.Store) { + t.Run("testGetAllByEnvironmentAndState", func(t *testing.T) { + ctx := context.Background() + + records := []nonce.Record{ + {Address: "vm1", Authority: "a1", Blockhash: "b1", Environment: nonce.EnvironmentVm, EnvironmentInstance: "vm_pubkey_1", Purpose: nonce.PurposeOnDemandTransaction, State: nonce.StateReleased, Signature: "s1"}, + {Address: "vm2", Authority: "a2", Blockhash: "b2", Environment: nonce.EnvironmentVm, EnvironmentInstance: "vm_pubkey_2", Purpose: nonce.PurposeOnDemandTransaction, State: nonce.StateReleased, Signature: "s2"}, + {Address: "vm3", Authority: "a3", Blockhash: "b3", Environment: nonce.EnvironmentVm, EnvironmentInstance: "vm_pubkey_1", Purpose: nonce.PurposeOnDemandTransaction, State: nonce.StateAvailable, Signature: "s3"}, + {Address: "sol1", Authority: "a4", Blockhash: "b4", Environment: nonce.EnvironmentSolana, EnvironmentInstance: nonce.EnvironmentInstanceSolanaMainnet, Purpose: nonce.PurposeOnDemandTransaction, State: nonce.StateReleased, Signature: "s4"}, + } + + for _, item := range records { + err := s.Save(ctx, &item) + require.NoError(t, err) + } + + // Should return released VM nonces across all instances (asc) + actual, err := s.GetAllByEnvironmentAndState(ctx, nonce.EnvironmentVm, nonce.StateReleased, query.EmptyCursor, 10, query.Ascending) + require.NoError(t, err) + assert.Equal(t, 2, len(actual)) + assert.Equal(t, "vm1", actual[0].Address) + assert.Equal(t, "vm2", actual[1].Address) + + // Should return released VM nonces across all instances (desc) + actual, err = s.GetAllByEnvironmentAndState(ctx, nonce.EnvironmentVm, nonce.StateReleased, query.EmptyCursor, 10, query.Descending) + require.NoError(t, err) + assert.Equal(t, 2, len(actual)) + assert.Equal(t, "vm2", actual[0].Address) + assert.Equal(t, "vm1", actual[1].Address) + + // Should return available VM nonces across all instances + actual, err = s.GetAllByEnvironmentAndState(ctx, nonce.EnvironmentVm, nonce.StateAvailable, query.EmptyCursor, 10, query.Ascending) + require.NoError(t, err) + assert.Equal(t, 1, len(actual)) + assert.Equal(t, "vm3", actual[0].Address) + + // Should return released Solana nonces across all instances + actual, err = s.GetAllByEnvironmentAndState(ctx, nonce.EnvironmentSolana, nonce.StateReleased, query.EmptyCursor, 10, query.Ascending) + require.NoError(t, err) + assert.Equal(t, 1, len(actual)) + assert.Equal(t, "sol1", actual[0].Address) + + // Should return not found for non-existent combination + _, err = s.GetAllByEnvironmentAndState(ctx, nonce.EnvironmentSolana, nonce.StateAvailable, query.EmptyCursor, 10, query.Ascending) + assert.Equal(t, nonce.ErrNonceNotFound, err) + + // Test pagination with limit (asc) + actual, err = s.GetAllByEnvironmentAndState(ctx, nonce.EnvironmentVm, nonce.StateReleased, query.EmptyCursor, 1, query.Ascending) + require.NoError(t, err) + assert.Equal(t, 1, len(actual)) + assert.Equal(t, "vm1", actual[0].Address) + + // Test pagination with cursor (asc) + actual, err = s.GetAllByEnvironmentAndState(ctx, nonce.EnvironmentVm, nonce.StateReleased, query.ToCursor(actual[0].Id), 10, query.Ascending) + require.NoError(t, err) + assert.Equal(t, 1, len(actual)) + assert.Equal(t, "vm2", actual[0].Address) + + // Test pagination with limit (desc) + actual, err = s.GetAllByEnvironmentAndState(ctx, nonce.EnvironmentVm, nonce.StateReleased, query.EmptyCursor, 1, query.Descending) + require.NoError(t, err) + assert.Equal(t, 1, len(actual)) + assert.Equal(t, "vm2", actual[0].Address) + + // Test pagination with cursor (desc) + actual, err = s.GetAllByEnvironmentAndState(ctx, nonce.EnvironmentVm, nonce.StateReleased, query.ToCursor(actual[0].Id), 10, query.Descending) + require.NoError(t, err) + assert.Equal(t, 1, len(actual)) + assert.Equal(t, "vm1", actual[0].Address) + }) +} + func testGetCount(t *testing.T, s nonce.Store) { t.Run("testGetCount", func(t *testing.T) { ctx := context.Background() diff --git a/ocp/rpc/currency/server.go b/ocp/rpc/currency/server.go index 0ee8b84..0d044bb 100644 --- a/ocp/rpc/currency/server.go +++ b/ocp/rpc/currency/server.go @@ -85,6 +85,23 @@ func (s *currencyServer) GetMints(ctx context.Context, req *currencypb.GetMintsR log := s.log.With(zap.String("method", "GetMints")) log = client.InjectLoggingMetadata(ctx, log) + // Track all requested mints so the worker polls their reserve state + var requestedMints []*common.Account + for _, protoMintAddress := range req.Addresses { + mintAccount, err := common.NewAccountFromProto(protoMintAddress) + if err != nil { + continue + } + requestedMints = append(requestedMints, mintAccount) + } + if err := s.liveMintStateWorker.trackMints(ctx, requestedMints); err != nil { + if err == errMintNotSupported { + return ¤cypb.GetMintsResponse{Result: currencypb.GetMintsResponse_NOT_FOUND}, nil + } + log.With(zap.Error(err)).Warn("failed to track requested mints") + return nil, status.Error(codes.Internal, "") + } + resp := ¤cypb.GetMintsResponse{ MetadataByAddress: make(map[string]*currencypb.Mint), } @@ -189,9 +206,9 @@ func (s *currencyServer) GetMints(ctx context.Context, req *currencypb.GetMintsR return nil, status.Error(codes.Internal, "") } - err = s.liveMintStateWorker.waitForData(ctx) + err = s.liveMintStateWorker.waitForReserveState(ctx, mintAccount) if err != nil { - log.With(zap.Error(err)).Warn("failed to wait for live mint data") + log.With(zap.Error(err)).Warn("failed to wait for live mint reserve state") return nil, status.Error(codes.Internal, "") } @@ -634,6 +651,12 @@ func (s *currencyServer) StreamLiveMintData( requestedMints = append(requestedMints, mint) } + // Track requested mints so the worker polls their reserve state + if err := s.liveMintStateWorker.trackMints(ctx, requestedMints); err != nil { + log.With(zap.Error(err)).Warn("failed to track requested mints") + return status.Error(codes.Internal, "") + } + // Generate unique stream ID streamID := uuid.New().String() log = log.With(zap.String("stream_id", streamID)) @@ -644,14 +667,13 @@ func (s *currencyServer) StreamLiveMintData( log.Debug("stream registered") - // Wait for initial data to be available - if err := s.liveMintStateWorker.waitForData(ctx); err != nil { - log.With(zap.Error(err)).Debug("context cancelled while waiting for data") - return status.Error(codes.Canceled, "") - } - - // Initial flush: send current exchange rates if the stream wants them + // Wait for and flush initial exchange rates if the stream wants them if stream.wantsExchangeRates() { + if err := s.liveMintStateWorker.waitForExchangeRates(ctx); err != nil { + log.With(zap.Error(err)).Debug("context cancelled while waiting for exchange rates") + return status.Error(codes.Canceled, "") + } + exchangeRates := s.liveMintStateWorker.getExchangeRates() if exchangeRates != nil && exchangeRates.SignedResponse != nil { if err := streamer.Send(exchangeRates.SignedResponse); err != nil { @@ -661,32 +683,42 @@ func (s *currencyServer) StreamLiveMintData( } } - // Initial flush: send current reserve states - reserveStates := s.liveMintStateWorker.getReserveStates() - if len(reserveStates) > 0 { - // Filter based on requested mints and build batch response - var filtered []*currencypb.VerifiedLaunchpadCurrencyReserveState - for _, state := range reserveStates { - if stream.wantsMint(state.Mint.PublicKey().ToBase58()) && state.SignedState != nil { - filtered = append(filtered, state.SignedState) - } + // Wait for and flush initial reserve states for each requested mint + var filtered []*currencypb.VerifiedLaunchpadCurrencyReserveState + for _, mint := range requestedMints { + if common.IsCoreMint(mint) { + continue } - if len(filtered) > 0 { - resp := ¤cypb.StreamLiveMintDataResponse{ - Type: ¤cypb.StreamLiveMintDataResponse_Data{ - Data: ¤cypb.StreamLiveMintDataResponse_LiveData{ - Type: ¤cypb.StreamLiveMintDataResponse_LiveData_LaunchpadCurrencyReserveStates{ - LaunchpadCurrencyReserveStates: ¤cypb.VerifiedLaunchapdCurrencyReserveStateBatch{ - ReserveStates: filtered, - }, + + err := s.liveMintStateWorker.waitForReserveState(ctx, mint) + if err != nil { + log.With(zap.Error(err)).Debug("context cancelled while waiting for reserve state") + return status.Error(codes.Canceled, "") + } + + state, err := s.liveMintStateWorker.getReserveState(mint) + if err != nil { + continue + } + if state.SignedState != nil { + filtered = append(filtered, state.SignedState) + } + } + if len(filtered) > 0 { + resp := ¤cypb.StreamLiveMintDataResponse{ + Type: ¤cypb.StreamLiveMintDataResponse_Data{ + Data: ¤cypb.StreamLiveMintDataResponse_LiveData{ + Type: ¤cypb.StreamLiveMintDataResponse_LiveData_LaunchpadCurrencyReserveStates{ + LaunchpadCurrencyReserveStates: ¤cypb.VerifiedLaunchapdCurrencyReserveStateBatch{ + ReserveStates: filtered, }, }, }, - } - if err := streamer.Send(resp); err != nil { - log.With(zap.Error(err)).Debug("failed to send initial reserve states") - return err - } + }, + } + if err := streamer.Send(resp); err != nil { + log.With(zap.Error(err)).Debug("failed to send initial reserve states") + return err } } diff --git a/ocp/rpc/currency/worker.go b/ocp/rpc/currency/worker.go index 470963f..44ac25e 100644 --- a/ocp/rpc/currency/worker.go +++ b/ocp/rpc/currency/worker.go @@ -16,48 +16,16 @@ import ( "github.com/code-payments/ocp-server/ocp/auth" "github.com/code-payments/ocp-server/ocp/common" - "github.com/code-payments/ocp-server/ocp/config" currency_util "github.com/code-payments/ocp-server/ocp/currency" ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/currency" ) var ( - badBoysMintAccount, _ = common.NewAccountFromPublicKeyString(config.BadBoysMintPublicKey) - bluebucksMintAccount, _ = common.NewAccountFromPublicKeyString(config.BluebucksMintPublicKey) - bitsMintAccount, _ = common.NewAccountFromPublicKeyString(config.BitsMintPublicKey) - bogeyMintAccount, _ = common.NewAccountFromPublicKeyString(config.BogeyMintPublicKey) - floatMintAccount, _ = common.NewAccountFromPublicKeyString(config.FloatMintPublicKey) - jeffyMintAccount, _ = common.NewAccountFromPublicKeyString(config.JeffyMintPublicKey) - lightspeedMintAccount, _ = common.NewAccountFromPublicKeyString(config.LightspeedMintPublicKey) - linksMintAccount, _ = common.NewAccountFromPublicKeyString(config.LinksMintPublicKey) - marketCoinMintAccount, _ = common.NewAccountFromPublicKeyString(config.MarketCoinMintPublicKey) - moonyMintAccount, _ = common.NewAccountFromPublicKeyString(config.MoonyMintPublicKey) - teddiesMintAccount, _ = common.NewAccountFromPublicKeyString(config.TeddiesMintPublicKey) - testMintAccount, _ = common.NewAccountFromPublicKeyString(config.TestMintPublicKey) - toshiMintAccount, _ = common.NewAccountFromPublicKeyString(config.ToshiMintPublicKey) - xpMintAccount, _ = common.NewAccountFromPublicKeyString(config.XpMintPublicKey) + errMintNotTracked = errors.New("mint is not being tracked") + errMintNotSupported = errors.New("mint is not supported") ) -// trackedLaunchpadMints is the hardcoded set of launchpad mints to track -// (excludes core mint as it only has exchange rate data) -var trackedLaunchpadMints = []*common.Account{ - badBoysMintAccount, - bluebucksMintAccount, - bitsMintAccount, - bogeyMintAccount, - floatMintAccount, - jeffyMintAccount, - lightspeedMintAccount, - linksMintAccount, - marketCoinMintAccount, - moonyMintAccount, - teddiesMintAccount, - testMintAccount, - toshiMintAccount, - xpMintAccount, -} - // liveExchangeRateData represents live exchange rate data with its pre-signed response type liveExchangeRateData struct { Rates map[string]float64 @@ -78,6 +46,9 @@ type liveMintStateWorker struct { conf *conf data ocp_data.Provider + mintsMu sync.RWMutex + trackedMints map[string]*common.Account + stateMu sync.RWMutex exchangeRates *liveExchangeRateData launchpadReserves map[string]*liveReserveStateData @@ -85,13 +56,11 @@ type liveMintStateWorker struct { streamsMu sync.RWMutex streams map[string]*liveMintDataStream - dataReady chan struct{} // closed when initial data is loaded - dataReadyOnce sync.Once + exchangeRatesReady chan struct{} + exchangeRatesReadyOnce sync.Once - // Track initial load completion - initMu sync.Mutex - exchangeRatesLoaded bool - reserveStatesLoadedMints map[string]struct{} + reserveReadyMu sync.Mutex + reserveReadyChans map[string]chan struct{} ctx context.Context cancel context.CancelFunc @@ -100,15 +69,16 @@ type liveMintStateWorker struct { func newLiveMintStateWorker(log *zap.Logger, data ocp_data.Provider, conf *conf) *liveMintStateWorker { ctx, cancel := context.WithCancel(context.Background()) return &liveMintStateWorker{ - log: log, - conf: conf, - data: data, - launchpadReserves: make(map[string]*liveReserveStateData), - streams: make(map[string]*liveMintDataStream), - dataReady: make(chan struct{}), - reserveStatesLoadedMints: make(map[string]struct{}), - ctx: ctx, - cancel: cancel, + log: log, + conf: conf, + data: data, + trackedMints: make(map[string]*common.Account), + launchpadReserves: make(map[string]*liveReserveStateData), + streams: make(map[string]*liveMintDataStream), + exchangeRatesReady: make(chan struct{}), + reserveReadyChans: make(map[string]chan struct{}), + ctx: ctx, + cancel: cancel, } } @@ -132,6 +102,56 @@ func (m *liveMintStateWorker) stop() { m.streams = make(map[string]*liveMintDataStream) } +// getTrackedMints returns the current set of dynamically tracked mints +func (m *liveMintStateWorker) getTrackedMints() map[string]*common.Account { + m.mintsMu.RLock() + defer m.mintsMu.RUnlock() + + result := make(map[string]*common.Account, len(m.trackedMints)) + for k, v := range m.trackedMints { + result[k] = v + } + return result +} + +// trackMints validates and adds mints to the tracked set. Only mints that +// pass IsSupportedMint validation are added. Core mint is excluded. Returns +// an error if any non-core mint is unsupported or cannot be validated. +func (m *liveMintStateWorker) trackMints(ctx context.Context, mints []*common.Account) error { + for _, mint := range mints { + if common.IsCoreMint(mint) { + continue + } + + mintAddr := mint.PublicKey().ToBase58() + + m.mintsMu.RLock() + _, alreadyTracked := m.trackedMints[mintAddr] + m.mintsMu.RUnlock() + + if alreadyTracked { + continue + } + + isSupported, err := common.IsSupportedMint(ctx, m.data, mint) + if err != nil { + return errors.Wrapf(err, "failed to validate mint %s", mintAddr) + } + if !isSupported { + return errMintNotSupported + } + + m.mintsMu.Lock() + m.trackedMints[mintAddr] = mint + m.mintsMu.Unlock() + + m.log.With(zap.String("mint", mintAddr)).Debug("tracking new mint from client request") + + go m.fetchAndUpdateReserveState(ctx, mint) + } + return nil +} + // registerStream creates and registers a new stream for the given mints func (m *liveMintStateWorker) registerStream(id string, mints []*common.Account) *liveMintDataStream { stream := newLiveMintDataStream(id, mints, streamBufferSize) @@ -157,34 +177,59 @@ func (m *liveMintStateWorker) unregisterStream(id string) { } } -// waitForData blocks until initial data is loaded or context is cancelled -func (m *liveMintStateWorker) waitForData(ctx context.Context) error { +// waitForExchangeRates blocks until exchange rate data is available or context is cancelled +func (m *liveMintStateWorker) waitForExchangeRates(ctx context.Context) error { select { - case <-m.dataReady: + case <-m.exchangeRatesReady: return nil case <-ctx.Done(): return ctx.Err() } } -// getExchangeRates returns the current pre-signed exchange rate data -func (m *liveMintStateWorker) getExchangeRates() *liveExchangeRateData { - m.stateMu.RLock() - defer m.stateMu.RUnlock() +// waitForReserveState blocks until reserve state data for a specific mint is +// available or context is cancelled. Returns ErrMintNotTracked immediately if +// the mint is not in the tracked set. +func (m *liveMintStateWorker) waitForReserveState(ctx context.Context, mint *common.Account) error { + if !m.isTrackedMint(mint) { + return errMintNotTracked + } - return m.exchangeRates + ch := m.getOrCreateReserveReadyChan(mint) + select { + case <-ch: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (m *liveMintStateWorker) isTrackedMint(mint *common.Account) bool { + m.mintsMu.RLock() + defer m.mintsMu.RUnlock() + + _, ok := m.trackedMints[mint.PublicKey().ToBase58()] + return ok +} + +func (m *liveMintStateWorker) getOrCreateReserveReadyChan(mint *common.Account) chan struct{} { + m.reserveReadyMu.Lock() + defer m.reserveReadyMu.Unlock() + + ch, ok := m.reserveReadyChans[mint.PublicKey().ToBase58()] + if !ok { + ch = make(chan struct{}) + m.reserveReadyChans[mint.PublicKey().ToBase58()] = ch + } + return ch } -// getReserveStates returns all current pre-signed launchpad currency reserve states -func (m *liveMintStateWorker) getReserveStates() []*liveReserveStateData { +// getExchangeRates returns the current pre-signed exchange rate data +func (m *liveMintStateWorker) getExchangeRates() *liveExchangeRateData { m.stateMu.RLock() defer m.stateMu.RUnlock() - result := make([]*liveReserveStateData, 0, len(m.launchpadReserves)) - for _, data := range m.launchpadReserves { - result = append(result, data) - } - return result + return m.exchangeRates } // getReserveState returns a current pre-signed launchpad currency reserve state for a mint @@ -200,34 +245,28 @@ func (m *liveMintStateWorker) getReserveState(mint *common.Account) (*liveReserv return nil, errors.New("not found") } -func (m *liveMintStateWorker) tryMarkDataReady() { - m.initMu.Lock() - defer m.initMu.Unlock() - - // Check if all data has been loaded - if !m.exchangeRatesLoaded { - return - } - if len(m.reserveStatesLoadedMints) < len(trackedLaunchpadMints) { - return - } - - m.dataReadyOnce.Do(func() { - close(m.dataReady) +func (m *liveMintStateWorker) markExchangeRatesReady() { + m.exchangeRatesReadyOnce.Do(func() { + close(m.exchangeRatesReady) }) } -func (m *liveMintStateWorker) markExchangeRatesLoaded() { - m.initMu.Lock() - m.exchangeRatesLoaded = true - m.initMu.Unlock() - m.tryMarkDataReady() -} +func (m *liveMintStateWorker) markReserveStateReady(mint *common.Account) { + m.reserveReadyMu.Lock() + defer m.reserveReadyMu.Unlock() + + ch, ok := m.reserveReadyChans[mint.PublicKey().ToBase58()] + if !ok { + ch = make(chan struct{}) + m.reserveReadyChans[mint.PublicKey().ToBase58()] = ch + } -func (m *liveMintStateWorker) markReserveStateLoaded(mint string) { - m.initMu.Lock() - m.reserveStatesLoadedMints[mint] = struct{}{} - m.initMu.Unlock() + select { + case <-ch: + // Already closed + default: + close(ch) + } } func (m *liveMintStateWorker) pollExchangeRates(ctx context.Context) { @@ -274,14 +313,12 @@ func (m *liveMintStateWorker) fetchAndUpdateExchangeRates(ctx context.Context, l m.stateMu.Unlock() m.notifyExchangeRates() - m.markExchangeRatesLoaded() + m.markExchangeRatesReady() } func (m *liveMintStateWorker) pollReserveState(ctx context.Context) { - log := m.log.With(zap.String("poller", "reserve_state")) - // Initial poll immediately - m.fetchAndUpdateReserveState(ctx, log) + m.fetchAndUpdateReserveStates(ctx) ticker := time.NewTicker(m.conf.reserveStatePollInterval.Get(ctx)) defer ticker.Stop() @@ -293,59 +330,74 @@ func (m *liveMintStateWorker) pollReserveState(ctx context.Context) { case <-m.ctx.Done(): return case <-ticker.C: - m.fetchAndUpdateReserveState(ctx, log) + m.fetchAndUpdateReserveStates(ctx) } } } -func (m *liveMintStateWorker) fetchAndUpdateReserveState(ctx context.Context, log *zap.Logger) { - var updatedStates []*liveReserveStateData +// fetchAndUpdateReserveState fetches and updates the reserve state for a single mint. +// Returns the updated state data, or nil if the fetch failed. +func (m *liveMintStateWorker) fetchAndUpdateReserveState(ctx context.Context, mint *common.Account) *liveReserveStateData { + mintAddr := mint.PublicKey().ToBase58() - for _, mint := range trackedLaunchpadMints { - mintAddr := mint.PublicKey().ToBase58() + supply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, m.data, mint) + if err != nil { + m.log.With( + zap.Error(err), + zap.String("mint", mintAddr), + ).Warn("failed to fetch launchpad currency circulating supply") + return nil + } - supply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, m.data, mint) - if err != nil { - log.With( - zap.Error(err), - zap.String("mint", mintAddr), - ).Warn("failed to fetch launchpad currency circulating supply") - continue - } + signedState, err := m.signReserveState(mint, supply, ts) + if err != nil { + m.log.With( + zap.Error(err), + zap.String("mint", mintAddr), + ).Warn("failed to sign reserve state") + return nil + } - // Sign the reserve state once when fetched - signedState, err := m.signReserveState(mint, supply, ts) - if err != nil { - log.With( - zap.Error(err), - zap.String("mint", mintAddr), - ).Warn("failed to sign reserve state") - continue - } + m.markReserveStateReady(mint) - // Track that this mint was loaded (before updating launchpadReserves) - m.markReserveStateLoaded(mintAddr) + stateData := &liveReserveStateData{ + Mint: mint, + SupplyFromBonding: supply, + Timestamp: ts, + SignedState: signedState, + } - stateData := &liveReserveStateData{ - Mint: mint, - SupplyFromBonding: supply, - Timestamp: ts, - SignedState: signedState, - } + m.stateMu.Lock() + m.launchpadReserves[mintAddr] = stateData + m.stateMu.Unlock() + + return stateData +} - m.stateMu.Lock() - m.launchpadReserves[mintAddr] = stateData - m.stateMu.Unlock() +func (m *liveMintStateWorker) fetchAndUpdateReserveStates(ctx context.Context) { + trackedMints := m.getTrackedMints() - updatedStates = append(updatedStates, stateData) + var mu sync.Mutex + var updatedStates []*liveReserveStateData + + var wg sync.WaitGroup + wg.Add(len(trackedMints)) + for _, mint := range trackedMints { + go func(mint *common.Account) { + defer wg.Done() + + if stateData := m.fetchAndUpdateReserveState(ctx, mint); stateData != nil { + mu.Lock() + updatedStates = append(updatedStates, stateData) + mu.Unlock() + } + }(mint) } + wg.Wait() if len(updatedStates) > 0 { m.notifyReserveStates(updatedStates) } - - // Try to mark data ready after all reserve states are processed - m.tryMarkDataReady() } func (m *liveMintStateWorker) notifyExchangeRates() { diff --git a/ocp/worker/currency/reserve.go b/ocp/worker/currency/reserve.go index 1a7c8b2..fa786f0 100644 --- a/ocp/worker/currency/reserve.go +++ b/ocp/worker/currency/reserve.go @@ -2,13 +2,13 @@ package currency import ( "context" + "sync" "time" "go.uber.org/zap" "github.com/code-payments/ocp-server/metrics" "github.com/code-payments/ocp-server/ocp/common" - "github.com/code-payments/ocp-server/ocp/config" currency_util "github.com/code-payments/ocp-server/ocp/currency" ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/currency" @@ -20,6 +20,9 @@ import ( type reserveRuntime struct { log *zap.Logger data ocp_data.Provider + + mintsMu sync.RWMutex + mints []*common.Account } func NewReserveRuntime(log *zap.Logger, data ocp_data.Provider) worker.Runtime { @@ -30,6 +33,8 @@ func NewReserveRuntime(log *zap.Logger, data ocp_data.Provider) worker.Runtime { } func (p *reserveRuntime) Start(runtimeCtx context.Context, interval time.Duration) error { + go p.pollMints(runtimeCtx, interval/3) + for { _, err := retry.Retry( func() error { @@ -68,260 +73,83 @@ func (p *reserveRuntime) Start(runtimeCtx context.Context, interval time.Duratio } } -// todo: Don't hardcode Jeffy and other Flipcash currencies -func (p *reserveRuntime) UpdateAllLaunchpadCurrencyReserves(ctx context.Context) error { - err1 := func() error { - bitsMintAccount, _ := common.NewAccountFromPublicKeyString(config.BitsMintPublicKey) - - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, bitsMintAccount) - if err != nil { - return err - } - - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: bitsMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err2 := func() error { - bogeyMintAccount, _ := common.NewAccountFromPublicKeyString(config.BogeyMintPublicKey) - - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, bogeyMintAccount) - if err != nil { - return err - } - - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: bogeyMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err3 := func() error { - floatMintAccount, _ := common.NewAccountFromPublicKeyString(config.FloatMintPublicKey) - - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, floatMintAccount) - if err != nil { - return err - } - - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: floatMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err4 := func() error { - jeffyMintAccount, _ := common.NewAccountFromPublicKeyString(config.JeffyMintPublicKey) - - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, jeffyMintAccount) - if err != nil { - return err - } - - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: jeffyMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err5 := func() error { - marketCoinMintAccount, _ := common.NewAccountFromPublicKeyString(config.MarketCoinMintPublicKey) - - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, marketCoinMintAccount) - if err != nil { - return err - } - - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: marketCoinMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err6 := func() error { - xpMintAccount, _ := common.NewAccountFromPublicKeyString(config.XpMintPublicKey) - - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, xpMintAccount) - if err != nil { - return err - } - - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: xpMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err7 := func() error { - badBoysMintAccount, _ := common.NewAccountFromPublicKeyString(config.BadBoysMintPublicKey) - - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, badBoysMintAccount) - if err != nil { - return err - } - - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: badBoysMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err8 := func() error { - testMintAccount, _ := common.NewAccountFromPublicKeyString(config.TestMintPublicKey) +func (p *reserveRuntime) pollMints(ctx context.Context, interval time.Duration) { + // Initial fetch before the first reserve update + p.refreshMints(ctx) - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, testMintAccount) - if err != nil { - return err - } - - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: testMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err9 := func() error { - moonyMintAccount, _ := common.NewAccountFromPublicKeyString(config.MoonyMintPublicKey) - - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, moonyMintAccount) - if err != nil { - return err + for { + select { + case <-ctx.Done(): + return + case <-time.After(interval): + p.refreshMints(ctx) } + } +} - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: moonyMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err10 := func() error { - bluebucksMintAccount, _ := common.NewAccountFromPublicKeyString(config.BluebucksMintPublicKey) +func (p *reserveRuntime) refreshMints(ctx context.Context) { + mintStrings, err := p.data.GetAllCurrencyMints(ctx) + if err != nil { + p.log.With(zap.Error(err)).Warn("failed to refresh currency mints") + return + } - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, bluebucksMintAccount) + var mints []*common.Account + for _, mint := range mintStrings { + account, err := common.NewAccountFromPublicKeyString(mint) if err != nil { - return err + p.log.With(zap.Error(err), zap.String("mint", mint)).Warn("invalid mint public key") + continue } - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: bluebucksMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err11 := func() error { - linksMintAccount, _ := common.NewAccountFromPublicKeyString(config.LinksMintPublicKey) - - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, linksMintAccount) - if err != nil { - return err + if common.IsCoreMint(account) { + continue } - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: linksMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() - - err12 := func() error { - lightspeedMintAccount, _ := common.NewAccountFromPublicKeyString(config.LightspeedMintPublicKey) - - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, lightspeedMintAccount) - if err != nil { - return err - } + mints = append(mints, account) + } - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: lightspeedMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() + p.mintsMu.Lock() + p.mints = mints + p.mintsMu.Unlock() +} - err13 := func() error { - toshiMintAccount, _ := common.NewAccountFromPublicKeyString(config.ToshiMintPublicKey) +func (p *reserveRuntime) getMints() []*common.Account { + p.mintsMu.RLock() + defer p.mintsMu.RUnlock() - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, toshiMintAccount) - if err != nil { - return err - } + return p.mints +} - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: toshiMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() +func (p *reserveRuntime) UpdateAllLaunchpadCurrencyReserves(ctx context.Context) error { + mints := p.getMints() - err14 := func() error { - teddiesMintAccount, _ := common.NewAccountFromPublicKeyString(config.TeddiesMintPublicKey) + var wg sync.WaitGroup + wg.Add(len(mints)) + for _, mint := range mints { + go func(mint *common.Account) { + defer wg.Done() - ciculatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, teddiesMintAccount) - if err != nil { - return err - } + log := p.log.With(zap.String("mint", mint.PublicKey().ToBase58())) - return p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ - Mint: teddiesMintAccount.PublicKey().ToBase58(), - SupplyFromBonding: ciculatingSupply, - Time: ts, - }) - }() + circulatingSupply, ts, err := currency_util.GetLaunchpadCurrencyCirculatingSupply(ctx, p.data, mint) + if err != nil { + log.With(zap.Error(err)).Warn("failed to get circulating supply") + return + } - if err1 != nil { - return err1 - } - if err2 != nil { - return err2 - } - if err3 != nil { - return err3 - } - if err4 != nil { - return err4 - } - if err5 != nil { - return err5 - } - if err6 != nil { - return err6 - } - if err7 != nil { - return err7 - } - if err8 != nil { - return err8 - } - if err9 != nil { - return err9 - } - if err10 != nil { - return err10 - } - if err11 != nil { - return err11 - } - if err12 != nil { - return err12 - } - if err13 != nil { - return err13 - } - if err14 != nil { - return err14 + err = p.data.PutCurrencyReserve(ctx, ¤cy.ReserveRecord{ + Mint: mint.PublicKey().ToBase58(), + SupplyFromBonding: circulatingSupply, + Time: ts, + }) + if err != nil { + log.With(zap.Error(err)).Warn("failed to put currency reserve") + return + } + }(mint) } + wg.Wait() return nil } diff --git a/ocp/worker/geyser/handler.go b/ocp/worker/geyser/handler.go index 6fb7cd0..1c77fed 100644 --- a/ocp/worker/geyser/handler.go +++ b/ocp/worker/geyser/handler.go @@ -7,7 +7,6 @@ import ( "github.com/mr-tron/base58" "github.com/pkg/errors" - "github.com/code-payments/ocp-server/ocp/config" geyserpb "github.com/code-payments/ocp-server/ocp/worker/geyser/api/gen" "github.com/code-payments/ocp-server/ocp/common" @@ -76,57 +75,44 @@ func (h *TokenProgramAccountHandler) Handle(ctx context.Context, update *geyserp return errors.Wrap(err, "invalid owner account") } + // Not an ATA, so filter it out. It cannot be a VM deposit ATA + if bytes.Equal(tokenAccount.PublicKey().ToBytes(), ownerAccount.PublicKey().ToBytes()) { + return nil + } + mintAccount, err := common.NewAccountFromPublicKeyBytes(unmarshalled.Mint) if err != nil { return errors.Wrap(err, "invalid mint account") } - switch mintAccount.PublicKey().ToBase58() { - - // todo: Don't hardcode Jeffy and other Flipcash currencies - case common.CoreMintAccount.PublicKey().ToBase58(), - config.BadBoysMintPublicKey, - config.BluebucksMintPublicKey, - config.BitsMintPublicKey, - config.BogeyMintPublicKey, - config.FloatMintPublicKey, - config.JeffyMintPublicKey, - config.LightspeedMintPublicKey, - config.LinksMintPublicKey, - config.MarketCoinMintPublicKey, - config.MoonyMintPublicKey, - config.TeddiesMintPublicKey, - config.TestMintPublicKey, - config.ToshiMintPublicKey, - config.XpMintPublicKey: - // Not an ATA, so filter it out. It cannot be a VM deposit ATA - if bytes.Equal(tokenAccount.PublicKey().ToBytes(), ownerAccount.PublicKey().ToBytes()) { - return nil - } + isSupportedMint, err := common.IsSupportedMint(ctx, h.data, mintAccount) + if err != nil { + return errors.Wrap(err, "error checking if mint is supported") + } + if !isSupportedMint { + return nil + } - exists, userAuthorityAccount, err := testForKnownUserAuthorityFromDepositPda(ctx, h.data, ownerAccount) - if err != nil { - return errors.Wrap(err, "error testing for user authority from deposit pda") - } else if !exists { - return nil - } + exists, userAuthorityAccount, err := testForKnownUserAuthorityFromDepositPda(ctx, h.data, ownerAccount) + if err != nil { + return errors.Wrap(err, "error testing for user authority from deposit pda") + } else if !exists { + return nil + } - err = processPotentialExternalDepositIntoVm(ctx, h.data, h.integration, signature, userAuthorityAccount, mintAccount) - if err != nil { - return errors.Wrap(err, "error processing signature for external deposit into vm") - } + err = processPotentialExternalDepositIntoVm(ctx, h.data, h.integration, signature, userAuthorityAccount, mintAccount) + if err != nil { + return errors.Wrap(err, "error processing signature for external deposit into vm") + } - if unmarshalled.Amount > 0 { - err = initiateExternalDepositIntoVm(ctx, h.data, userAuthorityAccount, mintAccount, unmarshalled.Amount) - if err != nil { - return errors.Wrap(err, "error depositing into the vm") - } + if unmarshalled.Amount > 0 { + err = initiateExternalDepositIntoVm(ctx, h.data, userAuthorityAccount, mintAccount, unmarshalled.Amount) + if err != nil { + return errors.Wrap(err, "error depositing into the vm") } - - return nil - default: - return nil } + + return nil } func initializeProgramAccountUpdateHandlers(conf *conf, data ocp_data.Provider, integration Integration) map[string]ProgramAccountUpdateHandler { diff --git a/ocp/worker/nonce/metrics.go b/ocp/worker/nonce/metrics.go index 8baeefa..455189e 100644 --- a/ocp/worker/nonce/metrics.go +++ b/ocp/worker/nonce/metrics.go @@ -2,12 +2,7 @@ package nonce import ( "context" - "fmt" "time" - - "github.com/code-payments/ocp-server/metrics" - "github.com/code-payments/ocp-server/ocp/common" - "github.com/code-payments/ocp-server/ocp/data/nonce" ) const ( @@ -22,52 +17,7 @@ func (p *runtime) metricsGaugeWorker(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-time.After(delay): - start := time.Now() - - // todo: optimize number of queries needed per polling check - for _, state := range []nonce.State{ - nonce.StateUnknown, - nonce.StateReleased, - nonce.StateAvailable, - nonce.StateReserved, - nonce.StateInvalid, - nonce.StateClaimed, - } { - count, err := p.data.GetNonceCountByStateAndPurpose(ctx, nonce.EnvironmentVm, common.CoreMintVmAccount.PublicKey().ToBase58(), state, nonce.PurposeClientIntent) - if err != nil { - continue - } - recordNonceCountEvent(ctx, nonce.EnvironmentVm, common.CoreMintVmAccount.PublicKey().ToBase58(), state, nonce.PurposeClientIntent, count) - - count, err = p.data.GetNonceCountByStateAndPurpose(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, state, nonce.PurposeOnDemandTransaction) - if err != nil { - continue - } - recordNonceCountEvent(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, state, nonce.PurposeOnDemandTransaction, count) - - count, err = p.data.GetNonceCountByStateAndPurpose(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, state, nonce.PurposeInternalServerProcess) - if err != nil { - continue - } - recordNonceCountEvent(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, state, nonce.PurposeInternalServerProcess, count) - - count, err = p.data.GetNonceCountByStateAndPurpose(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, state, nonce.PurposeClientSwap) - if err != nil { - continue - } - recordNonceCountEvent(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, state, nonce.PurposeClientSwap, count) - } - - delay = time.Second - time.Since(start) + // todo: define valuable metrics } } } - -func recordNonceCountEvent(ctx context.Context, env nonce.Environment, instance string, state nonce.State, useCase nonce.Purpose, count uint64) { - metrics.RecordEvent(ctx, nonceCountCheckEventName, map[string]interface{}{ - "pool": fmt.Sprintf("%s:%s", env.String(), instance), - "use_case": useCase.String(), - "state": state.String(), - "count": count, - }) -} diff --git a/ocp/worker/nonce/pool.go b/ocp/worker/nonce/pool.go index b00ba02..1caf702 100644 --- a/ocp/worker/nonce/pool.go +++ b/ocp/worker/nonce/pool.go @@ -22,7 +22,7 @@ const ( nonceBatchSize = 100 ) -func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, instance string, state nonce.State, interval time.Duration) error { +func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, state nonce.State, interval time.Duration) error { var cursor query.Cursor delay := interval @@ -35,11 +35,9 @@ func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, inst defer trace.End() tracedCtx := metrics.NewContext(runtimeCtx, trace) - // Get a batch of nonce records in similar state (e.g. newly created, released, reserved, etc...) - items, err := p.data.GetAllNonceByState( + items, err := p.data.GetAllNoncesByEnvironmentAndState( tracedCtx, env, - instance, state, query.WithLimit(nonceBatchSize), query.WithCursor(cursor), @@ -49,7 +47,6 @@ func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, inst return err } - // Process the batch of nonce accounts in parallel var wg sync.WaitGroup for _, item := range items { wg.Add(1) @@ -65,7 +62,6 @@ func (p *runtime) worker(runtimeCtx context.Context, env nonce.Environment, inst } wg.Wait() - // Update cursor to point to the next set of nonce accounts if len(items) > 0 { cursor = query.ToCursor(items[len(items)-1].Id) } else { @@ -173,7 +169,7 @@ func (p *runtime) handleUnknown(ctx context.Context, record *nonce.Record) error // Check the signature for a potential timeout (e.g. if the nonce account // was never created because the blockchain never saw the init/create // transaction) - err := p.checkForMissingTx(ctx, record) + err := p.checkForMissingTx(record) if err != nil { return p.markInvalid(ctx, record) } diff --git a/ocp/worker/nonce/runtime.go b/ocp/worker/nonce/runtime.go index 79bd6a8..702cc03 100644 --- a/ocp/worker/nonce/runtime.go +++ b/ocp/worker/nonce/runtime.go @@ -10,8 +10,6 @@ import ( indexerpb "github.com/code-payments/code-vm-indexer/generated/indexer/v1" - "github.com/code-payments/ocp-server/ocp/common" - "github.com/code-payments/ocp-server/ocp/config" ocp_data "github.com/code-payments/ocp-server/ocp/data" "github.com/code-payments/ocp-server/ocp/data/nonce" "github.com/code-payments/ocp-server/ocp/worker" @@ -57,46 +55,26 @@ func (p *runtime) Start(ctx context.Context, interval time.Duration) error { } { go func(state nonce.State) { - err := p.worker(ctx, nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, state, interval) + err := p.worker(ctx, nonce.EnvironmentSolana, state, interval) if err != nil && err != context.Canceled { - p.log.With(zap.Error(err)).Warn(fmt.Sprintf("nonce processing loop terminated unexpectedly for env %s, instance %s, state %d", nonce.EnvironmentSolana, nonce.EnvironmentInstanceSolanaMainnet, state)) + p.log.With(zap.Error(err)).Warn(fmt.Sprintf("nonce processing loop terminated unexpectedly for env %s, state %d", nonce.EnvironmentSolana, state)) } }(state) } // Setup workers to watch for nonce state changes on the VM side - // - // todo: Dynamically detect VMs - for _, vm := range []string{ - common.CoreMintVmAccount.PublicKey().ToBase58(), - config.BadBoysVmAccountPublicKey, - config.BluebucksVmAccountPublicKey, - config.BitsVmAccountPublicKey, - config.BogeyVmAccountPublicKey, - config.FloatVmAccountPublicKey, - config.JeffyVmAccountPublicKey, - config.LightspeedVmAccountPublicKey, - config.LinksVmAccountPublicKey, - config.MarketCoinVmAccountPublicKey, - config.MoonyVmAccountPublicKey, - config.TeddiesVmAccountPublicKey, - config.TestVmAccountPublicKey, - config.ToshiVmAccountPublicKey, - config.XpVmAccountPublicKey, + for _, state := range []nonce.State{ + nonce.StateReleased, } { - for _, state := range []nonce.State{ - nonce.StateReleased, - } { - go func(vm string, state nonce.State) { + go func(state nonce.State) { - err := p.worker(ctx, nonce.EnvironmentVm, vm, state, interval) - if err != nil && err != context.Canceled { - p.log.With(zap.Error(err)).Warn(fmt.Sprintf("nonce processing loop terminated unexpectedly for env %s, instance %s, state %d", nonce.EnvironmentVm, vm, state)) - } + err := p.worker(ctx, nonce.EnvironmentVm, state, interval) + if err != nil && err != context.Canceled { + p.log.With(zap.Error(err)).Warn(fmt.Sprintf("nonce processing loop terminated unexpectedly for env %s, state %d", nonce.EnvironmentVm, state)) + } - }(vm, state) - } + }(state) } go func() { diff --git a/ocp/worker/nonce/util.go b/ocp/worker/nonce/util.go index f6bf648..99ceadf 100644 --- a/ocp/worker/nonce/util.go +++ b/ocp/worker/nonce/util.go @@ -176,7 +176,7 @@ func (p *runtime) createNonceAccountTx(ctx context.Context, nonce *nonce.Record) return &tx, nil } -func (p *runtime) checkForMissingTx(ctx context.Context, nonce *nonce.Record) error { +func (p *runtime) checkForMissingTx(nonce *nonce.Record) error { sigCacheMu.Lock() defer sigCacheMu.Unlock() @@ -224,6 +224,10 @@ func (p *runtime) getBlockhashFromSolanaNonce(ctx context.Context, record *nonce return "", errors.Errorf("nonce environment is not %s", nonce.EnvironmentSolana.String()) } + if record.EnvironmentInstance != nonce.EnvironmentInstanceSolanaMainnet { + return "", errors.New("only support solana nonces on mainnet") + } + // Always get the account's state after the transaction's block to avoid // having RPC nodes that are behind provide stale finalized data. rawData, _, err := p.data.GetBlockchainAccountDataAfterBlock(ctx, record.Address, slot)