From ff685170df269520f7c309f7a8c0d91a599d8b2f Mon Sep 17 00:00:00 2001 From: Dominic Byrd-McDevitt Date: Sun, 19 Apr 2026 11:47:34 -0400 Subject: [PATCH 1/2] Add Akka circuit breaker for ElasticSearch calls After maxFailures consecutive call timeouts, the breaker opens and immediately returns ElasticSearchResponseFailure for resetTimeout, giving ES breathing room to recover without continued request pressure. All three parameters are overrideable via env vars for production tuning. Resurrects dpla/api#34 (shelved 2022 for performance testing), ported to the current dpla.api.v2 package structure. Co-Authored-By: Claude Sonnet 4.6 --- src/main/resources/application.conf | 11 ++++++++ .../search/ElasticSearchResponseHandler.scala | 25 +++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index ddbe4ba3..310e6e2a 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -63,6 +63,17 @@ elasticSearch { itemsUrl = ${?ITEM_ELASTICSEARCH_URL} pssUrl = "" pssUrl = ${?PSS_ELASTICSEARCH_URL} + circuitBreaker { + # Trip after this many consecutive call timeouts + maxFailures = 3 + maxFailures = ${?ES_CIRCUIT_BREAKER_MAX_FAILURES} + # How long a single ES call may take before counting as a failure + callTimeout = 10s + callTimeout = ${?ES_CIRCUIT_BREAKER_CALL_TIMEOUT} + # How long to keep the breaker open before attempting ES again + resetTimeout = 20s + resetTimeout = ${?ES_CIRCUIT_BREAKER_RESET_TIMEOUT} + } } awsSes { emailFrom = "info@dp.la" diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala index bd510709..9e3594a0 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala @@ -2,11 +2,15 @@ package dpla.api.v2.search import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter.TypedSchedulerOps import akka.http.scaladsl.model.{HttpResponse, StatusCode} import akka.http.scaladsl.model.HttpMessage.DiscardedEntity import akka.http.scaladsl.unmarshalling.Unmarshaller +import akka.pattern.CircuitBreaker +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.jdk.DurationConverters._ import scala.util.{Failure, Success} /** @@ -48,11 +52,28 @@ object ElasticSearchResponseHandler { def apply(): Behavior[ElasticSearchResponseHandlerCommand] = { Behaviors.setup { context => + + val cfg = context.system.settings.config.getConfig("elasticSearch.circuitBreaker") + + val maxFailures: Int = cfg.getInt("maxFailures") + val callTimeout: FiniteDuration = cfg.getDuration("callTimeout").toScala.toCoarsest + val resetTimeout: FiniteDuration = cfg.getDuration("resetTimeout").toScala.toCoarsest + + // If ES calls time out maxFailures times consecutively, stop sending + // requests for resetTimeout to let ES recover before retrying. + val breaker = CircuitBreaker( + scheduler = context.system.scheduler.toClassic, + maxFailures = maxFailures, + callTimeout = callTimeout, + resetTimeout = resetTimeout + ).onOpen(() => context.log.warn("ElasticSearch circuit breaker opened")) + .onClose(() => context.log.info("ElasticSearch circuit breaker closed")) + .onHalfOpen(() => context.log.info("ElasticSearch circuit breaker half-open, testing ES")) + Behaviors.receiveMessage[ElasticSearchResponseHandlerCommand] { case ProcessElasticSearchResponse(futureHttpResponse, replyTo) => - // Map the Future value to a message, handled by this actor. - context.pipeToSelf(futureHttpResponse) { + context.pipeToSelf(breaker.withCircuitBreaker(futureHttpResponse)) { case Success(httpResponse) => ProcessHttpResponse(httpResponse, replyTo) case Failure(e) => From 38ad54efe0f4b44a70b8c60c61e478a185204e32 Mon Sep 17 00:00:00 2001 From: Dominic Byrd-McDevitt Date: Mon, 20 Apr 2026 22:03:49 -0500 Subject: [PATCH 2/2] fix: apply circuit breaker before Http().singleRequest, not after Move CircuitBreaker creation from ElasticSearchResponseHandler to ElasticSearchClient so it wraps Http().singleRequest() directly inside withConcurrencyLimit. Previously the breaker wrapped an already-started Future, so it could not prevent requests from being sent to ES when open. Also distinguish CircuitBreakerOpenException in error log (warn vs error). Co-Authored-By: Claude Sonnet 4.6 --- .../api/v2/search/ElasticSearchClient.scala | 49 ++++++++++++++----- .../search/ElasticSearchResponseHandler.scala | 31 ++---------- 2 files changed, 42 insertions(+), 38 deletions(-) diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala index 17b22ccb..9c2e6f5a 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala @@ -2,6 +2,7 @@ package dpla.api.v2.search import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.actor.typed.scaladsl.{Behaviors, LoggerOps} +import akka.actor.typed.scaladsl.adapter.TypedSchedulerOps import akka.http.scaladsl.Http import akka.http.scaladsl.model.{ ContentTypes, @@ -10,6 +11,7 @@ import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } +import akka.pattern.CircuitBreaker import dpla.api.v2.search.ElasticSearchResponseHandler.{ ElasticSearchResponseHandlerCommand, ProcessElasticSearchResponse @@ -23,7 +25,9 @@ import dpla.api.v2.search.paramValidators.{ import spray.json.JsValue import java.util.concurrent.{Semaphore, TimeUnit} +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.DurationConverters._ import org.slf4j.LoggerFactory /** Sends requests to Elastic Search. @@ -126,6 +130,19 @@ object ElasticSearchClient { "ElasticSearchResponseHandler" ) + val cfg = context.system.settings.config.getConfig("elasticSearch.circuitBreaker") + val maxFailures: Int = cfg.getInt("maxFailures") + val callTimeout: FiniteDuration = cfg.getDuration("callTimeout").toScala.toCoarsest + val resetTimeout: FiniteDuration = cfg.getDuration("resetTimeout").toScala.toCoarsest + val breaker = CircuitBreaker( + scheduler = context.system.scheduler.toClassic, + maxFailures = maxFailures, + callTimeout = callTimeout, + resetTimeout = resetTimeout + ).onOpen(() => context.log.warn("ElasticSearch circuit breaker opened")) + .onClose(() => context.log.info("ElasticSearch circuit breaker closed")) + .onHalfOpen(() => context.log.info("ElasticSearch circuit breaker half-open, testing ES")) + Behaviors.receiveMessage[IntermediateSearchResult] { case SearchQuery(params, query, replyTo) => @@ -136,7 +153,8 @@ object ElasticSearchClient { endpoint, replyTo, responseHandler, - nextPhase + nextPhase, + breaker ) context.spawnAnonymous(sessionChildActor) Behaviors.same @@ -150,7 +168,8 @@ object ElasticSearchClient { endpoint, replyTo, responseHandler, - nextPhase + nextPhase, + breaker ) context.spawnAnonymous(sessionChildActor) Behaviors.same @@ -161,7 +180,8 @@ object ElasticSearchClient { endpoint, replyTo, responseHandler, - nextPhase + nextPhase, + breaker ) context.spawnAnonymous(sessionChildActor) Behaviors.same @@ -174,7 +194,8 @@ object ElasticSearchClient { endpoint, replyTo, responseHandler, - nextPhase + nextPhase, + breaker ) context.spawnAnonymous(sessionChildActor) Behaviors.same @@ -195,7 +216,8 @@ object ElasticSearchClient { endpoint: String, replyTo: ActorRef[SearchResponse], responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] + nextPhase: ActorRef[IntermediateSearchResult], + breaker: CircuitBreaker ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => @@ -210,7 +232,7 @@ object ElasticSearchClient { entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) val futureResp: Future[HttpResponse] = withConcurrencyLimit { - Http().singleRequest(request) + breaker.withCircuitBreaker { Http().singleRequest(request) } } context.log.info2( @@ -249,7 +271,8 @@ object ElasticSearchClient { endpoint: String, replyTo: ActorRef[SearchResponse], responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] + nextPhase: ActorRef[IntermediateSearchResult], + breaker: CircuitBreaker ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => @@ -259,7 +282,7 @@ object ElasticSearchClient { // Make an HTTP request to elastic search. val fetchUri = s"$endpoint/_doc/$id" val futureResp: Future[HttpResponse] = withConcurrencyLimit { - Http().singleRequest(HttpRequest(uri = fetchUri)) + breaker.withCircuitBreaker { Http().singleRequest(HttpRequest(uri = fetchUri)) } } context.log.info("ElasticSearch fetch QUERY: {}", fetchUri) @@ -299,7 +322,8 @@ object ElasticSearchClient { endpoint: String, replyTo: ActorRef[SearchResponse], responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] + nextPhase: ActorRef[IntermediateSearchResult], + breaker: CircuitBreaker ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => @@ -314,7 +338,7 @@ object ElasticSearchClient { entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) val futureResp: Future[HttpResponse] = withConcurrencyLimit { - Http().singleRequest(request) + breaker.withCircuitBreaker { Http().singleRequest(request) } } context.log.info2( @@ -352,7 +376,8 @@ object ElasticSearchClient { endpoint: String, replyTo: ActorRef[SearchResponse], responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] + nextPhase: ActorRef[IntermediateSearchResult], + breaker: CircuitBreaker ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => @@ -367,7 +392,7 @@ object ElasticSearchClient { entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) val futureResp: Future[HttpResponse] = withConcurrencyLimit { - Http().singleRequest(request) + breaker.withCircuitBreaker { Http().singleRequest(request) } } context.log.info2( diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala index 9e3594a0..5223f1bb 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala @@ -2,15 +2,12 @@ package dpla.api.v2.search import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter.TypedSchedulerOps import akka.http.scaladsl.model.{HttpResponse, StatusCode} import akka.http.scaladsl.model.HttpMessage.DiscardedEntity import akka.http.scaladsl.unmarshalling.Unmarshaller -import akka.pattern.CircuitBreaker +import akka.pattern.CircuitBreakerOpenException -import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContextExecutor, Future} -import scala.jdk.DurationConverters._ import scala.util.{Failure, Success} /** @@ -53,27 +50,10 @@ object ElasticSearchResponseHandler { def apply(): Behavior[ElasticSearchResponseHandlerCommand] = { Behaviors.setup { context => - val cfg = context.system.settings.config.getConfig("elasticSearch.circuitBreaker") - - val maxFailures: Int = cfg.getInt("maxFailures") - val callTimeout: FiniteDuration = cfg.getDuration("callTimeout").toScala.toCoarsest - val resetTimeout: FiniteDuration = cfg.getDuration("resetTimeout").toScala.toCoarsest - - // If ES calls time out maxFailures times consecutively, stop sending - // requests for resetTimeout to let ES recover before retrying. - val breaker = CircuitBreaker( - scheduler = context.system.scheduler.toClassic, - maxFailures = maxFailures, - callTimeout = callTimeout, - resetTimeout = resetTimeout - ).onOpen(() => context.log.warn("ElasticSearch circuit breaker opened")) - .onClose(() => context.log.info("ElasticSearch circuit breaker closed")) - .onHalfOpen(() => context.log.info("ElasticSearch circuit breaker half-open, testing ES")) - Behaviors.receiveMessage[ElasticSearchResponseHandlerCommand] { case ProcessElasticSearchResponse(futureHttpResponse, replyTo) => - context.pipeToSelf(breaker.withCircuitBreaker(futureHttpResponse)) { + context.pipeToSelf(futureHttpResponse) { case Success(httpResponse) => ProcessHttpResponse(httpResponse, replyTo) case Failure(e) => @@ -117,12 +97,11 @@ object ElasticSearchResponseHandler { } case ReturnFinalResponse(response, replyTo, error) => - // Log error if there is one error match { + case Some(_: CircuitBreakerOpenException) => + context.log.warn("Request rejected: ElasticSearch circuit breaker is open") case Some(e) => - context.log.error( - "Failed to process ElasticSearch response:", e - ) + context.log.error("Failed to process ElasticSearch response:", e) case None => // no-op } // Send fully processed reply to original requester.