Uso de CompletableFuture en Java

CompletableFuture en Java

CompletableFuture en Java


En este nuevo artículo vamos a ver el uso de CompletableFuture en Java, funcionalidad que fue introducida en Java 8 como mejoras para el tratamiento de la concurrencia.

¿Qué es CompletableFuture?

La clase Clase CompletableFuture tiene como objetivo ser usada para programación asíncrona. La programación asíncrona en Java nos va a permitir ejecutar un proceso o tarea paralela en un Thread diferente al principal, y la clase CompletableFuture nos va a permitir ver la resolución de la ejecución de ese Thread informándonos si la ejecución ha tenido éxito o ha fallado.

CompletableFuture es una mejora del Api de Futuros de Java (Future), la cual fue introducida en Java 5, pero no tuvieron en cuenta maneras de combinar diferentes llamadas o gestionar errores. Es por ese motivo que se creo CompletableFuture en Java 8.

CompletableFuture implementa la interfaz CompletionStage, y tiene múltiples métodos para combinar, ejecutar y componer llamadas.

Uso de CompletableFuture como Future

Como hemos comentado, CompletableFuture hace uso de la interfaz de Future, por lo que podemos hacer uso de CompletableFuture como si fuera Future. Esto es de gran utilidad para evitar tener que realizar migraciones de código.

Si tenemos un método que devuelve ya un Future podemos encapsular la respuesta del método, y únicamente si queremos tocar el interior:

  public Future<String> sayHi(){
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Future<Boolean> future = Executors.newCachedThreadPool().submit(() -> {
      Thread.sleep(2000); //schhhhhh
      return completableFuture.complete("Say Hi");
    });

    return completableFuture;
  }

Este método parte de un CompletableFuture para devolver un Future, para ello hacemos uso de complete. Hay que tener en cuenta que al hacer uso de complete se devuelve un valor boolean.

Hacemos uso de un ExecutorService para ejecutar el completableFuture y poder añadir un sleep.

Crear un CompletableFuture

Vamos a ver las diferentes maneras de crear un CompletableFuture (en el ejemplo anterior ya hemos visto la básica).

Crear un CompletableFuture básico

La manera más fácil y básica de crear un CompletableFuture es la siguiente:

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

Esta es la manera más sencilla de crear un CompletableFuture y para obtener su resultado haríamos uso de .get().

String result = completableFuture.get()

Con .get() esperamos hasta que el futuro este resuelto, esto nos puede provocar un bloqueo, por lo que si queremos completar un Futuro a mano podemos hacer:

completableFuture.complete("finish")

Hay que tener en cuenta que al hacer uso de get o complete, hacemos un bloqueo de la llamada.

Crear CompletableFuture con runAsync

runAsync es uno de lo métodos de CompletableFuture que nos va a permitir hacer uso de procesamiento asíncrono. El método runAsync va a ejecutar una tarea sin que esperemos una respuesta. Haría algo parecido a lo que hace Spring con @Async.

Vamos a ver un ejemplo de CompletableFuture con runAsync:

CompletableFuture<Void> myFuture = CompletableFuture.runAsync(() -> {
 
    userRepository.save(user);
    
   log.debug("THREAD WORKER {} this is a different thread", Thread.currentThread().getName());
});

Crear CompletableFuture con supplyAsync()

Cuando ejecutamos un proceso asíncrono pero queremos que el CompletableFuture nos devuelva el resultado, debemos hacer uso de supplyAsync(). Es decir, vamos a querer ejecutar una tarea o proceso de manera asíncrona, y cuando haya finalizado queremos el resultado.

SupplyAsync(), devuelve un completableFuture<T> partiendo de un supplier<T>. Vamos a ver un ejemplo con Lambdas de Java, ten en cuenta que todo lo realizado con Lambdas se puede hacer también sin ello:

CompletableFuture<User> myFuture = CompletableFuture.supplyAsync(() -> {

    log.debug("THREAD WORKER {} this is a different thread", Thread.currentThread().getName());

    return userRepository.save(user);
});

Cuando hacemos uso de completableFuture, no es necesario que realicemos ningún tratamiento de hilos, es decir, que la gestión de los hilos se puede gestionar de manera programática o delegar. En el caso en el que queramos realizar una mayor gestión sobre los Threads nos podemos crear un Thread Pool y utilizarlos de ese Pool, ya que tanto runAsync como supplyAsync aceptan un executor:

static CompletableFuture<Void>	runAsync(Runnable runnable)
static CompletableFuture<Void>	runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U>	supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U>	supplyAsync(Supplier<U> supplier, Executor executor)

Vamos a ver un ejemplo en el que le pasemos un Executor:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<User> myFuture = CompletableFuture.supplyAsync(() -> {

    log.debug("THREAD WORKER {} this is a different thread", Thread.currentThread().getName());

    return userRepository.save(user);
}, executor);

