8 minute read

Em sistemas que utilizam processamento assíncrono, como chamadas gRPC não bloqueantes, é comum encontrar cenários onde o gerenciamento inadequado de pools de threads pode levar a problemas de desempenho e até mesmo a um OutOfMemoryError (OOM). Um caso típico ocorre quando o mesmo pool de threads é utilizado tanto para enviar requisições quanto para processar as respostas. Isso pode resultar em um acúmulo de tarefas pendentes que consomem toda a memória disponível, causando um OOM.

Esse artigo descreve o problema que eu passei a semana investigando e como podemos atacá-lo! Vamos lá 👊!

Simulando o problema

Vamos considerar um exemplo onde um FixedThreadPool é utilizado para enviar milhões de requisições a um “fake” cliente gRPC e, simultaneamente, processar as respostas dessas requisições. O código abaixo demonstra como esse cenário pode levar a um OOM:

public class ThreadPoolsOOMExample {
    private static final int THREAD_POOL_SIZE = 10;
    private static final int TOTAL_TASKS = 1_000_000;

    private final ExecutorService mainExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);

    public static void main(String[] args) {
        ThreadPoolsOOMExample example = new ThreadPoolsOOMExample();
        example.run();
    }

    public void run() {
        for (int i = 0; i < TOTAL_TASKS; i++) {
            mainExecutor.submit(this::submitRequest);
        }
    }

    private void submitRequest() {
        // Simula o envio de uma requisição para o cliente gRPC
        CompletableFuture<Response> future = asyncGrpcCall();

        // Processa a resposta usando o mesmo executor
        future.thenApplyAsync(this::processResponse, mainExecutor);
    }

    private CompletableFuture<Response> asyncGrpcCall() {
        // Simula uma chamada gRPC assíncrona
        CompletableFuture<Response> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(100); // Simula o atraso da rede
                future.complete(new Response(512));
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        }).start();
        int queueSize = ((ThreadPoolExecutor) mainExecutor).getQueue().size();
        System.out.println("Current queue size: " + queueSize);
        printHeapSize();
        return future;
    }

    private Response processResponse(Response response) {
        // Processa a resposta do cliente gRPC
        System.out.println("Processando resposta");
        return response;
    }

    private void printHeapSize() {
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        long maxMemory = runtime.maxMemory();

        System.out.println("Heap size (total): " + totalMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (used): " + usedMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (max): " + maxMemory / (1024 * 1024) + " MB");
    }

    public static class Response {
        private byte[] data;

        public Response(int sizeInKB) {
            this.data = new byte[sizeInKB * 1024]; // 1 KB = 1024 bytes
        }

        public byte[] getData() {
            return data;
        }
    }
}

Neste exemplo, o mainExecutor é utilizado tanto para enviar requisições quanto para processar as respostas. Como o número de tarefas é muito grande (TOTAL_TASKS = 1000000), as respostas ficam acumuladas no final da fila do mainExecutor, esperando que todas as requisições sejam enviadas primeiro. Isso leva a um acúmulo de respostas pendentes, consumindo muita memória e eventualmente causando um OutOfMemoryError.

Possíveis soluções

1. Use Pools de Threads Separados

Uma solução eficaz é utilizar pools de threads separados para a produção de requisições e o processamento de respostas. Isso garante que as respostas possam ser processadas independentemente do envio de novas requisições, evitando o acúmulo de tarefas na fila.

