Skip to content

Java 8 CompletableFuture

Java 8 CompletableFuture

參考資料:https://medium.com/javarevisited/java-completablefuture-c47ca8c885af

建構方法

方法
runAsync(Runnable runnable)
runAsync(Runnable runnable, Executor executor)
supplyAsync(Supplier supplier)
supplyAsync(Supplier supplier, Executor executor)

runAsync(): 執行非同步工作,沒有回傳值。 supplyAsync() 執行非同步工作,有一回傳值。 executor: 自定義Thread Pool。

// Example using runAsync()
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
    // Perform some asynchronous task
    System.out.println("Task executed asynchronously");
});

// Example using supplyAsync()
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    // Perform some asynchronous computation
    return 42;
});

Blocking (阻斷) and Non-blocking (非阻塞)

阻斷

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> "hello!")
        .thenApplyAsync(s -> s.length());
try {
    Integer length = future1.get(); // 會等到 Fucure 物件處理完畢了,並且取到結果值了,才會往下執行。故,主執行緒被阻斷在這裡了。
    System.out.println("length:" + length);
} catch (InterruptedException | ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
} 
System.out.println("end");

非阻斷

CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "hello!")
        .thenApplyAsync(s -> s.length())
        .thenAccept(length -> System.out.println("length: " + length));

// Non Blocking Code 

future2.join(); // Wait for the future to complete

// 等到 future2 執行完畢後,才會繼續執行以下。
System.out.println("end");

Exception Handling

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    int result = 10 / 0; // Causes an ArithmeticException
    return result;
});

future.exceptionally(ex -> {
    System.out.println("Exception occurred: " + ex.getMessage());
    return 0; // Default value to return if there's an exception
}).thenAccept(result -> {
    System.out.println("Result: " + result);
});

---
Exception occurred: java.lang.ArithmeticException: / by zero
Result: 0

Composible

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Integer> transformedFuture = future.thenApplyAsync(s -> {
    System.out.println("Thread: " + Thread.currentThread().getName());
    return s.length();
});

transformedFuture.thenAccept(length -> {
    System.out.println("Thread: " + Thread.currentThread().getName());
    System.out.println("Length of Hello: " + length);
});
  • xxxx(function): function會用前個執行的thread去呼叫。
  • xxxxAsync(function): function會用非同步的方式呼叫,並用預設的thread pool。
  • xxxxAsync(function, executor): function會用非同步的方式呼叫,並用指定的thread pool。
方法 描述
thenRun() 同一執行緒,執行回傳 void 的任務
thenRunAsync() 不同執行緒,執行回傳 void 的任務
thenApply() 同一執行緒,執行回傳物件的任務
thenApplyAsync() 不同執行緒,執行回傳物件的任務
CompletableFuture.supplyAsync(() -> "hello").thenApplyAsync(s -> s.toUpperCase())
        .thenApplyAsync(s -> s + ", world!").thenApplyAsync(s -> s.toUpperCase())
        .whenComplete((r, ex) -> System.out.println(r)).join();

CompletableFuture.runAsync(() -> sleep(1000)).thenRunAsync(() -> sleep(100)).thenRunAsync(() -> sleep(100))
        .whenComplete((r, ex) -> System.out.println(r)).join();

Compose 兩種比較:thenApplyAsync()、thenCompose()

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 5).thenApplyAsync(r -> r * 2);
future.thenAccept(s -> System.out.println("Final result: " + s));


CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> future2 = future1.thenCompose(result -> {
    int doubledResult = result * 2;
    return CompletableFuture.supplyAsync(() -> doubledResult);
});
future2.thenAccept(result -> System.out.println("Final result: " + result));

Demo

需求:

有一個計數器會統計目前的數字。由於有不同執行緒會按下計數器,所以需要將計數器物件進行鎖定,讓該計數器增加一下的動作,僅能有一個執行緒進行操作,以避免計數器數字不正確。

Count:
public class Count {
 int cnt = 0;

 public void addctn() {
  synchronized (this) {
   try {
    Thread.sleep(100);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   cnt++;
  }
 }

 public int getctn() {
  synchronized (this) {
   return cnt;
  }
 }
}
模擬 20 個執行緒,共同對計數器進行操作。並且使用固定的Thread Pool進行管理。
 public static void main(String[] args) throws InterruptedException {

  Count count = new Count();

  ExecutorService executor = Executors.newFixedThreadPool(5);

  for (int i = 0; i < 20; i++) {
   CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    String threadName = Thread.currentThread().getName();
    System.out.println("run " + threadName + " thread");
    count.addctn();
   }, executor);
   future.join();
  }

  executor.shutdown();

  System.out.println("計數器:" + count.getctn());
 }
---
run pool-1-thread-1 thread
run pool-1-thread-2 thread
run pool-1-thread-3 thread
run pool-1-thread-4 thread
run pool-1-thread-5 thread
run pool-1-thread-1 thread
run pool-1-thread-2 thread
run pool-1-thread-3 thread
run pool-1-thread-4 thread
run pool-1-thread-5 thread
run pool-1-thread-1 thread
run pool-1-thread-2 thread
run pool-1-thread-3 thread
run pool-1-thread-4 thread
run pool-1-thread-5 thread
run pool-1-thread-1 thread
run pool-1-thread-2 thread
run pool-1-thread-3 thread
run pool-1-thread-4 thread
run pool-1-thread-5 thread
計數器:20