Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 77 additions & 23 deletions cmd/stellar-rpc/internal/preflight/preflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"runtime/cgo"
"sync"
"time"
"unsafe"

Expand All @@ -35,6 +36,54 @@ type snapshotSourceHandle struct {
ledgerEntryGetter ledgerentries.LedgerEntryGetter
ctx context.Context //nolint:containedctx
logger *log.Entry
cache *snapshotSourceCache
}

type cachedLedgerEntryAndTTL struct {
entry []byte
ttl int64
}

type snapshotSourceCache struct {
mu sync.Mutex
entries map[string]cachedLedgerEntryAndTTL
}
Comment on lines +47 to +50

func newSnapshotSourceHandle(ctx context.Context, params Parameters) snapshotSourceHandle {
return snapshotSourceHandle{
ledgerEntryGetter: params.LedgerEntryGetter,
ctx: ctx,
logger: params.Logger,
cache: &snapshotSourceCache{
entries: make(map[string]cachedLedgerEntryAndTTL),
},
}
}

func (c *snapshotSourceCache) get(key string) (cachedLedgerEntryAndTTL, bool) {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
return entry, ok
}

func (c *snapshotSourceCache) put(key string, entry cachedLedgerEntryAndTTL) {
c.mu.Lock()
defer c.mu.Unlock()
c.entries[key] = entry
}

func ledgerEntryAndTTLToC(entry cachedLedgerEntryAndTTL) C.ledger_entry_and_ttl_t {
if entry.entry == nil {
return C.ledger_entry_and_ttl_t{}
}
return C.ledger_entry_and_ttl_t{
entry: C.xdr_t{
xdr: (*C.uchar)(C.CBytes(entry.entry)),
len: C.size_t(len(entry.entry)),
},
ttl: C.int64_t(entry.ttl),
}
}

// Current base reserve is 0.5XLM (in stroops)
Expand All @@ -50,40 +99,53 @@ func SnapshotSourceGet(handle C.uintptr_t, cLedgerKey C.xdr_t) C.ledger_entry_an
panic("invalid handle type: expected snapshotSourceHandle")
}
ledgerKeyXDR := GoXDR(cLedgerKey)
return ledgerEntryAndTTLToC(h.getLedgerEntryAndTTL(ledgerKeyXDR))
}