package com.hugodesmarques.threads;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class SeparateThreadPoolsExample {
    private static final int PRODUCER_THREAD_POOL_SIZE = 10;
    private static final int CONSUMER_THREAD_POOL_SIZE = 10;
    private static final int TOTAL_TASKS = 1_000_000;

    private final ExecutorService producerThreadPool = Executors.newFixedThreadPool(PRODUCER_THREAD_POOL_SIZE);
    private final ExecutorService consumerThreadPool = Executors.newFixedThreadPool(CONSUMER_THREAD_POOL_SIZE);

    public static void main(String[] args) {
        SeparateThreadPoolsExample example = new SeparateThreadPoolsExample();
        example.run();
    }

    public void run() {
        for (int i = 0; i < TOTAL_TASKS; i++) {
            producerThreadPool.submit(this::submitRequest);
            int producerQueueSize = ((ThreadPoolExecutor) producerThreadPool).getQueue().size();
            System.out.println("Producer queue size: " + producerQueueSize);
        }
    }

    private void submitRequest() {
        // Simula o envio de uma requisição para o cliente gRPC
        CompletableFuture<Response> future = asyncGrpcCall();

        // Processa a resposta usando o mesmo executor
        future.thenApplyAsync(this::processResponse, consumerThreadPool);
    }

    private CompletableFuture<Response> asyncGrpcCall() {
        // Simula uma chamada gRPC assíncrona
        CompletableFuture<Response> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(100); // Simula o atraso da rede
                future.complete(new Response(512));
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        }).start();
        int consumerQueueSize = ((ThreadPoolExecutor) consumerThreadPool).getQueue().size();
        System.out.println("Consumer queue size: " + consumerQueueSize);
        printHeapSize();
        return future;
    }

    private Response processResponse(Response response) {
        // Processa a resposta do cliente gRPC
        System.out.println("Processando resposta");
        return response;
    }

    private void printHeapSize() {
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        long maxMemory = runtime.maxMemory();

        System.out.println("Heap size (total): " + totalMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (used): " + usedMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (max): " + maxMemory / (1024 * 1024) + " MB");
    }

    public static class Response {
        private byte[] data;

        public Response(int sizeInKB) {
            this.data = new byte[sizeInKB * 1024]; // 1 KB = 1024 bytes
        }

        public byte[] getData() {
            return data;
        }
    }
}

2. Use um Executor Com Capacidade Limitada

Outra abordagem é limitar a capacidade do ThreadPoolExecutor para evitar que ele aceite mais tarefas do que pode processar. Isso pode ser feito usando um BlockingQueue com capacidade limitada e uma política de rejeição adequada.

package com.hugodesmarques.threads;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolWithLimitsExample {

    private static final int TOTAL_TASKS = 1_000_000;
    private static final int THREAD_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;

    private final ExecutorService mainExecutor = new ThreadPoolExecutor(
            THREAD_POOL_SIZE,
            THREAD_POOL_SIZE,
            0L,
            TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(QUEUE_CAPACITY),
            new NamedThreadFactory("Producer"), // ThreadFactory com expressão lambda)
            new RejectionLoggingPolicy(new ThreadPoolExecutor.CallerRunsPolicy()) // Política de rejeição com log
    );

    public static void main(String[] args) {
        ThreadPoolWithLimitsExample example = new ThreadPoolWithLimitsExample();
        example.run();
    }

    public void run() {
        for (int i = 0; i < TOTAL_TASKS; i++) {
            mainExecutor.submit(this::submitRequest);
        }
    }

    private void submitRequest() {
        // Simula o envio de uma requisição para o cliente gRPC
        CompletableFuture<Response> future = asyncGrpcCall();

        // Processa a resposta usando o mesmo executor
        future.thenApplyAsync(this::processResponse, mainExecutor);
    }

    private CompletableFuture<Response> asyncGrpcCall() {
        // Simula uma chamada gRPC assíncrona
        CompletableFuture<Response> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(100); // Simula o atraso da rede
                future.complete(new Response(512));
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        }).start();
        int queueSize = ((ThreadPoolExecutor) mainExecutor).getQueue().size();
        System.out.println("Current queue size: " + queueSize);
        printHeapSize();
        return future;
    }

    private Response processResponse(Response response) {
        // Processa a resposta do cliente gRPC
        System.out.println("Processando resposta... thread: " + Thread.currentThread().getName());
        return response;
    }

    private void printHeapSize() {
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        long maxMemory = runtime.maxMemory();

        System.out.println("Heap size (total): " + totalMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (used): " + usedMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (max): " + maxMemory / (1024 * 1024) + " MB");
    }

    // Política de rejeição personalizada que registra um log quando uma tarefa é rejeitada
    static class RejectionLoggingPolicy implements RejectedExecutionHandler {
        private final RejectedExecutionHandler handler;

        public RejectionLoggingPolicy(RejectedExecutionHandler handler) {
            this.handler = handler;
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("Tarefa rejeitada: " + r.toString() + " thread: " + Thread.currentThread().getName());
            handler.rejectedExecution(r, executor);
        }
    }

    // ThreadFactory personalizado para nomear threads
    static class NamedThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        public NamedThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, namePrefix + "-thread-" + threadNumber.getAndIncrement());
        }
    }

    public static class Response {
        private byte[] data;

        public Response(int sizeInKB) {
            this.data = new byte[sizeInKB * 1024]; // 1 KB = 1024 bytes
        }

        public byte[] getData() {
            return data;
        }
    }
}

