Статьи

Получение обратной связи от параллельных задач

Продолжая с того места, где я остановился в моем последнем посте о пакете java.util.concurrent, интересно и иногда обязательно получать отзывы о параллельных задачах после их запуска.

Например, представьте себе приложение, которое должно отправлять пакеты электронной почты, помимо использования многопоточного механизма, вы хотите знать, сколько запланированных электронных писем было успешно отправлено, а в процессе фактической отправки — в режиме реального времени прогресс всей партия.

Для реализации такого рода многопоточности с обратной связью мы можем использовать интерфейс Callable . Этот интерфейс работает в основном так же, как и Runnable , но метод выполнения (call ()) возвращает значение, которое должно отражать результат выполненных вычислений.

Давайте сначала определим класс, который будет выполнять реальную задачу:

package com.ricardozuasti;
 
import java.util.concurrent.Callable;
 
public class FictionalEmailSender implements Callable<Boolean> {
    public FictionalEmailSender (String to, String subject, String body){
        this.to = to;
        this.subject = subject;
        this.body = body;
    }
 
    @Override
    public Boolean call() throws InterruptedException {
        // Simulate that sending the email takes between 0 and 0.5 seconds
        Thread.sleep(Math.round(Math.random()* 0.5 * 1000));
 
        // Lets say we have an 80% chance of successfully sending our email
        if (Math.random()>0.2){
            return true;
        } else {
            return false;
        }
    }
 
    private String to;
    private String subject;
    private String body;
}

Обратите внимание, что ваш Callable может использовать любой тип возврата, поэтому ваша задача может вернуть любую необходимую вам информацию.

Теперь мы можем использовать пул потоков ExecutorService для отправки наших электронных писем, и, поскольку наша задача реализована как Callable , мы получаем ссылку Future для каждой новой задачи, которую мы отправляем на выполнение. Обратите внимание, что мы создадим наш ExecutorService, используя прямой конструктор вместо служебного метода от Executors , это потому, что использование определенного класса ( ThreadPoolExecutor ) предоставляет некоторые методы, которые пригодятся (их нет в интерфейсе ExecutorService).

package com.ricardozuasti;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class Concurrency2 {
 
    public static void main(String[] args) {
        try {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(30, 30, 1, TimeUnit.SECONDS,
                    new LinkedBlockingQueue());
 
            List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(9000);
 
            // Lets spam every 4 digit numeric user on that silly domain
            for (int i = 1000; i < 10000; i++) {
                futures.add(executor.submit(new FictionalEmailSender(i + "@wesellnumericusers.com",
                        "Knock, knock, Neo", "The Matrix has you...")));
            }
 
            // All tasks have been submitted, wen can begin the shutdown of our executor
            System.out.println("Starting shutdown...");
            executor.shutdown();
 
            // Every second we print our progress
            while (!executor.isTerminated()) {
                executor.awaitTermination(1, TimeUnit.SECONDS);
                int progress = Math.round((executor.getCompletedTaskCount() * 100) /
                                          executor.getTaskCount());
 
                System.out.println(progress + "% done (" + executor.getCompletedTaskCount() +
                                   " emails have been sent).");
            }
 
            // Now that we are finished sending all the emails, we can review the futures
            // and see how many were successfully sent
            int errorCount = 0;
            int successCount = 0;
            for (Future<Boolean> future : futures) {
                if (future.get()) {
                    successCount++;
                } else {
                    errorCount++;
                }
            }
 
            System.out.println(successCount + " emails were successfully sent, but "
                    + errorCount + " failed.");
 
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

После того, как все задачи переданы в ExecutorService , мы начинаем его отключение (предотвращая отправку новых задач) и используем цикл (в реальном сценарии вы должны продолжать делать что-то еще, если это возможно), чтобы дождаться завершения всех задач, вычисляя и печать прогресса, достигнутого до сих пор на каждой итерации. Обратите внимание, что вы можете в любое время сохранить ссылку на исполнителя и запросить ее у других потоков, чтобы рассчитать и сообщить о ходе процесса.

Наконец, используя коллекцию ссылок на будущее, которые мы получили для каждого Callable, отправленного в ExecutorService , мы можем сообщить количество успешно отправленных электронных писем и число неудачных попыток.

Эта инфраструктура не только проста в использовании, но и способствует четкому разделению интересов, обеспечивая заранее определенный механизм связи между программой диспетчера и актуальными задачами.