func (h snapshotSourceHandle) getLedgerEntryAndTTL(ledgerKeyXDR []byte) cachedLedgerEntryAndTTL {
cacheKey := string(ledgerKeyXDR)
if h.cache != nil {
if cachedEntry, ok := h.cache.get(cacheKey); ok {
return cachedEntry
}
}
var ledgerKey xdr.LedgerKey
if err := xdr.SafeUnmarshal(ledgerKeyXDR, &ledgerKey); err != nil {
h.logger.WithError(err).Error("SnapshotSourceGet(): SafeUnmarshal() failed")
return C.ledger_entry_and_ttl_t{}
return cachedLedgerEntryAndTTL{}
}
entries, _, err := h.ledgerEntryGetter.GetLedgerEntries(h.ctx, []xdr.LedgerKey{ledgerKey})
if err != nil {
h.logger.WithError(err).Error("SnapshotSourceGet(): GetLedgerEntries() failed")
return C.ledger_entry_and_ttl_t{}
return cachedLedgerEntryAndTTL{}
}
if len(entries) > 1 {
h.logger.WithError(err).Error("SnapshotSourceGet(): GetLedgerEntries() returned more than one entry")
return C.ledger_entry_and_ttl_t{}
return cachedLedgerEntryAndTTL{}
}
Comment on lines 122 to 125
if len(entries) == 0 {
return C.ledger_entry_and_ttl_t{}
if h.cache != nil {
h.cache.put(cacheKey, cachedLedgerEntryAndTTL{})
}
return cachedLedgerEntryAndTTL{}
}
out, err := entries[0].Entry.MarshalBinary()
if err != nil {
h.logger.WithError(err).Error("SnapshotSourceGet(): MarshalBinary() failed")
return C.ledger_entry_and_ttl_t{}
return cachedLedgerEntryAndTTL{}
}

result := C.ledger_entry_and_ttl_t{
entry: C.xdr_t{
xdr: (*C.uchar)(C.CBytes(out)),
len: C.size_t(len(out)),
},
ttl: -1, // missing TTL
cachedEntry := cachedLedgerEntryAndTTL{
entry: out,
ttl: -1, // missing TTL
}
if entries[0].LiveUntilLedgerSeq != nil {
result.ttl = C.int64_t(*entries[0].LiveUntilLedgerSeq)
cachedEntry.ttl = int64(*entries[0].LiveUntilLedgerSeq)
}
return result
if h.cache != nil {
h.cache.put(cacheKey, cachedEntry)
}
return cachedEntry
}

func FreeGoXDR(xdr C.xdr_t) {
Expand Down Expand Up @@ -198,11 +260,7 @@ func getFootprintTTLPreflight(ctx context.Context, params Parameters) (Preflight
}
footprintCXDR := CXDR(footprintXDR)
defer FreeGoXDR(footprintCXDR)
ssh := snapshotSourceHandle{
ledgerEntryGetter: params.LedgerEntryGetter,
ctx: ctx,
logger: params.Logger,
}
ssh := newSnapshotSourceHandle(ctx, params)
handle := cgo.NewHandle(ssh)
defer handle.Delete()

Expand Down Expand Up @@ -232,11 +290,7 @@ func getInvokeHostFunctionPreflight(ctx context.Context, params Parameters) (Pre
}
sourceAccountCXDR := CXDR(sourceAccountXDR)
defer FreeGoXDR(sourceAccountCXDR)
ssh := snapshotSourceHandle{
ledgerEntryGetter: params.LedgerEntryGetter,
ctx: ctx,
logger: params.Logger,
}
ssh := newSnapshotSourceHandle(ctx, params)
handle := cgo.NewHandle(ssh)
defer handle.Delete()

Expand Down
62 changes: 62 additions & 0 deletions cmd/stellar-rpc/internal/preflight/preflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ type inMemoryLedgerEntryGetter struct {
latestLedgerSequence uint32
}

type countingLedgerEntryGetter struct {
inner inMemoryLedgerEntryGetter
calls int
}

func (m *countingLedgerEntryGetter) GetLedgerEntries(
ctx context.Context,
keys []xdr.LedgerKey,
) ([]ledgerentries.LedgerKeyAndEntry, uint32, error) {
m.calls++
return m.inner.GetLedgerEntries(ctx, keys)
}

func (m inMemoryLedgerEntryGetter) GetLedgerEntries(
_ context.Context,
keys []xdr.LedgerKey,
Expand Down Expand Up @@ -267,6 +280,55 @@ func TestGetPreflight(t *testing.T) {
}
}

func TestSnapshotSourceGetCachesLedgerEntryLookups(t *testing.T) {
inner, err := newInMemoryLedgerEntryGetter(mockLedgerEntries, latestSimulateTransactionLedgerSeq)
require.NoError(t, err)
getter := &countingLedgerEntryGetter{inner: inner}

key, err := mockLedgerEntriesWithoutTTLs[1].LedgerKey()
require.NoError(t, err)
keyXDR, err := key.MarshalBinary()
require.NoError(t, err)

handle := newSnapshotSourceHandle(t.Context(), Parameters{
LedgerEntryGetter: getter,
Logger: log.New(),
})
first := handle.getLedgerEntryAndTTL(keyXDR)
require.NotNil(t, first.entry)
require.Equal(t, int64(entryTTLValue), first.ttl)

second := handle.getLedgerEntryAndTTL(keyXDR)
require.Equal(t, first, second)
require.Equal(t, 1, getter.calls)
}

func TestSnapshotSourceGetCachesMissingLedgerEntries(t *testing.T) {
inner, err := newInMemoryLedgerEntryGetter(mockLedgerEntries, latestSimulateTransactionLedgerSeq)
require.NoError(t, err)
getter := &countingLedgerEntryGetter{inner: inner}

missingKey := xdr.LedgerKey{
Type: xdr.LedgerEntryTypeAccount,
Account: &xdr.LedgerKeyAccount{
AccountId: xdr.MustAddress("GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON"),
},
}
keyXDR, err := missingKey.MarshalBinary()
require.NoError(t, err)

handle := newSnapshotSourceHandle(t.Context(), Parameters{
LedgerEntryGetter: getter,
Logger: log.New(),
})
first := handle.getLedgerEntryAndTTL(keyXDR)
require.Nil(t, first.entry)

second := handle.getLedgerEntryAndTTL(keyXDR)
require.Nil(t, second.entry)
require.Equal(t, 1, getter.calls)
}

func TestGetPreflightDebug(t *testing.T) {
for _, protocolVersion := range supportedProtocolVersions {
t.Run(fmt.Sprintf("protocol %d", protocolVersion), func(t *testing.T) {
Expand Down