CompletableFuture in loop: How to collect all responses and handle errors

Multi tool use
Multi tool use


CompletableFuture in loop: How to collect all responses and handle errors



I am trying to call a rest api for PUT request in a loop. Each call is a CompletableFuture. Each api call returns an object of type RoomTypes.RoomType


PUT


CompletableFuture


RoomTypes.RoomType



I want to collect the responses (both successful and error
responses) in different lists. How do I achieve that? I am sure I
cannot use allOf because it would not get all the results if any
one call fails to update.


allOf



How do I log errors/exception for each call?





public void sendRequestsAsync(Map<Integer, List> map1) {
List<CompletableFuture<Void>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
List<RoomTypes.RoomType> responses = new ArrayList<>(); //List for responses
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

for (Map.Entry<Integer, List> entry :map1.entrySet()) {
CompletableFuture requestCompletableFuture = CompletableFuture
.supplyAsync(
() ->
//API call which returns object of type RoomTypes.RoomType
updateService.updateRoom(51,33,759,entry.getKey(),
new RoomTypes.RoomType(entry.getKey(),map2.get(entry.getKey()),
entry.getValue())),
yourOwnExecutor
)//Supply the task you wanna run, in your case http request
.thenApply(responses::add);

completableFutures.add(requestCompletableFuture);
}





First thing, don't use thenApply(responses::add) on a non-thread-safe collection like ArrayList, as it will likely break the collection structure. Additionally, allOf actually waits for all successes/failures, but the documentation may not be very explicit on this point (I actually tested it myself).
– Didier L
Jul 2 at 15:58



thenApply(responses::add)


ArrayList


allOf





@DidierL What do you mean by "wait"? I just tried it and I can see that the completable future returned by allOf calls the next stage method (e.g. handle) as soon as one of the completable futures in the collection produces an exception.
– Edwin Dalorzo
Jul 2 at 16:22


allOf


handle





@EdwinDalorzo In my test it's not the case: it waits until the last future is completed. Maybe it depends on what you do but that would be surprising.
– Didier L
Jul 2 at 16:31





@DidierL When you say "wait", do you mean the next stage method of the computable future is not invoked until all futures are resolved either successfully or not?
– Edwin Dalorzo
Jul 2 at 17:38






@EdwinDalorzo Indeed, yes: the allOf stage gets only completed after all futures are completed, even if some failed firs – AFAICT.
– Didier L
Jul 2 at 18:11


allOf




2 Answers
2



Alternatively, perhaps you can approach the problem from a different perspective and instead of forcing the use of CompletableFuture, you can use a CompletionService instead.


CompletableFuture



The whole idea of the CompletionService is that as soon as an answer for a given future is ready, it gets placed in a queue from which you can consume results.


CompletionService



Alternative 1: Without CompletableFuture


CompletionService<String> cs = new ExecutorCompletionService<>(executor);

List<Future<String>> futures = new ArrayList<>();

futures.add(cs.submit(() -> "One"));
futures.add(cs.submit(() -> "Two"));
futures.add(cs.submit(() -> "Three"));
futures.add(cs.submit(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));


List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();

while (futures.size() > 0) {
Future<String> f = cs.poll();
if (f != null) {
futures.remove(f);
try {
//at this point the future is guaranteed to be solved
//so there won't be any blocking here
String value = f.get();
successes.add(value);
} catch (Exception e) {
failures.add(e.getMessage());
}
}
}

System.out.println(successes);
System.out.println(failures);



Which yields:


[One, Two, Three, Five]
[java.lang.RuntimeException: Sucks to be four]



Alternative 2: With CompletableFuture



However, if you really, really need to deal with CompletableFuture you can submit those to the completion service as well, just by placing them directly into its queue:


CompletableFuture



For example, the following variation has the same result:


BlockingQueue<Future<String>> tasks = new ArrayBlockingQueue<>(5);
CompletionService<String> cs = new ExecutorCompletionService<>(executor, tasks);

List<Future<String>> futures = new ArrayList<>();

futures.add(CompletableFuture.supplyAsync(() -> "One"));
futures.add(CompletableFuture.supplyAsync(() -> "Two"));
futures.add(CompletableFuture.supplyAsync(() -> "Three"));
futures.add(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));

//places all futures in completion service queue
tasks.addAll(futures);

List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();

while (futures.size() > 0) {
Future<String> f = cs.poll();
if (f != null) {
futures.remove(f);
try {
//at this point the future is guaranteed to be solved
//so there won't be any blocking here
String value = f.get();
successes.add(value);
} catch (Exception e) {
failures.add(e.getMessage());
}
}
}



You can simply use allOf() to get a future that is completed when all your initial futures are completed (exceptionally or not), and then split them between succeeded and failed using Collectors.partitioningBy():


allOf()


Collectors.partitioningBy()


List<CompletableFuture<RoomTypes.RoomType>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

for (Map.Entry<Integer, List> entry : map1.entrySet()) {
CompletableFuture<RoomTypes.RoomType> requestCompletableFuture = CompletableFuture
.supplyAsync(
() ->
//API call which returns object of type RoomTypes.RoomType
updateService.updateRoom(51, 33, 759, entry.getKey(),
new RoomTypes.RoomType(entry.getKey(), map2.get(entry.getKey()),
entry.getValue())),
yourOwnExecutor
);

completableFutures.add(requestCompletableFuture);
}

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
// avoid throwing an exception in the join() call
.exceptionally(ex -> null)
.join();
Map<Boolean, List<CompletableFuture<RoomTypes.RoomType>>> result =
completableFutures.stream()
.collect(Collectors.partitioningBy(CompletableFuture::isCompletedExceptionally)));



The resulting map will contain one entry with true for the failed futures, and another entry with false key for the succeeded ones. You can then inspect the 2 entries to act accordingly.


true


false



Note that there are 2 slight changes compared to your original code:


requestCompletableFuture


CompletableFuture<RoomTypes.RoomType>


thenApply(responses::add)


responses



Concerning logging/exception handling, just add the relevant requestCompletableFuture.handle() to log them individually, but keep the requestCompletableFuture and not the one resulting from handle().


requestCompletableFuture.handle()


requestCompletableFuture


handle()





I get Non-static method cant be accessed through static context at CompletableFuture::isCompletedExceptionally . But my method is not static.
– Rudrani Angira
Jul 2 at 17:49



Non-static method cant be accessed through static context


CompletableFuture::isCompletedExceptionally





I have seen it when they type of result does not match exactly what collect returns. Try removing the assignment to result and use your IDE to extract a local variable from the whole expression again.
– Didier L
Jul 2 at 18:14


result


collect


result






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

s SGzyPfk
tXVHdQ0N6qlFhatEx,MWFUBAG2xKO9fFfjsVrE0RA SHhn2sUqDy5HT Wb

Popular posts from this blog

PHP contact form sending but not receiving emails

Do graphics cards have individual ID by which single devices can be distinguished?

Create weekly swift ios local notifications