diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index ddbe4ba..310e6e2 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -63,6 +63,17 @@ elasticSearch { itemsUrl = ${?ITEM_ELASTICSEARCH_URL} pssUrl = "" pssUrl = ${?PSS_ELASTICSEARCH_URL} + circuitBreaker { + # Trip after this many consecutive call timeouts + maxFailures = 3 + maxFailures = ${?ES_CIRCUIT_BREAKER_MAX_FAILURES} + # How long a single ES call may take before counting as a failure + callTimeout = 10s + callTimeout = ${?ES_CIRCUIT_BREAKER_CALL_TIMEOUT} + # How long to keep the breaker open before attempting ES again + resetTimeout = 20s + resetTimeout = ${?ES_CIRCUIT_BREAKER_RESET_TIMEOUT} + } } awsSes { emailFrom = "info@dp.la" diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala index 07f9aca..42b70ab 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchClient.scala @@ -2,6 +2,7 @@ package dpla.api.v2.search import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.actor.typed.scaladsl.{Behaviors, LoggerOps} +import akka.actor.typed.scaladsl.adapter.TypedSchedulerOps import akka.http.scaladsl.Http import akka.http.scaladsl.model.{ ContentTypes, @@ -10,6 +11,7 @@ import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } +import akka.pattern.CircuitBreaker import dpla.api.v2.search.ElasticSearchResponseHandler.{ ElasticSearchResponseHandlerCommand, ProcessElasticSearchResponse @@ -23,7 +25,9 @@ import dpla.api.v2.search.paramValidators.{ import spray.json.JsValue import java.util.concurrent.{Semaphore, TimeUnit} +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.DurationConverters._ import org.slf4j.LoggerFactory /** Sends requests to Elastic Search. @@ -126,6 +130,19 @@ object ElasticSearchClient { "ElasticSearchResponseHandler" ) + val cfg = context.system.settings.config.getConfig("elasticSearch.circuitBreaker") + val maxFailures: Int = cfg.getInt("maxFailures") + val callTimeout: FiniteDuration = cfg.getDuration("callTimeout").toScala.toCoarsest + val resetTimeout: FiniteDuration = cfg.getDuration("resetTimeout").toScala.toCoarsest + val breaker = CircuitBreaker( + scheduler = context.system.scheduler.toClassic, + maxFailures = maxFailures, + callTimeout = callTimeout, + resetTimeout = resetTimeout + ).onOpen(() => context.log.warn("ElasticSearch circuit breaker opened")) + .onClose(() => context.log.info("ElasticSearch circuit breaker closed")) + .onHalfOpen(() => context.log.info("ElasticSearch circuit breaker half-open, testing ES")) + Behaviors.receiveMessage[IntermediateSearchResult] { case SearchQuery(params, query, replyTo) => @@ -136,7 +153,8 @@ object ElasticSearchClient { endpoint, replyTo, responseHandler, - nextPhase + nextPhase, + breaker ) context.spawnAnonymous(sessionChildActor) Behaviors.same @@ -150,7 +168,8 @@ object ElasticSearchClient { endpoint, replyTo, responseHandler, - nextPhase + nextPhase, + breaker ) context.spawnAnonymous(sessionChildActor) Behaviors.same @@ -161,7 +180,8 @@ object ElasticSearchClient { endpoint, replyTo, responseHandler, - nextPhase + nextPhase, + breaker ) context.spawnAnonymous(sessionChildActor) Behaviors.same @@ -174,7 +194,8 @@ object ElasticSearchClient { endpoint, replyTo, responseHandler, - nextPhase + nextPhase, + breaker ) context.spawnAnonymous(sessionChildActor) Behaviors.same @@ -195,7 +216,8 @@ object ElasticSearchClient { endpoint: String, replyTo: ActorRef[SearchResponse], responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] + nextPhase: ActorRef[IntermediateSearchResult], + breaker: CircuitBreaker ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => @@ -210,7 +232,7 @@ object ElasticSearchClient { entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) val futureResp: Future[HttpResponse] = withConcurrencyLimit { - Http().singleRequest(request) + breaker.withCircuitBreaker { Http().singleRequest(request) } } context.log.info2( @@ -252,7 +274,8 @@ object ElasticSearchClient { endpoint: String, replyTo: ActorRef[SearchResponse], responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] + nextPhase: ActorRef[IntermediateSearchResult], + breaker: CircuitBreaker ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => @@ -262,7 +285,7 @@ object ElasticSearchClient { // Make an HTTP request to elastic search. val fetchUri = s"$endpoint/_doc/$id" val futureResp: Future[HttpResponse] = withConcurrencyLimit { - Http().singleRequest(HttpRequest(uri = fetchUri)) + breaker.withCircuitBreaker { Http().singleRequest(HttpRequest(uri = fetchUri)) } } context.log.info("ElasticSearch fetch QUERY: {}", fetchUri) @@ -302,7 +325,8 @@ object ElasticSearchClient { endpoint: String, replyTo: ActorRef[SearchResponse], responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] + nextPhase: ActorRef[IntermediateSearchResult], + breaker: CircuitBreaker ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => @@ -317,7 +341,7 @@ object ElasticSearchClient { entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) val futureResp: Future[HttpResponse] = withConcurrencyLimit { - Http().singleRequest(request) + breaker.withCircuitBreaker { Http().singleRequest(request) } } context.log.info2( @@ -355,7 +379,8 @@ object ElasticSearchClient { endpoint: String, replyTo: ActorRef[SearchResponse], responseHandler: ActorRef[ElasticSearchResponseHandlerCommand], - nextPhase: ActorRef[IntermediateSearchResult] + nextPhase: ActorRef[IntermediateSearchResult], + breaker: CircuitBreaker ): Behavior[ElasticSearchResponse] = { Behaviors.setup { context => @@ -370,7 +395,7 @@ object ElasticSearchClient { entity = HttpEntity(ContentTypes.`application/json`, query.toString) ) val futureResp: Future[HttpResponse] = withConcurrencyLimit { - Http().singleRequest(request) + breaker.withCircuitBreaker { Http().singleRequest(request) } } context.log.info2( diff --git a/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala b/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala index bd51070..5223f1b 100644 --- a/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala +++ b/src/main/scala/dpla/api/v2/search/ElasticSearchResponseHandler.scala @@ -5,6 +5,7 @@ import akka.actor.typed.scaladsl.Behaviors import akka.http.scaladsl.model.{HttpResponse, StatusCode} import akka.http.scaladsl.model.HttpMessage.DiscardedEntity import akka.http.scaladsl.unmarshalling.Unmarshaller +import akka.pattern.CircuitBreakerOpenException import scala.concurrent.{ExecutionContextExecutor, Future} import scala.util.{Failure, Success} @@ -48,10 +49,10 @@ object ElasticSearchResponseHandler { def apply(): Behavior[ElasticSearchResponseHandlerCommand] = { Behaviors.setup { context => + Behaviors.receiveMessage[ElasticSearchResponseHandlerCommand] { case ProcessElasticSearchResponse(futureHttpResponse, replyTo) => - // Map the Future value to a message, handled by this actor. context.pipeToSelf(futureHttpResponse) { case Success(httpResponse) => ProcessHttpResponse(httpResponse, replyTo) @@ -96,12 +97,11 @@ object ElasticSearchResponseHandler { } case ReturnFinalResponse(response, replyTo, error) => - // Log error if there is one error match { + case Some(_: CircuitBreakerOpenException) => + context.log.warn("Request rejected: ElasticSearch circuit breaker is open") case Some(e) => - context.log.error( - "Failed to process ElasticSearch response:", e - ) + context.log.error("Failed to process ElasticSearch response:", e) case None => // no-op } // Send fully processed reply to original requester.