Skip to content

source-hubspot-native: improve memory usage & other misc. improvements#4202

Draft
Alex-Bair wants to merge 5 commits intomainfrom
bair/source-hubspot-native-delayed-improvements
Draft

source-hubspot-native: improve memory usage & other misc. improvements#4202
Alex-Bair wants to merge 5 commits intomainfrom
bair/source-hubspot-native-delayed-improvements

Conversation

@Alex-Bair
Copy link
Copy Markdown
Member

Description:

We've observed source-hubspot-native consistently OOM when there are millions of delayed changes to catch up on in a single fetch_delayed_changes invocation. This PR attempts to avoid those OOMs by:

  • Refactoring ID fetching from a callback-based pagination pattern to async generators, allowing fetch_changes_with_associations to process IDs in bounded chunks instead of buffering all records in memory.
  • Stopping legacy API pagination once results are at or before the since cursor, since these APIs return results newest-first.
  • Replacing the global deduplication set in fetch_search_objects with a small set tracking only IDs at the current max_updated timestamp, used for deduplication at the 10k search boundary.
  • Switching delayed streams from should_yield (which inserted into the emitted changes cache as a side effect) to a pure has_as_recent_as check, limiting cache growth.

Additionally, realtime streams now tolerate out-of-order search results by logging a warning and skipping the record instead of raising an exception.

There are quite a few changes here, and a lot of the diff is a result of changing from callbacks to async generators & the resulting interface changes. I recommend reading commit-by-commit

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)

…stream incrementally to prevent OOM

The incremental capture path (`fetch_changes_with_associations`) previously
buffered all changed object IDs in memory before fetching any full objects.
For large HubSpot instances with millions of objects modified in a short
window, this caused OOM crashes. Two functions contributed to the large
memory pressure causing the OOMs:

1. `fetch_search_objects` accumulated every (datetime, id) into a set,
sorted it, and returned the entire collection at once.
2. `fetch_changes_with_associations` called the ID fetcher in a loop,
appending all results to a list, sorting, then batch-reading.

This commit converts both functions to a streaming async generator model:

- `fetch_search_objects` now yields `(modified_time, id)` tuples as each
200-item search API page arrives, removing the set accumulator and
final sort. Deduplication at the 10k search boundary is deferred to
the downstream `emitted_changes_cache` rather than handled in-place.

- `fetch_changes_with_associations` consumes the `fetch_search_objects`
generator incrementally, filling a chunk buffer of
`batch_size * concurrent_batches` IDs. Each chunk is split into batches
and fetched concurrently via `buffer_ordered`.

- The `_FetchIdsFn` callable type `(page, count) -> (Iterable, PageCursor)`
is replaced by `FetchIdsFn` = `AsyncGenerator[(datetime, str)]`.

- Legacy API entities (`companies`, `contacts`, `deals`, `engagements`)
convert their `do_fetch` callbacks to self-contained async generators
that handle pagination internally.

- Search-only entities (`products`, `goals`, `orders`, `line_items`,
`feedback_submissions`, `custom_objects`) drop the `do_fetch` wrapper
entirely. `fetch_search_objects` is now itself an async generator.
The legacy "recently modified" APIs (`companies`, `contacts`, `deals`,
`engagements`) return results newest-first. Previously,
`fetch_changes_with_associations` handled the early-stop by setting
`next_page = None` when it saw a record at or before `since`. Now that
ID fetching is done via self-contained async generators, the generators
need to handle this themselves.

Without this, the generators would keep paginating through all available
results (up to the 9,900 cap) even after passing the `since` boundary,
wasting API calls on IDs that will be filtered out downstream. This is
safe because the newest-first ordering guarantees that once we see a
record at or before `since`, all subsequent records are also at or
before `since`.
…me streams

The HubSpot Search API occasionally returns results slightly out of
ascending order by `lastmodifieddate`. Previously, `fetch_search_objects`
raised an exception whenever it detected this, crashing the connector.

This is acceptable for the delayed stream, which must capture every
change within its time window — an ordering violation there could indicate
data integrity issues that should surface as errors. But for the realtime
stream, missing a single change is harmless: the delayed stream runs
every few minutes and will pick up anything the realtime stream missed.
Crashing the realtime stream over a minor ordering inconsistency is
worse than skipping the out-of-order record, since the crash halts all
progress until the connector restarts.

Add an `ignore_out_of_order_results` parameter to `fetch_search_objects`.
When `True`, out-of-order results are logged as a warning and skipped.
All fetch_recent_* call sites pass `True`; all fetch_delayed_*
call sites keep the default of `False`, preserving the existing exception.

Also gives `fetch_delayed_feedback_submissions` its own `fetch_search_objects`
call instead of delegating to `fetch_recent_feedback_submissions`, so the
two paths can use different `ignore_out_of_order_results` options.
…dary

When the search API hits its 10k offset limit and a cycle is detected,
`fetch_search_objects_modified_at` re-fetches all IDs at the current
timestamp, including ones already yielded by the main loop. Previously,
it was the responsibility of the caller to deduplicate records
when the 10k offset limit was reached. That requires the caller to track
_all_ ids yielded from `fetch_search_objects` to determine
which are duplicates, when really only the first 10k need tracked to
deduplicate against.

This commit adds that deduplication against the first 10k ids into the
`fetch_search_objects` function.
…ounded on delayed streams

The delayed stream was using `cache.should_yield()` which both checks and
inserts into the emitted changes cache. For high-volume accounts this could
accumulate millions of entries (one per unique ID in the 1-hour delayed
window), risking OOM. This commit switches to `cache.has_as_recent_as()`
so the delayed stream only reads the cache for deduplication against the
realtime stream without inserting its own entries. This is fine since
deduplication within the delayed stream is handled at the lower
`fetch_search_objects` level per the previous commit & `fetch_delayed_changes`
does not need to perform deduplication against records it has previously
seen.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant