Know Thy Threadpool: A Worked Example with Dropwizard

Dealing with concurrent and parallel code is hard, so I’ll be using concrete examples with Dropwizard that demostrate an asynchronous server and client implementations. For our tests, we’ll have 100 requests sent to a server that waits for a second before returning hello world.

All java metrics were recorded using Java VisualVM

From the Jersey docs, all async server examples serve requests like the following:

@Path("/resource")
public class ThreaderResource {
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public void asyncGet(@Suspended final AsyncResponse resp) {
        new Thread(() -> {
            try {
                Thread.sleep(1000);
                resp.resume("hello world");
            } catch (InterruptedException e) {
                resp.resume(e);
            }
        }).start();
    }
}

With every wave of 100 requests, 100 server threads are allocated only to be de-allocated shortly thereafter. Leading to about 30-60ms of overhead. This is wasteful, and the Jersey docs may be misleading. The behaviorly equivalent code using a cached thread pool is shown below, which will reuse threads instead of destroying them.

@Path("/resource")
public class ThreaderResource {
    private final Executor executor = Executors.newCachedThreadPool();

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public void asyncGet(@Suspended final AsyncResponse resp) {
        executor.execute(() -> {
            try {
                Thread.sleep(1000);
                resp.resume("hello world");
            } catch (InterruptedException e) {
                resp.resume(e);
            }
        });
    }
}

The result is that initial requests were slower by a few milliseconds slower, but after the requests are fulfilled, the threads are not destroyed. This allows for subsequent waves to piggyback off these threads, bringing request times down to nearly zero overhead (2-10ms over 1s).

But there is an issue the cached thread pool that is alluded to in the javadocs.

ExecutorService newCachedThreadPool():

Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. Note that pools with similar properties but different details (for example, timeout parameters) may be created using ThreadPoolExecutor constructors.

There is no upper bound to the number of threads in the thread pool, and threads are not cheap. Java -Xssize sets the size of the stack for each thread. By deafult on x64 Linux, it’s 1024KB. With 100 requests that’d be about 100MB. Yes, it’s nice that all of our requests finished in about the minimum time required (1 second + some overhead), but at what cost. What if our server received 1,000 or 10,000 requests in a second? There is only so many requests until our server runs out of memory. Fixing the pool size to a given number of threads is one way to fix this issue.

private final Executor executor = Executors.newFixedThreadPool(50);

50 threads will permanently be created to service requests. With only 50 threads, 50 of our 100 requests will take two seconds to complete. This sounds undesirable, but compared to the server crashing, it’s much more reasonable. Better have slight degraded experience than no experience.

This can be fine tuned, such that n threads will always exist that can grow to m threads, and expire after o units of p time using the ThreadPoolExecutor. This is how dropwizard works using the ExecutorServiceBuilder that is part of an application’s environment.

I haven’t mentioned it yet, but in case anyone is confused, the server can listen and dispatch requests to the resource and handle multiple requests concurrently. The typical use case is that requests complete synchronously. What we just accomplished was allowing the server process other requests on the previously used thread while async request is still being processed.

Client Side

Moving onto the client side part of the post, we’ll be using a Jersey client configured through Dropwizard. Since this is a command line program, the following code sets up the Client (you wouldn’t normally do this in a Dropwizard application)

public static Client setupCommon() {
    // Stop logging so that the console is clear
    getLoggerContext().getLogger(ROOT_LOGGER_NAME).detachAndStopAllAppenders();
    final MetricRegistry metrics = new MetricRegistry();
    final Environment env = new Environment("threading-client", Jackson.newObjectMapper(),
        Validators.newValidator(), metrics, ThreaderClient.class.getClassLoader());
    final JerseyClientConfiguration config = new JerseyClientConfiguration();
    config.setMinThreads(1);
    config.setMaxThreads(128);
    config.setWorkQueueSize(8);

    // Our application takes longer than the default 500ms, so we manually set
    // the timeout to a minute
    config.setTimeout(minutes(1L));
    return new JerseyClientBuilder(env).using(config).build("client");
}

Jersey Async

A Jersey client contains an asynchronous API. It will schedule each request on our configured thread pool (1 thread min, 128 thread max). There are major ergonomic issues, though. One can either accept a Future<?>, which is poor excuse for a future, or register a callback. Registering a callback is the best choice, but it complicates the code as we have to keep track of all outstanding requests before printing elapsed time.

final Client client = setupCommon();
final CountDownLatch latch = new CountDownLatch(100);
final Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < 100; i++) {
    final int capI = i;
    client.target("http://localhost:8080/resource")
            .request().async().get(new InvocationCallback<Response>() {
        @Override
        public void completed(Response response) {
            System.out.println(capI);
            latch.countDown();
            response.close();
        }

        @Override
        public void failed(Throwable throwable) {
            System.err.println("oh no! " + throwable.getMessage());
            latch.countDown();
        }
    });
}

latch.await();
stopwatch.stop();
System.out.println("elapsed (ms) " + stopwatch.elapsed(TimeUnit.MILLISECONDS));

The code executes in about 1500-2500ms, the reason for the range is that it is slightly undeterministic if a future will gets its own thread (because the thread pool work queue is full) or if it will wait until another request has finished.

There’s too much cruft in this solution. I want an API where I can get rid of the latch and not deal with line-taxing callbacks.

Jersey Default Rx

There’s a way! Using the reactive Jersey client API, the code becomes much simpler.

final Client client = setupCommon();
final RxClient<RxCompletionStageInvoker> rxClient = RxCompletionStage.from(client);

final ArrayList<CompletableFuture<Void>> responses = Lists.newArrayList();
final Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < 100; i++) {
    final int capI = i;
    final CompletableFuture<Void> resp = rxClient.target("http://localhost:8080/resource")
            .request().rx().get()
            .thenAccept(Response::close)
            .thenRun(() -> System.out.println(Instant.now().getEpochSecond() + ": " + capI))
            .toCompletableFuture();
    responses.add(resp);
}

CompletableFuture.allOf(responses.toArray(new CompletableFuture[responses.size()])).get();
stopwatch.stop();
System.out.println("elapsed (ms) " + stopwatch.elapsed(TimeUnit.MILLISECONDS));

Running the code results in … wait … that can’t be right. The console is only printing seven requests a second. This is terrible! This API is taking 100 / 7 ~ 15 times longer.

The culprit is RxCompletionStage.from(client), which looks innocent enough until reading the docs.

Requests are by default invoked immediately. If not said otherwise the ForkJoinPool#commonPool() pool is used to obtain a thread which processed the request.

This is the problem. The common pool is a thread pool that contains number of CPUs - 1. My desktop contains 8 CPUs, so executing 7 requests at once is now starting to make sense. Except it doesn’t. By default, the common pool is optimized to saturate 100% of the CPU without overscheduling tasks.

Why then would the default thread pool for something that is the epitome of an IO operation be optimized for CPU bound tasks?

Why doesn’t the rx client use the thread pool we already built specifically for our client?

Jersey Dropwizard Rx

Dropwizard to the rescue. Now, Dropwizard will create the rx client for you that is configured to use the thread pool that will already be used for other types of requests (notably synchronous and asynchronous).

final RxClient<RxCompletionStageInvoker> client =
        new JerseyClientBuilder(env).using(config)
                .buildRx("client", RxCompletionStageInvoker.class);

The only bad part is that this feature is for the next version after Dropwizard 1.0.

Comments

If you'd like to leave a comment, please email [email protected]