Skip to content

Latest commit

ย 

History

History
714 lines (492 loc) ยท 28.4 KB

File metadata and controls

714 lines (492 loc) ยท 28.4 KB

๋ชฉ์ฐจ



CompletableFuture

์ด์ „ ๊ธ€ - ์ž๋ฐ” ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ์™€ Future์— ์ด์–ด์„œ ์ด๋ฒˆ ๊ธ€์€ ์ž๋ฐ” ๋น„๋™๊ธฐ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ์ •๋ฆฌํ•˜๋Š” ์‹œ๋ฆฌ์ฆˆ์˜ ๋‘๋ฒˆ์งธ ๊ธ€์ด๋‹ค.

์ด์ „ ๊ธ€์—์„œ๋„ ์•Œ์•„๋ณด์•˜๋“ฏ์ด, JDK 1.5์— ๋‚˜์˜จ Future์€ ํ•œ๊ณ„๊ฐ€ ๋ถ„๋ช…ํ–ˆ๋‹ค.

์ด๋กœ์ธํ•ด JDK 1.8๋ถ€ํ„ฐ CompletableFuture ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ์†Œ๊ฐœ๋˜์—ˆ๊ณ , ์ด ์ธํ„ฐํŽ˜์ด์Šค๋Š” Future ์ธํ„ฐํŽ˜์ด์Šค๋„ ๊ตฌํ˜„ํ•˜๊ธฐ๋•Œ๋ฌธ์— ๊ธฐ์กด์˜ Future๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ์™ธ๋ถ€์—์„œ ์™„๋ฃŒ์‹œํ‚ฌ ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ CompletableFuture๋ž€ ์ด๋ฆ„์„ ๊ฐ–๊ฒŒ๋˜์—ˆ๋‹ค.

CompletableFuture์˜ ์‚ฌ์ „์  ์˜๋ฏธ๋Š” "์™„๋ฃŒ๋œ ๋ฏธ๋ž˜"์ด๋‹ค.

๊ทธ๋ฆฌ๊ณ  ๋™์‹œ์— CompletionStage ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•จ์œผ๋กœ์จ ๋น„๋™๊ธฐ ์—ฐ์‚ฐ ๋‹จ๊ณ„(Step)๋ฅผ ์ œ๊ณตํ•˜๋ฉฐ, ์ด๋Š” ๋น„๋™๊ธฐ ์—ฐ์‚ฐ์„ ๊ณ„์†ํ•ด์„œ ์ฒด์ด๋‹ ํ˜•ํƒœ๋กœ ์กฐํ•ฉํ•˜๋Š” ๊ฒƒ์ด ๊ฐ€๋Šฅํ•ด์กŒ๋‹ค.

Future์€ Collection, CompletableFuture์€ Stream์˜ ๊ด€๊ณ„๋กœ ๋น„์œ ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

์ด๋ฒˆ ๊ธ€์€ Future์˜ ํ•œ๊ณ„๋ฅผ ํ†ตํ•ด CompletableFuture๊ฐ€ ํ•„์š”ํ•œ ์ด์œ ๋ฅผ ์•Œ์•„๋ณด๊ณ , CompletableFuture์˜ ์‚ฌ์šฉ๋ฒ•์— ๋Œ€ํ•ด์„œ ์ •๋ฆฌํ•ด๋ณธ๋‹ค.


1 Future๋งŒ์œผ๋ก  ๋ณต์žกํ•œ ๋น„๋™๊ธฐ ๋กœ์ง์„ ๊ตฌ์„ฑํ•˜๊ธฐ ์–ด๋ ต๋‹ค


๐Ÿ’โ€โ™‚๏ธ Future์˜ ๊ฐ€์žฅ ํฐ ํ•จ๊ณ„์ ์€ ๋ธ”๋กœํ‚น ์ฝ”๋“œ (get())๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ ์„œ๋Š” Callback์„ ์‹คํ–‰ํ•  ์ˆ˜ ์—†๋Š” ๊ฒƒ์ด๋‹ค.

์ด์ „ ๊ธ€ - ์ž๋ฐ” ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ์™€ Future์—์„œ ์ •๋ฆฌํ–ˆ๋“ฏ์ด, Future์˜ ๊ฐ€์žฅ ํฐ ํ•œ๊ณ„์ ์€ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ๊ฒฐ๊ณผ์— ๋”ฐ๋ฅธ Callback ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์ด ์–ด๋ ต๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

๋ฌผ๋ก  ์•ž์„  ๊ธ€์ฒ˜๋Ÿผ FutureTask๋ฅผ ๊ตฌํ˜„ํ•จ์œผ๋กœ์จ Callback ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•  ์ˆœ ์žˆ์ง€๋งŒ, ์ฝœ๋ฐฑ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ์ฝ”๋“œ๋ฅผ ๋”ฐ๋กœ ๋˜ ๊ตฌ์„ฑํ•ด์ค˜์•ผํ•˜๋Š” ๋ฒˆ๊ฑฐ๋กœ์›€์ด ์กด์žฌํ•œ๋‹ค.