En el caso en el que se use el Pool por defecto será usado el ForkJoinPool.

Obtener resultado de CompletableFuture

Obtener CompletableFuture con get()

Cuando trabajamos con CompletableFuture lo ideal es que la llamada sea asíncrona y no tener que esperar por la respuesta, pero para el caso en el que necesitemos la respuesta para un procesamiento o un tratamiento posterior podemos hacer uso de .get(). Por ejemplo:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<User> myFuture = CompletableFuture.supplyAsync(() -> {

    log.debug("THREAD WORKER {} this is a different thread", Thread.currentThread().getName());

    return userRepository.save(user);
}, executor);

User user = myFuture.get();

Con el get anterior lo que hemos hecho ha sido un bloqueo hasta obtener el resultado de User.

Para evitar estos bloqueos, podemos hacer uso de otros métodos como thenApply(), thenAccept() y thenRun().

Obtener resultado de CompletableFuture con thenApply()

Al aplicar thenApply() en un CompletableFuture lo que vamos a hacer es obtener el resultado cuando el proceso ha finalizado. ThenApply() es una interfaz funcional que acepta un argumento y produce un resultado. Vamos a ver un ejemplo:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<User> myFuture = CompletableFuture.supplyAsync(() -> {

    log.debug("THREAD WORKER {} this is a different thread", Thread.currentThread().getName());

    return userRepository.save(user);
}, executor)
.thenApply(user -> {
   return "name " + user.getName();
});

User user = myFuture.get();

Con el .get() anterior bloqueamos pero como ya ha llegado el futuro no tenemos que esperar.

Uso de thenApplyAsync()

Con el ejemplo anterior, cuando hemos ejecutado la interfaz funcional thenApply() se ha ejecutado en el mismo Thread que la ejecución de supplyAsync(). Pero si queremos que nuestro método thenApply() se ejecute en otro Thread aparte podríamos hacer uso de thenApplyAsync(), vamos a ver un ejemplo:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<User> myFuture = CompletableFuture.supplyAsync(() -> {

    log.debug("THREAD WORKER {} this is a different thread", Thread.currentThread().getName());

    return userRepository.save(user);
})
.thenApplyAsync(user -> {
    log.debug("THREAD WORKER 2 {} this is a different thread", Thread.currentThread().getName());

   return "name " + user.getName();

},executor);

Resultado de CompletableFuture con thenAccept()

El uso de thenAccept() nos va a permitir ejecutar código una vez se ha finalizado el futuro y obtener el valor, para ello haremos uso de thenAccept() y si quieres completar el futuro se hace uso de thenRun().

Vamos a ver un ejemplo:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> myFuture = CompletableFuture.supplyAsync(() -> {

    log.debug("THREAD WORKER {} this is a different thread", Thread.currentThread().getName());

    return userRepository.save(user);
}, executor)
.thenAccept(user -> {
   System.out.println("then accept");
});

El código anterior al hacer uso de thenAccept obtenemos un CompletableFuture con el valor de User.

La diferencia con thenApply es que thenApply devuelve un objeto y thenAccept devuelve Void.

Finalmente, si no queremos procesar ni obtener valor al final, podemos usar thenRun(), la cual es una función lambda.

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<User> myFuture = CompletableFuture.supplyAsync(() -> {

    log.debug("THREAD WORKER {} this is a different thread", Thread.currentThread().getName());

    return userRepository.save(user);
}, executor)
.thenRun(() -> System.out.println("Computation finished."));

Y a continuación podríamos hacer uso de .get() para obtener el valor.

Combinar CompletableFuture

Combinar CompletableFuture con thenCompose()

Cuando queramos ejecutar dos futuros que dependen el uno del otro, por ejemplo, necesitamos obtener la información de un usuario y en función de ese usuario obtener sus movimientos bancarios. Vamos a ver un ejemplo:

CompletableFuture<User> getUserDetail(String userId) {
	return CompletableFuture.supplyAsync(() -> {
		return UserService.getDetails(userId);
	});	
}

CompletableFuture<List<Transaction>> getTransactions(User user) {
	return CompletableFuture.supplyAsync(() -> {
		return TransactionService.getTransactions(user);
	});
}

En el caso anterior el segundo futuro depende del primero, por lo que si queremos invocar ambos y componer la llamada podemos hacer uso de thenCompose():

CompletableFuture<List<Transaction>> result = getUserDetail(userId)
.thenCompose(user -> getTransactions(user));

Aunque podemos hacer uso de thenApply para sustituir thenCompose() para el ejemplo anterior, podríamos decir que lo más apropiado es thenCompose, además usaremos thenCompose() cuando podamos pasar como parámetro el estado anterior de las llamadas.

Uso de thenCombine() para combinar futuros independientes

Con el uso de thenCombine() podemos ejecutar dos completableFutures de manera independiente y luego combinarlos para realizar algún proceso u obtener algún resultado.

