Статьи

Пакет масштабирующей пружины — пошаговое разделение

Мы поговорили о том, как начать работу с Spring Batch, в нескольких предыдущих статьях. Теперь мы собираемся начать обсуждение некоторых стратегий, доступных для масштабирования Spring Batch.

В этой статье основное внимание будет уделено разделению шага, чтобы на этом шаге было несколько потоков, каждый из которых обрабатывал кусок данных параллельно. Это полезно, если у вас есть большой кусок данных, который можно логически разделить на более мелкие куски, которые можно обрабатывать параллельно. Это работает так, что вы определяете главный шаг, который отвечает за определение основы блоков, а затем перерабатывает все эти блоки в набор подчиненных шагов для обработки каждого фрагмента.

Разметка

Если бы я мог вернуться к прошлому опыту, прекрасным примером этого была бы обработка всех ежедневных счетов для каждой компании в рамках большой системы закупок. Наши обрабатываемые данные могут быть логически разделены по каждой обрабатываемой компании. Скажем, в этой системе закупок участвуют 250 компаний, и наш разделенный шаг определен в 15 потоков. Наш Партнер , скорее всего, выполнит запрос, чтобы найти все компании, чьи счета-фактуры ожидают обработки в этот день. Ответственность Разделителя на этом этапе будет заключаться в создании ExecutionContext для каждой из этих компаний и добавлении этого на карту с уникальным ключом. Этот ExecutionContext должен содержать любую информацию, необходимую для обработки счетов для этой компании, такую ​​как идентификатор компании и любую другую соответствующую информацию. Когда Partitioner возвращает карту ExecutionContexts, Spring Batch создаст новый шаг для каждой записи на карте и использует значение ключа как часть имени шага. На основе конфигурации, такой как 15 потоков, он затем создаст пул из 15 потоков и начнет выполнять шаги параллельно 15 за раз. Например, если у вас 85 шагов, Spring Batch начнет выполнять 15 шагов, и после завершения каждого шага поток, завершивший этот шаг, будет выбирать следующий шаг и начинать выполнение, пока все шаги не будут завершены.

Пример

Итак, теперь, когда у нас есть общее представление о том, как работает разбиение, давайте рассмотрим простой пример. В нашем случае мы будем проверять входящий каталог, куда будут выгружаться входящие файлы каталога поставщиков. Итак, чтобы создать разделитель Spring Batch, нам нужно создать класс, который реализует интерфейс Partitioner Spring Batch. Поскольку это нечто общее и его можно использовать повторно, мы будем называть этот класс MultiFileResourcePartitioner , который является простым POJO и имеет только одно имя поля «inboundDir», представляющее путь к каталогу, содержащему файлы для обработки. Интерфейсы Partitioner указывают, что класс должен реализовывать метод с именем «partition», который принимает параметр int, представляющий размер сетки, и возвращает Map, который содержит ExecutionContext.

Вот список классов для MultiFileResourcePartitioner:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.keyhole.example.partition;
 
import java.io.File;
import java.util.HashMap;
import java.util.Map;
 
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.core.io.FileSystemResource;
 
public class MultiFileResourcePartitioner implements Partitioner {
 
    private String inboundDir;
 
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
 
        Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
        File dir = new File(inboundDir);
        if (dir.isDirectory()) {
            File[] files = dir.listFiles();
            for (File file : files) {
                ExecutionContext context = new ExecutionContext();
                context.put("fileResource", file.toURI().toString());
                partitionMap.put(file.getName(), context);
            }
        }
        return partitionMap;
    }
 
    public String getInboundDir() {
        return inboundDir;
    }
 
    public void setInboundDir(String inboundDir) {
        this.inboundDir = inboundDir;
    }
 
}

Конфигурация этого компонента в контексте нашего приложения будет выглядеть следующим образом:

1
2
3
4
5
<bean id="inventoryFilePartitioner"
        class="com.keyhole.example.partition.MultiFileResourcePartitioner"
        scope="step">
        <property name="inboundDir" value="/data/inbound" />
</bean>

Рассматривая реализованный метод разбиения, мы просто перечислим все файлы, найденные в указанном входящем каталоге, и создадим ExecutionContext для каждого найденного файла, добавив его к возвращаемой карте. Уникальный ключ, который используется для размещения каждого ExecutionContext на карте, также станет частью имени шага, созданного для каждой записи на карте. Spring Batch будет использовать эту карту разделов для создания подчиненного шага из каждого ключа, найденного на карте.

Чтобы разделить шаг, необходимо сначала создать шаг, на который будет ссылаться конфигурация раздела. Этот шаг должен быть настроен так же, как любой другой шаг в Spring Batch, и для этого примера мы определим FlatFileItemReader и простой ItemWriter , который просто вызовет метод toString () и зарегистрирует его на консоли.