๐Ÿ’โ€โ™‚๏ธ ์ด์™ธ์—๋„ Future์€ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ํ•œ๊ณ„๊ฐ€ ์กด์žฌํ•œ๋‹ค.

  • ์—ฌ๋Ÿฌ ๋น„๋™๊ธฐ Task ์ฒ˜๋ฆฌ์— ๋Œ€ํ•œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ๋‚˜ ํƒ€์ž„์•„์›ƒ ์„ค์ •ํ•˜๊ธฐ ์‰ฝ์ง€ ์•Š๋‹ค.
  • ์—ฌ๋Ÿฌ Future๋ฅผ ์กฐํ•ฉํ•˜๊ธฐ ์–ด๋ ต๋‹ค.
    • ๋ณดํ†ต ๋‘ ๊ฐœ์˜ ๋น„๋™๊ธฐ Task ์ฒ˜๋ฆฌ๋ฅผ ํ•ด์•ผํ•œ๋‹ค๋ฉด, ๋‘ ๊ฐœ์˜ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ๋Š” ์„œ๋กœ ๋…๋ฆฝ์ ์ผ์ˆ˜๋„, ๋‘ ๋ฒˆ์งธ๊ฐ€ ์ฒซ ๋ฒˆ์งธ์— ์˜์กด์ ์ผ ์ˆ˜๋„ ์žˆ๋‹ค.
    • Future์€ ์œ„์™€ ๊ฐ™์€ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ๋ผ๋ฆฌ์˜ ์กฐํ•ฉ์„ ๊ตฌ์„ฑํ•˜๋Š”๊ฒƒ์„ ์ง์ ‘ ๊ตฌํ˜„ํ•ด์ค˜์•ผํ•˜๊ธฐ๋•Œ๋ฌธ์— ์‰ฝ์ง€์•Š๋‹ค.
    • ์ด์™ธ์—๋„ ๋‹ค์–‘ํ•œ ์š”๊ตฌ์‚ฌํ•ญ์— ๋งž๋Š” Future๋ผ๋ฆฌ์˜ ์กฐํ•ฉ์„ ๊ตฌ์„ฑํ•˜๊ธฐ ํž˜๋“ค๋‹ค.
  • Future ์ง‘ํ•ฉ์ด ์‹คํ–‰ํ•˜๋Š” ๋ชจ๋“  Task์˜ ์™„๋ฃŒ๋ฅผ ๊ธฐ๋‹ค๋ ค์•ผํ•œ๋‹ค.
  • ์ฝ”๋“œ์ƒ์—์„œ Future๋ฅผ ์™„๋ฃŒ์‹œํ‚ฌ ์ˆ˜ ์—†๋‹ค.
    • ๊ธฐ์กด์˜ Future์€ ์™ธ๋ถ€์—์„œ ์™„๋ฃŒ์‹œํ‚ฌ ์ˆ˜ ์—†๋‹ค. ์ทจ์†Œํ•˜๊ฑฐ๋‚˜, get()์— ํƒ€์ž„์•„์›ƒ์„ ์„ค์ •ํ•ด์ค˜์•ผํ•œ๋‹ค.
  • Future ์ง‘ํ•ฉ์—์„œ ๊ฐ€์žฅ ๋นจ๋ฆฌ ์™„๋ฃŒ๋˜๋Š” Task๋งŒ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ๋‚˜๋จธ์ง€๋Š” ์ค‘์ง€ํ•˜๋Š” ์š”๊ตฌ์‚ฌํ•ญ์„ ์ง€ํ‚ค๊ธฐ ์–ด๋ ต๋‹ค.
    • ์˜ˆ๋ฅผ ๋“ค์–ด ์—ฌ๋Ÿฌ Task๊ฐ€ ๋‹ค์–‘ํ•œ ๋ฐฉ์‹์œผ๋กœ ๊ฐ™์€ ๊ฒฐ๊ณผ๋ฅผ ๊ตฌํ•˜๋Š” ์ƒํ™ฉ.

๐Ÿ’โ€โ™‚๏ธ Future์˜ ํ•œ๊ณ„๋ฅผ ํ•ด๊ฒฐํ•œ CompletableFuture.

๊ธฐ์กด์˜ Futuru JDK 1.5๋ถ€ํ„ฐ ์ถ”๊ฐ€๋˜์–ด ๋น„๋™๊ธฐ ๊ฒฐ๊ณผ๋ฅผ ์–ป๋Š”๋ฐ ์ž˜ ์‚ฌ์šฉ๋˜์—ˆ์ง€๋งŒ, ์œ„์™€ ๊ฐ™์ด ๊ณ„์‚ฐ๋“ค์„ ๋‹ค์–‘ํ•œ ํ˜•ํƒœ๋กœ ๊ฒฐํ•ฉํ•˜๊ฑฐ๋‚˜ ์˜ค๋ฅ˜๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์ด ๊นŒ๋‹ค๋กœ์› ๋‹ค.

๊ทธ๋กœ์ธํ•ด JDK 1.8๋ถ€ํ„ฐ CompletableFuture๊ฐ€ ์ถ”๊ฐ€๋˜์—ˆ๋‹ค.

CompletableFuture์€ ๊ธฐ์กด์˜ Future ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•  ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ, CompletionStage๋„ ๊ตฌํ˜„ํ•œ๋‹ค.

์ด๋Š” CompletableFuture๊ฐ€ ๊ธฐ์กด์˜ Future ๊ธฐ๋Šฅ์„ ๋ชจ๋‘ ์ง€์›ํ•  ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ, CompletionStage์˜ ๊ธฐ๋Šฅ๋„ ์ถ”๊ฐ€๋กœ ์ œ๊ณตํ•œ๋‹ค๊ณ  ์ดํ•ดํ•  ์ˆ˜ ์žˆ๋‹ค.


๐Ÿ’โ€โ™‚๏ธ CompletionStage

CompletionStage ๊ณต์‹ ๋ฌธ์„œ์—์„  ์•„๋ž˜์™€ ๊ฐ™์ด ์„ค๋ช…ํ•˜๊ณ ์žˆ๋‹ค.

A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes. 

A stage completes upon termination of its computation, but this may in turn trigger other dependent stages.

ํ•ด์„ํ•˜๋ฉด CompletionStage๋Š” ๋‹ค๋ฅธ CompletionStage๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๊ฑฐ๋‚˜ ๊ฐ’์„ ๊ณ„์‚ฐํ•˜๋Š” ํ•˜๋‚˜์˜ ๋น„๋™๊ธฐ ๊ณ„์‚ฐ ๋‹จ๊ณ„(Stage)๋ผ๊ณ  ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

์ฆ‰, ์—ฌ๋Ÿฌ ๋‹จ๊ณ„์˜ ๋น„๋™๊ธฐ Task ์ฒ˜๋ฆฌ๋ฅผ ๋‹จ๊ณ„๋ณ„๋กœ ๊ตฌ์„ฑํ•˜์—ฌ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Œ์„ ๋‚˜ํƒ€๋‚ธ๋‹ค.

๊ฒŒ๋‹ค๊ฐ€ ๊ฐ ๋‹จ๊ณ„์—์„œ ๋ฐœ์ƒํ•œ ์—๋Ÿฌ๋ฅผ ๊ด€๋ฆฌํ•˜๊ณ  ์ „๋‹ฌํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ฐ„๋‹จํ•œ ์˜ˆ์‹œ.

stage.thenApply(x -> square(x))
      .thenAccept(x -> System.out.print(x))
      .thenRun(() -> System.out.println());

๋น„๋™๊ธฐ Task ์ฒ˜๋ฆฌ์˜ ๊ฒฐ๊ณผ๋ฅผ ์–ป๋Š” Future๋Š” Stream๊ณผ ์œ ์‚ฌํ•œ ๊ฒƒ ๊ฐ™๋‹ค.


2 CompletableFuture๋ฅผ ์ด์šฉํ•œ ๊ธฐ๋ณธ์ ์ธ ๋น„๋™๊ธฐ ์š”์ฒญ

CompletableFuture๋ฅผ ์ด์šฉํ•˜์—ฌ ๊ธฐ๋ณธ์ ์ธ ๋น„๋™๊ธฐ ์š”์ฒญ๊ณผ ์ฝœ๋ฐฑ์„ ์ง์ ‘ ๊ตฌํ˜„ํ•ด๋ณด๋ฉด์„œ ์–ด๋–ป๊ฒŒ ์‚ฌ์šฉํ•˜๋Š”์ง€ ์‚ดํŽด๋ณธ๋‹ค.

