В этом примере мы покажем, как читать данные из одной таблицы и записывать их в другую таблицу той же схемы, используя AsyncItemProcessor и AsyncItemWriter .
Асинхронные процессоры помогают вам масштабировать обработку элементов. В случае использования асинхронного процессора AsyncItemProcessor выполняет функции диспетчера, выполняя логику ItemProcessor для элемента в новом потоке. Как только элемент завершается, Future передается в AsynchItemWriter для записи.
Вам также может понравиться: Введение в Spring Batch
Следовательно, вы можете повысить производительность, используя асинхронную обработку элементов, что в основном позволяет реализовать сценарии fork-join. AsyncItemWriter собирает результаты и записывает обратно порцию , как только все результаты становятся доступными.
В следующем примере показано, как настроить AsyncItemProcessor
.
Джава
1
<bean id="processor" class="org.springframework.batch.integration.async.AsyncItemProcessor">
2
<p>
3
<bean class="your.ItemProcessor"/>
4
</property>
5
<p>
6
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
7
</property>
8
</bean>
Джава
xxxxxxxxxx
1
2
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
3
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
4
asyncItemProcessor.setTaskExecutor(taskExecutor);
5
asyncItemProcessor.setDelegate(itemProcessor);
6
return asyncItemProcessor;
7
}
delegate
Свойство относится к вашему ItemProcessor
бобу, и taskExecutor
свойство относится к TaskExecutor
вашему выбору.
В следующем примере показано, как настроить AsyncItemWriter
:
Конфигурация XML
Джава
xxxxxxxxxx
1
<bean id="itemWriter" class="org.springframework.batch.integration.async.AsyncItemWriter">
2
<p>
3
<bean id="itemWriter" class="your.ItemWriter"/>
4
</property>
5
</bean>
Конфигурации Java
Джава
xxxxxxxxxx
1
2
public AsyncItemWriter writer(ItemWriter itemWriter) {
3
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
4
asyncItemWriter.setDelegate(itemWriter);
5
return asyncItemWriter;
6
}
Опять же, delegate
свойство на самом деле является ссылкой на ваш ItemWriter
боб.
Джава
xxxxxxxxxx
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0"
3
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5
<modelVersion>4.0.0</modelVersion>
6
<p>
7
<groupId>org.springframework.boot</groupId>
8
<artifactId>spring-boot-starter-parent</artifactId>
9
<version>2.2.2.RELEASE</version>
10
<relativePath /> <!-- lookup parent from repository -->
11
</parent>
12
<groupId>com.example</groupId>
13
<artifactId>asyncItemProcessorItemWriter</artifactId>
14
<version>0.0.1-SNAPSHOT</version>
15
<p>jar</packaging>
16
<name>asyncItemProcessorItemWriter</name>
17
<description>Demo project for Spring Boot</description>
18
<p>
20
<java.version>1.8</java.version>
21
<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
22
</properties>
23
<dependencies>
25
<dependency>
26
<groupId>org.springframework.boot</groupId>
27
<artifactId>spring-boot-starter-batch</artifactId>
28
</dependency>
29
<dependency>
30
<groupId>org.springframework.boot</groupId>
31
<artifactId>spring-boot-starter-jdbc</artifactId>
32
</dependency>
33
<dependency>
34
<groupId>org.springframework.batch</groupId>
35
<artifactId>spring-batch-integration</artifactId>
36
</dependency>
37
<dependency>
38
<groupId>com.h2database</groupId>
39
<artifactId>h2</artifactId>
40
<scope>runtime</scope>
41
</dependency>
42
<dependency>
43
<groupId>mysql</groupId>
44
<artifactId>mysql-connector-java</artifactId>
45
<scope>runtime</scope>
46
</dependency>
47
<dependency>
48
<groupId>org.projectlombok</groupId>
49
<artifactId>lombok</artifactId>
50
<optional>true</optional>
51
</dependency>
52
<dependency>
53
<groupId>org.springframework.boot</groupId>
54
<artifactId>spring-boot-starter-test</artifactId>
55
<scope>test</scope>
56
</dependency>
57
<dependency>
58
<groupId>org.springframework.batch</groupId>
59
<artifactId>spring-batch-test</artifactId>
60
<scope>test</scope>
61
</dependency>
62
</dependencies>
63
<build>
65
<p>
66
<p>
67
<groupId>org.springframework.boot</groupId>
68
<artifactId>spring-boot-maven-plugin</artifactId>
69
</plugin>
70
</plugins>
71
</build>
72
</project>
73
CustomerRowMapper - интерфейс, используемый для сопоставления строк для каждой строки. Реализации этого интерфейса выполняют фактическую работу по отображению каждой строки в результирующий объект, но не нужно беспокоиться о том, что обработка исключений будет обнаружена и обработана вызывающим JdbcTemplate.
xxxxxxxxxx
1
public class CustomerRowMapper implements RowMapper<Customer> {
2
3
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
4
return Customer.builder().id(rs.getLong("id"))
5
.firstName(rs.getString("firstName"))
6
.lastName(rs.getString("lastName"))
7
.birthdate(rs.getString("birthdate"))
8
.build();
9
}
10
}
Клиент - доменный объект, который содержит данные клиента.
Джава
xxxxxxxxxx
1
2
3
4
5
public class Customer {
6
private Long id;
7
private String firstName;
8
private String lastName;
9
private String birthdate;
10
}
JobConfiguration - удобный интерфейс, который объединяет
Джава
xxxxxxxxxx
1
2
public class JobConfiguration {
3
4
private JobBuilderFactory jobBuilderFactory;
5
6
7
private StepBuilderFactory stepBuilderFactory;
8
9
10
private DataSource dataSource;
11
12
13
14
public JdbcPagingItemReader<Customer> customerPagingItemReader(){
15
// reading database records using JDBC in a paging fashion
16
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
17
reader.setDataSource(this.dataSource);
18
reader.setFetchSize(1000);
19
reader.setRowMapper(new CustomerRowMapper());
20
21
// Sort Keys
22
Map<String, Order> sortKeys = new HashMap<>();
23
sortKeys.put("id", Order.ASCENDING);
24
25
// MySQL implementation of a PagingQueryProvider using database specific features.
26
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
27
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
28
queryProvider.setFromClause("from customer");
29
queryProvider.setSortKeys(sortKeys);
30
31
reader.setQueryProvider(queryProvider);
32
33
return reader;
34
}
35
36
37
public ItemProcessor itemProcessor(){
38
return new ItemProcessor<Customer, Customer>() {
39
41
public Customer process(Customer item) throws Exception {
42
Thread.sleep(new Random().nextInt(10));
43
return Customer.builder().id(item.getId()).firstName(item.getFirstName())
44
.lastName(item.getLastName()).birthdate(item.getBirthdate()).build();
45
}
46
};
47
}
48
49
50
public AsyncItemProcessor asyncItemProcessor() throws Exception{
51
AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
52
asyncItemProcessor.setDelegate(itemProcessor());
53
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
54
asyncItemProcessor.afterPropertiesSet();
55
return asyncItemProcessor;
56
}
57
58
59
public JdbcBatchItemWriter<Customer> customerItemWriter(){
60
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
61
writer.setDataSource(dataSource);
62
writer.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
63
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
64
writer.afterPropertiesSet();
65
66
return writer;
67
}
68
69
70
public AsyncItemWriter<Customer> asyncItemWriter() throws Exception{
71
AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
72
asyncItemWriter.setDelegate(customerItemWriter());
73
asyncItemWriter.afterPropertiesSet();
74
return asyncItemWriter;
75
}
76
77
"unchecked") (
78
79
public Step step1() throws Exception {
80
return stepBuilderFactory.get("step1")
81
.chunk(1000)
82
.reader(customerPagingItemReader())
83
.processor(asyncItemProcessor())
84
.writer(asyncItemWriter())
85
.build();
86
}
87
88
89
public Job job() throws Exception {
90
return jobBuilderFactory.get("job")
91
.start(step1())
92
.build();
93
}
94
}
95
MainApp - Запустите это приложение как приложение Spring Boot.
xxxxxxxxxx
1
2
3
public class AsyncItemProcessorItemWriterApplication implements CommandLineRunner{
4
5
private JobLauncher jobLauncher;
6
8
private Job job;
9
10
public static void main(String[] args) {
11
SpringApplication.run(AsyncItemProcessorItemWriterApplication.class, args);
12
}
13
14
15
public void run(String... args) throws Exception {
16
JobParameters jobParameters = new JobParametersBuilder()
17
.addString("JobId", String.valueOf(System.currentTimeMillis()))
18
.addDate("date", new Date())
19
.addLong("time",System.currentTimeMillis()).toJobParameters();
20
21
JobExecution execution = jobLauncher.run(job, jobParameters);
22
System.out.println("STATUS :: "+execution.getStatus());
23
}
24
}
25
schema.sql
xxxxxxxxxx
1
CREATE TABLE `test`.`customer` (
2
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
3
`firstName` VARCHAR(255) NULL,
4
`lastName` VARCHAR(255) NULL,
5
`birthdate` VARCHAR(255) NULL,
6
PRIMARY KEY (`id`)
7
);
8
10
11
CREATE TABLE `test`.`new_customer` (
12
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
13
`firstName` VARCHAR(255) NULL,
14
`lastName` VARCHAR(255) NULL,
15
`birthdate` VARCHAR(255) NULL,
16
PRIMARY KEY (`id`)
17
);
Application.properties
Джава
xxxxxxxxxx
1
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
2
spring.datasource.url=jdbc:mysql://localhost:3306/test
3
spring.datasource.username=root
4
spring.datasource.password=root
5
spring.batch.initialize-schema=always
6
spring.batch.job.enabled=false
Вывод
Исходя из результатов, мы можем сделать вывод, что мы смогли успешно перенести данные из одной таблицы в другую, используя AsyncItemProcessor и AsyncItemWriter .
Джава
xxxxxxxxxx
1
mysql> use test;
2
Database changed
3
mysql> show tables;
4
+------------------------------+
5
| Tables_in_test |
6
+------------------------------+
7
| batch_job_execution |
8
| batch_job_execution_context |
9
| batch_job_execution_params |
10
| batch_job_execution_seq |
11
| batch_job_instance |
12
| batch_job_seq |
13
| batch_step_execution |
14
| batch_step_execution_context |
15
| batch_step_execution_seq |
16
| customer |
17
| new_customer |
18
+------------------------------+
19
11 rows in set (0.00 sec)
20
mysql> select * from new_customer limit 5;
22
+----+-----------+----------+---------------------+
23
| id | firstName | lastName | birthdate |
24
+----+-----------+----------+---------------------+
25
| 1 | John | Doe | 10-10-1952 10:10:10 |
26
| 2 | Amy | Eugene | 05-07-1985 17:10:00 |
27
| 3 | Laverne | Mann | 11-12-1988 10:10:10 |
28
| 4 | Janice | Preston | 19-02-1960 10:10:10 |
29
| 5 | Pauline | Rios | 29-08-1977 10:10:10 |
30
+----+-----------+----------+---------------------+
31
5 rows in set (0.00 sec)
32
mysql>
Дальнейшее чтение
Spring Batch: чтение XML-файла и запись в базу данных Oracle.
Пакетная обработка больших наборов данных с помощью Spring Boot и Spring Batch