Статьи

Корпоративный вариант использования для Spring Batch

В нашей фирме у нас был сценарий использования для обработки некоторых файлов, сгенерированных основным кадром. В этом посте я хотел бы описать, как мы реализовали вариант использования с помощью Spring Batch.

Вариант использования: Мэйнфрейм сгенерирует некоторые файлы (которые мы называем это куба) и поместит их на некоторый сетевой диск, доступный для нашего приложения для чтения / переименования / удаления.

Было бы здорово взглянуть на нашу конфигурацию XML:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
						http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">

    <job-repository id="jobRepository"
                    data-source="appDataSource"
                    transaction-manager="jtaTransactionManager"
                    xmlns="http://www.springframework.org/schema/batch"
            />
    <job id="userStoreProvisioning" xmlns="http://www.springframework.org/schema/batch" job-repository="jobRepository" restartable="true">
        <step id="readWrite" next="clean">
            <tasklet transaction-manager="jtaTransactionManager">
                <chunk
                        reader="userStoreMultiResourceReader"
                        processor="userStoreProcessors"
                        writer="userStoreItemWriter"
                        commit-interval="1">
                    <streams>
                        <stream ref="userStoreMultiResourceReader" />
                    </streams>
                </chunk>
            </tasklet>
        </step>
        <step id="clean">
            <tasklet ref="userStoreBatchCleanupTasklet" transaction-manager="jtaTransactionManager"/>
        </step>
        <listeners>
            <listener ref="userStoreProvisionListener"/>
        </listeners>
    </job>

    <bean id="userStoreBatchCleanupTasklet" class="UserStoreBatchCleanupTasklet"
          p:directoryResource="${userStoreProvisioning.multiResourceReader.directory}"/>

    <bean id="userStoreProvisionListener" class="UserStoreProvisionListener" />

    <bean id="userStoreItemWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
        <property name="delegates">
            <list>
                <bean class="UserStoreJdbcItemWriter">
                    <property name="jdbcTemplate" ref="userStoreDataSource"/>
                </bean>
                <bean class="UserStoreJmsItemWriter"/>
            </list>
        </property>
    </bean>

    <bean id="userStoreProcessors" class="org.springframework.batch.item.support.CompositeItemProcessor">
        <property name="delegates">
            <list>
                <bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
                    <property name="validator">
                        <bean class="MainframeBeanWrapperValidator"/>
                    </property>
                </bean>
            </list>
        </property>
    </bean>

    <bean id="userStoreMultiResourceReader" class="MultiResourceItemReader" scope="step" >
        <property name="resourcePredicate">
            <bean class="ResourceFilter" />
        </property>
        <property name="resources"
                  value="${userStoreProvisioning.multiResourceReader.directory}${userStoreProvisioning.multiResourceReader.resources}"/>
        <property name="comparator">
            <bean class="MultiResourceReaderComparator"/>
        </property>
        <property name="delegate" ref="userStoreItemReader"/>
        <property name="saveState" value="true" />
    </bean>


    <bean id="userStoreItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="prototype">
        <!-- Used to determine where the line endings are and do things like continue over a line ending if inside a quoted string. -->
        <property name="recordSeparatorPolicy">
            <bean class="org.springframework.batch.item.file.separator.DefaultRecordSeparatorPolicy"/>
        </property>
        <property name="saveState" value="true" />
        <!-- Interface for mapping lines (strings) to domain objects typically used to map lines read from a file to domain objects on a per line basis. -->
        <property name="lineMapper">
            <bean id="userStoreLineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

                <!-- A LineTokenizer implementation that splits the input String on a configurable delimiter. -->
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="delimiter" value=";"/>
                        <property name="names" value="userId,actionType,userType"/>
                    </bean>
                </property>

                <!--Implementation based on bean property paths-->
                <property name="fieldSetMapper">
                    <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <property name="prototypeBeanName" value="mainframeBeanWrapper"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="mainframeBeanWrapper" class="MainframeBeanWrapper" scope="prototype"/>

</beans>

В приведенном выше XML-файле есть несколько элементов, которые требуют более подробного описания:

Вещь Описание
jobRepository