๋น„๋™๊ธฐ ์š”์ฒญ์€ ํฌ๊ฒŒ ์•„๋ž˜ ๋‘ ๊ฐ€์ง€๋กœ ๋‚˜๋‰œ๋‹ค.

  • ๋ฆฌํ„ด๊ฐ’์ด ์—†๋Š” ๊ฒฝ์šฐ: runAsync()
  • ๋ฆฌํ„ด๊ฐ’์ด ์žˆ๋Š” ๊ฒฝ์šฐ: supplyAsync()

2-1 ๋ฆฌํ„ด๊ฐ’์ด ์—†๋Š” ๊ฒฝ์šฐ - runAsync(Runnable)

runAsync()๋Š” ๊ฐ€์žฅ ๋Œ€ํ‘œ์ ์œผ๋กœ CompletableFuture๋ฅผ ์ด์šฉํ•˜์—ฌ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ์š”์ฒญํ•˜๋Š” ๋ฉ”์„œ๋“œ๋‹ค.

๊ทธ๋ฆฌ๊ณ  runAsync()๋Š” ๋น„๋™๊ธฐ์ ์œผ๋กœ ๋™์ž‘ํ•˜๊ธธ ์›ํ•˜๋Š” ์ž‘์—…์ค‘ ๋ฆฌํ„ด๊ฐ’์ด ์—†๋Š” ๊ฒฝ์šฐ ์‚ฌ์šฉ๋œ๋‹ค.

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Hello " + currentThread().getName());
});

// get()์„ ํ˜ธ์ถœํ•ด์•ผ์ง€๋งŒ, ๋น„๋™๊ธฐ์ ์œผ๋กœ ๋™์ž‘ํ•œ๋‹ค.
System.out.println(future.get()); // Hello ForkJoinPool.commonPool-worker-19

์œ„์™€ ๊ฐ™์ด ์‰ฝ๊ฒŒ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ๋ฅผ ์š”์ฒญํ•  ์ˆ˜ ์žˆ๋‹ค.

runAsync()๋Š” get()์„ ํ˜ธ์ถœํ•ด์ค˜์•ผ ์‹คํ–‰๋œ๋‹ค.


๐Ÿ’โ€โ™‚๏ธ runAsync()๋Š” ์ž…๋ ฅ๊ฐ’์œผ๋กœ Runnable์„ ๋ฐ›๋Š”๋‹ค.

Runnable์€ ์ž…๋ ฅ๊ฐ’๊ณผ ๋ฆฌํ„ด๊ฐ’ ๋ชจ๋‘ ์—†๋Š” ํ•จ์ˆ˜ํ˜• ์ธํ„ฐํŽ˜์ด์Šค์ด๋‹ค.

๊ทธ๋Ÿฌ๋ฏ€๋กœ runAsync()๋Š” ๊ฒฐ๊ณผ ๊ฐ’์ด ํ•„์š”์—†๋Š” ๋น„๋™๊ธฐ ์š”์ฒญ์— ์‚ฌ์šฉ๋œ๋‹ค.


๐Ÿ’โ€โ™‚๏ธ Task๋ฅผ ๋น„๋™๊ธฐ๋กœ ์ฒ˜๋ฆฌํ•  ๋•Œ ์‚ฌ์šฉ๋˜๋Š” ๋””ํดํŠธ ์Šค๋ ˆ๋“œ๋Š” ForkJoinPool์ด๋‹ค.

runAsync()์™€ ๋’ค์—์„œ ์‚ดํŽด๋ณผ supplyAsync()๋ชจ๋‘ ์‹คํ–‰ํ•  ๋•Œ ์Šค๋ ˆ๋“œ ํ’€์„ ์ง€์ •ํ•ด์ฃผ์ง€์•Š์œผ๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด ForkJoinPool์„ ์‚ฌ์šฉํ•œ๋‹ค.


2-2 ๋ฆฌํ„ด๊ฐ’์ด ์žˆ๋Š” ๊ฒฝ์šฐ - supplyAsync(Supplier)

supplyAsync()๋Š” runAsync()์™€ ๊ฐ™์ด ๋Œ€ํ‘œ์ ์œผ๋กœ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ๋ฅผ ์š”์ฒญํ•˜๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.

์ฐจ์ด์ ์ด๋ผ๋ฉด supplyAsync()๋Š” ์ด๋ฆ„์—์„œ ์•Œ ์ˆ˜ ์žˆ๋“ฏ์ด ๋น„๋™๊ธฐ์ ์œผ๋กœ ๋™์ž‘ํ•˜๊ธธ ์›ํ•˜๋Š” ์ž‘์—…์ค‘ ๋ฆฌํ„ด๊ฐ’์ด ์žˆ๋Š” ๊ฒฝ์šฐ ์‚ฌ์šฉ๋œ๋‹ค.

ExecutorService es = Executors.newFixedThreadPool(4);

// get()์„ ํ˜ธ์ถœํ•˜์ง€์•Š์•„๋„ ๋น„๋™๊ธฐ๋กœ Task๊ฐ€ ์‹คํ–‰๋œ๋‹ค.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello " + Thread.currentThread().getName(), es);

System.out.println(future.get()); // Hello pool-1-thread-1

es.shutdown();

์œ„์™€ ๊ฐ™์ด ์‰ฝ๊ฒŒ ๋น„๋™๊ธฐ ์š”์ฒญ์„ ํ•  ์ˆ˜ ์žˆ๋‹ค.

์ฃผ์˜ํ•  ์ ์€ ๊ธฐ์กด์˜ Future์™€ ๋™์ผํ•˜๊ฒŒ get()ํ˜ธ์ถœ์‹œ Blocking์ด ๋œ๋‹ค. ๊ทธ๋ฆฌ๊ณ  get()์„ ํ˜ธ์ถœํ•˜์ง€์•Š์•„๋„ supplyAsync()๋กœ ๋„˜๊ฒจ์ค€ Task๊ฐ€ ๋น„๋™๊ธฐ๋กœ ์‹คํ–‰๋œ๋‹ค.


๐Ÿ’โ€โ™‚๏ธ applyAsync()๋Š” ์ž…๋ ฅ๊ฐ’์œผ๋กœ Supplier์„ ๋ฐ›๋Š”๋‹ค.

Supplier๋Š” ์ž…๋ ฅ ๊ฐ’์€ ์—†์ง€๋งŒ, ๋ฆฌํ„ด๊ฐ’์ด ์กด์žฌํ•˜๋Š” ํ•จ์ˆ˜ํ˜• ์ธํ„ฐํŽ˜์ด์Šค์ด๋‹ค.

๊ทธ๋Ÿฌ๋ฏ€๋กœ runAsync()๋Š” ๊ฒฐ๊ณผ ๊ฐ’์ด ํ•„์š”ํ•œ ๋น„๋™๊ธฐ ์š”์ฒญ์— ์‚ฌ์šฉ๋œ๋‹ค.


3 ๋น„๋™๊ธฐ ์ž‘์—… Callback

