Building Non-Blocking Web APIs in Java
December 26, 2021
A traditional spring web MVC application is blocking in nature. What does that mean? It means that from the time a controller gets a request to the time it returns some value, it ties up a thread. A lesson on threads themselves is a topic for another day - but for the purpose of this write-up, a simple definition is: a thread is a lightweight process - where a process gets dedicated resources, a thread runs within context of a program (or a process) and takes advantage of resources alloted to that program. You can think of it as a single execution flow but what it is, and why it is important will become clearer as we go.
Concurrent Requests Handling
Simplest MVC application will look something like this:
Component A would call component B and then wait for it’s response. Component B gets a request and starts doing something.
Component A here could be anything, a separate service, or even frontend of your website.
Component B is your backend application
The problem here is that if your application just runs on a single thread, until Component B is done ‘doing something’ and responding to component A, the thread is tied up - it will not be available to accept another incoming request. If another user were to open the browser and type in your website’s domain, your server will reject it’s request - making the webpage load without whatever content your service provides.
A lot of application containers provide support for allowing your application to handle concurrent requests. In Python land, for example, gunicorn or uwsgi will both allow you to start your container with multiple workers. They can also queue incoming requests instead of immediately rejecting them to allow some back-pressure. As workers free up, they will pick next request from the queue and hand it off to the worker (increasing total time taken to service that particular request: time in queue + processing time).
In Java, on the other hand, you can achieve similar results two ways.
A) web application servers can allow concurrent request and queueing mechanism (similar to what uWSGI would do) B) your application can keep your main thread free by handing off computation to a separate thread.
A major difference between Java and Python here is that Python has the infamous GIL - that can potentially (and intentionally) slowdown performance as you multi-thread your code.
So while our web application will not be single threaded, e.g. in case of Netty it can be upto 2 * CPU cores, it does not mean our application will be optimal. You can allow 8 concurrent connections yet run out as the demand scales.
IO Bound vs CPU Bound
If your application is doing CPU intensive work, you will have to keep a low number of threads as concurrency really doesn’t get you anything - your CPU is busy doing actual work and rarely has time to switch context and start working on another computation.
On the other hand, if all your application is doing is making calls to an external service over http, or making a database calls - it is IO bound, because it spends most of it’s time waiting for network call to resolve before returning it’s results to the client. The CPU is mostly just waiting on an operation to finish and sitting idle while it does. This is where threading really helps achieve optimal performance - you are basically just telling your CPU that instead of waiting for some IO bound operation to finish, get started on another request. Depending on how you instrument your application, this becomes very evident when you visualize CPU of the instance your application is running on. If your application exercises low tens of CPU percentage, that is a strong sign that you can benefit from parallelizing some of your code.
As an example consider a service that queries a datastore and returns some value. Assuming that the database query takes 100s of milliseconds, and we get 1000 requests per second. If our server just allows 10 concurrent requests, a blocking API would only process 100 of those requests in a second, either queueing the remaining 900 or rejecting them altogether. However, what happens if you can hand off the database querying part to a separate thread, immediately returning the main thread to the pool to accept more incoming connections? Let’s find out.
Experiment
Let’s build a simple MVC service - with the following service component
public CustomResponse get() throws InterruptedException {logger.info("Sleeping for 5 seconds");Thread.sleep(5000);logger.info("Still not done, sleeping for 5 seconds");Thread.sleep(5000);logger.info("Done");return new CustomResponse("Done");}
Where custom response is just a simple encapsulation
public class CustomResponse {private String response;public CustomResponse(final String response){this.response = response;}public String getResponse() {return response;}public void setResponse(String response) {this.response = response;}}
Now to expose this service call to an API using Springboot’s annotation based controller - we would do something like this
@RestControllerpublic class BlockingController {private final BlockingService blockingService;@Autowiredpublic BlockingController(final BlockingService blockingService){this.blockingService = blockingService;}@GetMapping("/blocking/resource")public Mono<ResponseEntity<CustomResponse>> hello() throws InterruptedException {return Mono.just(ResponseEntity.ok(blockingService.get()));}}
NOTE: We are using Spring webflux, which has support for building non-blocking, reactive, and functional APIs but writing non-optimal blocking API to illustrate our point
The controller blocks the thread until service returns a value, then wraps it into a Mono and returns it to the caller.
What if we wanted to free up the main thread as soon as the computation was handed off to the service? The easiest, springboot-ish way will be to utilize the extensive support that is baked into Spring Webflux. An API like the following will schedule computation on a separate thread using pre-defined schedulers (you can use a different scheduler per your need)
@GetMapping("/non-blocking/resource/mono")public Mono<CustomResponse> hello2() {return Mono.fromSupplier(() -> {try {Thread.sleep(10000);} catch (InterruptedException e) {//eh, sad but don't care for tutorial}return new CustomResponse("Done");}).subscribeOn(Schedulers.boundedElastic());}
That works perfectly well, but to show what I mean by parallelizing logic, perhaps it will help to see how to multi-thread your code so it works even outside the context of Spring webflux. If I was to rewrite my service to return a Future object, instead of a computed value, our caller can make the decision whether to retrieve value in a blocking call or not. The following code does not return a computed value, instead it hands off the computation to a separate thread - one from the executor, and returns the handle to whoever needs it. The caller can then wait for the value, or immediately retrieve it using the CompleteableFuture API
public class NonBlockingService {private static final Logger logger = LogManager.getLogger(NonBlockingService.class);private static final ExecutorService executorService = Executors.newFixedThreadPool(100);public CompletableFuture<CustomResponse> get(){return CompletableFuture.supplyAsync(() -> {try {logger.info("Starting get resource");logger.info("Sleeping for 5 seconds");Thread.sleep(5000);logger.info("Still not done, sleeping for 5 seconds");Thread.sleep(5000);} catch (InterruptedException e) {logger.info("Something bad happened");}logger.info("Done");return new CustomResponse("Done");}, executorService);}}
A spring-webflux controller can then just do the following to open up a non-blocking API for this service
@GetMapping("/non-blocking/resource")public Mono<CustomResponse> hello() {return Mono.fromFuture(nonBlockingService.get());}
What is happening here?
We utilize executors factory provided by the executor service api.
private static final ExecutorService executorService = Executors.newFixedThreadPool(100);
This creates a threadpool of a 100 threads. We have different options for the kind of threadpool we want, you can also create a relatively more optimized work stealing thread pool using Executors.newWorkStealingPool(100)
Once thread pool is created we can use this exclusive thread pool for our asynchronous code. Following code supplies the async code to be run on our dedicated thread pool. Note that you can schedule a future on default thread pool too, in which case we call the method without executor service passed as an argument.
CompletableFuture.supplyAsync(() -> {try {logger.info("Starting get resource");logger.info("Sleeping for 5 seconds");Thread.sleep(5000);logger.info("Still not done, sleeping for 5 seconds");Thread.sleep(5000);} catch (InterruptedException e) {logger.info("Something bad happened");}logger.info("Done");return new CustomResponse("Done");}, executorService)
A Primer On Java Futures:
Future interface was introduced in Java 5 to deliver result of an Asynchronous computation - however to do anything slightly complex, required nested callbacks and threatened coupling of asynchronous code generation with business logic. CompletableFuture was added to Java8 to help compose future objects, reducing the need for any nested callback or unnecessarily coupling business logic with asynchronous computing in Java by exposing an extensive set of APIs.
Testing Our APIs
We can also use CompletableFuture to create a test that bombards our API with 100 concurrent requests and waits for their completion. I have the following test, written in TestNG for a quick and dirty comparison of performance
public class NonBlockingAPITests {private static final Logger logger = Logger.getLogger(NonBlockingAPITests.class);@Test(dataProvider = "data-provider")public void testNonBlockingAPI(final String uri) throws InterruptedException, ExecutionException {HttpClient client = HttpClient.newHttpClient();int parallelTasks = 100;ExecutorService execService = Executors.newFixedThreadPool(parallelTasks);logger.info("Starting execution");long startMillis = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);List<CompletableFuture<Void>> completableFutureList = Lists.newArrayList();for (int i=0; i < parallelTasks; i++){completableFutureList.add(CompletableFuture.runAsync(() -> {try {CompletableFuture<HttpResponse<String>> response = client.sendAsync(HttpRequest.newBuilder().uri(URI.create(uri)).build(),BodyHandlers.ofString());response.get();} catch (Exception e) {Assert.fail();}}, execService));}CompletableFuture<Object> allFutureResult =CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).thenApply(f -> completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList()));allFutureResult.get();long endMillis = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);logger.info("Done executing");logger.info(String.format("Total Time Taken -- %d", endMillis - startMillis));}@DataProvider(name = "data-provider")public Object[][] dataProvider(){return new String[][]{{"http://localhost:8082/non-blocking/resource"},{"http://localhost:8082/non-blocking/resource/mono"},{"http://localhost:8081/blocking/resource"}};}}
Running tests against all three APIs we get the following results
Analyzing Results
If you notice - our non-blocking APIs took 10s and 20s respectively. Firstly, the call to mono took longer because of the scheduler we chose - it limited us with the number of threads we allocated to process this call.
Looking at java doc for bounded_elastic we see it it upperbounded by 10*CPU cores. On a 4 core CPU that would mean 40 threads, hence 40 requests handled in parallel. It is not actually going to be linear - thread context switching is an expensive operation.
The maximum number of created threads is bounded by a cap (by default ten times the number of available CPU cores, see DEFAULT_BOUNDED_ELASTIC_SIZE).
The CompletableFuture API took 10seconds because that is how long each API call takes. We had a threadpool of a 100 threads, made an API call with a 100 requests, each got assigned a separate thread - and they all finished in 10seconds together.
The blocking API call takes 2minutes+ on my computer because while Netty allots 8 workers (2*4 cores) - each worker takes 10seconds to complete their task before allowing the next 8 API requests to be serviced. 100 requests will be serviced in 12.5 iterations - each iteration taking 10 seconds, taking the total to 2 minutes
Use Cases:
Some use cases where reactive, non-blocking APIs would be an obvious choice:
If you max out your service’s throughput, and are running your service on an orchestrator like Kubernetes - you can run into a problem where Kubernetes will keep killing your pods. Kubernetes allow us to write health check probes so it can monitor status of a pod, in case it goes in a bad state and is unable to serve traffic, kubernetes will take it out of provision and schedule a new fresh pod to replace it. If our service’s throughput is maxed out, health checks will start failing because all calls to your application will be rejected by the server - hence putting your application in a recycle loop for as long as the traffic remains high.
Imagine a service that acts as an egress proxy for your company - all the microservices route their external network call through this service - it is imperative that this service does not block incoming calls while it waits on hearing back from the external services it called on it’s clients behalf.
If you want to retrieve data from 5 distinct datasources, instead of doing it sequentially, you can spawn multiple threads to do it all in parallel since they are all network calls anyway - easing the back pressure
Parting Thoughts
If you have an IO bound application - you can achieve exponentially better performance by moving to reactive Java. As compute gets cheaper and horizontal scaling becomes easier we might be tempted to let the compute instances scale as we go - however the above experiment shows why it is important to understand the requirements of our application to effectively utilize resources. Price of having 10 EC2 instances to do what could be achieved by a fraction of compute helps us save astronomical cloud computing fees.
I have uploaded the code onto my public github - if you want to play with it (I would suggest playing with exception handling of CompletableFutures). It is minimal amount of code to just showcase our experiment - nothing fancy.
As always - feel free to reach out, for questions, concerns, disagreements, etc. etc. at blog@haaris.dev