From f3b78b4213f800c31a2b9da7d9636d416da8986d Mon Sep 17 00:00:00 2001 From: Christopher Brady Date: Wed, 20 May 2026 13:55:10 -0600 Subject: [PATCH] update entitlemetns when processing partial messages --- datastream/merge.go | 82 +++++++++++ datastream/merge_test.go | 286 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 368 insertions(+) diff --git a/datastream/merge.go b/datastream/merge.go index 6a597a3..d092dd4 100644 --- a/datastream/merge.go +++ b/datastream/merge.go @@ -13,6 +13,11 @@ import ( // names a Company JSON field; the SDK dispatches per-field merge semantics: // maps shallow-merge, keyed collections upsert by their natural key, and // scalar fields replace. +// +// Partials don't carry refreshed entitlements, so when credit_balances or +// metrics change without an accompanying entitlements field, we re-derive +// entitlement.credit_remaining and entitlement.usage locally to match what +// the server computes when assembling a full company message. func PartialCompany(existing *rulesengine.Company, partialJSON json.RawMessage) (*rulesengine.Company, error) { var fields map[string]json.RawMessage if err := json.Unmarshal(partialJSON, &fields); err != nil { @@ -21,6 +26,12 @@ func PartialCompany(existing *rulesengine.Company, partialJSON json.RawMessage) merged := DeepCopyCompany(existing) + var ( + creditBalanceUpdates map[string]float64 + metricsUpdated bool + entitlementsInPartial bool + ) + for key, raw := range fields { switch key { case "id": @@ -54,12 +65,14 @@ func PartialCompany(existing *rulesengine.Company, partialJSON json.RawMessage) merged.CreditBalances = make(map[string]float64, len(cb)) } maps.Copy(merged.CreditBalances, cb) + creditBalanceUpdates = cb case "entitlements": var ents []*rulesengine.FeatureEntitlement if err := json.Unmarshal(raw, &ents); err != nil { return nil, fmt.Errorf("unmarshal field %q: %w", key, err) } merged.Entitlements = ents + entitlementsInPartial = true case "keys": var keys map[string]string if err := json.Unmarshal(raw, &keys); err != nil { @@ -75,6 +88,7 @@ func PartialCompany(existing *rulesengine.Company, partialJSON json.RawMessage) return nil, fmt.Errorf("unmarshal field %q: %w", key, err) } merged.Metrics = upsertMetrics(merged.Metrics, incoming) + metricsUpdated = true case "plan_ids": var ids []string if err := json.Unmarshal(raw, &ids); err != nil { @@ -106,9 +120,77 @@ func PartialCompany(existing *rulesengine.Company, partialJSON json.RawMessage) } } + if !entitlementsInPartial && (creditBalanceUpdates != nil || metricsUpdated) && len(merged.Entitlements) > 0 { + merged.Entitlements = syncEntitlementDerivedFields(merged.Entitlements, creditBalanceUpdates, metricsUpdated, merged.Metrics) + } + return merged, nil } +// syncEntitlementDerivedFields re-derives credit_remaining and usage on +// entitlements when a partial updates credit_balances or metrics without +// sending refreshed entitlements. Returns a new slice of new pointers — the +// input entitlements are not mutated. +// +// credit_remaining is matched per-entitlement against the incoming balance +// map (entitlements pointing at credits not in the partial are untouched). +// usage is matched against the merged metrics list using the full triple +// (event_name, metric_period, month_reset), defaulting period to all_time +// and month_reset to first_of_month when the entitlement leaves them unset. +// +// credit_total and credit_used are deliberately left alone — they aggregate +// across a grant ledger the SDK doesn't see. +func syncEntitlementDerivedFields( + entitlements []*rulesengine.FeatureEntitlement, + creditBalanceUpdates map[string]float64, + metricsUpdated bool, + mergedMetrics rulesengine.CompanyMetricCollection, +) []*rulesengine.FeatureEntitlement { + var metricLookup map[metricKey]int64 + if metricsUpdated { + metricLookup = make(map[metricKey]int64, len(mergedMetrics)) + for _, m := range mergedMetrics { + if m == nil { + continue + } + metricLookup[metricKey{m.EventSubtype, m.Period, m.MonthReset}] = m.Value + } + } + + result := make([]*rulesengine.FeatureEntitlement, len(entitlements)) + for i, ent := range entitlements { + if ent == nil { + continue + } + updated := *ent + + if creditBalanceUpdates != nil && ent.CreditID != nil { + if balance, ok := creditBalanceUpdates[*ent.CreditID]; ok { + v := balance + updated.CreditRemaining = &v + } + } + + if metricLookup != nil && ent.EventName != nil { + period := rulesengine.MetricPeriodAllTime + if ent.MetricPeriod != nil { + period = *ent.MetricPeriod + } + monthReset := rulesengine.MetricPeriodMonthResetFirst + if ent.MonthReset != nil { + monthReset = *ent.MonthReset + } + if value, ok := metricLookup[metricKey{*ent.EventName, period, monthReset}]; ok { + v := value + updated.Usage = &v + } + } + + result[i] = &updated + } + return result +} + // PartialUser merges a partial JSON update into an existing User and returns // the result. The original is not mutated. func PartialUser(existing *rulesengine.User, partialJSON json.RawMessage) (*rulesengine.User, error) { diff --git a/datastream/merge_test.go b/datastream/merge_test.go index 7074bda..cf90e71 100644 --- a/datastream/merge_test.go +++ b/datastream/merge_test.go @@ -270,6 +270,292 @@ func TestPartialCompany_DoesNotMutateOriginal(t *testing.T) { assert.Equal(t, map[string]float64{"credit-1": 999.0, "credit-2": 50.0}, merged.CreditBalances) } +// Credit-balance partials don't include refreshed entitlements, so the SDK +// syncs credit_remaining locally to keep the two in step for consumers who +// read the cached entitlement object between full company refreshes. + +func makeEntitlement(featureID, featureKey string) *rulesengine.FeatureEntitlement { + return &rulesengine.FeatureEntitlement{FeatureID: featureID, FeatureKey: featureKey} +} + +func withCredit(ent *rulesengine.FeatureEntitlement, creditID string, remaining float64) *rulesengine.FeatureEntitlement { + id := creditID + r := remaining + ent.CreditID = &id + ent.CreditRemaining = &r + return ent +} + +func withEvent(ent *rulesengine.FeatureEntitlement, eventName string, period *rulesengine.MetricPeriod, monthReset *rulesengine.MetricPeriodMonthReset, usage int64) *rulesengine.FeatureEntitlement { + name := eventName + u := usage + ent.EventName = &name + ent.MetricPeriod = period + ent.MonthReset = monthReset + ent.Usage = &u + return ent +} + +func TestPartialCompany_SyncsCreditRemaining_MatchingCreditID(t *testing.T) { + existing := baseCompany() + existing.CreditBalances = map[string]float64{"credit-1": 100.0} + existing.Entitlements = []*rulesengine.FeatureEntitlement{ + withCredit(makeEntitlement("feat-1", "f1"), "credit-1", 100.0), + makeEntitlement("feat-2", "f2"), // no credit_id — must stay untouched + } + + partial := json.RawMessage(`{"credit_balances":{"credit-1":25.0}}`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 2) + require.NotNil(t, merged.Entitlements[0].CreditRemaining) + assert.Equal(t, 25.0, *merged.Entitlements[0].CreditRemaining) + assert.Nil(t, merged.Entitlements[1].CreditRemaining) + + // Original not mutated. + require.NotNil(t, existing.Entitlements[0].CreditRemaining) + assert.Equal(t, 100.0, *existing.Entitlements[0].CreditRemaining) +} + +func TestPartialCompany_SyncsCreditRemaining_AcrossMultipleCreditIDs(t *testing.T) { + existing := baseCompany() + existing.CreditBalances = map[string]float64{"credit-1": 100.0, "credit-2": 50.0} + existing.Entitlements = []*rulesengine.FeatureEntitlement{ + withCredit(makeEntitlement("feat-1", "f1"), "credit-1", 100.0), + withCredit(makeEntitlement("feat-2", "f2"), "credit-2", 50.0), + } + + partial := json.RawMessage(`{"credit_balances":{"credit-1":75.0,"credit-2":10.0}}`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 2) + require.NotNil(t, merged.Entitlements[0].CreditRemaining) + assert.Equal(t, 75.0, *merged.Entitlements[0].CreditRemaining) + require.NotNil(t, merged.Entitlements[1].CreditRemaining) + assert.Equal(t, 10.0, *merged.Entitlements[1].CreditRemaining) +} + +func TestPartialCompany_UnmatchedCreditID_LeftAlone(t *testing.T) { + existing := baseCompany() + existing.CreditBalances = map[string]float64{"credit-1": 100.0} + existing.Entitlements = []*rulesengine.FeatureEntitlement{ + withCredit(makeEntitlement("feat-1", "f1"), "credit-other", 999.0), + } + + // Partial only updates credit-1; entitlement points at credit-other and + // must keep its existing credit_remaining. + partial := json.RawMessage(`{"credit_balances":{"credit-1":25.0}}`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 1) + require.NotNil(t, merged.Entitlements[0].CreditRemaining) + assert.Equal(t, 999.0, *merged.Entitlements[0].CreditRemaining) +} + +func TestPartialCompany_SingleCreditFansOutToMultipleEntitlements(t *testing.T) { + // Common when one credit type funds multiple features — each gets its + // own entitlement sharing the same credit_id. + existing := baseCompany() + existing.CreditBalances = map[string]float64{"credit-shared": 500.0} + existing.Entitlements = []*rulesengine.FeatureEntitlement{ + withCredit(makeEntitlement("feat-a", "feature-a"), "credit-shared", 500.0), + withCredit(makeEntitlement("feat-b", "feature-b"), "credit-shared", 500.0), + withCredit(makeEntitlement("feat-c", "feature-c"), "credit-shared", 500.0), + makeEntitlement("feat-d", "feature-d"), // unrelated, no credit + } + + partial := json.RawMessage(`{"credit_balances":{"credit-shared":120.0}}`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 4) + for i := range 3 { + require.NotNil(t, merged.Entitlements[i].CreditRemaining, "entitlement %d", i) + assert.Equal(t, 120.0, *merged.Entitlements[i].CreditRemaining, "entitlement %d", i) + } + assert.Nil(t, merged.Entitlements[3].CreditRemaining) +} + +func TestPartialCompany_SyncSkippedWhenPartialAlsoSendsEntitlements(t *testing.T) { + // If the partial carries entitlements, we trust those wholesale and don't + // re-derive credit_remaining from credit_balances. + existing := baseCompany() + existing.CreditBalances = map[string]float64{"credit-1": 100.0} + existing.Entitlements = []*rulesengine.FeatureEntitlement{ + withCredit(makeEntitlement("feat-1", "f1"), "credit-1", 100.0), + } + + partial := json.RawMessage(`{ + "credit_balances":{"credit-1":25.0}, + "entitlements":[{"feature_id":"feat-1","feature_key":"f1","value_type":"boolean","credit_id":"credit-1","credit_remaining":17.0}] + }`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 1) + require.NotNil(t, merged.Entitlements[0].CreditRemaining) + assert.Equal(t, 17.0, *merged.Entitlements[0].CreditRemaining) +} + +func TestPartialCompany_SyncNoOpWhenNoEntitlementsExist(t *testing.T) { + existing := baseCompany() + existing.Entitlements = nil + + partial := json.RawMessage(`{"credit_balances":{"credit-1":25.0}}`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + assert.Equal(t, 25.0, merged.CreditBalances["credit-1"]) + assert.Empty(t, merged.Entitlements) +} + +func TestPartialCompany_SyncsUsage_EventBasedEntitlement(t *testing.T) { + existing := baseCompany() + currentMonth := rulesengine.MetricPeriodCurrentMonth + firstOfMonth := rulesengine.MetricPeriodMonthResetFirst + existing.Metrics = rulesengine.CompanyMetricCollection{ + {EventSubtype: "credits_used", Period: currentMonth, MonthReset: firstOfMonth, Value: 10}, + } + existing.Entitlements = []*rulesengine.FeatureEntitlement{ + withEvent(makeEntitlement("feat-1", "f1"), "credits_used", ¤tMonth, &firstOfMonth, 10), + } + + partial := json.RawMessage(`{"metrics":[{"event_subtype":"credits_used","period":"current_month","month_reset":"first_of_month","value":42}]}`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 1) + require.NotNil(t, merged.Entitlements[0].Usage) + assert.EqualValues(t, 42, *merged.Entitlements[0].Usage) +} + +func TestPartialCompany_UsageMatchRequiresFullTriple(t *testing.T) { + // The server matches metrics to entitlements on the full triple + // (event_subtype, period, month_reset). A metric with a different period + // must not satisfy an entitlement's lookup. + existing := baseCompany() + allTime := rulesengine.MetricPeriodAllTime + currentMonth := rulesengine.MetricPeriodCurrentMonth + firstOfMonth := rulesengine.MetricPeriodMonthResetFirst + existing.Metrics = rulesengine.CompanyMetricCollection{ + {EventSubtype: "api_calls", Period: allTime, MonthReset: firstOfMonth, Value: 100}, + } + existing.Entitlements = []*rulesengine.FeatureEntitlement{ + withEvent(makeEntitlement("feat-1", "f1"), "api_calls", ¤tMonth, &firstOfMonth, 5), + } + + // Partial updates the all_time metric. Entitlement points at current_month + // so its usage must NOT change. + partial := json.RawMessage(`{"metrics":[{"event_subtype":"api_calls","period":"all_time","month_reset":"first_of_month","value":999}]}`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 1) + require.NotNil(t, merged.Entitlements[0].Usage) + assert.EqualValues(t, 5, *merged.Entitlements[0].Usage) +} + +func TestPartialCompany_UsageDefaultsToAllTimeFirstOfMonth(t *testing.T) { + // When entitlement period/month_reset are nil, the server's metric lookup + // defaults to all_time / first_of_month — the sync must do the same. + existing := baseCompany() + allTime := rulesengine.MetricPeriodAllTime + firstOfMonth := rulesengine.MetricPeriodMonthResetFirst + existing.Metrics = rulesengine.CompanyMetricCollection{ + {EventSubtype: "api_calls", Period: allTime, MonthReset: firstOfMonth, Value: 0}, + } + existing.Entitlements = []*rulesengine.FeatureEntitlement{ + withEvent(makeEntitlement("feat-1", "f1"), "api_calls", nil, nil, 0), + } + + partial := json.RawMessage(`{"metrics":[{"event_subtype":"api_calls","period":"all_time","month_reset":"first_of_month","value":7}]}`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 1) + require.NotNil(t, merged.Entitlements[0].Usage) + assert.EqualValues(t, 7, *merged.Entitlements[0].Usage) +} + +func TestPartialCompany_UsageUnchangedWhenNoMatchingMetricInPartial(t *testing.T) { + // If the partial's metric upsert leaves the entitlement's metric untouched + // (different event_subtype), keep the existing usage. The sync re-derives + // from the merged metrics list, which still holds event-a at value 50. + existing := baseCompany() + allTime := rulesengine.MetricPeriodAllTime + firstOfMonth := rulesengine.MetricPeriodMonthResetFirst + existing.Metrics = rulesengine.CompanyMetricCollection{ + {EventSubtype: "event-a", Period: allTime, MonthReset: firstOfMonth, Value: 50}, + } + existing.Entitlements = []*rulesengine.FeatureEntitlement{ + withEvent(makeEntitlement("feat-1", "f1"), "event-a", &allTime, &firstOfMonth, 50), + } + + partial := json.RawMessage(`{"metrics":[{"event_subtype":"event-b","period":"all_time","month_reset":"first_of_month","value":999}]}`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 1) + require.NotNil(t, merged.Entitlements[0].Usage) + assert.EqualValues(t, 50, *merged.Entitlements[0].Usage) +} + +func TestPartialCompany_SyncsCreditRemainingAndUsageTogether(t *testing.T) { + // A partial can carry both credit_balances and metrics changes; both + // derived fields must be applied in the same entitlements rebuild. + existing := baseCompany() + allTime := rulesengine.MetricPeriodAllTime + firstOfMonth := rulesengine.MetricPeriodMonthResetFirst + existing.CreditBalances = map[string]float64{"credit-1": 100.0} + existing.Metrics = rulesengine.CompanyMetricCollection{ + {EventSubtype: "event-a", Period: allTime, MonthReset: firstOfMonth, Value: 5}, + } + ent := withCredit(makeEntitlement("feat-1", "f1"), "credit-1", 100.0) + ent = withEvent(ent, "event-a", &allTime, &firstOfMonth, 5) + existing.Entitlements = []*rulesengine.FeatureEntitlement{ent} + + partial := json.RawMessage(`{ + "credit_balances":{"credit-1":25.0}, + "metrics":[{"event_subtype":"event-a","period":"all_time","month_reset":"first_of_month","value":80}] + }`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 1) + require.NotNil(t, merged.Entitlements[0].CreditRemaining) + assert.Equal(t, 25.0, *merged.Entitlements[0].CreditRemaining) + require.NotNil(t, merged.Entitlements[0].Usage) + assert.EqualValues(t, 80, *merged.Entitlements[0].Usage) +} + +func TestPartialCompany_CreditTotalAndUsedNotDerivedFromPartial(t *testing.T) { + // credit_total and credit_used aggregate across a grant ledger the SDK + // doesn't see. They must keep their last full-message value across + // credit_balances partials; grant-lifecycle events trigger a full + // company message that refreshes them. + existing := baseCompany() + existing.CreditBalances = map[string]float64{"credit-1": 100.0} + total := 200.0 + used := 100.0 + ent := withCredit(makeEntitlement("feat-1", "f1"), "credit-1", 100.0) + ent.CreditTotal = &total + ent.CreditUsed = &used + existing.Entitlements = []*rulesengine.FeatureEntitlement{ent} + + partial := json.RawMessage(`{"credit_balances":{"credit-1":25.0}}`) + merged, err := PartialCompany(existing, partial) + require.NoError(t, err) + + require.Len(t, merged.Entitlements, 1) + require.NotNil(t, merged.Entitlements[0].CreditTotal) + assert.Equal(t, 200.0, *merged.Entitlements[0].CreditTotal) + require.NotNil(t, merged.Entitlements[0].CreditUsed) + assert.Equal(t, 100.0, *merged.Entitlements[0].CreditUsed) +} + func TestPartialUser_ReplacesTraits(t *testing.T) { existing := baseUser() partial := json.RawMessage(`{"traits":[{"value":"Free","trait_definition":{"id":"tier"}}]}`)