CompletableFuture์˜ ๊ฐ€์žฅ ํฐ ์žฅ์ ์€ ํ•จ์ˆ˜ํ˜• ํ˜•ํƒœ๋กœ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ๊ฒฐ๊ณผ์˜ Callback์„ ์ง€์›ํ•œ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

CompletableFuture์€ CompletionStage๋ฅผ ๊ตฌํ˜„ํ•จ์œผ๋กœ์จ ์•„๋ž˜ ๋ฉ”์„œ๋“œ๋ฅผ ์ง€์›ํ•œ๋‹ค.

  • thenApply(Function): ๋น„๋™๊ธฐ๋กœ ๋™์ž‘ํ•œ ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฆฌํ„ด๋ฐ›์•„์„œ ๋‹ค๋ฅธ ๊ฐ’์œผ๋กœ ๋ฐ”๊พธ๋Š” ์ฝœ๋ฐฑ
    • Stream.map๊ณผ ์œ ์‚ฌํ•˜๋‹ค.
  • thenAccept(Consumer): ๋น„๋™๊ธฐ๋กœ ๋™์ž‘ํ•œ ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฆฌํ„ด๋ฐ›์•„์„œ ๋˜ ๋‹ค๋ฅธ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฝœ๋ฐฑ (๋ฆฌํ„ด์—†์ด)
  • thenRun(Runnable): ๋น„๋™๊ธฐ๋กœ ๋™์ž‘ํ•œ ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฆฌํ„ด๋ฐ›์ง€ ์•Š๊ณ  ๋‹ค๋ฅธ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฝœ๋ฐฑ

์œ„ ๋ฉ”์„œ๋“œ๋“ค์„ ์ด์šฉํ•˜์—ฌ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ์š”์ฒญ์‹œ ์ฝœ๋ฐฑ์„ ์ •์˜ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ์ฒด์ด๋‹์„ ํ†ตํ•ด ๋‹ค์–‘ํ•œ ์กฐํ•ฉ์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค.


3-1 thenApply(Function)

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
    return this.uniApplyStage((Executor)null, fn);
}

thenApply(Function)์€ CompletableFuture<U>๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ฉฐ, ์ด๋ฆ„์—์„œ๋„ ์•Œ ์ˆ˜ ์žˆ๋“ฏ์ด, ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฆฌํ„ด๋ฐ›์•„์„œ ๋‹ค๋ฅธ ๊ฐ’์„ ๋ฐ”๊พธ๋Š” ์ฝœ๋ฐฑ์ด๋‹ค.

๋น„๋™๊ธฐ๋กœ์ฒ˜๋ฆฌํ•œ ๋ฌธ์ž์—ด ๊ฒฐ๊ณผ๋ฅผ ๋ชจ๋‘ ๋Œ€๋ฌธ์ž์—ด๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ์˜ˆ์‹œ.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello ";
}).thenApply((s) -> {
    System.out.println(Thread.currentThread().getName());
    return s.toUpperCase();
});

// get()์„ ํ˜ธ์ถœํ•ด์•ผ ๋น„๋™๊ธฐ ์ž‘์—…์ด ๋™์ž‘ํ•œ๋‹ค.
System.out.println(future.get()); // HELLO

์œ„ ์˜ˆ์‹œ์—์„œ ๋ณผ ์ˆ˜ ์žˆ๋“ฏ์ด, thenApply(Function)๋Š” Stream.map๊ณผ ๋™์ผํ•œ ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

thenApply๋กœ ๋„˜์–ด์˜ค๋Š” Function์€ supplyAsync()์™€ ๋™์ผํ•œ ์Šค๋ ˆ๋“œ์—์„œ ๋™์ž‘ํ•œ๋‹ค. ๋ณ„๋„์˜ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•˜๊ณ ์‹ถ๋‹ค๋ฉด thenApplyAsync()๋ฅผ ์‚ฌ์šฉํ•ด์•ผํ•œ๋‹ค.


3-2 thenAccept(Consumer)

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return this.uniAcceptStage((Executor)null, action);
}

thenAccept(Consumer)๋Š” CompletableFuture<Void>๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค. ์ฆ‰, ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜์ง€์•Š๋Š”๋‹ค.

์ด๋Š” ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฆฌํ„ด๋ฐ›์•„์„œ ๋˜ ๋‹ค๋ฅธ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฝœ๋ฐฑ๋ผ๊ณ  ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

๋‹จ, thenApply(Function)์™€๋Š” ๋‹ค๋ฅด๊ฒŒ ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฆฌํ„ดํ•˜์ง€์•Š๋Š”๋‹ค. ๊ทธ์ € ์‚ฌ์šฉํ•˜๋Š” ๋น„๋™๊ธฐ๋กœ ์ฒ˜๋ฆฌํ•œ ๊ฒฐ๊ณผ๋ฅผ ์ฝœ๋ฐฑ์œผ๋กœ Consumeํ•˜๋Š” ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello ";
}).thenAccept((s) -> {
    System.out.println(Thread.currentThread().getName());
    System.out.println(s.toLowerCase());
});

// get()์„ ํ˜ธ์ถœํ•ด์•ผ thenAccept๊ฐ€ ๋™์ž‘ํ•œ๋‹ค.
future.get();

์˜ˆ์‹œ์—์„œ ๋ณผ ์ˆ˜ ์žˆ๋“ฏ์ด, thenAccept๋Š” ์ž…๋ ฅ๊ฐ’์„ ๋ฐ›์•„์„œ ์†Œ๋น„๋งŒํ•  ๋ฟ, ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฆฌํ„ดํ•˜์ง€์•Š๋Š”๋‹ค.

get()์„ ํ˜ธ์ถœํ•ด์•ผ thenAccept()๊ฐ€ ๋™์ž‘ํ•˜๋ฉฐ, get()์ด ์—†์„๊ฒฝ์šฐ supplyAsync()๋กœ ์ฃผ์–ด์ง„ ๋น„๋™๊ธฐ Task๋งŒ ์ˆ˜ํ–‰๋œ๋‹ค.

thenAccept๋กœ ๋„˜์–ด์˜ค๋Š” Consumer๋Š” supplyAsync์™€ ๋™์ผํ•œ ์Šค๋ ˆ๋“œ์—์„œ ๋™์ž‘ํ•œ๋‹ค. ๋ณ„๋„์˜ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•˜๊ณ ์‹ถ๋‹ค๋ฉด thenAccepyAsync()๋ฅผ ์‚ฌ์šฉํ•ด์•ผํ•œ๋‹ค.


3-3 thenRun(Runnable)

public CompletableFuture<Void> thenRun(Runnable action) {
    return this.uniRunStage((Executor)null, action);
}

thenRun(Runnable)์€ ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฆฌํ„ด๋ฐ›์ง€ ์•Š๊ณ  ๋‹ค๋ฅธ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฝœ๋ฐฑ์ด๋‹ค.

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());

    // 3์ดˆ๊ฐ„ Sleep
    try {
        Thread.sleep(3_000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    
    return "Hello ";
}).thenRun(() -> {
    System.out.println("Computation Finished");
});

