В этой статье мы узнаем, как использовать CompositeItemProcessor при чтении данных из БД MySQL в файловые системы. Это один из декораторов семейства Item Reader и Writer Implementations.
Вариант использования: в некоторых случаях пользователю необходимо добавить специальное поведение к уже существующему ItemReader
. Spring Batch предлагает некоторые из-из-коробки декораторов , которые могут добавить дополнительное поведение для ваших ItemReader
и ItemWriter
реализаций.
Spring Batch включает в себя следующие декораторы:
ClassifierCompositeItemProcessor
ClassifierCompositeItemProcessor
Является , ItemProcessor
что вызывает один из набора ItemProcessor
реализаций на основе шаблона маршрутизатора реализуется через предоставленный Classifier
. Spring Batch обеспечивает ClassifierCompositeItemProcessorBuilder
создание экземпляра ClassifierCompositeItemProcessor
.
Составной элемент, который пропускает элемент через последовательность введенных ItemTransformer
s (возвращаемое значение предыдущего преобразования является входным значением следующего).
Вы также можете быть заинтересованы в: Spring Batch — чтение из XML и запись в Mongo
Обратите внимание, что пользователь отвечает за внедрение цепочки s, которая соответствует объявленным типам ввода и вывода.
Maven Dependency : зависимость проекта, необходимая для запуска этого проекта.
XML
xxxxxxxxxx
1
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4
<modelVersion>4.0.0</modelVersion>
5
<parent>
6
<groupId>org.springframework.boot</groupId>
7
<artifactId>spring-boot-starter-parent</artifactId>
8
<version>2.2.2.RELEASE</version>
9
<relativePath></relativePath> <!-- lookup parent from repository -->
10
</parent>
11
<groupId>com.example</groupId>
12
<artifactId>compositeItemProcess</artifactId>
13
<version>0.0.1-SNAPSHOT</version>
14
<packaging>jar</packaging>
15
<name>compositeItemProcess</name>
16
<description>Demo project for Spring Boot</description>
17
<properties>
19
<java.version>1.8</java.version>
20
</properties>
21
<dependencies>
23
<dependency>
24
<groupId>org.springframework.boot</groupId>
25
<artifactId>spring-boot-starter-batch</artifactId>
26
</dependency>
27
<dependency>
28
<groupId>org.springframework.boot</groupId>
29
<artifactId>spring-boot-starter-jdbc</artifactId>
30
</dependency>
31
<dependency>
33
<groupId>com.h2database</groupId>
34
<artifactId>h2</artifactId>
35
<scope>runtime</scope>
36
</dependency>
37
<dependency>
38
<groupId>mysql</groupId>
39
<artifactId>mysql-connector-java</artifactId>
40
<scope>runtime</scope>
41
</dependency>
42
<dependency>
43
<groupId>org.projectlombok</groupId>
44
<artifactId>lombok</artifactId>
45
<version>1.18.2</version>
46
<optional>true</optional>
47
</dependency>
48
<dependency>
49
<groupId>org.springframework.boot</groupId>
50
<artifactId>spring-boot-starter-test</artifactId>
51
<scope>test</scope>
52
</dependency>
53
<dependency>
54
<groupId>org.springframework.batch</groupId>
55
<artifactId>spring-batch-test</artifactId>
56
<scope>test</scope>
57
</dependency>
58
</dependencies>
59
<build>
61
<plugins>
62
<plugin>
63
<groupId>org.springframework.boot</groupId>
64
<artifactId>spring-boot-maven-plugin</artifactId>
65
</plugin>
66
</plugins>
67
</build>
68
</project>
CustomLineAggregator: интерфейс, используемый для создания строки, представляющей объект.
Джава
xxxxxxxxxx
1
package com.example.aggregator;
2
import org.springframework.batch.item.file.transform.LineAggregator;
4
import com.example.model.Customer;
6
import com.fasterxml.jackson.databind.ObjectMapper;
7
public class CustomLineAggregator implements LineAggregator<Customer> {
9
private ObjectMapper objectMapper = new ObjectMapper();
10
12
public String aggregate(Customer item) {
13
try {
14
return objectMapper.writeValueAsString(item);
15
} catch (Exception e) {
16
throw new RuntimeException("Unable to serialize Customer", e);
17
}
18
}
19
}
CustomerRowMapper: интерфейс, используемый для сопоставления строк для каждой строки. Реализации этого интерфейса выполняют фактическую работу по отображению каждой строки в объект результата, но вам не нужно беспокоиться об обработке исключений. Он будет перехвачен и обработан вызывающим JdbcTemplate.
Джава
xxxxxxxxxx
1
package com.example.mapper;
2
import java.sql.ResultSet;
4
import java.sql.SQLException;
5
import org.springframework.jdbc.core.RowMapper;
7
import com.example.model.Customer;
9
public class CustomerRowMapper implements RowMapper<Customer> {
11
13
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
14
return Customer.builder().id(rs.getLong("id"))
15
.firstName(rs.getString("firstName"))
16
.lastName(rs.getString("lastName"))
17
.birthdate(rs.getString("birthdate")).build();
18
}
19
}
Клиент: класс модели, который содержит данные клиента.
Джава
xxxxxxxxxx
1
2
3
4
5
public class Customer {
6
private Long id;
7
private String firstName;
8
private String lastName;
9
private String birthdate;
10
}
FilteringItemProcessor: интерфейс для преобразования элемента. Учитывая элемент в качестве входных данных, этот интерфейс предоставляет точку расширения, которая позволяет применять бизнес-логику в сценарии обработки, ориентированной на элементы. Следует отметить, что хотя можно вернуть тип, отличный от предоставленного, это не является строго обязательным. Кроме того, возвращение нуля указывает, что элемент не должен быть продолжен для обработки.
Здесь возвращаются только четные идентификаторы клиентов для записи в файл.
Джава
xxxxxxxxxx
1
package com.example.processor;
2
import org.springframework.batch.item.ItemProcessor;
4
import com.example.model.Customer;
6
public class FilteringItemProcessor implements ItemProcessor<Customer, Customer> {
8
10
public Customer process(Customer item) throws Exception {
11
if(item.getId() % 2 == 0)
12
return null;
13
else
14
return item;
15
}
16
}
UpperCaseItemProcessor: здесь мы делаем firstName и LastName в верхнем регистре.
Джава
xxxxxxxxxx
1
package com.example.processor;
2
import org.springframework.batch.item.ItemProcessor;
4
import com.example.model.Customer;
6
public class UpperCaseItemProcessor implements ItemProcessor<Customer, Customer> {
8
10
public Customer process(Customer item) throws Exception {
11
12
return Customer.builder()
13
.id(item.getId())
14
.firstName(item.getFirstName().toUpperCase())
15
.lastName(item.getLastName().toUpperCase())
16
.birthdate(item.getBirthdate())
17
.build();
18
}
19
}
JobConfiguration: это основной файл конфигурации.
Джава
xxxxxxxxxx
1
package com.example.configuration;
2
import java.io.File;
4
import java.util.ArrayList;
5
import java.util.HashMap;
6
import java.util.List;
7
import java.util.Map;
8
import javax.sql.DataSource;
10
import org.springframework.batch.core.Job;
12
import org.springframework.batch.core.Step;
13
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
14
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
15
import org.springframework.batch.item.ItemProcessor;
16
import org.springframework.batch.item.database.JdbcPagingItemReader;
17
import org.springframework.batch.item.database.Order;
18
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
19
import org.springframework.batch.item.file.FlatFileItemWriter;
20
import org.springframework.batch.item.support.CompositeItemProcessor;
21
import org.springframework.beans.factory.annotation.Autowired;
22
import org.springframework.context.annotation.Bean;
23
import org.springframework.context.annotation.Configuration;
24
import org.springframework.core.io.FileSystemResource;
25
import com.example.aggregator.CustomLineAggregator;
27
import com.example.mapper.CustomerRowMapper;
28
import com.example.model.Customer;
29
import com.example.processor.FilteringItemProcessor;
30
import com.example.processor.UpperCaseItemProcessor;
31
33
public class JobConfiguration {
34
35
private JobBuilderFactory jobBuilderFactory;
36
37
38
private StepBuilderFactory stepBuilderFactory;
39
40
41
private DataSource dataSource;
42
43
44
public JdbcPagingItemReader<Customer> customerPagingItemReader(){
45
// reading database records using JDBC in a paging fashion
46
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
47
reader.setDataSource(this.dataSource);
48
reader.setFetchSize(1000);
49
reader.setRowMapper(new CustomerRowMapper());
50
51
// Sort Keys
52
Map<String, Order> sortKeys = new HashMap<>();
53
sortKeys.put("id", Order.ASCENDING);
54
55
// MySQL implementation of a PagingQueryProvider using database specific features.
56
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
57
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
58
queryProvider.setFromClause("from customer");
59
queryProvider.setSortKeys(sortKeys);
60
61
reader.setQueryProvider(queryProvider);
62
63
return reader;
64
}
65
66
67
68
public FlatFileItemWriter<Customer> customerItemWriter() throws Exception{
69
String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
70
System.out.println(">> Output Path = "+customerOutputPath);
71
72
FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<>();
73
writer.setLineAggregator(new CustomLineAggregator());
74
writer.setResource(new FileSystemResource(customerOutputPath));
75
writer.afterPropertiesSet();
76
77
return writer;
78
}
79
80
81
public CompositeItemProcessor<Customer, Customer> compositeItemProcessor() throws Exception{
82
List<ItemProcessor<Customer, Customer>> delegates = new ArrayList<>();
83
delegates.add(new FilteringItemProcessor());
84
delegates.add(new UpperCaseItemProcessor());
85
86
CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>();
87
// Establishes the ItemProcessor delegates that will work on the item to be processed.
88
processor.setDelegates(delegates);
89
processor.afterPropertiesSet();
90
91
return processor;
92
}
93
94
95
96
public Step step1() throws Exception {
97
return stepBuilderFactory.get("step1")
98
.<Customer, Customer>chunk(100)
99
.reader(customerPagingItemReader())
100
.processor(compositeItemProcessor())
101
.writer(customerItemWriter())
102
.build();
103
}
104
105
106
public Job job() throws Exception {
107
return jobBuilderFactory.get("job")
108
.start(step1())
109
.build();
110
}
111
}
112
CompositeItemProcessApplication:
xxxxxxxxxx
1
package com.example;
2
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
4
import org.springframework.boot.SpringApplication;
5
import org.springframework.boot.autoconfigure.SpringBootApplication;
6
8
9
public class CompositeItemProcessApplication {
10
public static void main(String[] args) {
12
SpringApplication.run(CompositeItemProcessApplication.class, args);
13
}
14
}
application.properties:
Джава
xxxxxxxxxx
1
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
2
spring.datasource.url=jdbc:mysql://localhost:3306/test
3
spring.datasource.username=root
4
spring.datasource.password=root
5
spring.batch.initialize-schema=always
schema.sql:
SQL
xxxxxxxxxx
1
CREATE TABLE `test`.`customer` (
2
`id` MEDIUMINT(8) UNSIGNED NOT NULL AUTO_INCREMENT,
3
`firstName` VARCHAR(255) NULL,
4
`lastName` VARCHAR(255) NULL,
5
`birthdate` VARCHAR(255) NULL,
6
PRIMARY KEY (`id`)
7
) AUTO_INCREMENT=1;
data.sql:
Джава
xxxxxxxxxx
1
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
2
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
3
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
4
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
5
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
6
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
7
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
8
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
9
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
10
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
11
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
12
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
13
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
14
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
15
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
16
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
17
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
18
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
19
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
20
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');
21
Вывод:
xxxxxxxxxx
1
/ __/ _ \| '_ ` _ \| '_ \| / __| __/ _ \_____| | __/ _ \ '_ ` _ \ _____| '_ \| '__/ _ \ / __/ _ \/ __/ __|/ _ \| '__|
2
| (_| (_) | | | | | | |_) | \__ \ || __/_____| | || __/ | | | | |_____| |_) | | | (_) | (_| __/\__ \__ \ (_) | |
3
\___\___/|_| |_| |_| .__/|_|___/\__\___| |_|\__\___|_| |_| |_| | .__/|_| \___/ \___\___||___/___/\___/|_|
4
|_| |_|
5
2020-01-08 21:33:27.621 INFO 18608 --- [ main] c.e.CompositeItemProcessApplication : Starting CompositeItemProcessApplication on with PID 18608 (C:\Learnings\spring-batch-latest\Minella\Spring-Batch-by-Michael-Minella\compositeItemProcess\target\classes started by in C:\Learnings\spring-batch-latest\Minella\Spring-Batch-by-Michael-Minella\compositeItemProcess)
6
2020-01-08 21:33:27.624 INFO 18608 --- [ main] c.e.CompositeItemProcessApplication : No active profile set, falling back to default profiles: default
7
>> Output Path = C:\Users\AppData\Local\Temp\1\customerOutput2849649057381160317.out
8
2020-01-08 21:33:28.409 INFO 18608 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
9
2020-01-08 21:33:29.063 INFO 18608 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
10
2020-01-08 21:33:29.806 INFO 18608 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: MYSQL
11
2020-01-08 21:33:29.824 INFO 18608 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
12
2020-01-08 21:33:29.890 INFO 18608 --- [ main] c.e.CompositeItemProcessApplication : Started CompositeItemProcessApplication in 2.58 seconds (JVM running for 3.782)
13
2020-01-08 21:33:29.891 INFO 18608 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: [--spring.output.ansi.enabled=always]
14
2020-01-08 21:33:29.985 INFO 18608 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] launched with the following parameters: [{-spring.output.ansi.enabled=always}]
15
2020-01-08 21:33:30.062 INFO 18608 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
16
2020-01-08 21:33:30.162 INFO 18608 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 100ms
17
2020-01-08 21:33:30.188 INFO 18608 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{-spring.output.ansi.enabled=always}] and the following status: [COMPLETED] in 166ms
18
2020-01-08 21:33:30.192 INFO 18608 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
19
2020-01-08 21:33:30.199 INFO 18608 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
20
Спасибо за прочтение!