репозиторий заданий является ключевым элементом инфраструктуры Spring Batch, поскольку он предоставляет некоторые метаданные о пакетной обработке.
userStoreProvisioning Это основная работа нашего варианта использования.
меры В этом случае использования есть два шага. Первый шаг (readWrite) отвечает за чтение из файла и сохранение в базе данных, а также за отправку сообщения в соответствующую JMS. Последний шаг — очистка, которая отвечает за удаление обработанного файла.
 userStoreMultiResourceReader  Это наш класс, который мы расширили от org.springframework.batch.item.file.MultiResourceItemReader; По сути, мы добавили в Spring Batch функцию: фильтр ресурсов. С помощью концепции фильтра мы можем легко управлять тем, какой файл мы хотим обработать в первую очередь. Например, мы хотим сначала обработать сбойные, а затем и другие файлы.

Внутри этого класса мы используем компаратор для обработки сортировки файлов.

 userStoreItemReader  По сути, это предустановленная функция из Batch, которая поможет нам прочитать значения файла. Сам XML явно самоописывается, поэтому единственное, что заслуживает того, чтобы его озвучить, это: saveState; это элемент, который будет сохранять состояние показаний, и в случае сбоя процесса заполнения, когда пакетный режим хочет повторить попытку, он начнет работу с последнего сохраненного состояния.
 userStoreItemWriter  У нас есть сценарий использования для записи прочитанных строк файла в базу данных и JMS. Следовательно, мы достигаем этой цели, потребляя CompositeItemWriter. 
 UserStoreJdbcItemWriter  Это пользовательский ItemWriter для JDBC. Потому что у нас была логика, которую нам иногда приходилось вставлять, а иногда нам приходилось обновлять.
 userStoreBatchCleanupTasklet  Это пользовательский тасклет для удаления обработанных файлов.

Здесь вы можете найти некоторые исходные коды вышеупомянутых предметов.

MultiResourceItemReader

/**
 * This class is basically identical with {@link org.springframework.batch.item.file.MultiResourceItemReader}. This
 * class has added a feature on filtering the resources before processing them.
 *
 * @author Chris Shayan
 */
public class MultiResourceItemReader<T> extends org.springframework.batch.item.file.MultiResourceItemReader<T> {
    ResourcePredicate<Resource> resourcePredicate;

    @Override
    public void setResources(Resource[] resources) {
        Validate.notNull(resources, "The resources must not be null");

        List<Resource> resourcesList = new ArrayList<Resource>(Arrays.asList(resources));
        if (resourcePredicate.isPredicateActive()) {
            for (Iterator<Resource> it = resourcesList.iterator(); it.hasNext(); ) {
                if (resourcePredicate.evaluate(it.next()) == false) {
                    it.remove();
                }
            }
        }

        super.setResources(resourcesList.toArray(new Resource[resourcesList.size()]));
    }

    public void setResourcePredicate(ResourcePredicate<Resource> resourcePredicate) {
        Validate.notNull(resourcePredicate);

        this.resourcePredicate = resourcePredicate;
    }
}

ResourceFilter

/**
 * This class will ensure what resource files should be processed in this cycle of execution.
 * <p><b>Warning:</b> This class should not be singleton at all.</p>
 *
 * @author Chris Shayan
 */
public class ResourceFilter implements ResourcePredicate<Resource> {
    @Autowired
    JobExplorer jobExplorer;

    private JobParameters lastJobInstanceJobParameters;

    @Override
    public boolean evaluate(Resource object) {
        Validate.notNull(object);
        Validate.notEmpty(object.getFilename());

        return lastJobInstanceJobParameters.getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY).contains(object.getFilename());
    }

    @Override
    public boolean isPredicateActive() {
        final List<JobInstance> userStoreJobInstances = jobExplorer.getJobInstances(SchedulingLauncher.USER_STORE_PROVISIONING, 0, 1);
        if (CollectionUtils.isNotEmpty(userStoreJobInstances)) {
            final JobInstance lastJobInstance = userStoreJobInstances.get(0);
            List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(lastJobInstance);

            final JobExecution lastJobExecution = jobExecutions.get(0); // get the first instance which is the last job execution.
            if (ExitStatus.FAILED.equals(lastJobExecution.getExitStatus())) {
                lastJobInstanceJobParameters = lastJobInstance.getJobParameters();
                return true;
            }
        }
        return false;
    }
}