future.get();

์ฃผ์˜ํ•  ์ ์€ ๊ธฐ์กด์˜ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ๋˜์–ด์•ผ์ง€ thenRun์œผ๋กœ ์ฃผ์–ด์ง„ Runnable์ด ์‹คํ–‰๋œ๋‹ค.

๊ณ„์‚ฐ ๊ฐ’์ด ํ•„์š”ํ•˜์ง€ ์•Š๊ฑฐ๋‚˜ ์ฒด์ธ ๋์—์„œ ์ผ๋ถ€ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š์œผ๋ ค๊ณ ํ•  ๋•Œ ์‚ฌ์šฉ๋  ์ˆ˜ ์žˆ๋‹ค.


4 Future์˜ ์กฐํ•ฉ

Callback์„ ์ง€์›ํ•˜๋Š” ๊ฒƒ ์™ธ์—๋„ CompletableFuture์—์„œ ๊ฐ€์žฅ ์ค‘์š”ํ•œ ๋ถ€๋ถ„์€ ์—ฌ๋Ÿฌ Future๋ฅผ ๋ณ‘๋ ฌ๋กœ ์กฐํ•ฉํ•ด์„œ ๋น„๋™๊ธฐ ์ž‘์—… ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

๋งค ์ฒด์ด๋‹๋งˆ๋‹ค CompletableFuture๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ธฐ๋•Œ๋ฌธ์— ์—ฌ๋Ÿฌ๊ฐ€์ง€ ๋น„๋™๊ธฐ Task๋ฅผ ์—ฐ๊ฒฐ ๋ฐ ๊ฒฐํ•ฉํ•  ์ˆ˜ ์žˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด, 2๊ฐœ์˜ CompletableFuture๋ฅผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ํ•ฉ์น  ์ˆ˜ ์žˆ๋‹ค.

์กฐํ•ฉํ•  ๋•Œ ์‚ฌ์šฉ๋˜๋Š” ๋ฉ”์„œ๋“œ๋Š” ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

  • thenCompose(): ๋‘ ์ž‘์—…์ด ์„œ๋กœ ์ด์–ด์„œ ์‹คํ–‰ํ•˜๋„๋ก ์กฐํ•ฉ.
    • ๋’ค ์ˆœ๋ฒˆ ์ž‘์—…์ด ์•ž ์ˆœ๋ฒˆ ์ž‘์—…์— ์˜์กด์ ์ด๋‹ค.
  • thenCombine(): ๋‘ ์ž‘์—…์„ ๋…๋ฆฝ์ ์œผ๋กœ ์‹คํ–‰ํ•˜๊ณ  ๋‘˜ ๋‹ค ์ข…๋ฃŒ ํ–ˆ์„ ๋•Œ ์ฝœ๋ฐฑ์„ ์‹คํ–‰.

4-1 thenCompose()

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
    return this.uniComposeStage((Executor)null, fn);
}

thenCompose()๋Š” ๋‘ ๊ฐœ์˜ Future๋ฅผ ์ˆœ์ฐจ์ ์œผ๋กœ ์—ฐ๊ฒฐํ•œ๋‹ค.

CompletableFuture<String> combinedFuture = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3_000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello ";
}).thenCompose((result) -> CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2_000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("World " + Thread.currentThread().getName());
    return result + " World";
}));

System.out.println(combinedFuture.get());
// Hello ForkJoinPool.commonPool-worker-19
// World ForkJoinPool.commonPool-worker-19
// Hello  World

๋’ค์— ์ฃผ์–ด์ง„ Future (์œ„ ์˜ˆ์‹œ์—์„œ World)๊ฐ€ ์•ž ์ˆœ๋ฒˆ์˜ Future (์œ„ ์˜ˆ์‹œ์—์„œ Hello)๊ฒฐ๊ณผ์— ์˜์กด์ ์ด๋ฏ€๋กœ, ์ˆœ์ฐจ์ ์œผ๋กœ Future๊ฐ€ ์‹คํ–‰๋œ๋‹ค.


thenApply() vs thenCompose()

thenApply()์™€ thenCompose()๊ฐ€ ํ•˜๋Š” ์—ญํ• ์ด ๋น„์Šทํ•˜๋‹ค๋ณด๋‹ˆ ๋‘ ๋ฉ”์„œ๋“œ๊ฐ€ ํฐ ์ฐจ์ด๊ฐ€ ์—†์–ด๋ณด์ธ๋‹ค.

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)

๋ฉ”์„œ๋“œ ์‹œ๊ทธ๋‹ˆ์ฒ˜์—์„œ ์•Œ ์ˆ˜ ์žˆ๋“ฏ์ด, ๋‘ ๋ฉ”์„œ๋“œ๋Š” ๋ฐ˜ํ™˜๊ฐ’์€ ๊ฐ™์ง€๋งŒ ๋งค๊ฐœ๋ณ€์ˆ˜์˜ ํƒ€์ž…์ด ๋‹ค๋ฅด๋‹ค.

  • thenApply()๋Š” Future ๊ฒฐ๊ณผ๋ฅผ ๋ฐ›์•˜์„ ๋•Œ, ๋ฐ˜ํ™˜ ์ „์— ์–ด๋–ค ์ฒ˜๋ฆฌ๋ฅผ applyํ•  ๋•Œ ์‚ฌ์šฉ๋œ๋‹ค.
    • map๊ณผ ๊ฐ™์ด Future์˜ ๊ฒฐ๊ณผ๋ฅผ ํŠน์ • ๊ฐ’์œผ๋กœ ๋ณ€ํ™˜ํ•  ๋•Œ ์‚ฌ์šฉ๋  ์ˆ˜ ์žˆ๋‹ค. (๋ฐ˜ํ™˜ ๊ฐ’์ด ์ด์ „ Future๋กœ ์ฒ˜๋ฆฌํ•œ ๊ฒฐ๊ณผ์ด๋‹ค.)
  • thenCompose()๋Š” Future ๋‹ค์Œ์— ๋˜ ๋‹ค๋ฅธ Future๋ฅผ ์ด์–ด์„œ ์‹คํ–‰ํ•˜๊ฒŒ๋” ์—ฐ๊ฒฐํ•  ๋•Œ ์‚ฌ์šฉ๋œ๋‹ค.
    • flatmap๊ณผ ๊ฐ™์ด ๋˜ ๋‹ค๋ฅธ CompletableFuture๋ฅผ ํŒŒ์ดํ”„๋ผ์ธ ํ˜•์‹์œผ๋กœ ์—ฐ๊ฒฐํ•ด์„œ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•œ๋‹ค. (๋ฐ˜ํ™˜ ๊ฐ’์ด ์ƒˆ๋กœ์šด ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ์˜ ๊ฒฐ๊ณผ์ธ CompletionStage ํƒ€์ž…์ด์–ด์•ผํ•œ๋‹ค..)

