В нашей фирме у нас был сценарий использования для обработки некоторых файлов, сгенерированных основным кадром. В этом посте я хотел бы описать, как мы реализовали вариант использования с помощью Spring Batch.
Вариант использования: Мэйнфрейм сгенерирует некоторые файлы (которые мы называем это куба) и поместит их на некоторый сетевой диск, доступный для нашего приложения для чтения / переименования / удаления.
Было бы здорово взглянуть на нашу конфигурацию XML:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd"> <job-repository id="jobRepository" data-source="appDataSource" transaction-manager="jtaTransactionManager" xmlns="http://www.springframework.org/schema/batch" /> <job id="userStoreProvisioning" xmlns="http://www.springframework.org/schema/batch" job-repository="jobRepository" restartable="true"> <step id="readWrite" next="clean"> <tasklet transaction-manager="jtaTransactionManager"> <chunk reader="userStoreMultiResourceReader" processor="userStoreProcessors" writer="userStoreItemWriter" commit-interval="1"> <streams> <stream ref="userStoreMultiResourceReader" /> </streams> </chunk> </tasklet> </step> <step id="clean"> <tasklet ref="userStoreBatchCleanupTasklet" transaction-manager="jtaTransactionManager"/> </step> <listeners> <listener ref="userStoreProvisionListener"/> </listeners> </job> <bean id="userStoreBatchCleanupTasklet" class="UserStoreBatchCleanupTasklet" p:directoryResource="${userStoreProvisioning.multiResourceReader.directory}"/> <bean id="userStoreProvisionListener" class="UserStoreProvisionListener" /> <bean id="userStoreItemWriter" class="org.springframework.batch.item.support.CompositeItemWriter"> <property name="delegates"> <list> <bean class="UserStoreJdbcItemWriter"> <property name="jdbcTemplate" ref="userStoreDataSource"/> </bean> <bean class="UserStoreJmsItemWriter"/> </list> </property> </bean> <bean id="userStoreProcessors" class="org.springframework.batch.item.support.CompositeItemProcessor"> <property name="delegates"> <list> <bean class="org.springframework.batch.item.validator.ValidatingItemProcessor"> <property name="validator"> <bean class="MainframeBeanWrapperValidator"/> </property> </bean> </list> </property> </bean> <bean id="userStoreMultiResourceReader" class="MultiResourceItemReader" scope="step" > <property name="resourcePredicate"> <bean class="ResourceFilter" /> </property> <property name="resources" value="${userStoreProvisioning.multiResourceReader.directory}${userStoreProvisioning.multiResourceReader.resources}"/> <property name="comparator"> <bean class="MultiResourceReaderComparator"/> </property> <property name="delegate" ref="userStoreItemReader"/> <property name="saveState" value="true" /> </bean> <bean id="userStoreItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="prototype"> <!-- Used to determine where the line endings are and do things like continue over a line ending if inside a quoted string. --> <property name="recordSeparatorPolicy"> <bean class="org.springframework.batch.item.file.separator.DefaultRecordSeparatorPolicy"/> </property> <property name="saveState" value="true" /> <!-- Interface for mapping lines (strings) to domain objects typically used to map lines read from a file to domain objects on a per line basis. --> <property name="lineMapper"> <bean id="userStoreLineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <!-- A LineTokenizer implementation that splits the input String on a configurable delimiter. --> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="delimiter" value=";"/> <property name="names" value="userId,actionType,userType"/> </bean> </property> <!--Implementation based on bean property paths--> <property name="fieldSetMapper"> <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"> <property name="prototypeBeanName" value="mainframeBeanWrapper"/> </bean> </property> </bean> </property> </bean> <bean id="mainframeBeanWrapper" class="MainframeBeanWrapper" scope="prototype"/> </beans>В приведенном выше XML-файле есть несколько элементов, которые требуют более подробного описания:
Вещь | Описание |
jobRepository |
репозиторий заданий является ключевым элементом инфраструктуры Spring Batch, поскольку он предоставляет некоторые метаданные о пакетной обработке. |
userStoreProvisioning | Это основная работа нашего варианта использования. |
меры | В этом случае использования есть два шага. Первый шаг (readWrite) отвечает за чтение из файла и сохранение в базе данных, а также за отправку сообщения в соответствующую JMS. Последний шаг — очистка, которая отвечает за удаление обработанного файла. |
userStoreMultiResourceReader | Это наш класс, который мы расширили от org.springframework.batch.item.file.MultiResourceItemReader; По сути, мы добавили в Spring Batch функцию: фильтр ресурсов. С помощью концепции фильтра мы можем легко управлять тем, какой файл мы хотим обработать в первую очередь. Например, мы хотим сначала обработать сбойные, а затем и другие файлы.
Внутри этого класса мы используем компаратор для обработки сортировки файлов. |
userStoreItemReader | По сути, это предустановленная функция из Batch, которая поможет нам прочитать значения файла. Сам XML явно самоописывается, поэтому единственное, что заслуживает того, чтобы его озвучить, это: saveState; это элемент, который будет сохранять состояние показаний, и в случае сбоя процесса заполнения, когда пакетный режим хочет повторить попытку, он начнет работу с последнего сохраненного состояния. |
userStoreItemWriter | У нас есть сценарий использования для записи прочитанных строк файла в базу данных и JMS. Следовательно, мы достигаем этой цели, потребляя CompositeItemWriter. |
UserStoreJdbcItemWriter | Это пользовательский ItemWriter для JDBC. Потому что у нас была логика, которую нам иногда приходилось вставлять, а иногда нам приходилось обновлять. |
userStoreBatchCleanupTasklet | Это пользовательский тасклет для удаления обработанных файлов. |
Здесь вы можете найти некоторые исходные коды вышеупомянутых предметов.
MultiResourceItemReader
/** * This class is basically identical with {@link org.springframework.batch.item.file.MultiResourceItemReader}. This * class has added a feature on filtering the resources before processing them. * * @author Chris Shayan */ public class MultiResourceItemReader<T> extends org.springframework.batch.item.file.MultiResourceItemReader<T> { ResourcePredicate<Resource> resourcePredicate; @Override public void setResources(Resource[] resources) { Validate.notNull(resources, "The resources must not be null"); List<Resource> resourcesList = new ArrayList<Resource>(Arrays.asList(resources)); if (resourcePredicate.isPredicateActive()) { for (Iterator<Resource> it = resourcesList.iterator(); it.hasNext(); ) { if (resourcePredicate.evaluate(it.next()) == false) { it.remove(); } } } super.setResources(resourcesList.toArray(new Resource[resourcesList.size()])); } public void setResourcePredicate(ResourcePredicate<Resource> resourcePredicate) { Validate.notNull(resourcePredicate); this.resourcePredicate = resourcePredicate; } }
ResourceFilter
/** * This class will ensure what resource files should be processed in this cycle of execution. * <p><b>Warning:</b> This class should not be singleton at all.</p> * * @author Chris Shayan */ public class ResourceFilter implements ResourcePredicate<Resource> { @Autowired JobExplorer jobExplorer; private JobParameters lastJobInstanceJobParameters; @Override public boolean evaluate(Resource object) { Validate.notNull(object); Validate.notEmpty(object.getFilename()); return lastJobInstanceJobParameters.getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY).contains(object.getFilename()); } @Override public boolean isPredicateActive() { final List<JobInstance> userStoreJobInstances = jobExplorer.getJobInstances(SchedulingLauncher.USER_STORE_PROVISIONING, 0, 1); if (CollectionUtils.isNotEmpty(userStoreJobInstances)) { final JobInstance lastJobInstance = userStoreJobInstances.get(0); List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(lastJobInstance); final JobExecution lastJobExecution = jobExecutions.get(0); // get the first instance which is the last job execution. if (ExitStatus.FAILED.equals(lastJobExecution.getExitStatus())) { lastJobInstanceJobParameters = lastJobInstance.getJobParameters(); return true; } } return false; } }
MainframeBeanWrapper
/** * A bean class wrapping User entity object and with additional properties read from file. */ public class MainframeBeanWrapper { private String userId; private String userType; private String actionType; /** * @return the userId */ public String getUserId() { return userId; } /** * @param userId the userId to set */ public void setUserId(String userId) { this.userId = userId; } /** * @return */ public String getUserType() { return userType; } /** * @param userType */ public void setUserType(String userType) { this.userType = userType; } /** * @return */ public String getActionType() { return actionType; } /** * @param actionType */ public void setActionType(String actionType) { this.actionType = actionType; } @Override public String toString() { return "userId: " + userId + ", userType: " + userType + ", actionType: " + actionType; } }
UserStoreBatchCleanupTasklet
/** * This step is in charge of deleting the processed files. * @author Chris Shayan */ public class UserStoreBatchCleanupTasklet implements Tasklet, InitializingBean { private Resource directoryResource; @Override public RepeatStatus execute(StepContribution contribution, final ChunkContext chunkContext) throws Exception { final File directory = directoryResource.getFile(); if (directory.isDirectory()) { final File[] processedFiles = directory.listFiles(new FileFilter() { @Override public boolean accept(File pathName) { String fileNames = chunkContext .getStepContext() .getStepExecution() .getJobParameters() .getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY); return fileNames.contains(pathName.getName()); } }); for (File processedFile : processedFiles) { FileUtils.forceDelete(processedFile); } } return RepeatStatus.FINISHED; } @Override public void afterPropertiesSet() throws Exception { Validate.notNull(directoryResource, "directory must be set. Make sure your service.properties has following key [userStoreProvisioning.multiResourceReader.directory]"); } public void setDirectoryResource(Resource directory) { this.directoryResource = directory; } }
UserStoreProvisionListener
/** * This is an instance of {@link JobExecutionListener} which on {@link JobExecutionListener#afterJob(org.springframework.batch.core.JobExecution)} * it will do a propper logging; with current format: Spring Batch with jobName[{0}], started at [{1}], ended at [{2}]. It processed these files [{3}] * * @author Chris Shayan */ public class UserStoreProvisionListener implements JobExecutionListener { private static final Logger LOG = Logger.getLogger(UserStoreProvisionListener.class); private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd-HH.mm.ss"); @Override public void beforeJob(JobExecution jobExecution) { } /** * Do a proper logging. * * @param jobExecution See {@link JobExecution} */ @Override public void afterJob(JobExecution jobExecution) { final String fileNames = jobExecution.getJobInstance().getJobParameters().getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY); if (StringUtils.isNotBlank(fileNames)) { // If there are no file which are process, then no need to log anything. log(jobExecution, fileNames); } } /** * It will log the proper message based on {@link BatchStatus}. If it is {@link BatchStatus#COMPLETED} it will use {@link Logger#info(Object)} otherwise it will use {@link Logger#error(Object)} * @param jobExecution See {@link JobExecution} * @param fileNames The file(s) which have been processed by this spring batch job. */ public void log(JobExecution jobExecution, String fileNames) { final BatchStatus status = jobExecution.getStatus(); final String startTime = DATETIME_FORMATTER.print(new DateTime(jobExecution.getStartTime().getTime())); final String endTime = DATETIME_FORMATTER.print(new DateTime(jobExecution.getEndTime().getTime())); final String jobName = jobExecution.getJobInstance().getJobName(); final String logMessage = MessageFormat.format("Spring Batch with jobName[{0}], started at [{1}], ended at [{2}]. It processed these files [{3}]", jobName, startTime, endTime, fileNames); if (BatchStatus.COMPLETED.equals(status)) { LOG.info(logMessage); } else { LOG.error(logMessage); } } }
SchedulingLauncher
/** * This class is in charge of launching the user store provision spring batch jobs. * @author Chris Shayan */ public class SchedulingLauncher implements InitializingBean { private static final Logger LOG = Logger.getLogger(SchedulingLauncher.class); public static final String USER_STORE_PROVISIONING = "userStoreProvisioning"; @Autowired JobExplorer jobExplorer; private Resource resourcesPath; private Job job; private JobLauncher jobLauncher; /** * It is a field to be used as a key for data sharing between * {@link org.springframework.batch.item.file.MultiResourceItemReader} and {@link UserStoreBatchCleanupTasklet} */ public static final String USERSTORE_FILE_NAMES_KEY = String.format("%s.userstore_file_names_key", SchedulingLauncher.class.getSimpleName()); @Override public void afterPropertiesSet() throws Exception { Validate.notNull(resourcesPath, "directory must be set. Make sure your service.properties has following key [userStoreProvisioning.multiResourceReader.directory]"); } public void setResources(Resource resources) { this.resourcesPath = resources; } /** * This method is defined as a scheduler within batch.xml * * @throws Exception See {@link Exception} */ public void launch() throws Exception { // if there is no files in the folder, do not launch the job. if (StringUtils.isNotBlank(getFileNames())) { jobLauncher.run(job, getJobParameters()); } else { LOG.info("There is no files in the specific folder to be processed."); } } /** * Prepare a list of file names * * @return comma separated file names * @throws IOException */ private String getFileNames() throws IOException { File[] fileList = resourcesPath.getFile().listFiles(); String names = ""; if (fileList == null || fileList.length == 0) { names = ""; } else { for (File file : fileList) { names += String.format(",%s", file.getName()); } } return names; } /** * @return an appropriate JobParameter for jobLauncher. If there is a FAILED job execution in the last job instance, * return its JobParameter to restart the job again. */ private JobParameters getJobParameters() throws IOException { final List<JobInstance> userStoreJobInstances = jobExplorer.getJobInstances(USER_STORE_PROVISIONING, 0, 1); if (CollectionUtils.isNotEmpty(userStoreJobInstances)) { final JobInstance lastJobInstance = userStoreJobInstances.get(0); List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(lastJobInstance); final JobExecution lastJobExecution = jobExecutions.get(0); // get the first instance which is the last job execution. if (ExitStatus.FAILED.equals(lastJobExecution.getExitStatus())) { final JobParameters lastJobInstanceJobParameters = lastJobInstance.getJobParameters(); return new JobParametersBuilder() .addString(USERSTORE_FILE_NAMES_KEY, lastJobInstanceJobParameters.getString(USERSTORE_FILE_NAMES_KEY)) .toJobParameters(); } } return getDefaultJobParameters(); } /** * @return default JobParameter which consist of the files' name to be processed. * * @throws IOException */ private JobParameters getDefaultJobParameters() throws IOException { return new JobParametersBuilder() .addString(USERSTORE_FILE_NAMES_KEY, getFileNames()) .toJobParameters(); } public void setJob(Job job) { this.job = job; } public void setJobLauncher(JobLauncher jobLauncher) { this.jobLauncher = jobLauncher; } }
Я надеюсь, что этот брифинг был кратким руководством для Spring Batch в ваших случаях использования. В то же время, я должен поблагодарить одного из моих товарищей, Энди Нга, который внес большой вклад в реализацию этого варианта использования.