Итак, Java 8 была выпущена некоторое время назад, с кучей функций и изменений. Все мы, фанаты Java, ждали этого целую вечность, начиная с того момента, когда они первоначально анонсировали все замечательные функции, которые будут в Java 7, которые в конечном итоге были потрачены.
У меня совсем недавно было время, чтобы действительно начать придавать ему реальный вид, я обновил свои домашние проекты до 8, и я должен сказать, что в целом я вполне доволен тем, что мы получили. API-интерфейс java.time «имитирует» JodaTime — это большое улучшение, пакет java.util.stream становится полезным, лямбда-выражения изменят наш стиль кодирования, что может потребовать некоторого привыкания и к этим изменениям… цитата: «С большой силой приходит большая ответственность» звучит правдоподобно, я чувствую, что в будущем могут быть некоторые интересные времена, так как довольно легко написать какой-то сложный для расшифровки код. В качестве примера отладка кода, который я написал ниже, будет «забавной»…
Пример файла находится на моем репозитории блога Github
Этот пример прост: запускайте несколько потоков, выполняйте некоторую работу одновременно, а затем дождитесь их завершения. Я подумал, пока я играю с Java 8, позвольте мне пойти на это полностью …
Вот что я придумал:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
package net.briandupreez.blog.java8.futures;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.util.Collection;import java.util.List;import java.util.concurrent.*;import java.util.stream.Collectors;/** * Generified future running and completion * * @param <T> the result type * @param <S> the task input */public class WaitingFuturesRunner<T, S> { private transient static final Log logger = LogFactory.getLog(WaitingFuturesRunner.class); private final Collection<Task<T, S>> tasks; private final long timeOut; private final TimeUnit timeUnit; private final ExecutorService executor; /** * Constructor, used to initialise with the required tasks * * @param tasks the list of tasks to execute * @param timeOut max length of time to wait * @param timeUnit time out timeUnit */ public WaitingFuturesRunner(final Collection<Task<T, S>> tasks, final long timeOut, final TimeUnit timeUnit) { this.tasks = tasks; this.timeOut = timeOut; this.timeUnit = timeUnit; this.executor = Executors.newFixedThreadPool(tasks.size()); } /** * Go! * * @param taskInput The input to the task * @param consolidatedResult a container of all the completed results */ public void go(final S taskInput, final ConsolidatedResult<T> consolidatedResult) { final CountDownLatch latch = new CountDownLatch(tasks.size()); final List<CompletableFuture<T>> theFutures = tasks.stream() .map(aSearch -> CompletableFuture.supplyAsync(() -> processTask(aSearch, taskInput, latch), executor)) .collect(Collectors.<CompletableFuture<T>>toList()); final CompletableFuture<List<T>> allDone = collectTasks(theFutures); try { latch.await(timeOut, timeUnit); logger.debug("complete... adding results"); allDone.get().forEach(consolidatedResult::addResult); } catch (final InterruptedException | ExecutionException e) { logger.error("Thread Error", e); throw new RuntimeException("Thread Error, could not complete processing", e); } } private <E> CompletableFuture<List<E>> collectTasks(final List<CompletableFuture<E>> futures) { final CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); return allDoneFuture.thenApply(v -> futures.stream() .map(CompletableFuture<E>::join) .collect(Collectors.<E>toList()) ); } private T processTask(final Task<T, S> task, final S searchTerm, final CountDownLatch latch) { logger.debug("Starting: " + task); T searchResults = null; try { searchResults = task.process(searchTerm, latch); } catch (final Exception e) { e.printStackTrace(); } return searchResults; }} |
Тест:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
package net.briandupreez.blog.java8.futures;import net.briandupreez.blog.java8.futures.example.StringInputTask;import net.briandupreez.blog.java8.futures.example.StringResults;import org.apache.log4j.BasicConfigurator;import org.junit.Assert;import org.junit.BeforeClass;import org.junit.Test;import java.util.ArrayList;import java.util.List;import java.util.concurrent.TimeUnit;/** * Test * Created by brian on 4/26/14. */public class CompletableFuturesRunnerTest { @BeforeClass public static void init() { BasicConfigurator.configure(); } /** * 5tasks at 3000ms concurrently should not be more than 3100 * @throws Exception error */ @Test(timeout = 3100) public void testGo() throws Exception { final List<Task<String, String>> taskList = setupTasks(); final WaitingFuturesRunner<String, String> completableFuturesRunner = new WaitingFuturesRunner<>(taskList, 4, TimeUnit.SECONDS); final StringResults consolidatedResults = new StringResults(); completableFuturesRunner.go("Something To Process", consolidatedResults); Assert.assertEquals(5, consolidatedResults.getResults().size()); for (final String s : consolidatedResults.getResults()) { Assert.assertTrue(s.contains("complete")); Assert.assertTrue(s.contains("Something To Process")); } } private List<Task<String, String>> setupTasks() { final List<Task<String, String>> taskList = new ArrayList<>(); final StringInputTask stringInputTask = new StringInputTask("Task 1"); final StringInputTask stringInputTask2 = new StringInputTask("Task 2"); final StringInputTask stringInputTask3 = new StringInputTask("Task 3"); final StringInputTask stringInputTask4 = new StringInputTask("Task 4"); final StringInputTask stringInputTask5 = new StringInputTask("Task 5"); taskList.add(stringInputTask); taskList.add(stringInputTask2); taskList.add(stringInputTask3); taskList.add(stringInputTask4); taskList.add(stringInputTask5); return taskList; }} |
Выход:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
0 [pool-1-thread-1] Starting: StringInputTask{taskName='Task 1'}0 [pool-1-thread-5] Starting: StringInputTask{taskName='Task 5'}0 [pool-1-thread-2] Starting: StringInputTask{taskName='Task 2'}2 [pool-1-thread-4] Starting: StringInputTask{taskName='Task 4'}2 [pool-1-thread-3] Starting: StringInputTask{taskName='Task 3'}3003 [pool-1-thread-5] Done: Task 53004 [pool-1-thread-3] Done: Task 33003 [pool-1-thread-1] Done: Task 13003 [pool-1-thread-4] Done: Task 43003 [pool-1-thread-2] Done: Task 23007 [Thread-0] WaitingFuturesRunner - complete... adding results |
Некоторые полезные статьи / ссылки, которые я нашел и прочитал, делая это:
Oracle: лямбда-урок
IBM: параллелизм Java 8
Томаш Нуркевич: полное руководство по CompletableFuture
| Ссылка: | Игра с Java 8 — Lambdas и Concurrency от нашего партнера по JCG Брайана Дю Приза в блоге Zen в искусстве ИТ . |