Статьи

Игра с Java 8 — лямбды и параллелизм

Итак, Java 8 была выпущена некоторое время назад, с кучей функций и изменений. Все мы, фанаты Java, ждали этого целую вечность, начиная с того момента, когда они первоначально анонсировали все замечательные функции, которые будут в Java 7, которые в конечном итоге были потрачены.

У меня совсем недавно было время, чтобы действительно начать придавать ему реальный вид, я обновил свои домашние проекты до 8, и я должен сказать, что в целом я вполне доволен тем, что мы получили. API-интерфейс java.time «имитирует» JodaTime — это большое улучшение, пакет java.util.stream становится полезным, лямбда-выражения изменят наш стиль кодирования, что может потребовать некоторого привыкания и к этим изменениям… цитата: «С большой силой приходит большая ответственность» звучит правдоподобно, я чувствую, что в будущем могут быть некоторые интересные времена, так как довольно легко написать какой-то сложный для расшифровки код. В качестве примера отладка кода, который я написал ниже, будет «забавной»…

Пример файла находится на моем репозитории блога Github

Этот пример прост: запускайте несколько потоков, выполняйте некоторую работу одновременно, а затем дождитесь их завершения. Я подумал, пока я играю с Java 8, позвольте мне пойти на это полностью …
Вот что я придумал:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package net.briandupreez.blog.java8.futures;
 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
 
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
 
 
/**
 * Generified future running and completion
 *
 * @param <T> the result type
 * @param <S> the task input
 */
public class WaitingFuturesRunner<T, S> {
    private transient static final Log logger = LogFactory.getLog(WaitingFuturesRunner.class);
    private final Collection<Task<T, S>> tasks;
    private final long timeOut;
    private final TimeUnit timeUnit;
    private final ExecutorService executor;
 
    /**
     * Constructor, used to initialise with the required tasks
     *
     * @param tasks the list of tasks to execute
     * @param timeOut  max length of time to wait
     * @param timeUnit     time out timeUnit
     */
    public WaitingFuturesRunner(final Collection<Task<T, S>> tasks, final long timeOut, final TimeUnit timeUnit) {
        this.tasks = tasks;
        this.timeOut = timeOut;
        this.timeUnit = timeUnit;
        this.executor = Executors.newFixedThreadPool(tasks.size());
    }
 
    /**
     * Go!
     *
     * @param taskInput          The input to the task
     * @param consolidatedResult a container of all the completed results
     */
    public void go(final S taskInput, final ConsolidatedResult<T> consolidatedResult) {
        final CountDownLatch latch = new CountDownLatch(tasks.size());
        final List<CompletableFuture<T>> theFutures = tasks.stream()
                .map(aSearch -> CompletableFuture.supplyAsync(() -> processTask(aSearch, taskInput, latch), executor))
                .collect(Collectors.<CompletableFuture<T>>toList());
 
        final CompletableFuture<List<T>> allDone = collectTasks(theFutures);
        try {
            latch.await(timeOut, timeUnit);
            logger.debug("complete... adding results");
            allDone.get().forEach(consolidatedResult::addResult);
        } catch (final InterruptedException | ExecutionException e) {
            logger.error("Thread Error", e);
            throw new RuntimeException("Thread Error, could not complete processing", e);
        }
    }
 
    private <E> CompletableFuture<List<E>> collectTasks(final List<CompletableFuture<E>> futures) {
        final CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        return allDoneFuture.thenApply(v -> futures.stream()
                        .map(CompletableFuture<E>::join)
                        .collect(Collectors.<E>toList())
        );
    }
 
    private T processTask(final Task<T, S> task, final S searchTerm, final CountDownLatch latch) {
        logger.debug("Starting: " + task);
        T searchResults = null;
        try {
            searchResults = task.process(searchTerm, latch);
        } catch (final Exception e) {
            e.printStackTrace();
        }
        return searchResults;
    }
 
}

Тест:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package net.briandupreez.blog.java8.futures;
 
import net.briandupreez.blog.java8.futures.example.StringInputTask;
import net.briandupreez.blog.java8.futures.example.StringResults;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
 
/**
 * Test
 * Created by brian on 4/26/14.
 */
public class CompletableFuturesRunnerTest {
 
    @BeforeClass
    public static void init() {
        BasicConfigurator.configure();
    }
 
    /**
     *  5tasks at 3000ms concurrently should not be more than 3100
     * @throws Exception error
     */
    @Test(timeout = 3100)
    public void testGo() throws Exception {
        final List<Task<String, String>> taskList = setupTasks();
 
        final WaitingFuturesRunner<String, String> completableFuturesRunner = new WaitingFuturesRunner<>(taskList, 4, TimeUnit.SECONDS);
        final StringResults consolidatedResults = new StringResults();
 
        completableFuturesRunner.go("Something To Process", consolidatedResults);
 
        Assert.assertEquals(5, consolidatedResults.getResults().size());
        for (final String s : consolidatedResults.getResults()) {
            Assert.assertTrue(s.contains("complete"));
            Assert.assertTrue(s.contains("Something To Process"));
        }
 
 
    }
 
    private List<Task<String, String>> setupTasks() {
        final List<Task<String, String>> taskList = new ArrayList<>();
        final StringInputTask stringInputTask = new StringInputTask("Task 1");
        final StringInputTask stringInputTask2 = new StringInputTask("Task 2");
        final StringInputTask stringInputTask3 = new StringInputTask("Task 3");
        final StringInputTask stringInputTask4 = new StringInputTask("Task 4");
        final StringInputTask stringInputTask5 = new StringInputTask("Task 5");
        taskList.add(stringInputTask);
        taskList.add(stringInputTask2);
        taskList.add(stringInputTask3);
        taskList.add(stringInputTask4);
        taskList.add(stringInputTask5);
        return taskList;
    }
}

Выход:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
0 [pool-1-thread-1] Starting: StringInputTask{taskName='Task 1'}
 
0 [pool-1-thread-5] Starting: StringInputTask{taskName='Task 5'}
 
0 [pool-1-thread-2] Starting: StringInputTask{taskName='Task 2'}
 
2 [pool-1-thread-4] Starting: StringInputTask{taskName='Task 4'}
 
2 [pool-1-thread-3] Starting: StringInputTask{taskName='Task 3'}
 
3003 [pool-1-thread-5] Done: Task 5
 
3004 [pool-1-thread-3] Done: Task 3
 
3003 [pool-1-thread-1] Done: Task 1
 
3003 [pool-1-thread-4] Done: Task 4
 
3003 [pool-1-thread-2] Done: Task 2
 
3007 [Thread-0] WaitingFuturesRunner  - complete... adding results

Некоторые полезные статьи / ссылки, которые я нашел и прочитал, делая это:

Oracle: лямбда-урок

IBM: параллелизм Java 8

Томаш Нуркевич: полное руководство по CompletableFuture

Ссылка: Игра с Java 8 — Lambdas и Concurrency от нашего партнера по JCG Брайана Дю Приза в блоге Zen в искусстве ИТ .