Итак, Java 8 была выпущена некоторое время назад, с кучей функций и изменений. Все мы, фанаты Java, ждали этого целую вечность, начиная с того момента, когда они первоначально анонсировали все замечательные функции, которые будут в Java 7, которые в конечном итоге были потрачены.
У меня совсем недавно было время, чтобы действительно начать придавать ему реальный вид, я обновил свои домашние проекты до 8, и я должен сказать, что в целом я вполне доволен тем, что мы получили. API-интерфейс java.time «имитирует» JodaTime — это большое улучшение, пакет java.util.stream становится полезным, лямбда-выражения изменят наш стиль кодирования, что может потребовать некоторого привыкания и к этим изменениям… цитата: «С большой силой приходит большая ответственность» звучит правдоподобно, я чувствую, что в будущем могут быть некоторые интересные времена, так как довольно легко написать какой-то сложный для расшифровки код. В качестве примера отладка кода, который я написал ниже, будет «забавной»…
Пример файла находится на моем репозитории блога Github
Этот пример прост: запускайте несколько потоков, выполняйте некоторую работу одновременно, а затем дождитесь их завершения. Я подумал, пока я играю с Java 8, позвольте мне пойти на это полностью …
Вот что я придумал:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
package net.briandupreez.blog.java8.futures; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.Collection; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; /** * Generified future running and completion * * @param <T> the result type * @param <S> the task input */ public class WaitingFuturesRunner<T, S> { private transient static final Log logger = LogFactory.getLog(WaitingFuturesRunner. class ); private final Collection<Task<T, S>> tasks; private final long timeOut; private final TimeUnit timeUnit; private final ExecutorService executor; /** * Constructor, used to initialise with the required tasks * * @param tasks the list of tasks to execute * @param timeOut max length of time to wait * @param timeUnit time out timeUnit */ public WaitingFuturesRunner( final Collection<Task<T, S>> tasks, final long timeOut, final TimeUnit timeUnit) { this .tasks = tasks; this .timeOut = timeOut; this .timeUnit = timeUnit; this .executor = Executors.newFixedThreadPool(tasks.size()); } /** * Go! * * @param taskInput The input to the task * @param consolidatedResult a container of all the completed results */ public void go( final S taskInput, final ConsolidatedResult<T> consolidatedResult) { final CountDownLatch latch = new CountDownLatch(tasks.size()); final List<CompletableFuture<T>> theFutures = tasks.stream() .map(aSearch -> CompletableFuture.supplyAsync(() -> processTask(aSearch, taskInput, latch), executor)) .collect(Collectors.<CompletableFuture<T>>toList()); final CompletableFuture<List<T>> allDone = collectTasks(theFutures); try { latch.await(timeOut, timeUnit); logger.debug( "complete... adding results" ); allDone.get().forEach(consolidatedResult::addResult); } catch ( final InterruptedException | ExecutionException e) { logger.error( "Thread Error" , e); throw new RuntimeException( "Thread Error, could not complete processing" , e); } } private <E> CompletableFuture<List<E>> collectTasks( final List<CompletableFuture<E>> futures) { final CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray( new CompletableFuture[futures.size()])); return allDoneFuture.thenApply(v -> futures.stream() .map(CompletableFuture<E>::join) .collect(Collectors.<E>toList()) ); } private T processTask( final Task<T, S> task, final S searchTerm, final CountDownLatch latch) { logger.debug( "Starting: " + task); T searchResults = null ; try { searchResults = task.process(searchTerm, latch); } catch ( final Exception e) { e.printStackTrace(); } return searchResults; } } |
Тест:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
package net.briandupreez.blog.java8.futures; import net.briandupreez.blog.java8.futures.example.StringInputTask; import net.briandupreez.blog.java8.futures.example.StringResults; import org.apache.log4j.BasicConfigurator; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; /** * Test * Created by brian on 4/26/14. */ public class CompletableFuturesRunnerTest { @BeforeClass public static void init() { BasicConfigurator.configure(); } /** * 5tasks at 3000ms concurrently should not be more than 3100 * @throws Exception error */ @Test (timeout = 3100 ) public void testGo() throws Exception { final List<Task<String, String>> taskList = setupTasks(); final WaitingFuturesRunner<String, String> completableFuturesRunner = new WaitingFuturesRunner<>(taskList, 4 , TimeUnit.SECONDS); final StringResults consolidatedResults = new StringResults(); completableFuturesRunner.go( "Something To Process" , consolidatedResults); Assert.assertEquals( 5 , consolidatedResults.getResults().size()); for ( final String s : consolidatedResults.getResults()) { Assert.assertTrue(s.contains( "complete" )); Assert.assertTrue(s.contains( "Something To Process" )); } } private List<Task<String, String>> setupTasks() { final List<Task<String, String>> taskList = new ArrayList<>(); final StringInputTask stringInputTask = new StringInputTask( "Task 1" ); final StringInputTask stringInputTask2 = new StringInputTask( "Task 2" ); final StringInputTask stringInputTask3 = new StringInputTask( "Task 3" ); final StringInputTask stringInputTask4 = new StringInputTask( "Task 4" ); final StringInputTask stringInputTask5 = new StringInputTask( "Task 5" ); taskList.add(stringInputTask); taskList.add(stringInputTask2); taskList.add(stringInputTask3); taskList.add(stringInputTask4); taskList.add(stringInputTask5); return taskList; } } |
Выход:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
0 [pool- 1 -thread- 1 ] Starting: StringInputTask{taskName= 'Task 1' } 0 [pool- 1 -thread- 5 ] Starting: StringInputTask{taskName= 'Task 5' } 0 [pool- 1 -thread- 2 ] Starting: StringInputTask{taskName= 'Task 2' } 2 [pool- 1 -thread- 4 ] Starting: StringInputTask{taskName= 'Task 4' } 2 [pool- 1 -thread- 3 ] Starting: StringInputTask{taskName= 'Task 3' } 3003 [pool- 1 -thread- 5 ] Done: Task 5 3004 [pool- 1 -thread- 3 ] Done: Task 3 3003 [pool- 1 -thread- 1 ] Done: Task 1 3003 [pool- 1 -thread- 4 ] Done: Task 4 3003 [pool- 1 -thread- 2 ] Done: Task 2 3007 [Thread- 0 ] WaitingFuturesRunner - complete... adding results |
Некоторые полезные статьи / ссылки, которые я нашел и прочитал, делая это:
Oracle: лямбда-урок
IBM: параллелизм Java 8
Томаш Нуркевич: полное руководство по CompletableFuture
Ссылка: | Игра с Java 8 — Lambdas и Concurrency от нашего партнера по JCG Брайана Дю Приза в блоге Zen в искусстве ИТ . |