์ข‹์€ ์ฐธ๊ณ  ๋ฐ ์˜ˆ์‹œ


4-2 thenCombine()

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
    return this.uniComposeStage((Executor)null, fn);
}

thenCompose()๋Š” ๋‘ ์ž‘์—…์ด ์„œ๋กœ ์ด์–ด์„œ ์‹คํ–‰๋˜๋„๋ก ์กฐํ•ฉํ•˜๊ธฐ๋•Œ๋ฌธ์—, ๋’ท ์ˆœ๋ฒˆ์˜ Task๊ฐ€ ์•ž ์ˆœ๋ฒˆ์˜ Task๋ฅผ ์˜์กดํ•œ๋‹ค.

๋ฐ˜๋ฉด์—, thenCombine()์€ ๋‘ ๊ฐœ์˜ Task๋ฅผ ์„œ๋กœ ๋…๋ฆฝ์ ์œผ๋กœ ์‹คํ–‰ํ•œ๋‹ค.

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3_000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
});

CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1_500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("World " + Thread.currentThread().getName());
    return "World";
});

CompletableFuture<String> future = hello.thenCombine(world, (h_result, w_result) -> {
    return h_result + " " + w_result;
});

System.out.println(future.get());
// World ForkJoinPool.commonPool-worker-5
// Hello ForkJoinPool.commonPool-worker-19
// Hello World

thenCombine()์€ ๋‘ Task๊ฐ€ ์„œ๋กœ ๋…๋ฆฝ์ ์ด๋ฏ€๋กœ, ์ˆœ๋ฒˆ์— ์ƒ๊ด€์—†์ด ๋™์‹œ์— ๋น„๋™๊ธฐ๋กœ ์‹คํ–‰ํ•˜๊ณ ๋‚˜์„œ ๋‘˜ ๋‹ค ์™„๋ฃŒ๋˜์—ˆ์„๋•Œ Combine๋œ๋‹ค.


4-3 thenAcceptBoth()

thenAcceptBoth()๋Š” ๋‘ ๊ฐœ์˜ Future ๊ฒฐ๊ณผ๋กœ ๋ฌด์–ธ๊ฐ€๋ฅผ ํ•˜๊ณ ์‹ถ์ง€๋งŒ, ๊ฒฐ๊ณผ ๊ฐ’์„ Future ์ฒด์ธ์œผ๋กœ ์ „๋‹ฌํ•  ํ•„์š”๊ฐ€์—†์„ ๋•Œ ์‚ฌ์šฉ๋œ๋‹ค.

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3_000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello ";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(2_000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("World " + Thread.currentThread().getName());
    return "World";
}), (s1, s2) -> System.out.println(s1 + s2));

completableFuture.get();
// World ForkJoinPool.commonPool-worker-5
// Hello ForkJoinPool.commonPool-worker-19
// Hello World

๋‘ ๊ฐœ์˜ Future ๊ฒฐ๊ณผ๋กœ Comsume๋งŒํ•˜๊ณ ์‹ถ์„๋•Œ ์‚ฌ์šฉ๋œ๋‹ค.

๋˜ํ•œ, ๋‘ Task๊ฐ€ ์„œ๋กœ ๋…๋ฆฝ์ ์ด๋ฏ€๋กœ, ์ˆœ๋ฒˆ์— ์ƒ๊ด€์—†์ด ๋™์‹œ์— ๋น„๋™๊ธฐ๋กœ ์‹คํ–‰ํ•˜๊ณ ๋‚˜์„œ ๋‘˜ ๋‹ค ์™„๋ฃŒ๋˜์—ˆ์„ ๋•Œ Comsume๋œ๋‹ค.


5 ์—ฌ๋Ÿฌ Future ๋ณ‘๋ ฌ ์‹คํ–‰

CompletableFuture์€ ์—ฌ๋Ÿฌ Future๋ฅผ ์กฐํ•ฉํ•ด์„œ ๋น„๋™๊ธฐ ์ž‘์—… ํŒŒ์ดํ”„๋ผ์ธ์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ์„ ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ, ์—ฌ๋Ÿฌ Future๋ฅผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•˜๊ณ  ๋ชจ๋“  Future๊ฐ€ ์‹คํ–‰๋ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ ค์„œ ๊ฒฐํ•ฉ๋œ ๊ฒฐ๊ณผ๋ฅผ ํ•œ๋ฒˆ์— ์–ป๋Š” ๋ฐฉ๋ฒ•๋„ ์ œ๊ณตํ•œ๋‹ค.

์‰ฝ๊ฒŒ ๋งํ•ด.. CompletableFuture์€ ์—ฌ๋Ÿฌ Future๋ฅผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๊ฒŒํ•ด์ฃผ๋Š” ๋‘ ๊ฐ€์ง€ ๋ฉ”์„œ๋“œ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

  • allOf(): ์—ฌ๋Ÿฌ Task๋ฅผ ๋ชจ๋‘ ์‹คํ–‰ํ•˜๊ณ  ๋ชจ๋“  ์ž‘์—… ๊ฒฐ๊ณผ์— ์ฝœ๋ฐฑ ์‹คํ–‰.
  • anyOf(): ์—ฌ๋Ÿฌ Task์ค‘์— ๊ฐ€์žฅ ๋นจ๋ฆฐ ๋๋‚œ ํ•˜๋‚˜์˜ ๊ฒฐ๊ณผ์— ์ฝœ๋ฐฑ ์‹คํ–‰.

5-1 allOf()

allOf()๋Š” ์—ฌ๋Ÿฌ ์ž‘์—…์„ ์กฐํ•ฉํ•˜๋ฉฐ, ์—ฌ๋Ÿฌ Task๋ฅผ ๋ชจ๋‘ ์‹คํ–‰ํ•˜๊ณ  ๋ชจ๋“  ์ž‘์—… ๊ฒฐ๊ณผ์— ์ฝœ๋ฐฑ ์‹คํ–‰ํ•œ๋‹ค.

allOf() ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์ œ๊ณต๋œ ๋ชจ๋“  Future๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ Blocking ๋œ๋‹ค.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);

// ... ๋น„๋™๊ธฐ ์š”์ฒญํ›„ ๋”ฐ๋กœ ์ฒ˜๋ฆฌํ•ด์•ผํ•  ์ฝ”๋“œ๊ฐ€ ์žˆ์œผ๋ฉด ์—ฌ๊ธฐ์— ์ •์˜ํ•˜๋ฉด ๋œ๋‹ค. ...

// allOf()๋Š” get()์„ ํ˜ธ์ถœํ•˜๋ฉด ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ๋„˜์–ด์˜จ ๋ชจ๋“  Future๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ Blocking๋œ๋‹ค.
combinedFuture.get();

