From ccbe89374ed7daf4ddf9754ecb05f791cd1d2b0d Mon Sep 17 00:00:00 2001 From: Eren Atas Date: Tue, 14 Apr 2026 19:24:21 +0200 Subject: [PATCH] fix(store): deduplicate flags across flagSetIds in incremental update mode Add a membership table to track which flagSetIds reference which flags, enabling per-flagSetId queries without storing duplicate flag entries. Flags are stored once per (key, source) in the flags table while the membership table records (flagSetId, key, source) triples. Key behaviors: - Get iterates all membership entries for a (flagSetId, key) and selects the highest-priority flag, consistent with GetAll/collectViaMembership. - updateIncremental upserts flag content when a (key, source) already exists at equal or lower priority, so content changes (targeting rules, variants) are not silently dropped. - Metadata always includes flagSetId even when the original metadata is nil, ensuring consistent serialization for clients. - Watch registers scoped per-(key, source) watch channels from memdb instead of watching the entire flags table, avoiding O(N_watchers x N_updates) spurious wake-ups. Signed-off-by: Eren Atas --- core/pkg/store/query.go | 13 ++ core/pkg/store/store.go | 383 +++++++++++++++++++++++++++++++---- core/pkg/store/store_test.go | 68 +++++++ 3 files changed, 424 insertions(+), 40 deletions(-) diff --git a/core/pkg/store/query.go b/core/pkg/store/query.go index 478440a8b..e16de7dad 100644 --- a/core/pkg/store/query.go +++ b/core/pkg/store/query.go @@ -24,6 +24,10 @@ const flagSetIdSourceCompoundIndex = flagSetIdIndex + "+" + sourceIndex const keySourceCompoundIndex = keyIndex + "+" + sourceIndex const flagSetIdKeySourceCompoundIndex = flagSetIdIndex + "+" + keyIndex + "+" + sourceIndex +// membership table for incremental update deduplication +const membershipTable = "membership" +const membershipFlagSetIdKeyIndex = flagSetIdIndex + "+" + keyIndex + // flagSetId defaults to a UUID generated at startup to make our queries consistent // any flag without a "flagSetId" is assigned this one; it's never exposed externally var nilFlagSetId = uuid.New().String() @@ -83,6 +87,15 @@ func (s *Selector) IsEmpty() bool { return s == nil || len(s.indexMap) == 0 } +// HasFlagSetId returns the flagSetId value and true if the selector includes a flagSetId constraint. +func (s *Selector) HasFlagSetId() (string, bool) { + if s == nil || s.indexMap == nil { + return "", false + } + v, ok := s.indexMap[flagSetIdIndex] + return v, ok && v != "" +} + // ToQuery converts the Selector map to an indexId and constraints for querying the Store. // For a given index, a specific order and number of constraints are required. // Both the indexId and constraints are generated based on the keys present in the selector's internal map. diff --git a/core/pkg/store/store.go b/core/pkg/store/store.go index 48b80b107..e67b8f274 100644 --- a/core/pkg/store/store.go +++ b/core/pkg/store/store.go @@ -6,6 +6,7 @@ import ( "fmt" "slices" "sort" + "sync/atomic" "github.com/hashicorp/go-memdb" "github.com/open-feature/flagd/core/pkg/logger" @@ -21,6 +22,14 @@ type FlagQueryResult struct { Flags []model.Flag } +// flagSetMembership tracks which (flagSetId, source) combinations include a given flag key. +// Used by the membership table to deduplicate flags across flagSetIds during incremental updates. +type flagSetMembership struct { + FlagSetId string + Key string + Source string +} + type IStore interface { Get(ctx context.Context, key string, selector *Selector) (model.Flag, model.Metadata, error) GetAll(ctx context.Context, selector *Selector) ([]model.Flag, model.Metadata, error) @@ -36,6 +45,10 @@ type Store struct { sources []string // deprecated: has no effect and will be removed soon. FlagSources []string + // hasMembership is set to true after the first incremental update. + // When false, Get/GetAll/Watch skip membership resolution entirely, + // ensuring zero behavioral change for non-incremental callers. + hasMembership atomic.Bool } // NewStore creates a new in-memory store with the given sources. @@ -117,6 +130,62 @@ func NewStore(logger *logger.Logger, sources []string) (*Store, error) { }, }, }, + membershipTable: { + Name: membershipTable, + Indexes: map[string]*memdb.IndexSchema{ + idIndex: { + Name: idIndex, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "FlagSetId", Lowercase: false}, + &memdb.StringFieldIndex{Field: "Key", Lowercase: false}, + &memdb.StringFieldIndex{Field: "Source", Lowercase: false}, + }, + }, + }, + flagSetIdIndex: { + Name: flagSetIdIndex, + Unique: false, + Indexer: &memdb.StringFieldIndex{Field: "FlagSetId", Lowercase: false}, + }, + membershipFlagSetIdKeyIndex: { + Name: membershipFlagSetIdKeyIndex, + Unique: false, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "FlagSetId", Lowercase: false}, + &memdb.StringFieldIndex{Field: "Key", Lowercase: false}, + }, + }, + }, + flagSetIdSourceCompoundIndex: { + Name: flagSetIdSourceCompoundIndex, + Unique: false, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "FlagSetId", Lowercase: false}, + &memdb.StringFieldIndex{Field: "Source", Lowercase: false}, + }, + }, + }, + keySourceCompoundIndex: { + Name: keySourceCompoundIndex, + Unique: false, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "Key", Lowercase: false}, + &memdb.StringFieldIndex{Field: "Source", Lowercase: false}, + }, + }, + }, + sourceIndex: { + Name: sourceIndex, + Unique: false, + Indexer: &memdb.StringFieldIndex{Field: "Source", Lowercase: false}, + }, + }, + }, }, } @@ -157,14 +226,55 @@ func (s *Store) Get(_ context.Context, key string, selector *Selector) (model.Fl indexId, constraints := selector.ToQuery() s.logger.Debug(fmt.Sprintf("getting flag with query: %s, %v", indexId, constraints)) raw, err := txn.First(flagsTable, indexId, constraints...) - flag, ok := raw.(model.Flag) if err != nil { return model.Flag{}, queryMeta, fmt.Errorf("flag %s not found: %w", key, err) } - if !ok { - return model.Flag{}, queryMeta, fmt.Errorf("flag %s is not a valid flag", key) + flag, ok := raw.(model.Flag) + if ok { + return flag, queryMeta, nil + } + + // Flag not found directly — try membership resolution for flagSetId queries. + // With dedup, flags may be stored under a different flagSetId. + if s.hasMembership.Load() { + if flagSetId, hasFSI := selector.HasFlagSetId(); hasFSI { + memberIt, mErr := txn.Get(membershipTable, membershipFlagSetIdKeyIndex, flagSetId, key) + if mErr == nil { + var best model.Flag + found := false + for mRaw := memberIt.Next(); mRaw != nil; mRaw = memberIt.Next() { + m := mRaw.(flagSetMembership) + flagIt, fErr := txn.Get(flagsTable, keySourceCompoundIndex, m.Key, m.Source) + if fErr != nil { + continue + } + if fRaw := flagIt.Next(); fRaw != nil { + candidate := fRaw.(model.Flag) + if !found || candidate.Priority >= best.Priority { + best = candidate + found = true + } + } + } + if found { + best.FlagSetId = flagSetId + if best.Metadata == nil { + best.Metadata = make(model.Metadata) + } else { + patched := make(model.Metadata, len(best.Metadata)) + for k, v := range best.Metadata { + patched[k] = v + } + best.Metadata = patched + } + best.Metadata["flagSetId"] = flagSetId + return best, queryMeta, nil + } + } + } } - return flag, queryMeta, nil + + return model.Flag{}, queryMeta, fmt.Errorf("flag %s is not a valid flag", key) } // otherwise, get all flags with the given key, and keep the last one with the highest priority @@ -198,6 +308,18 @@ func (s *Store) Get(_ context.Context, key string, selector *Selector) (model.Fl func (s *Store) GetAll(ctx context.Context, selector *Selector) ([]model.Flag, model.Metadata, error) { var flags []model.Flag queryMeta := selector.ToMetadata() + + // For flagSetId selectors, try membership resolution first + if s.hasMembership.Load() { + if flagSetId, hasFSI := selector.HasFlagSetId(); hasFSI { + txn := s.db.Txn(false) + if flags := s.collectViaMembership(txn, flagSetId, nil); flags != nil { + return flags, queryMeta, nil + } + } + } + + // Fall back to direct flags table query it, err := s.selectOrAll(selector) if err != nil { @@ -216,8 +338,9 @@ type flagIdentifier struct { // Update the flag state with the provided flags. // When incrementalUpdate is true, deletion is scoped to only the flagSetIds present in // this payload (from metadata and flag-level overrides), allowing flags from other -// flagSetIds to accumulate across updates. When false, all flags for the source are -// replaced (the default full-snapshot behavior). +// flagSetIds to accumulate across updates. Flags are deduplicated by (key, source) so +// that identical flags shared across flagSetIds are stored only once. +// When false, all flags for the source are replaced (the default full-snapshot behavior). // EXPERIMENTAL: incrementalUpdate support may change or be removed in a future release. func (s *Store) Update( source string, @@ -260,40 +383,139 @@ func (s *Store) Update( txn := s.db.Txn(true) defer txn.Abort() - // When incrementalUpdate is enabled, scope deletion to only the flagSetIds touched - // by this payload (metadata-level + flag-level overrides). This allows per-flagSetId - // updates (e.g., from per-project stream messages) to accumulate without deleting - // flags from unrelated flagSetIds. Otherwise, replace all flags for the source. - var oldFlags []model.Flag if incrementalUpdate { - seenFlagSetIds := make(map[string]struct{}) - if fsi, ok := metadata["flagSetId"].(string); ok && fsi != "" { - seenFlagSetIds[fsi] = struct{}{} + s.updateIncremental(txn, source, newFlags, metadata) + } else { + s.updateFullSnapshot(txn, source, newFlags) + } + + txn.Commit() +} + +// updateIncremental handles membership-aware incremental updates with deduplication. +// Flags are stored once per (key, source) in the flags table. A lightweight membership table +// tracks which flagSetIds include which keys, enabling per-flagSetId queries without duplication. +func (s *Store) updateIncremental(txn *memdb.Txn, source string, newFlags map[flagIdentifier]model.Flag, metadata model.Metadata) { + s.hasMembership.Store(true) + + // Step 1: Determine flagSetIds touched by this payload (from metadata + flag-level overrides) + seenFlagSetIds := make(map[string]struct{}) + if fsi, ok := metadata["flagSetId"].(string); ok && fsi != "" { + seenFlagSetIds[fsi] = struct{}{} + } + for id := range newFlags { + seenFlagSetIds[id.flagSetId] = struct{}{} + } + + // Step 2: Collect old membership entries for each touched flagSetId+source + oldMembership := make(map[flagIdentifier]struct{}) + for fsi := range seenFlagSetIds { + it, err := txn.Get(membershipTable, flagSetIdSourceCompoundIndex, fsi, source) + if err != nil { + s.logger.Error(fmt.Sprintf("unable to query membership for flagSetId %s: %v", fsi, err)) + continue + } + for raw := it.Next(); raw != nil; raw = it.Next() { + m := raw.(flagSetMembership) + oldMembership[flagIdentifier{flagSetId: m.FlagSetId, key: m.Key}] = struct{}{} + } + } + + // Step 3: Build new membership set + newMembership := make(map[flagIdentifier]struct{}, len(newFlags)) + for id := range newFlags { + newMembership[id] = struct{}{} + } + + // Step 4: Delete stale membership entries and orphaned flags + for oldId := range oldMembership { + if _, ok := newMembership[oldId]; ok { + continue // still present } - for id := range newFlags { - seenFlagSetIds[id.flagSetId] = struct{}{} + // Remove stale membership entry + if _, err := txn.DeleteAll(membershipTable, idIndex, oldId.flagSetId, oldId.key, source); err != nil { + s.logger.Error(fmt.Sprintf("error deleting membership: flagSetId=%s key=%s: %v", oldId.flagSetId, oldId.key, err)) } - for fsi := range seenFlagSetIds { - sel := NewSelector(flagSetIdIndex+"="+fsi).WithIndex(sourceIndex, source) - indexId, constraints := sel.ToQuery() - it, err := txn.Get(flagsTable, indexId, constraints...) + // Check if any other flagSetId still references this key+source + refIt, err := txn.Get(membershipTable, keySourceCompoundIndex, oldId.key, source) + if err != nil { + s.logger.Error(fmt.Sprintf("error checking membership refs for key %s: %v", oldId.key, err)) + continue + } + if refIt.Next() == nil { + // No more references — delete the flag from the flags table + count, err := txn.DeleteAll(flagsTable, keySourceCompoundIndex, oldId.key, source) if err != nil { - s.logger.Error(fmt.Sprintf("unable to query flags for flagSetId %s: %v", fsi, err)) - return + s.logger.Error(fmt.Sprintf("error deleting orphaned flag %s: %v", oldId.key, err)) + } else { + s.logger.Debug(fmt.Sprintf("deleted %d orphaned flag(s) with key '%s' from source '%s'", count, oldId.key, source)) } - oldFlags = append(oldFlags, s.collect(it)...) } - } else { - sel := NewSelector(sourceIndex + "=" + source) - indexId, constraints := sel.ToQuery() - it, err := txn.Get(flagsTable, indexId, constraints...) + } + + // Step 5: Insert/update membership entries and deduplicate flags + for id, newFlag := range newFlags { + // Upsert membership entry + if err := txn.Insert(membershipTable, flagSetMembership{ + FlagSetId: id.flagSetId, + Key: id.key, + Source: source, + }); err != nil { + s.logger.Error(fmt.Sprintf("unable to insert membership for flagSetId=%s key=%s: %v", id.flagSetId, id.key, err)) + continue + } + + // Dedup: check if a flag with the same key+source already exists (from any flagSetId) + existingIt, err := txn.Get(flagsTable, keySourceCompoundIndex, newFlag.Key, source) if err != nil { - s.logger.Error(fmt.Sprintf("unable to query flags for source %s: %v", source, err)) - return + s.logger.Error(fmt.Sprintf("unable to check existing flag %s: %v", newFlag.Key, err)) + continue + } + existing := existingIt.Next() + if existing != nil { + existingFlag := existing.(model.Flag) + if existingFlag.Priority > newFlag.Priority { + // Higher priority source already owns this flag, skip + s.logger.Debug(fmt.Sprintf("flag '%s' owned by higher priority source, skipping", newFlag.Key)) + continue + } + // Flag already exists at same or lower priority — update in place to pick up content changes. + // Preserve the existing entry's FlagSetId so the upsert overwrites the canonical row. + newFlag.FlagSetId = existingFlag.FlagSetId + s.logger.Debug(fmt.Sprintf("updating existing flag '%s' (canonical flagSetId: %s) for flagSetId '%s'", newFlag.Key, existingFlag.FlagSetId, id.flagSetId)) + if err := txn.Insert(flagsTable, newFlag); err != nil { + s.logger.Error(fmt.Sprintf("unable to update existing flag %s: %v", newFlag.Key, err)) + } + continue } + + // New flag — insert into flags table + s.logger.Debug(fmt.Sprintf("storing flag: %s (flagSetId: %s)", newFlag.Key, id.flagSetId)) + if err := txn.Insert(flagsTable, newFlag); err != nil { + s.logger.Error(fmt.Sprintf("unable to insert flag %s: %v", newFlag.Key, err)) + } + } +} + +// updateFullSnapshot replaces all flags for the source (non-incremental mode). +func (s *Store) updateFullSnapshot(txn *memdb.Txn, source string, newFlags map[flagIdentifier]model.Flag) { + // Clean up any membership entries for this source (in case a previous incremental update left them) + if _, err := txn.DeleteAll(membershipTable, sourceIndex, source); err != nil { + s.logger.Error(fmt.Sprintf("error cleaning membership for source %s: %v", source, err)) + } + + // Get all existing flags for this source + sel := NewSelector(sourceIndex + "=" + source) + indexId, constraints := sel.ToQuery() + it, err := txn.Get(flagsTable, indexId, constraints...) + var oldFlags []model.Flag + if err != nil { + s.logger.Error(fmt.Sprintf("unable to query flags for source %s: %v", source, err)) + } else { oldFlags = s.collect(it) } + // Delete flags not in the new set for _, oldFlag := range oldFlags { if _, ok := newFlags[flagIdentifier{flagSetId: oldFlag.FlagSetId, key: oldFlag.Key}]; !ok { // flag has been deleted @@ -326,7 +548,7 @@ func (s *Store) Update( if ok { if oldFlag.Priority > newFlag.Priority { // if the old flag has a higher prio, we should not try to write it - s.logger.Error(fmt.Sprintf("unable to delete flags with key %s and flagSetId %s: %v", oldFlag.Key, oldFlag.FlagSetId, err)) + s.logger.Error(fmt.Sprintf("unable to update flags with key %s and flagSetId %s: higher priority exists", oldFlag.Key, oldFlag.FlagSetId)) continue } } @@ -340,8 +562,6 @@ func (s *Store) Update( continue } } - - txn.Commit() } // Watch the result-set of a selector for changes, sending updates to the watcher channel. @@ -349,21 +569,41 @@ func (s *Store) Watch(ctx context.Context, selector *Selector, watcher chan<- Fl go func() { for { ws := memdb.NewWatchSet() - it, err := s.selectOrAll(selector) - if err != nil { - s.logger.WithFields(zap.String("selector", selector.ToLogString()), zap.Error(err)).Error("error getting flags") - close(watcher) - return + txn := s.db.Txn(false) + + var flags []model.Flag + + // For flagSetId selectors, always watch the membership table so we + // detect new membership additions even when starting with zero entries. + if s.hasMembership.Load() { + if flagSetId, hasFSI := selector.HasFlagSetId(); hasFSI { + memberIt, err := txn.Get(membershipTable, flagSetIdIndex, flagSetId) + if err == nil { + ws.Add(memberIt.WatchCh()) + } + if membershipFlags := s.collectViaMembership(txn, flagSetId, &ws); membershipFlags != nil { + flags = membershipFlags + } + } } - ws.Add(it.WatchCh()) - flags := s.collect(it) + // Fall back to direct flags table query if membership didn't produce results + if flags == nil { + it, err := s.selectOrAllWithTxn(txn, selector) + if err != nil { + s.logger.WithFields(zap.String("selector", selector.ToLogString()), zap.Error(err)).Error("error getting flags") + close(watcher) + return + } + ws.Add(it.WatchCh()) + flags = s.collect(it) + } watcher <- FlagQueryResult{ Flags: flags, } - if err = ws.WatchCtx(ctx); err != nil { + if err := ws.WatchCtx(ctx); err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { s.logger.WithFields(zap.String("selector", selector.ToLogString()), zap.Error(err)).Debug("context cancellation while watching flags") } else { @@ -379,6 +619,10 @@ func (s *Store) Watch(ctx context.Context, selector *Selector, watcher chan<- Fl // returns an iterator for the given selector, or all flags if the selector is nil or empty func (s *Store) selectOrAll(selector *Selector) (it memdb.ResultIterator, err error) { txn := s.db.Txn(false) + return s.selectOrAllWithTxn(txn, selector) +} + +func (s *Store) selectOrAllWithTxn(txn *memdb.Txn, selector *Selector) (it memdb.ResultIterator, err error) { if !selector.IsEmpty() { indexId, constraints := selector.ToQuery() s.logger.Debug(fmt.Sprintf("getting all flags with query: %s, %v", indexId, constraints)) @@ -389,6 +633,65 @@ func (s *Store) selectOrAll(selector *Selector) (it memdb.ResultIterator, err er } } +// collectViaMembership resolves flags for a flagSetId through the membership table. +// Each membership entry maps to a canonical flag stored in the flags table by key+source. +// If ws is non-nil, scoped watches are added for each (key, source) lookup so that only +// changes to flags relevant to this flagSetId trigger the watch. +func (s *Store) collectViaMembership(txn *memdb.Txn, flagSetId string, ws *memdb.WatchSet) []model.Flag { + memberIt, err := txn.Get(membershipTable, flagSetIdIndex, flagSetId) + if err != nil { + s.logger.Error(fmt.Sprintf("error querying membership for flagSetId %s: %v", flagSetId, err)) + return nil + } + + flags := make(map[string]model.Flag) // key -> flag (dedup by key, keep highest priority) + hasMembership := false + for raw := memberIt.Next(); raw != nil; raw = memberIt.Next() { + hasMembership = true + m := raw.(flagSetMembership) + flagIt, fErr := txn.Get(flagsTable, keySourceCompoundIndex, m.Key, m.Source) + if fErr != nil { + continue + } + if ws != nil { + ws.Add(flagIt.WatchCh()) + } + for fRaw := flagIt.Next(); fRaw != nil; fRaw = flagIt.Next() { + flag := fRaw.(model.Flag) + flag.FlagSetId = flagSetId // patch to match the queried flagSetId + if flag.Metadata == nil { + flag.Metadata = make(model.Metadata) + } else { + patched := make(model.Metadata, len(flag.Metadata)) + for k, v := range flag.Metadata { + patched[k] = v + } + flag.Metadata = patched + } + flag.Metadata["flagSetId"] = flagSetId + if existing, ok := flags[flag.Key]; ok { + if flag.Priority < existing.Priority { + continue + } + } + flags[flag.Key] = flag + } + } + + if !hasMembership { + return nil // signal to caller that no membership exists (fall back to direct query) + } + + result := make([]model.Flag, 0, len(flags)) + for _, f := range flags { + result = append(result, f) + } + sort.Slice(result, func(i, j int) bool { + return result[i].Key < result[j].Key + }) + return result +} + // collects flags from an iterator, ensuring that only the highest priority flag is kept when there are duplicates func (s *Store) collect(it memdb.ResultIterator) []model.Flag { flags := make(map[flagIdentifier]model.Flag) diff --git a/core/pkg/store/store_test.go b/core/pkg/store/store_test.go index a729c4e49..1cc034e84 100644 --- a/core/pkg/store/store_test.go +++ b/core/pkg/store/store_test.go @@ -721,6 +721,74 @@ func TestUpdateFlagSetIdScoping(t *testing.T) { } } +func TestGetMembershipResolvesHighestPriority(t *testing.T) { + t.Parallel() + + // Two sources: srcLow (priority 0) and srcHigh (priority 1). + // Both register a membership entry for the same (flagSetId, key). + // Get should return the flag from srcHigh (higher priority index). + srcLow := "srcLow" + srcHigh := "srcHigh" + sources := []string{srcLow, srcHigh} + + s, err := NewStore(logger.NewLogger(nil, false), sources) + require.NoError(t, err) + + // srcLow provides flag "shared" under flagSetId "A" + s.Update(srcLow, []model.Flag{ + {Key: "shared", DefaultVariant: "low"}, + }, model.Metadata{"flagSetId": "A"}, true) + + // srcHigh provides the same flag "shared" under flagSetId "A" + s.Update(srcHigh, []model.Flag{ + {Key: "shared", DefaultVariant: "high"}, + }, model.Metadata{"flagSetId": "A"}, true) + + // Get via flagSetId selector should resolve through membership + sel := NewSelector("flagSetId=A") + sel = sel.WithIndex("key", "shared") + got, _, err := s.Get(context.Background(), "shared", &sel) + require.NoError(t, err) + assert.Equal(t, "high", got.DefaultVariant, "Get should return the flag from the highest priority source") + + // GetAll should also return the high-priority flag + selAll := NewSelector("flagSetId=A") + allFlags, _, err := s.GetAll(context.Background(), &selAll) + require.NoError(t, err) + require.Len(t, allFlags, 1) + assert.Equal(t, "high", allFlags[0].DefaultVariant, "GetAll should return the flag from the highest priority source") +} + +func TestIncrementalUpdateRefreshesFlagContent(t *testing.T) { + t.Parallel() + + const src = "src1" + sources := []string{src} + + s, err := NewStore(logger.NewLogger(nil, false), sources) + require.NoError(t, err) + + // Initial delivery: flag "toggle" with defaultVariant "off" + s.Update(src, []model.Flag{ + {Key: "toggle", DefaultVariant: "off"}, + }, model.Metadata{"flagSetId": "A"}, true) + + sel := NewSelector("flagSetId=A") + sel = sel.WithIndex("key", "toggle") + got, _, err := s.Get(context.Background(), "toggle", &sel) + require.NoError(t, err) + assert.Equal(t, "off", got.DefaultVariant) + + // Second delivery: same key, updated content + s.Update(src, []model.Flag{ + {Key: "toggle", DefaultVariant: "on"}, + }, model.Metadata{"flagSetId": "A"}, true) + + got, _, err = s.Get(context.Background(), "toggle", &sel) + require.NoError(t, err) + assert.Equal(t, "on", got.DefaultVariant, "incremental update should refresh flag content") +} + func TestToLogStringCompound(t *testing.T) { t.Parallel()