TL;DR Don’t make any assumptions when switching between libraries. Creating akka http requests without consuming responses will drain your connection pool.

My team recently encountered a strange behaviour in one of our microservices (service A): One of the workflows of the service involves:

  1. popping messages from a Kafka queue
  2. calling another service for data (service B)
  3. sending the data to another 3rd party service and then
  4. store the results in Postgres.

The decision was made to disable the call to that 3rd party service, but leave the general flow in place. So I removed the lines that would trigger and process communication with the 3rd party microservices.

After deploying this change, we experienced some trouble in an internal tool: 503 errors. We knew that the tool was calling service A, which was calling service B on the tool’s behalf (design flaw?). It took some time to identify the problem as Exceeded configured max-open-requests value of [...]. Looking at the akka http documentation we could exclude …

  • high latency
  • high throughput
  • server too slow
  • network too slow

… because the communication between service A and B is between two docker machines in the same network and has a relatively low throughput and small payloads. The monitoring of the server (service B) was also happy.

It turned out, that the actual reason was: Response entities were not read or discarded. When removing the lines to the 3rd party service, the request to service B was still fired. But there was nothing to consume the results of that service.

Doing the request to our service B was wrapped in a ServiceB class which was doing an HTTP call like this:

Http().singleRequest(Get("https://yadayadayada")).map { response =>
  if (response.status.isSuccess()) {
    Some(response.entity)
  } else {
    logger.warn("oh no")
    None
  }
}

We pulled up a scratchpad and tried out, what happens if fire requests to service B:

import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}

object Test {

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))
  implicit val executionContext = system.dispatcher

  def fireRequest(x: Int)(implicit system: ActorSystem) = {
    System.out.println(s"${x} starting")
    Http().singleRequest(Get(s"https://httpbin.org/stream-bytes/1024")).map { response =>
      if (response.status.isSuccess) {
        System.out.println(s"${x} succeeded")
        Some(response.entity)
      } else {
        System.out.println(s"${x} failed")
        None
      }
    }
  }

  def spawnRequests(max: Int) =  {
    for(x <- 1 to max) {
      fireRequest(x)
    }
    System.out.println(s"all spawned")
  }
}

Call Test.spawnRequest(16) and you will see, that 16 requests are started and 8 return successfully, but further execution is blocked. If you change the line to fireRequest(x).map(_.get.discardBytes()), all the requests are starting happily and all succeed.

My assumption was that when you create an HTTP request, the connection to the server (and therefore allocating a connection instance from the pool) only happens, when you start to consume the stream. Not at the time you create the instance of the request object.

This assumption came from using http4s and fs2:

val request = httpClient.expect[String]("https://yadayadayada").map(body => logger.info(body))
// absolutely nothing happened so far
request.unsafeRun() // this starts the request to the service.

If there would be such a thing as a connection pool in this case, the connection would only be checked out of the pool, when you call unsafeRun. Everything before is just a description of the actions that you want to perform, but none of them is triggered before you explicitly say so.

So the lesson learned here is: Never assume library A has similar behaviour than library B.


blog comments powered by Disqus

Published

08 December 2017

Tags