MainframeBeanWrapper

/**
 * A bean class wrapping User entity object and with additional properties read from file.
 */
public class MainframeBeanWrapper {
	private String userId;
	
    private String userType;

    private String actionType;

    /**
	 * @return the userId
	 */
	public String getUserId() {
		return userId;
	}

	/**
	 * @param userId the userId to set
	 */
	public void setUserId(String userId) {
		this.userId = userId;
	}

	/**
	 * @return
	 */
	public String getUserType() {
        return userType;
    }

    /**
     * @param userType
     */
    public void setUserType(String userType) {
        this.userType = userType;
    }

    /**
     * @return
     */
    public String getActionType() {
        return actionType;
    }

    /**
     * @param actionType
     */
    public void setActionType(String actionType) {
        this.actionType = actionType;
    }

    @Override
    public String toString() {
    	return "userId: " + userId + ", userType: " + userType + ", actionType: " + actionType;
    }

}

UserStoreBatchCleanupTasklet

/**
 * This step is in charge of deleting the processed files.
 * @author Chris Shayan
 */
public class UserStoreBatchCleanupTasklet implements Tasklet, InitializingBean {
    private Resource directoryResource;

    @Override
    public RepeatStatus execute(StepContribution contribution, final ChunkContext chunkContext) throws Exception {
        final File directory = directoryResource.getFile();

        if (directory.isDirectory()) {
            final File[] processedFiles = directory.listFiles(new FileFilter() {
                @Override
                public boolean accept(File pathName) {
                    String fileNames = chunkContext
                            .getStepContext()
                            .getStepExecution()
                            .getJobParameters()
                            .getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY);
                    return fileNames.contains(pathName.getName());
                }
            });

            for (File processedFile : processedFiles) {
                FileUtils.forceDelete(processedFile);
            }
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Validate.notNull(directoryResource, "directory must be set. Make sure your service.properties has following key [userStoreProvisioning.multiResourceReader.directory]");
    }

    public void setDirectoryResource(Resource directory) {
        this.directoryResource = directory;
    }

}

UserStoreProvisionListener

/**
 * This is an instance of {@link JobExecutionListener} which on {@link JobExecutionListener#afterJob(org.springframework.batch.core.JobExecution)}
 * it will do a propper logging; with current format: Spring Batch with jobName[{0}], started at [{1}], ended at [{2}]. It processed these files [{3}]
 *
 * @author Chris Shayan
 */
public class UserStoreProvisionListener implements JobExecutionListener {
    private static final Logger LOG = Logger.getLogger(UserStoreProvisionListener.class);
    private static final DateTimeFormatter DATETIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd-HH.mm.ss");

    @Override
    public void beforeJob(JobExecution jobExecution) {
    }

    /**
     * Do a proper logging.
     *
     * @param jobExecution See {@link JobExecution}
     */
    @Override
    public void afterJob(JobExecution jobExecution) {
        final String fileNames = jobExecution.getJobInstance().getJobParameters().getString(SchedulingLauncher.USERSTORE_FILE_NAMES_KEY);
        if (StringUtils.isNotBlank(fileNames)) { // If there are no file which are process, then no need to log anything.
            log(jobExecution, fileNames);
        }
    }

    /**
     * It will log the proper message based on {@link BatchStatus}. If it is {@link BatchStatus#COMPLETED} it will use {@link Logger#info(Object)} otherwise it will use {@link Logger#error(Object)}
     * @param jobExecution See {@link JobExecution}
     * @param fileNames The file(s) which have been processed by this spring batch job.
     */
    public void log(JobExecution jobExecution, String fileNames) {
        final BatchStatus status = jobExecution.getStatus();
        final String startTime = DATETIME_FORMATTER.print(new DateTime(jobExecution.getStartTime().getTime()));
        final String endTime = DATETIME_FORMATTER.print(new DateTime(jobExecution.getEndTime().getTime()));
        final String jobName = jobExecution.getJobInstance().getJobName();

        final String logMessage = MessageFormat.format("Spring Batch with jobName[{0}], started at [{1}], ended at [{2}]. It processed these files [{3}]",
                jobName, startTime, endTime, fileNames);

        if (BatchStatus.COMPLETED.equals(status)) {
            LOG.info(logMessage);
        } else {
            LOG.error(logMessage);
        }
    }

}