// get()์˜ Blocking์ด ํ’€๋ฆฌ๋ฉด, allOf()์˜ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ๋„˜์–ด์˜จ ๋ชจ๋“  Future๋Š” ์™„๋ฃŒ๋œ ์ƒํƒœ์ด๋‹ค.
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

์ด๋Š” ์—ฌ๋Ÿฌ Future์˜ ๊ฒฐ๊ณผ๋ฅผ ๋ชจ๋‘ ์–ป๊ธฐ์œ„ํ•ด ํ•œ๋ฒˆ์— Blockingํ•  ๋•Œ ์•„์ฃผ ์œ ์šฉํ•˜๋‹ค.


๐Ÿ’โ€โ™‚๏ธ ํ•œ๊ฐ€์ง€ ์ฃผ์˜ํ•  ์ ์œผ๋กœ CompletableFuture.allOf()์˜ ๋ฐ˜ํ™˜ ์œ ํ˜•์€ CompletableFuture<Void>์ด๋‹ค.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFutures = CompletableFuture.allOf(future1, future2)
        .thenAccept(System.out::println);

System.out.println(combinedFutures);
// null

์œ„์™€ ๊ฐ™์ด allOf() ๊ฒฐ๊ณผ๋ฅผ ์ง์ ‘ ๋ฐ˜ํ™˜ํ•˜์ง„ ์•Š๋Š”๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์‹ค์ œ๋กœ ์•„๋ž˜์™€ ๊ฐ™์ด ๋ฉ”์„œ๋“œ ์‹œ๊ทธ๋‹ˆ์ฒ˜๋ฅผ๋ณด๋ฉด CompletableFuture<Void>๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

์ด๋Š” allOf()์˜ ํ•œ๊ณ„์ด๊ธฐ๋„ํ•œ๋ฐ.. allOf()๋Š” ์œ„์™€ ๊ฐ™์ด ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์ฃผ์–ด์ง„ ๋ชจ๋“  CompletableFuture์˜ ๊ฒฐํ•ฉ๋œ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š๋Š”๋‹ค.

๊ทธ ์ด์œ ๋Š” ๊ฐ๊ฐ์˜ CompletableFuture๊ฐ€ ๋ฐ˜ํ™˜ํ•˜๋Š” ํƒ€์ž…์ด ์„œ๋กœ ๋‹ค๋ฅผ ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.


allOf ๋ฉ”์„œ๋“œ ์„ค๋ช…

๋ฉ”์„œ๋“œ์˜ ์ฃผ์„๋ถ€๋ถ„์—๋„ ์œ„์™€ ๊ฐ™์ด ๊ฐ ๊ฐœ๋ณ„ CompletableFuture์˜ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ์™€ ๊ฒฐ๊ณผ๊ฐ€ ๋‹ค๋ฅด๊ธฐ ๋•Œ๋ฌธ์— ๊ฐ์ž ์ฒ˜๋ฆฌํ•ด์ค˜์•ผํ•œ๋‹ค๊ณ ํ•œ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด, future1์€ String์„ future2์€ Integer๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค๋ฉด, ํ•˜๋‚˜์˜ ๊ฒฐํ•ฉ๋œ ๊ฒฐ๊ณผ๋กœ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์—†๊ธฐ๋•Œ๋ฌธ์— ์–ด์ฉ” ์ˆ˜ ์—†์ด Void๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ณ  ์ˆ˜๋™์œผ๋กœ ๊ฐ ์ž ์ฒ˜๋ฆฌํ•ด์ค˜์•ผํ•˜๋Š” ๊ฒƒ์ด๋‹ค.

์ด ๋•Œ๋ฌธ์—.. allOf๋ฅผ ํ†ตํ•ด ์„œ๋กœ ๋‹ค๋ฅธ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” CompletableFuture๋ฅผ ์‚ฌ์šฉํ•  ์‹œ ์ฝ”๋“œ๊ฐ€ ๋น„๊ต์  ๋”๋Ÿฌ์›Œ์ง„๋‹ค.


๐Ÿ’โ€โ™‚๏ธ ๋งŒ์•ฝ ๋ชจ๋“  Future์˜ ๊ฒฐํ•ฉ๋œ ๊ฒฐ๊ณผ๋ฅผ ์–ป๊ณ ์‹ถ์œผ๋ฉด ์ˆ˜๋™์œผ๋กœ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ€์ ธ์™€์•ผํ•œ๋‹ค.

๋‹คํ–‰ํžˆ JDK 1.8์˜ CompletableFuture๋Š” join() ๋ฉ”์„œ๋“œ๋ฅผ ์ง€์›ํ•จ์œผ๋กœ์จ ๋น„๊ต์  ๊ฐ„๋‹จํžˆ allOf()์˜ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์ฃผ์–ด์ง„ ๋ชจ๋“  Future์˜ ๊ฒฐ๊ณผ๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ๊ฒŒํ•˜์˜€๋‹ค.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");

combinedFuture.get();

String combined = Stream.of(future1, future2, future3)
  .map(CompletableFuture::join)
  .collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

๐Ÿค” get()๊ณผ join()์˜ ์ฐจ์ด์ ์€?

Future.get()๊ณผ CompletableFuture.join()์˜ ๊ฐ€์žฅ ํฐ ์ฐจ์ด์ ์€ ์˜ˆ์™ธ๋ฅผ ๋˜์ง€๋Š” ๋ฐฉ์‹์— ์žˆ๋‹ค.

// Future.get()
V get() throws InterruptedException, ExecutionException;

// CompletableFuture.join()
public T join()

get()์€ ๋‘ ๊ฐœ์˜ CheckedException์„ ๋˜์ง€๊ธฐ๋•Œ๋ฌธ์— ์‚ฌ์šฉํ•˜๋Š” ํด๋ผ์ด์–ธํŠธ์ธก์—์„œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ๋ฅผ ํ•ด์ฃผ์–ด์•ผํ•œ๋‹ค.

๋ฐ˜๋ฉด์—, join()์€ ์–ด๋– ํ•œ CheckedException์„ ๋˜์ง€์ง€์•Š๊ธฐ๋•Œ๋ฌธ์— ํฌ๋ผ๋ฆฌ์–ธํŠธ์—์„œ ๋”ฐ๋กœ ์˜ˆ์™ธ์ฒ˜๋ฆฌํ•ด์ค„ ํ•„์š”๊ฐ€์—†๋‹ค.

๋Œ€์‹  CompletionException๋ผ๋Š” UncheckedException๋ฅผ ๋˜์ง€๋Š”๋ฐ, ์ด๋Š” exceptionally()๋กœ ์˜ˆ์™ธ๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

๋” ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์—ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ .


๐Ÿ’โ€โ™‚๏ธ ์—ฌ๋Ÿฌ Future์˜ ๋ฐ˜ํ™˜ ๊ฐ’์ด ๊ฐ™์„ ๊ฒฝ์šฐ ๊ฐ๊ฐ์˜ ๊ฒฐ๊ณผ ๊ฐ’์„ ์–ป์–ด์˜ค๋Š” ์˜ˆ์‹œ

