В нашей фирме у нас был сценарий использования для обработки некоторых файлов, сгенерированных основным кадром. В этом посте я хотел бы описать, как мы реализовали вариант использования с помощью Spring Batch.
Вариант использования: Мэйнфрейм сгенерирует некоторые файлы (которые мы называем это куба) и поместит их на некоторый сетевой диск, доступный для нашего приложения для чтения / переименования / удаления.
Было бы здорово взглянуть на нашу конфигурацию XML:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">
<job-repository id="jobRepository"
data-source="appDataSource"
transaction-manager="jtaTransactionManager"
xmlns="http://www.springframework.org/schema/batch"
/>
<job id="userStoreProvisioning" xmlns="http://www.springframework.org/schema/batch" job-repository="jobRepository" restartable="true">
<step id="readWrite" next="clean">
<tasklet transaction-manager="jtaTransactionManager">
<chunk
reader="userStoreMultiResourceReader"
processor="userStoreProcessors"
writer="userStoreItemWriter"
commit-interval="1">
<streams>
<stream ref="userStoreMultiResourceReader" />
</streams>
</chunk>
</tasklet>
</step>
<step id="clean">
<tasklet ref="userStoreBatchCleanupTasklet" transaction-manager="jtaTransactionManager"/>
</step>
<listeners>
<listener ref="userStoreProvisionListener"/>
</listeners>
</job>
<bean id="userStoreBatchCleanupTasklet" class="UserStoreBatchCleanupTasklet"
p:directoryResource="${userStoreProvisioning.multiResourceReader.directory}"/>
<bean id="userStoreProvisionListener" class="UserStoreProvisionListener" />
<bean id="userStoreItemWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<list>
<bean class="UserStoreJdbcItemWriter">
<property name="jdbcTemplate" ref="userStoreDataSource"/>
</bean>
<bean class="UserStoreJmsItemWriter"/>
</list>
</property>
</bean>
<bean id="userStoreProcessors" class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
<property name="validator">
<bean class="MainframeBeanWrapperValidator"/>
</property>
</bean>
</list>
</property>
</bean>
<bean id="userStoreMultiResourceReader" class="MultiResourceItemReader" scope="step" >
<property name="resourcePredicate">
<bean class="ResourceFilter" />
</property>
<property name="resources"
value="${userStoreProvisioning.multiResourceReader.directory}${userStoreProvisioning.multiResourceReader.resources}"/>
<property name="comparator">
<bean class="MultiResourceReaderComparator"/>
</property>
<property name="delegate" ref="userStoreItemReader"/>
<property name="saveState" value="true" />
</bean>
<bean id="userStoreItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="prototype">
<!-- Used to determine where the line endings are and do things like continue over a line ending if inside a quoted string. -->
<property name="recordSeparatorPolicy">
<bean class="org.springframework.batch.item.file.separator.DefaultRecordSeparatorPolicy"/>
</property>
<property name="saveState" value="true" />
<!-- Interface for mapping lines (strings) to domain objects typically used to map lines read from a file to domain objects on a per line basis. -->
<property name="lineMapper">
<bean id="userStoreLineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<!-- A LineTokenizer implementation that splits the input String on a configurable delimiter. -->
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value=";"/>
<property name="names" value="userId,actionType,userType"/>
</bean>
</property>
<!--Implementation based on bean property paths-->
<property name="fieldSetMapper">
<bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="mainframeBeanWrapper"/>
</bean>
</property>
</bean>
</property>
</bean>
<bean id="mainframeBeanWrapper" class="MainframeBeanWrapper" scope="prototype"/>
</beans>
В приведенном выше XML-файле есть несколько элементов, которые требуют более подробного описания:| Вещь | Описание |
| jobRepository |
репозиторий заданий является ключевым элементом инфраструктуры Spring Batch, поскольку он предоставляет некоторые метаданные о пакетной обработке. |
| userStoreProvisioning | Это основная работа нашего варианта использования. |
| меры | В этом случае использования есть два шага. Первый шаг (readWrite) отвечает за чтение из файла и сохранение в базе данных, а также за отправку сообщения в соответствующую JMS. Последний шаг — очистка, которая отвечает за удаление обработанного файла. |
| userStoreMultiResourceReader | Это наш класс, который мы расширили от org.springframework.batch.item.file.MultiResourceItemReader; По сути, мы добавили в Spring Batch функцию: фильтр ресурсов. С помощью концепции фильтра мы можем легко управлять тем, какой файл мы хотим обработать в первую очередь. Например, мы хотим сначала обработать сбойные, а затем и другие файлы.
Внутри этого класса мы используем компаратор для обработки сортировки файлов. |
| userStoreItemReader | По сути, это предустановленная функция из Batch, которая поможет нам прочитать значения файла. Сам XML явно самоописывается, поэтому единственное, что заслуживает того, чтобы его озвучить, это: saveState; это элемент, который будет сохранять состояние показаний, и в случае сбоя процесса заполнения, когда пакетный режим хочет повторить попытку, он начнет работу с последнего сохраненного состояния. |
| userStoreItemWriter | У нас есть сценарий использования для записи прочитанных строк файла в базу данных и JMS. Следовательно, мы достигаем этой цели, потребляя CompositeItemWriter. |
| UserStoreJdbcItemWriter | Это пользовательский ItemWriter для JDBC. Потому что у нас была логика, которую нам иногда приходилось вставлять, а иногда нам приходилось обновлять. |
| userStoreBatchCleanupTasklet | Это пользовательский тасклет для удаления обработанных файлов. |
Здесь вы можете найти некоторые исходные коды вышеупомянутых предметов.
MultiResourceItemReader
/**
* This class is basically identical with {@link org.springframework.batch.item.file.MultiResourceItemReader}. This
* class has added a feature on filtering the resources before processing them.
*
* @author Chris Shayan
*/
public class MultiResourceItemReader<T> extends org.springframework.batch.item.file.MultiResourceItemReader<T> {
ResourcePredicate<Resource> resourcePredicate;
@Override
public void setResources(Resource[] resources) {
Validate.notNull(resources, "The resources must not be null");
List<Resource> resourcesList = new ArrayList<Resource>(Arrays.asList(resources));
if (resourcePredicate.isPredicateActive()) {
for (Iterator<Resource> it = resourcesList.iterator(); it.hasNext(); ) {
if (resourcePredicate.evaluate(it.next()) == false) {
it.remove();
}
}
}
super.setResources(resourcesList.toArray(new Resource[resourcesList.size()]));
}
public void setResourcePredicate(ResourcePredicate<Resource> resourcePredicate) {
Validate.notNull(resourcePredicate);
this.resourcePredicate = resourcePredicate;
}
}
ResourceFilter
/**
* This class will ensure what resource files should be processed in this cycle of execution.
* <p><b>Warning:</b> This class should not be singleton at all.</p>
*
* @author Chris Shayan
*/
public class ResourceFilter implements ResourcePredicate<Resource> {
@Autowired
JobExplorer jobExplorer;
private JobParameters lastJobInstanceJobParameters;
@Override
public boolean evaluate(Resource object) {
Validate.notNull(object);
Validate.notEmpty(object.getFilename());
return lastJobInstanceJobParameters.getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY).contains(object.getFilename());
}
@Override
public boolean isPredicateActive() {
final List<JobInstance> userStoreJobInstances = jobExplorer.getJobInstances(SchedulingLauncher.USER_STORE_PROVISIONING, 0, 1);
if (CollectionUtils.isNotEmpty(userStoreJobInstances)) {
final JobInstance lastJobInstance = userStoreJobInstances.get(0);
List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(lastJobInstance);
final JobExecution lastJobExecution = jobExecutions.get(0); // get the first instance which is the last job execution.
if (ExitStatus.FAILED.equals(lastJobExecution.getExitStatus())) {
lastJobInstanceJobParameters = lastJobInstance.getJobParameters();
return true;
}
}
return false;
}
}
MainframeBeanWrapper
/**
* A bean class wrapping User entity object and with additional properties read from file.
*/
public class MainframeBeanWrapper {
private String userId;
private String userType;
private String actionType;
/**
* @return the userId
*/
public String getUserId() {
return userId;
}
/**
* @param userId the userId to set
*/
public void setUserId(String userId) {
this.userId = userId;
}
/**
* @return
*/
public String getUserType() {
return userType;
}
/**
* @param userType
*/
public void setUserType(String userType) {
this.userType = userType;
}
/**
* @return
*/
public String getActionType() {
return actionType;
}
/**
* @param actionType
*/
public void setActionType(String actionType) {
this.actionType = actionType;
}
@Override
public String toString() {
return "userId: " + userId + ", userType: " + userType + ", actionType: " + actionType;
}
}
UserStoreBatchCleanupTasklet
/**
* This step is in charge of deleting the processed files.
* @author Chris Shayan
*/
public class UserStoreBatchCleanupTasklet implements Tasklet, InitializingBean {
private Resource directoryResource;
@Override
public RepeatStatus execute(StepContribution contribution, final ChunkContext chunkContext) throws Exception {
final File directory = directoryResource.getFile();
if (directory.isDirectory()) {
final File[] processedFiles = directory.listFiles(new FileFilter() {
@Override
public boolean accept(File pathName) {
String fileNames = chunkContext
.getStepContext()
.getStepExecution()
.getJobParameters()
.getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY);
return fileNames.contains(pathName.getName());
}
});
for (File processedFile : processedFiles) {
FileUtils.forceDelete(processedFile);
}
}
return RepeatStatus.FINISHED;
}
@Override
public void afterPropertiesSet() throws Exception {
Validate.notNull(directoryResource, "directory must be set. Make sure your service.properties has following key [userStoreProvisioning.multiResourceReader.directory]");
}
public void setDirectoryResource(Resource directory) {
this.directoryResource = directory;
}
}
UserStoreProvisionListener
/**
* This is an instance of {@link JobExecutionListener} which on {@link JobExecutionListener#afterJob(org.springframework.batch.core.JobExecution)}
* it will do a propper logging; with current format: Spring Batch with jobName[{0}], started at [{1}], ended at [{2}]. It processed these files [{3}]
*
* @author Chris Shayan
*/
public class UserStoreProvisionListener implements JobExecutionListener {
private static final Logger LOG = Logger.getLogger(UserStoreProvisionListener.class);
private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd-HH.mm.ss");
@Override
public void beforeJob(JobExecution jobExecution) {
}
/**
* Do a proper logging.
*
* @param jobExecution See {@link JobExecution}
*/
@Override
public void afterJob(JobExecution jobExecution) {
final String fileNames = jobExecution.getJobInstance().getJobParameters().getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY);
if (StringUtils.isNotBlank(fileNames)) { // If there are no file which are process, then no need to log anything.
log(jobExecution, fileNames);
}
}
/**
* It will log the proper message based on {@link BatchStatus}. If it is {@link BatchStatus#COMPLETED} it will use {@link Logger#info(Object)} otherwise it will use {@link Logger#error(Object)}
* @param jobExecution See {@link JobExecution}
* @param fileNames The file(s) which have been processed by this spring batch job.
*/
public void log(JobExecution jobExecution, String fileNames) {
final BatchStatus status = jobExecution.getStatus();
final String startTime = DATETIME_FORMATTER.print(new DateTime(jobExecution.getStartTime().getTime()));
final String endTime = DATETIME_FORMATTER.print(new DateTime(jobExecution.getEndTime().getTime()));
final String jobName = jobExecution.getJobInstance().getJobName();
final String logMessage = MessageFormat.format("Spring Batch with jobName[{0}], started at [{1}], ended at [{2}]. It processed these files [{3}]",
jobName, startTime, endTime, fileNames);
if (BatchStatus.COMPLETED.equals(status)) {
LOG.info(logMessage);
} else {
LOG.error(logMessage);
}
}
}
SchedulingLauncher
/**
* This class is in charge of launching the user store provision spring batch jobs.
* @author Chris Shayan
*/
public class SchedulingLauncher implements InitializingBean {
private static final Logger LOG = Logger.getLogger(SchedulingLauncher.class);
public static final String USER_STORE_PROVISIONING = "userStoreProvisioning";
@Autowired
JobExplorer jobExplorer;
private Resource resourcesPath;
private Job job;
private JobLauncher jobLauncher;
/**
* It is a field to be used as a key for data sharing between
* {@link org.springframework.batch.item.file.MultiResourceItemReader} and {@link UserStoreBatchCleanupTasklet}
*/
public static final String USERSTORE_FILE_NAMES_KEY = String.format("%s.userstore_file_names_key", SchedulingLauncher.class.getSimpleName());
@Override
public void afterPropertiesSet() throws Exception {
Validate.notNull(resourcesPath, "directory must be set. Make sure your service.properties has following key [userStoreProvisioning.multiResourceReader.directory]");
}
public void setResources(Resource resources) {
this.resourcesPath = resources;
}
/**
* This method is defined as a scheduler within batch.xml
*
* @throws Exception See {@link Exception}
*/
public void launch() throws Exception {
// if there is no files in the folder, do not launch the job.
if (StringUtils.isNotBlank(getFileNames())) {
jobLauncher.run(job, getJobParameters());
} else {
LOG.info("There is no files in the specific folder to be processed.");
}
}
/**
* Prepare a list of file names
*
* @return comma separated file names
* @throws IOException
*/
private String getFileNames() throws IOException {
File[] fileList = resourcesPath.getFile().listFiles();
String names = "";
if (fileList == null || fileList.length == 0) {
names = "";
} else {
for (File file : fileList) {
names += String.format(",%s", file.getName());
}
}
return names;
}
/**
* @return an appropriate JobParameter for jobLauncher. If there is a FAILED job execution in the last job instance,
* return its JobParameter to restart the job again.
*/
private JobParameters getJobParameters() throws IOException {
final List<JobInstance> userStoreJobInstances = jobExplorer.getJobInstances(USER_STORE_PROVISIONING, 0, 1);
if (CollectionUtils.isNotEmpty(userStoreJobInstances)) {
final JobInstance lastJobInstance = userStoreJobInstances.get(0);
List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(lastJobInstance);
final JobExecution lastJobExecution = jobExecutions.get(0); // get the first instance which is the last job execution.
if (ExitStatus.FAILED.equals(lastJobExecution.getExitStatus())) {
final JobParameters lastJobInstanceJobParameters = lastJobInstance.getJobParameters();
return new JobParametersBuilder()
.addString(USERSTORE_FILE_NAMES_KEY, lastJobInstanceJobParameters.getString(USERSTORE_FILE_NAMES_KEY))
.toJobParameters();
}
}
return getDefaultJobParameters();
}
/**
* @return default JobParameter which consist of the files' name to be processed.
*
* @throws IOException
*/
private JobParameters getDefaultJobParameters() throws IOException {
return new JobParametersBuilder()
.addString(USERSTORE_FILE_NAMES_KEY, getFileNames())
.toJobParameters();
}
public void setJob(Job job) {
this.job = job;
}
public void setJobLauncher(JobLauncher jobLauncher) {
this.jobLauncher = jobLauncher;
}
}
Я надеюсь, что этот брифинг был кратким руководством для Spring Batch в ваших случаях использования. В то же время, я должен поблагодарить одного из моих товарищей, Энди Нга, который внес большой вклад в реализацию этого варианта использования.