thenCombine es una función que acepta dos argumentos de tipo CompletableFuture.

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Nice to")
    .thenCombine(CompletableFuture.supplyAsync(
      () -> " meet you"), (k1, k2) -> k1 + k2));

La ejecución anterior cuando hagamos .get() devolverá «Nice to meet you», se ha realizado una unión de los dos futuros pasados por parámetro.

Ejecución de Futuros en paralelo con anyOf() y allOf()

Para aquellos casos en los que necesitamos ejecutar mútiples futuros en paralelo, por lo general vamos a esperar por la ejecución de todos y a continuación procesar y/o combinar sus resultados.

Para los casos en las que necesitamos ejecutar futuros en paralelo y procesar el resultado de todos podemos hacer uso de allOf() y para aquellos casos en los que únicamente esperamos por cualquiera haremos uso de anyOf(). Vamos a ver usos de allOf() y anyOf() para entenderlo mejor:

CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Nice");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "to");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "meet");
CompletableFuture<String> future4  
  = CompletableFuture.supplyAsync(() -> "you");

CompletableFuture<Void> allOfFutures 
  = CompletableFuture.allOf(future1, future2, future3, future4);

En el ejemplo anterior hemos combinado todos los futuros una vez completados, pero allOf() nos devuelve void, por lo que si queremos obtener el valor de ellos podemos o ejecutar uno a uno, o hacer uso de join, con lo que obtenemos el valor, por ejemplo:

String allOfFutures = Stream.of(future1, future2, future3, future4)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

El ejemplo anterior nos devolverá Nice to meet you. Al hacer uso de CompletableFuture si algún futuro no termina bien se lanzará una excepción.

Si por el contrario no queremos esperar a que todos los futuros ejecutados terminen, podemos hacer uso de anyOf(), con lo que el primer futuro que termine será el que se devuelva. Por ejemplo:

CompletableFuture<String> future1  
  = CompletableFuture.supplyAsync(() -> "Nice");
CompletableFuture<String> future2  
  = CompletableFuture.supplyAsync(() -> "to");
CompletableFuture<String> future3  
  = CompletableFuture.supplyAsync(() -> "meet");
CompletableFuture<String> future4  
  = CompletableFuture.supplyAsync(() -> "you");

CompletableFuture<Object> anyFuture 
  = CompletableFuture.anyOf(future1, future2, future3, future4);

System.out.println(anyFuture.get()); // Result of future2

El código anterior se mostrará el que ha tardado menos en ejecutarse, en las pruebas fue el future2.

Gestión de Excepciones en CompletableFuture

Cuando estamos trabajando con futuros es necesario realizar tratamiento de excepciones para evitar propagar errores no controlados o no deseados.

Cuando queremos capturar y tratar errores con CompletableFutures vamos a hacer uso de handle(). Este método recibe dos parámetros, el resultado de la ejecución si termina con éxito y la excepción a lanzar si hay algún error en algún paso del futuro.

Vamos a ver el uso de handle() con un ejemplo:

User user = null;

CompletableFuture<User> completableFuture  
  =  CompletableFuture.supplyAsync(() -> {
      if (user == null) {
          throw new RuntimeException("User cannot be null");
      }
      return user;
  }).handle((s, t) -> s != null ? s : new User());

assertEquals(new User(), completableFuture.get());

En el ejemplo anterior hemos tratado la excepción que se produce devolviendo un new User(). Si en cambio queremos realizar un tratamiento lo más parecido a la programación síncrona, podemos hacer uso de completeExceptionally().

Con completeExceptionally() podremos devolver la excepción que se ha producido, si y solo sí todavía la ejecución no ha terminado.

En el ejemplo siguiente, si el future.get() se ejecuta antes de terminar el CompletableFuture se devolverá una RuntimeExcepcion, en cambio si ha terminado se ejecutará y saldrá por pantalla sayHi.

CompletableFuture<String> future = CompletableFuture.completedFuture("say Hi");

future.completeExceptionally(new RuntimeException("nooooooo"));
System.out.println(future.get());

En cambio si queremos lanzar la excepción independientemente del estado podemos hacer uso de obtrudeException, con el siguiente código siempre se ejecutaría la excepción:

CompletableFuture<String> future = CompletableFuture.completedFuture("say Hi");

future.completeExceptionally(new RuntimeException("nooooooo"));
System.out.println(future.get());

Conclusión

En esta entrada sobre Uso de CompletableFuture en Java, hemos profundizado sobre el uso de futuros en Java y como nos pueden ayudar en procesos paralelos y asíncronos. Un aspecto a tener en cuenta cuando hagamos uso de futuros será la creación o no de un Executor propio para pasarlo como parámetro al CompletableFuture.

Si necesitas más información puedes escribirnos un comentario o un correo electrónico a refactorizando.web@gmail.com o también nos puedes contactar por nuestras redes sociales Facebook o twitter y te ayudaremos encantados!


Deja una respuesta

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *