Skip to content

fix: Run Scopus contributor lookups concurrently#2533

Draft
LarsV123 wants to merge 6 commits into
mainfrom
NP-51299-scopus-import-timeout
Draft

fix: Run Scopus contributor lookups concurrently#2533
LarsV123 wants to merge 6 commits into
mainfrom
NP-51299-scopus-import-timeout

Conversation

@LarsV123

@LarsV123 LarsV123 commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

Ticket: https://sikt.atlassian.net/browse/NP-51299

The Scopus import timed out (900s) on large XML files with thousands of contributors. ParallelizeListProcessing looked parallel but ran serially: the submitted futures were kept in a lazy Stream that was never materialized before being joined, so the terminal toList() fused submit and join and processed one task at a time. With ~15k external PIA/Cristin calls that blows past the Lambda ceiling.

Materialize the futures before joining so all tasks are submitted before any is awaited, and bound real concurrency with a Semaphore instead of using the partition chunk size (which spawned ~N/4 threads, the opposite of "polite"). Remove the silent sequential fallback that reprocessed the entire list on any single job failure.

…elism

The Scopus import timed out (900s) on large XML files with thousands of
contributors. ParallelizeListProcessing looked parallel but ran serially:
the submitted futures were kept in a lazy Stream that was never materialized
before being joined, so the terminal toList() fused submit and join and
processed one task at a time. With ~15k external PIA/Cristin calls that
blows past the Lambda ceiling.

Materialize the futures before joining so all tasks are submitted before any
is awaited, and bound real concurrency with a Semaphore instead of using the
partition chunk size (which spawned ~N/4 threads, the opposite of "polite").
Remove the silent sequential fallback that reprocessed the entire list on any
single job failure.

Refs NP-51299

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@github-actions

github-actions Bot commented Jun 2, 2026

Copy link
Copy Markdown

Test Results

   319 files  ±0     319 suites  ±0   21m 18s ⏱️ -53s
 5 412 tests +7   5 401 ✅ +7  11 💤 ±0  0 ❌ ±0 
10 457 runs  +7  10 446 ✅ +7  11 💤 ±0  0 ❌ ±0 

Results for commit 335e671. ± Comparison against base commit a190400.

This pull request removes 1 and adds 8 tests. Note that renamed tests count towards both.
no.sikt.nva.scopus.paralleliseutils.ParallelizeListProcessingTest ‑ shouldHandleListWithSizeThatDividedByNumberOfThreadsHaveAremainder()
no.sikt.nva.scopus.conversion.CristinConnectionTest ‑ shouldQueryProxyOnceForRepeatedPersonLookupsAndAgainAfterCacheIsCleared(WireMockRuntimeInfo)
no.sikt.nva.scopus.conversion.CristinConnectionTest ‑ shouldRetryServerErrorsUpToMaxAttemptsWhenFetchingPerson(WireMockRuntimeInfo)
no.sikt.nva.scopus.conversion.CristinConnectionTest ‑ shouldRetryTooManyRequestsUpToMaxAttemptsWhenFetchingPerson(WireMockRuntimeInfo)
no.sikt.nva.scopus.conversion.PiaConnectionTest ‑ shouldQueryPiaOnceForRepeatedOrganizationLookupsAndAgainAfterCacheIsCleared()
no.sikt.nva.scopus.conversion.PiaConnectionTest ‑ shouldRetryServerErrorsUpToMaxAttemptsWhenFetchingOrganization()
no.sikt.nva.scopus.paralleliseutils.ParallelizeListProcessingTest ‑ shouldPreserveInputOrderWhenListSizeExceedsConcurrencyLimit()
no.sikt.nva.scopus.paralleliseutils.ParallelizeListProcessingTest ‑ shouldRethrowAsCompletionExceptionWhenCallingThreadIsInterrupted()
no.sikt.nva.scopus.paralleliseutils.ParallelizeListProcessingTest ‑ shouldRunJobsConcurrentlyUpToTheGivenLimit()

♻️ This comment has been updated with latest results.

LarsV123 and others added 5 commits June 2, 2026 15:47
Large Scopus documents repeat the same affiliation, organization and country
lookups across thousands of author groups (e.g. 3014 author groups for 1864
unique affiliation ids). Each repeat previously hit PIA/Cristin again.

Memoize the per-key lookups in PiaConnection and CristinConnection with
ConcurrentHashMap (thread-safe under the now-real concurrency). The caches are
cleared at the start of each import via ContributorExtractor, so they scope to
a single document: no cross-invocation staleness and no unbounded growth in a
warm Lambda container.

Refs NP-51299

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Replace the manual "submit each task, collect futures, then join" with
ExecutorService.invokeAll. It expresses the fan-out-then-barrier intent in one
call and structurally prevents reintroducing the lazy-stream serialization bug,
since invokeAll submits every task before returning. invokeAll declares
InterruptedException, so the interrupt is propagated as a runtime exception
after restoring the interrupt flag.

Refs NP-51299

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
PIA and Cristin calls previously threw on any non-2xx with no retry, so a
transient blip (5xx, or 429 under the now-real concurrent load) failed the
lookup and, with commit 2, that empty result got cached for the rest of the
import.

Wrap the individual HTTP send in both connections with a resilience4j Retry
(3 attempts, 100ms wait, retry on 429 and 5xx), mirroring the pattern already
used in nva-cristin-service. The retry sits inside the cached computation, so a
result is only cached once retries are exhausted.

Refs NP-51299

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ption

An interrupt during invokeAll is a failure of a concurrent computation, so
CompletionException is the precise wrapper for it. Replaces the generic
RuntimeException, which was a bad habit rather than an intentional choice.

Refs NP-51299

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant