CompletableFuture in loop: How to collect all responses and handle errors

Multi tool use
CompletableFuture in loop: How to collect all responses and handle errors
I am trying to call a rest api for PUT
request in a loop. Each call is a CompletableFuture
. Each api call returns an object of type RoomTypes.RoomType
PUT
CompletableFuture
RoomTypes.RoomType
I want to collect the responses (both successful and error
responses) in different lists. How do I achieve that? I am sure I
cannot use allOf
because it would not get all the results if any
one call fails to update.
allOf
How do I log errors/exception for each call?
public void sendRequestsAsync(Map<Integer, List> map1) {
List<CompletableFuture<Void>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
List<RoomTypes.RoomType> responses = new ArrayList<>(); //List for responses
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (Map.Entry<Integer, List> entry :map1.entrySet()) {
CompletableFuture requestCompletableFuture = CompletableFuture
.supplyAsync(
() ->
//API call which returns object of type RoomTypes.RoomType
updateService.updateRoom(51,33,759,entry.getKey(),
new RoomTypes.RoomType(entry.getKey(),map2.get(entry.getKey()),
entry.getValue())),
yourOwnExecutor
)//Supply the task you wanna run, in your case http request
.thenApply(responses::add);
completableFutures.add(requestCompletableFuture);
}
thenApply(responses::add)
ArrayList
allOf
@DidierL What do you mean by "wait"? I just tried it and I can see that the completable future returned by
allOf
calls the next stage method (e.g. handle
) as soon as one of the completable futures in the collection produces an exception.– Edwin Dalorzo
Jul 2 at 16:22
allOf
handle
@EdwinDalorzo In my test it's not the case: it waits until the last future is completed. Maybe it depends on what you do but that would be surprising.
– Didier L
Jul 2 at 16:31
@DidierL When you say "wait", do you mean the next stage method of the computable future is not invoked until all futures are resolved either successfully or not?
– Edwin Dalorzo
Jul 2 at 17:38
@EdwinDalorzo Indeed, yes: the
allOf
stage gets only completed after all futures are completed, even if some failed firs – AFAICT.– Didier L
Jul 2 at 18:11
allOf
2 Answers
2
Alternatively, perhaps you can approach the problem from a different perspective and instead of forcing the use of CompletableFuture
, you can use a CompletionService instead.
CompletableFuture
The whole idea of the CompletionService
is that as soon as an answer for a given future is ready, it gets placed in a queue from which you can consume results.
CompletionService
Alternative 1: Without CompletableFuture
CompletionService<String> cs = new ExecutorCompletionService<>(executor);
List<Future<String>> futures = new ArrayList<>();
futures.add(cs.submit(() -> "One"));
futures.add(cs.submit(() -> "Two"));
futures.add(cs.submit(() -> "Three"));
futures.add(cs.submit(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));
List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();
while (futures.size() > 0) {
Future<String> f = cs.poll();
if (f != null) {
futures.remove(f);
try {
//at this point the future is guaranteed to be solved
//so there won't be any blocking here
String value = f.get();
successes.add(value);
} catch (Exception e) {
failures.add(e.getMessage());
}
}
}
System.out.println(successes);
System.out.println(failures);
Which yields:
[One, Two, Three, Five]
[java.lang.RuntimeException: Sucks to be four]
Alternative 2: With CompletableFuture
However, if you really, really need to deal with CompletableFuture
you can submit those to the completion service as well, just by placing them directly into its queue:
CompletableFuture
For example, the following variation has the same result:
BlockingQueue<Future<String>> tasks = new ArrayBlockingQueue<>(5);
CompletionService<String> cs = new ExecutorCompletionService<>(executor, tasks);
List<Future<String>> futures = new ArrayList<>();
futures.add(CompletableFuture.supplyAsync(() -> "One"));
futures.add(CompletableFuture.supplyAsync(() -> "Two"));
futures.add(CompletableFuture.supplyAsync(() -> "Three"));
futures.add(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));
//places all futures in completion service queue
tasks.addAll(futures);
List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();
while (futures.size() > 0) {
Future<String> f = cs.poll();
if (f != null) {
futures.remove(f);
try {
//at this point the future is guaranteed to be solved
//so there won't be any blocking here
String value = f.get();
successes.add(value);
} catch (Exception e) {
failures.add(e.getMessage());
}
}
}
You can simply use allOf()
to get a future that is completed when all your initial futures are completed (exceptionally or not), and then split them between succeeded and failed using Collectors.partitioningBy()
:
allOf()
Collectors.partitioningBy()
List<CompletableFuture<RoomTypes.RoomType>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (Map.Entry<Integer, List> entry : map1.entrySet()) {
CompletableFuture<RoomTypes.RoomType> requestCompletableFuture = CompletableFuture
.supplyAsync(
() ->
//API call which returns object of type RoomTypes.RoomType
updateService.updateRoom(51, 33, 759, entry.getKey(),
new RoomTypes.RoomType(entry.getKey(), map2.get(entry.getKey()),
entry.getValue())),
yourOwnExecutor
);
completableFutures.add(requestCompletableFuture);
}
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
// avoid throwing an exception in the join() call
.exceptionally(ex -> null)
.join();
Map<Boolean, List<CompletableFuture<RoomTypes.RoomType>>> result =
completableFutures.stream()
.collect(Collectors.partitioningBy(CompletableFuture::isCompletedExceptionally)));
The resulting map will contain one entry with true
for the failed futures, and another entry with false
key for the succeeded ones. You can then inspect the 2 entries to act accordingly.
true
false
Note that there are 2 slight changes compared to your original code:
requestCompletableFuture
CompletableFuture<RoomTypes.RoomType>
thenApply(responses::add)
responses
Concerning logging/exception handling, just add the relevant requestCompletableFuture.handle()
to log them individually, but keep the requestCompletableFuture
and not the one resulting from handle()
.
requestCompletableFuture.handle()
requestCompletableFuture
handle()
I get
Non-static method cant be accessed through static context
at CompletableFuture::isCompletedExceptionally
. But my method is not static.– Rudrani Angira
Jul 2 at 17:49
Non-static method cant be accessed through static context
CompletableFuture::isCompletedExceptionally
I have seen it when they type of
result
does not match exactly what collect
returns. Try removing the assignment to result
and use your IDE to extract a local variable from the whole expression again.– Didier L
Jul 2 at 18:14
result
collect
result
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
First thing, don't use
thenApply(responses::add)
on a non-thread-safe collection likeArrayList
, as it will likely break the collection structure. Additionally,allOf
actually waits for all successes/failures, but the documentation may not be very explicit on this point (I actually tested it myself).– Didier L
Jul 2 at 15:58