Статьи

Реализация Producer / Consumer с использованием SynchronousQueue

Среди множества полезных классов, которые Java предоставляет для поддержки параллелизма, есть один, о котором я хотел бы поговорить: SynchronousQueue . В частности, я хотел бы пройтись по реализации Producer / Consumer, используя удобный SynchronousQueue в качестве механизма обмена.

Может показаться неясным, зачем использовать этот тип очереди для связи между производителем и потребителем, если только мы не заглянем за пределы реализации SynchronousQueue . Оказывается, это не совсем очередь, как мы привыкли думать об очередях. Аналогия была бы просто коллекцией, содержащей не более одного элемента.

Почему это полезно? Ну, есть несколько причин. С точки зрения производителя, только один элемент (или сообщение) может быть сохранен в очереди. Чтобы перейти к следующему элементу (или сообщению), производитель должен дождаться, пока потребитель не использует тот, который в данный момент находится в очереди. С точки зрения потребителя, он просто опрашивает очередь на наличие следующего доступного элемента (или сообщения). Все очень просто, но большое преимущество в том, что производитель не может отправлять сообщения быстрее, чем потребитель может их обработать.

Вот один из случаев использования, с которыми я столкнулся в последнее время: сравните две таблицы базы данных (возможно, просто огромные) и определите, содержат ли они разные данные или данные совпадают (копия). SynchronousQueue — довольно удобный инструмент для этой проблемы: он позволяет обрабатывать каждую таблицу в собственном потоке, а также компенсировать возможные таймауты / задержки при чтении из двух разных баз данных.

Давайте начнем с определения нашей функции сравнения, которая принимает источники данных источника и назначения, а также имя таблицы (для сравнения). Я использую весьма полезный класс JdbcTemplate из среды Spring, поскольку он очень хорошо абстрагирует все скучные детали, касающиеся соединений и подготовленных операторов.

1
2
3
4
public boolean compare( final DataSource source, final DataSource destination, final String table )  {
    final JdbcTemplate from = new  JdbcTemplate( source );
    final JdbcTemplate to = new JdbcTemplate( destination );
}

Перед выполнением какого-либо фактического сравнения данных рекомендуется сравнить количество строк в таблице исходной и целевой баз данных:

1
2
3
if( from.queryForLong('SELECT count(1) FROM ' + table ) != to.queryForLong('SELECT count(1) FROM ' + table ) ) {
    return false;
}

Теперь, по крайней мере, зная, что таблица содержит одинаковое количество строк в обеих базах данных, мы можем начать со сравнения данных. Алгоритм очень прост:

  • создать отдельный поток для баз данных источника (производителя) и назначения (потребителя)
  • поток-производитель читает одну строку из таблицы и помещает ее в SynchronousQueue
  • Потребительский поток также читает одну строку из таблицы, затем запрашивает в очереди доступную строку для сравнения (ожидает при необходимости) и, наконец, сравнивает два набора результатов.

Используя еще одну замечательную часть параллельных утилит Java для пула потоков, давайте определим пул потоков с фиксированным количеством потоков (2).

1
2
final ExecutorService executor = Executors.newFixedThreadPool( 2 );
final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();

Следуя описанному алгоритму, функциональность производителя может быть представлена ​​как один вызываемый:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
Callable< Void > producer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        from.query( 'SELECT * FROM ' + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {                  
                        List< ? > row = ...; // convert ResultSet to List
                        if( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) {
                            throw new SQLException( 'Having more data but consumer has already completed' );
                        }
                    } catch( InterruptedException ex ) {
                        throw new SQLException( 'Having more data but producer has been interrupted' );
                    }
                }
            }
        );
 
        return  null;
    }
};

Код немного многословен из-за синтаксиса Java, но на самом деле он мало что делает. Каждый результирующий набор, считанный из таблицы производителя, преобразуется в список (реализация была опущена, поскольку он является образцом) и помещает в очередь ( предложение ). Если очередь не пуста, производитель блокируется, ожидая, пока потребитель завершит свою работу. Соответственно, потребитель может быть представлен следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Callable< Void > consumer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        to.query( 'SELECT * FROM ' + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {
                        List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES );
                        if( source == null ) {
                            throw new SQLException( 'Having more data but producer has already completed' );
                        }                                    
 
                        List< ? > destination = ...; // convert ResultSet to List
                        if( !source.equals( destination ) ) {
                            throw new SQLException( 'Row data is not the same' );
                        }
                    } catch ( InterruptedException ex ) {
                        throw new SQLException( 'Having more data but consumer has been interrupted' );
                    }
                }
            }
        );
 
        return  null;
    }
};

Потребитель выполняет обратную операцию в очереди: вместо помещения данных он извлекает их ( опрос ) из очереди. Если очередь пуста, потребитель блокируется, ожидая, когда производитель опубликует следующую строку. Оставшаяся часть отправляет только те вызовы для исполнения. Любое исключение, возвращаемое методом get Future, указывает, что таблица не содержит те же данные (или существуют проблемы с получением данных из базы данных):

1
2
3
4
List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) );
for( final Future< Void > future: futures ) {
    future.get( 5, TimeUnit.MINUTES );
}

Ссылка: Реализация Producer / Consumer с использованием SynchronousQueue от нашего партнера по JCG Андрея Редько в блоге Андрея Редько {devmind} .