Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions datastream/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import (
// names a Company JSON field; the SDK dispatches per-field merge semantics:
// maps shallow-merge, keyed collections upsert by their natural key, and
// scalar fields replace.
//
// Partials don't carry refreshed entitlements, so when credit_balances or
// metrics change without an accompanying entitlements field, we re-derive
// entitlement.credit_remaining and entitlement.usage locally to match what
// the server computes when assembling a full company message.
func PartialCompany(existing *rulesengine.Company, partialJSON json.RawMessage) (*rulesengine.Company, error) {
var fields map[string]json.RawMessage
if err := json.Unmarshal(partialJSON, &fields); err != nil {
Expand All @@ -21,6 +26,12 @@ func PartialCompany(existing *rulesengine.Company, partialJSON json.RawMessage)

merged := DeepCopyCompany(existing)

var (
creditBalanceUpdates map[string]float64
metricsUpdated bool
entitlementsInPartial bool
)

for key, raw := range fields {
switch key {
case "id":
Expand Down Expand Up @@ -54,12 +65,14 @@ func PartialCompany(existing *rulesengine.Company, partialJSON json.RawMessage)
merged.CreditBalances = make(map[string]float64, len(cb))
}
maps.Copy(merged.CreditBalances, cb)
creditBalanceUpdates = cb
case "entitlements":
var ents []*rulesengine.FeatureEntitlement
if err := json.Unmarshal(raw, &ents); err != nil {
return nil, fmt.Errorf("unmarshal field %q: %w", key, err)
}
merged.Entitlements = ents
entitlementsInPartial = true
case "keys":
var keys map[string]string
if err := json.Unmarshal(raw, &keys); err != nil {
Expand All @@ -75,6 +88,7 @@ func PartialCompany(existing *rulesengine.Company, partialJSON json.RawMessage)
return nil, fmt.Errorf("unmarshal field %q: %w", key, err)
}
merged.Metrics = upsertMetrics(merged.Metrics, incoming)
metricsUpdated = true
case "plan_ids":
var ids []string
if err := json.Unmarshal(raw, &ids); err != nil {
Expand Down Expand Up @@ -106,9 +120,77 @@ func PartialCompany(existing *rulesengine.Company, partialJSON json.RawMessage)
}
}

if !entitlementsInPartial && (creditBalanceUpdates != nil || metricsUpdated) && len(merged.Entitlements) > 0 {
merged.Entitlements = syncEntitlementDerivedFields(merged.Entitlements, creditBalanceUpdates, metricsUpdated, merged.Metrics)
}

return merged, nil
}

// syncEntitlementDerivedFields re-derives credit_remaining and usage on
// entitlements when a partial updates credit_balances or metrics without
// sending refreshed entitlements. Returns a new slice of new pointers — the
// input entitlements are not mutated.
//
// credit_remaining is matched per-entitlement against the incoming balance
// map (entitlements pointing at credits not in the partial are untouched).
// usage is matched against the merged metrics list using the full triple
// (event_name, metric_period, month_reset), defaulting period to all_time
// and month_reset to first_of_month when the entitlement leaves them unset.
//
// credit_total and credit_used are deliberately left alone — they aggregate
// across a grant ledger the SDK doesn't see.
func syncEntitlementDerivedFields(
entitlements []*rulesengine.FeatureEntitlement,
creditBalanceUpdates map[string]float64,
metricsUpdated bool,
mergedMetrics rulesengine.CompanyMetricCollection,
) []*rulesengine.FeatureEntitlement {
var metricLookup map[metricKey]int64
if metricsUpdated {
metricLookup = make(map[metricKey]int64, len(mergedMetrics))
for _, m := range mergedMetrics {
if m == nil {
continue
}
metricLookup[metricKey{m.EventSubtype, m.Period, m.MonthReset}] = m.Value
}
}

result := make([]*rulesengine.FeatureEntitlement, len(entitlements))
for i, ent := range entitlements {
if ent == nil {
continue
}
updated := *ent

if creditBalanceUpdates != nil && ent.CreditID != nil {
if balance, ok := creditBalanceUpdates[*ent.CreditID]; ok {
v := balance
updated.CreditRemaining = &v
}
}

if metricLookup != nil && ent.EventName != nil {
period := rulesengine.MetricPeriodAllTime
if ent.MetricPeriod != nil {
period = *ent.MetricPeriod
}
monthReset := rulesengine.MetricPeriodMonthResetFirst
if ent.MonthReset != nil {
monthReset = *ent.MonthReset
}
if value, ok := metricLookup[metricKey{*ent.EventName, period, monthReset}]; ok {
v := value
updated.Usage = &v
}
}

result[i] = &updated
}
return result
}

// PartialUser merges a partial JSON update into an existing User and returns
// the result. The original is not mutated.
func PartialUser(existing *rulesengine.User, partialJSON json.RawMessage) (*rulesengine.User, error) {
Expand Down
Loading
Loading