Вот детали конфигурации для шага и связанных с ним пружинных бобов. Здесь важно отметить, что у нас есть ItemReader на уровне шага, чтобы мы не сталкивались с проблемами с несколькими потоками, использующими одни и те же компоненты для обработки данных. Нам также нужно, чтобы они были ограничены таким образом, чтобы мы могли использовать позднюю привязку Spring, чтобы указать значение, содержащее ресурс файла, в ExecutionContext шага.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
<batch:step id="inventoryLoadStep"
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="inventoryLoadReader" writer="logItemWriter"
            commit-interval="5000" />
    </batch:tasklet>
</batch:step>
<bean name="inventoryLoadReader" scope="step"
    class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource"
        value="#{stepExecutionContext['fileResource']}" />
    <property name="lineMapper" ref="inventoryLineMapper" />
<property name="linesToSkip" value="1" />
</bean>
 
<bean name="inventoryLineMapper"
    class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
    <property name="fieldSetMapper" ref="inventoryFieldMapper" />
    <property name="lineTokenizer" ref="inventoryLineTokenizer" />
</bean>
 
<bean name="inventoryLineTokenizer"      class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" />

Поскольку в этом примере мы читаем и обрабатываем текстовые файлы с разделителями-запятыми, у нас очень мало кода для написания этого шага настройки. Единственный код, который нам нужно реализовать, — это FieldSetMapper, необходимый для отображения содержимого строки на объект, представляющий запись файла. Каждая строка в файле будет содержать поля «категория», «подкатегория», «описание», «номер каталога», «цвет», «размер», «цена» и «количество». Таким образом, наш объект будет содержать эти поля, и наш листинг кода FieldSetMapper будет выглядеть следующим образом.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.keyhole.example.partition;
 
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindException;
 
@Component("inventoryFieldMapper")
public class InventoryItemFieldSetMapper implements FieldSetMapper<InventoryItem> {
 
    @Override
    public InventoryItem mapFieldSet(FieldSet fieldSet) throws BindException {
        InventoryItem item = new InventoryItem();
        item.setCategory(fieldSet.readString(0));
        item.setSubCategory(fieldSet.readString(1));
        item.setDescription(fieldSet.readString(2));
        item.setCatalogNum(fieldSet.readString(3));
        item.setColor(fieldSet.readString(4));
        item.setSize(fieldSet.readString(5));
        item.setPrice(fieldSet.readDouble(6));
        item.setQty(fieldSet.readInt(7));
        return item;
    }
 
}

Теперь, когда у нас есть созданный и настроенный Partitioner и Step, остается только настроить сам разделенный шаг! И это так просто, как это:

1
2
3
4
5
6
7
<batch:job id="InventoryLoader">
    <batch:step id="partitionedInventoryLoadStep">
        <batch:partition step="inventoryLoadStep" partitioner="inventoryFilePartitioner">
            <batch:handler grid-size="10" task-executor="inventoryLoadTaskExecutor" />
        </batch:partition>
    </batch:step>
</batch:job>

При настройке сегментированного шага вы определяете шаг так же, как и любой другой шаг, присваивая ему идентификатор и, если требуется, значение следующего шага. Вместо того, чтобы определять содержимое шага как обычный чанк или тасклет, Spring Batch предоставляет тег раздела, который требует от вас указать шаг задания, подлежащий разбиению, и Partitioner, который будет использоваться для определения фрагментов данных. Вам также нужно будет определить обработчик раздела, который будет обрабатывать эти шаги, в этом случае мы будем использовать ThreadPoolTaskExecutor, который будет иметь размер пула потоков 10 и разрешить им тайм-аут, если они не используются.

1
2
3
4
5
6
<bean id="inventoryLoadTaskExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="10" />
    <property name="maxPoolSize" value="10" />
    <property name="allowCoreThreadTimeOut" value="true" />
</bean>

Итак, если у вас есть процесс Spring Batch, в котором есть шаг, обрабатывающий большое количество записей, и вы заинтересованы в повышении производительности, попробуйте попробовать поэтапное разбиение. Он должен быть прост в реализации и обеспечивать дополнительную производительность, чтобы ускорить время обработки.

Дополнительные ресурсы

Для примера кода, связанного с этой статьей, я загрузил исходный код в репозиторий github по адресу https://github.com/jonny-hackett/batch-example . Для выполнения примера кода, связанного с этой статьей, существует тестовое имя JUnit InventoryLoadTest . Файлы данных находятся в папке src / test / resources / data / inbound и должны быть помещены в локальный каталог, соответствующий входящему каталогу Partitioner. Также ознакомьтесь с http://docs.spring.io/spring-batch/reference/html/scalability.html .