3. Controle o fluxo de produção com o uso de semáforos

Implementar controle de fluxo é uma abordagem eficaz para garantir que a produção de novas requisições não ultrapasse a capacidade do sistema de processar respostas. Isso pode ser feito usando semáforos (Semaphore) ou outras técnicas de controle de fluxo. A ideia é limitar o número de requisições simultâneas para evitar sobrecarregar o sistema.

package com.hugodesmarques.threads;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class ThreadPoolsWithSemaphores {
    private static final int THREAD_POOL_SIZE = 10;
    private static final int TOTAL_TASKS = 1_000_000;
    private static final int MAX_CONCURRENT_REQUESTS = 100;
    private final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS);

    private final ExecutorService mainExecutor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);

    public static void main(String[] args) {
        ThreadPoolsWithSemaphores example = new ThreadPoolsWithSemaphores();
        example.run();
    }

    public void run() {
        for (int i = 0; i < TOTAL_TASKS; i++) {
            try {
                System.out.println("Submetendo request: " + i);
                semaphore.acquire();
                mainExecutor.submit(this::submitRequest);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void submitRequest() {
        try {
            // Simula o envio de uma requisição para o cliente gRPC
            CompletableFuture<Response> future = asyncGrpcCall();

            // Processa a resposta usando o mesmo executor
            future.thenApplyAsync(this::processResponse, mainExecutor)
                    .whenComplete((result, ex) -> semaphore.release());
        } catch (Exception e) {
            semaphore.release();
        }
    }

    private CompletableFuture<Response> asyncGrpcCall() {
        // Simula uma chamada gRPC assíncrona
        CompletableFuture<Response> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(100); // Simula o atraso da rede
                future.complete(new Response(512));
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        }).start();
        printHeapSize();
        return future;
    }

    private Response processResponse(Response response) {
        // Processa a resposta do cliente gRPC
        System.out.println("Processando resposta");
        return response;
    }

    private void printHeapSize() {
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        long maxMemory = runtime.maxMemory();

        System.out.println("Heap size (total): " + totalMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (used): " + usedMemory / (1024 * 1024) + " MB");
        System.out.println("Heap size (max): " + maxMemory / (1024 * 1024) + " MB");
    }

    public static class Response {
        private byte[] data;

        public Response(int sizeInKB) {
            this.data = new byte[sizeInKB * 1024]; // 1 KB = 1024 bytes
        }

        public byte[] getData() {
            return data;
        }
    }
}

Conclusão

O gerenciamento adequado de pools de threads em sistemas assíncronos é crucial para evitar problemas de desempenho e OutOfMemoryError (OOM). Utilizar pools de threads separados, limitar a capacidade do ThreadPoolExecutor e implementar controle de fluxo são abordagens eficazes para garantir que a produção de novas requisições não ultrapasse a capacidade do sistema de processar respostas.

Ao aplicar essas soluções, você pode garantir que seu sistema permaneça eficiente e responsivo, mesmo sob alta carga, evitando o acúmulo excessivo de tarefas na fila e prevenindo OOM.

Todos os exemplos deste post estão disponíveis no meu repositório: GitHub

Fica a dica, pra você não passar pela mesma dor de cabeça que eu passei 😅.

Updated: