Некоторое время назад мой друг спросил меня о возможностях ускорения следующего процесса: они генерируют некоторые данные в два этапа, считывают из базы данных и обрабатывают результаты. Чтение занимает примерно 70% времени, а обработка занимает оставшиеся 30%. К сожалению, они не могут просто загрузить все данные в память, поэтому они разбивают чтение на гораздо более мелкие куски (страницы) и обрабатывают эти страницы после их извлечения, чередуя эти два этапа в цикле. Вот псевдокод того, что они имеют до сих пор:
01
02
03
04
05
06
07
08
09
10
11
12
|
public Data loadData(int page) { //70% of time... } public void process(Data data) { //30% of time... } for (int i = 0 ; i < MAX; ++i) { Data data = loadData(i); process(data); } |
Его идея улучшить алгоритм состояла в том, чтобы каким-то образом начать извлекать следующую страницу данных, когда текущая страница еще обрабатывается, тем самым сокращая общее время выполнения алгоритма. Он был прав, но не знал, как поместить это в Java-код, не имея большого опыта работы с великолепным пакетом java.util.concurrent
. Эта статья предназначена для таких людей, вкратце представляя основные принципы параллельного программирования на Java, такие как пулы потоков и тип Future<T>
. Сначала давайте представим начальную и желаемую реализацию с помощью диаграммы Ганта :
Вторая диаграмма представляет решение, к которому мы стремимся. Первое наблюдение, которое вы должны сделать, заключается в том, что второй процесс заканчивается раньше, и это хорошо. Второй: когда мы обрабатываем первую страницу (желтая 1), вторая страница уже загружается (зеленая 2). Когда мы начинаем обработку страницы 2, страница 3 начала загружаться. И так далее. Мы вернемся к этому графику позже, когда у нас будет рабочая реализация. Давайте поместим это в код.
Потоки — это способ добиться фоновой загрузки данных (зеленые блоки). Однако простой запуск потока для каждого зеленого блока является медленным и неудобным. Пул потоков, состоящий из одного потока, гораздо более гибкий и простой в использовании. Сначала давайте loadData()
наш вызов loadData()
в Callable<Data>
:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
private class LoadDataTask implements Callable<Data> { private final int page; private LoadDataTask( int page) { this .page = page; } @Override public Data call() throws Exception { return loadData(page); } } |
Когда у нас есть такой класс, можно легко заполнить пул потоков (представленный ExecutorService
) и ждать ответа. Вот полная реализация:
01
02
03
04
05
06
07
08
09
10
11
|
ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Data> next = executorService.submit( new LoadDataTask( 0 )); for ( int i = 0 ; i < MAX; ++i) { Future<Data> current = next; if (i + 1 < MAX) { next = executorService.submit( new LoadDataTask(i + 1 )); } Data data = current.get(); //this can block process(data); } executorService.shutdownNow(); |
Executors.newSingleThreadExecutor()
основном создает фоновый поток, ожидающий выполнения задач. Мы не можем использовать больший пул (с большим количеством потоков), потому что тогда мы рискуем хранить слишком много данных в памяти, прежде чем они будут обработаны.
Для примера предположим, что загрузка страницы (зеленые блоки) занимает 700 мс, а обработка (желтые блоки) — 300 мс. В начале мы отправляем начальную задачу для загрузки страницы 0 (первая синяя стрелка указывает вниз). Таким образом, мы должны ждать полных 700 мсек для первого блока. Однако, как только данные становятся доступны, прежде чем мы начнем их обрабатывать, мы немедленно запрашиваем следующую страницу. Когда мы запускаем вторую итерацию, нам не нужно снова ждать полных 700 мс, потому что загрузка данных уже продлилась на 300 мс, поэтому Future.get () блокируется только на 400 мс. Мы повторяем этот процесс, пока не обработаем последнюю страницу. Конечно, у нас нет загрузки следующей страницы данных, потому что мы уже обработали все из них, таким образом это уродливое условие внутри цикла. Этого легко избежать, возвращая нулевой объект из loadData()
когда страница выходит за границы, но давайте оставим это для ясности примера.
Этот подход настолько распространен на предприятии, что выделенная поддержка была добавлена как в Spring, так и в EJB . Давайте использовать Spring в качестве примера. Единственное, что нам нужно изменить, это настроить возвращаемое значение loadData()
из Data в Future<Data>
. Обтекание значения результата с помощью AsyncResult
требуется для компиляции:
1
2
3
4
5
|
@Async public Future<Data> loadData( int page) { //... return new AsyncResult<Data>( new Data(...)); } |
Конечно, этот класс является частью некоторого компонента Spring (скажем, dao
). API теперь намного чище:
1
2
3
4
5
6
7
8
9
|
Future<Data> next = dao.loadData( 0 ); for ( int i = 0 ; i < MAX; ++i) { Future<Data> current = next; if (i + 1 < MAX) { next = dao.loadData(i + 1 ); } Data data = current.get(); processor.process(data); } |
нам больше не нужно использовать Callable
и взаимодействовать с некоторыми пулами потоков. Кроме того, начальная загрузка Spring никогда не была такой простой (так что не говорите мне, что Spring тяжеловесна!):
01
02
03
04
05
06
07
08
09
10
11
|
@Configuration @ComponentScan ( "com.blogspot.nurkiewicz.async" ) @EnableAsync public class Config implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { return Executors.newSingleThreadExecutor(); } } |
Технически getAsyncExecutor()
не требуется, но по умолчанию Spring создаст пул потоков с 10 потоками для методов @Async (а нам нужен только один). Теперь просто запустите это где-нибудь в своем коде.
1
2
|
ApplicationContext context = new AnnotationConfigApplicationContext(Config. class ); |
Урок, извлеченный из этой статьи: не бойтесь параллелизма, это намного проще, чем вы думаете, при условии, что вы используете встроенные абстракции и понимаете их.
Ссылка: Распараллеливание простого варианта использования объяснено нашим партнером по JCG Томашем Нуркевичем в блоге Java и соседей .