์—ฌ๋Ÿฌ Future์˜ ๊ฒฐ๊ณผ ๊ฐ’์„ ๊ฒฐํ•ฉํ•˜์ง€์•Š๊ณ  ๊ฐ๊ฐ ์–ป์–ด์˜ค๊ณ ์‹ถ์„๋•Œ ์‚ฌ์šฉ๋˜๋Š” ์˜ˆ์‹œ์ด๋‹ค.

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
});

CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
    System.out.println("World " + Thread.currentThread().getName());
    return "World";
});

List<CompletableFuture<String>> futures = Arrays.asList(hello, world);
CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[futures.size()]);

// allOf()๋Š” futuresArray๋กœ ์ฃผ์–ด์ง„ Future๋“ค์ด ๋ชจ๋‘ ๋๋‚ฌ๋‹ค๋Š” ๊ฒƒ์„ ๋ณด์žฅํ•œ๋‹ค. 
// ๊ทธ๋Ÿฌ๋ฏ€๋กœ thenApply๋กœ ์ฃผ์–ด์ง„ ์ฝœ๋ฐฑ์ด ์‹คํ–‰๋  ๋• ์ด๋ฏธ ๋ชจ๋“  Future๊ฐ€ ์™„๋ฃŒ๋œ ์ƒํƒœ์ด๋‹ค.
// v๋Š” Void์˜ ์•ฝ์ž๋กœ, ์•„๋ฌด ์˜๋ฏธ์—†๋‹ค.
CompletableFuture<List<String>> results = CompletableFuture.allOf(futuresArray) 
        .thenApply(v -> futures.stream()
                .map(stringCompletableFuture -> stringCompletableFuture.join())
                .collect(Collectors.toList()));

results.get().forEach(System.out::println);
// World ForkJoinPool.commonPool-worker-19
// Hello ForkJoinPool.commonPool-worker-5
// Hello
// World

allOf()๋ฅผ ํ†ตํ•ด ๋ชจ๋“  ์ž‘์—…์ด ์™„๋ฃŒ๋˜์—ˆ์Œ์„ ๋ณด์žฅ๋ฐ›๊ณ ๋‚˜๋ฉด, thenApply()๋ฅผ ํ†ตํ•ด ์ง์ ‘ allOf()์˜ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ๋„˜๊ธด CompletableFuture์—์„œ join()๋ฅผ ํ†ตํ•ด ๊ฐ’์„ ๊บผ๋‚ด์ค˜์•ผํ•œ๋‹ค.

์ด๋ฅผ ๋ฉ”์„œ๋“œ๋กœ ๋ถ„๋ฆฌํ•ด์„œ ์žฌํ™œ์šฉํ•˜๊ณ ์‹ถ๋‹ค๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด ๊ตฌํ˜„ํ•˜๋ฉด๋œ๋‹ค.

public class Futures {
    public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
        CompletableFuture<Void> cfv = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        return cfv.thenApply(future -> {
            return futures.stream()
                    .map(completableFuture -> completableFuture.join())
                    .collect(Collectors.toList());
        });
    }
}

5-2 anyOf()

anyOf()๋Š” allOf()์™€ ๋™์ผํ•œ ํŠน์ง•์„ ๊ฐ€์ง€์ง€๋งŒ, allOf()์™€ ๋‹ค๋ฅด๊ฒŒ ์—ฌ๋Ÿฌ Future์ค‘ ๊ฐ€์žฅ ๋นจ๋ฆฌ ๋๋‚œ ํ•˜๋‚˜์˜ ๊ฒฐ๊ณผ๋งŒ ๊ฐ€์ ธ์˜จ๋‹ค.

CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
});

CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
    System.out.println("World " + Thread.currentThread().getName());
    return "World";
});

CompletableFuture<Void> future = CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);

future.get();

์œ„ ์˜ˆ์‹œ์˜ ์ถœ๋ ฅ ๊ฒฐ๊ณผ๋Š” Hello ํ˜น์€ World์ค‘ ๊ฐ€์žฅ ๋จผ์ € ๋๋‚œ Task ํ•˜๋‚˜๋งŒ ์ถœ๋ ฅ๋œ๋‹ค.


6 ์˜ˆ์™ธ ์ฒ˜๋ฆฌ

๋น„๋™๊ธฐ๋กœ ์š”์ฒญํ•œ Task์—์„œ ์–ธ์ œ๋“  ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด๋Ÿฌํ•œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด CompletableFuture์€ ์•„๋ž˜ ๋‘ ๊ฐ€์ง€ ๋ฉ”์„œ๋“œ๋ฅผ ์ง€์›ํ•œ๋‹ค.

  • exceptionally(Function): Task๊ฐ€ ์˜ˆ์™ธ๋กœ ์ธํ•ด ์™„๋ฃŒ๋˜์—ˆ์„ ๋•Œ ์˜ˆ์™ธ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ์ฝœ๋ฐฑ.
  • handle(BiFunction): Task๊ฐ€ ์ •์ƒ ํ˜น์€ ์˜ˆ์™ธ๋กœ ์™„๋ฃŒ๋˜์—ˆ์„ ๋•Œ์˜ ์ฝœ๋ฐฑ

6-1 exceptionally(Function)

CompletableFuture์˜ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด ๊ต‰์žฅํžˆ ๊ฐ„๋‹จํ•˜๋‹ค.

boolean throwError = true;

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).exceptionally(ex -> {
    System.out.println(ex); // java.util.concurrent.CompletionException: java.lang.IllegalArgumentException
    return "ERROR!";
});

System.out.println(future.get()); // ERROR

supplyAsync, thenApply, thenAccept ๋“ฑ์˜ ๋ฉ”์„œ๋“œ์—์„œ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๋•Œ exceptionally()๋ฅผ ํ†ตํ•ด ์˜ˆ์™ธ๋ฅผ ํ•ธ๋“ค๋งํ•  ์ˆ˜ ์žˆ๋‹ค.


6-2 handle(BiFunction)

handle๋ฉ”์„œ๋“œ๋ฅผ ์ด์šฉํ•˜๋ฉด Future๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ์ฒ˜๋ฆฌ๋˜์—ˆ์„ ๋•Œ์™€ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜์—ฌ ์ฒ˜๋ฆฌ๋˜์—ˆ์„๋•Œ์˜ ์ฝœ๋ฐฑ์„ ํ•œ๋ฒˆ์— ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

boolean throwError = true;

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("Hello " + Thread.currentThread().getName());
    return "Hello";
}).handle((result, ex) -> {
    if (ex != null) {
        System.out.println(ex);
        return "ERROR!";
    }
    return result;
});

System.out.println(future.get());

์ฐธ๊ณ