SchedulingLauncher

/**
 * This class is in charge of launching the user store provision spring batch jobs.
 * @author Chris Shayan
 */
public class SchedulingLauncher implements InitializingBean {
	private static final Logger LOG = Logger.getLogger(SchedulingLauncher.class);
    public static final String USER_STORE_PROVISIONING = "userStoreProvisioning";

    @Autowired
    JobExplorer jobExplorer;

    private Resource resourcesPath;
    private Job job;
    private JobLauncher jobLauncher;

    /**
     * It is a field to be used as a key for data sharing between
     * {@link org.springframework.batch.item.file.MultiResourceItemReader} and {@link UserStoreBatchCleanupTasklet}
     */
    public static final String USERSTORE_FILE_NAMES_KEY = String.format("%s.userstore_file_names_key", SchedulingLauncher.class.getSimpleName());

    @Override
    public void afterPropertiesSet() throws Exception {
        Validate.notNull(resourcesPath, "directory must be set. Make sure your service.properties has following key [userStoreProvisioning.multiResourceReader.directory]");
    }

    public void setResources(Resource resources) {
        this.resourcesPath = resources;
    }

    /**
     * This method is defined as a scheduler within batch.xml
     *
     * @throws Exception See {@link Exception}
     */
    public void launch() throws Exception {
    	// if there is no files in the folder, do not launch the job.
        if (StringUtils.isNotBlank(getFileNames())) {
            jobLauncher.run(job, getJobParameters());
        } else {
        	LOG.info("There is no files in the specific folder to be processed.");
        }
    }

    /**
     * Prepare a list of file names
     *
     * @return comma separated file names
     * @throws IOException
     */
    private String getFileNames() throws IOException {
        File[] fileList = resourcesPath.getFile().listFiles();
        String names = "";
        if (fileList == null || fileList.length == 0) {
            names = "";
        } else {
            for (File file : fileList) {
                names += String.format(",%s", file.getName());
            }
        }
        return names;
    }

    /**
     * @return an appropriate JobParameter for jobLauncher. If there is a FAILED job execution in the last job instance,
     * return its JobParameter to restart the job again.
     */
    private JobParameters getJobParameters() throws IOException {
        final List<JobInstance> userStoreJobInstances = jobExplorer.getJobInstances(USER_STORE_PROVISIONING, 0, 1);
        if (CollectionUtils.isNotEmpty(userStoreJobInstances)) {
            final JobInstance lastJobInstance = userStoreJobInstances.get(0);
            List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(lastJobInstance);

            final JobExecution lastJobExecution = jobExecutions.get(0); // get the first instance which is the last job execution.
            if (ExitStatus.FAILED.equals(lastJobExecution.getExitStatus())) {
                final JobParameters lastJobInstanceJobParameters = lastJobInstance.getJobParameters();
                return new JobParametersBuilder()
                        .addString(USERSTORE_FILE_NAMES_KEY, lastJobInstanceJobParameters.getString(USERSTORE_FILE_NAMES_KEY))
                        .toJobParameters();
            }
        }

        return getDefaultJobParameters();
    }

    /**
     * @return default JobParameter which consist of the files' name to be processed.
     *
     * @throws IOException
     */
    private JobParameters getDefaultJobParameters() throws IOException {
        return new JobParametersBuilder()
        		.addString(USERSTORE_FILE_NAMES_KEY, getFileNames())
        		.toJobParameters();
    }

    public void setJob(Job job) {
        this.job = job;
    }

    public void setJobLauncher(JobLauncher jobLauncher) {
        this.jobLauncher = jobLauncher;
    }
}

Я надеюсь, что этот брифинг был кратким руководством для Spring Batch в ваших случаях использования. В то же время, я должен поблагодарить одного из моих товарищей, Энди Нга, который внес большой вклад в реализацию этого варианта использования.