Я работал над переносом некоторых пакетных заданий для Podcastpedia.org на Spring Batch. Раньше эти рабочие места разрабатывались по-моему, и я подумал, что пора использовать более «стандартизированный» подход. Поскольку я никогда раньше не использовал Spring с настройкой Java, я подумал, что это хорошая возможность узнать об этом, настроив задания Spring Batch в Java. И так как я все пытаюсь что-то новое с Spring, почему бы не бросить Spring Boot в лодку …
  Замечания: 
  Перед тем, как вы начнете с этого учебника, я рекомендую вам сначала прочитать « Начало работы Spring — создание пакетной службы» , поскольку представленная здесь структура и код основаны на этом оригинале. 
1. Что я буду строить
Итак, как уже упоминалось, в этом посте я представлю Spring Batch в контексте его настройки и разработки с ним некоторых пакетных заданий для Podcastpedia.org . Вот краткое описание двух заданий, которые в настоящее время являются частью проекта Podcastpedia-batch :
-   addNewPodcastJob
- считывает метаданные подкаста (URL фида, идентификатор, категории и т. д.) из плоского файла
 - преобразует (анализирует и готовит эпизоды для вставки с помощью Http Apache Client ) данные
 - и на последнем шаге вставьте его в базу данных Podcastpedia и сообщите об этом отправителю по электронной почте.
 
 - notifyEmailSubscribeersJob — люди могут подписаться на свои любимые подкасты на Podcastpedia.org по электронной почте. Для тех, кто сделал это, регулярно проверяется (ЕЖЕДНЕВНО, ЕЖЕНЕДЕЛЬНО, ЕЖЕМЕСЯЧНО), доступны ли новые эпизоды, и если они есть, подписчики информируются об этом по электронной почте; читать из базы данных , расширять данные чтения через JPA, перегруппировать их и уведомлять подписчика по электронной почте
 
  Исходный код: 
  Исходный код этого руководства доступен на GitHub — Podcastpedia-batch. 
Примечание. Перед началом работы я также настоятельно рекомендую ознакомиться с языком пакетной обработки доменов , чтобы такие термины, как «Задания», «Шаги» или «ItemReaders» не казались вам странными.
2. Что вам нужно
- Любимый текстовый редактор или IDE
 - JDK 1,7 или позже
 - Maven 3.0+
 
3. Настройте проект
Проект построен с Maven . Он использует Spring Boot, что упрощает создание автономных приложений на основе Spring, которые вы можете «просто запустить». Вы можете узнать больше о Spring Boot, посетив веб-сайт проекта.
3.1. Maven файл сборки
 Поскольку он использует Spring Boot, он будет иметь spring-boot-starter-parent качестве родителя и пару других spring-boot-starters, которые получат для нас некоторые библиотеки, необходимые для проекта: 
pom.xml проекта podcastpedia-batch
| 
 001 
002 
003 
004 
005 
006 
007 
008 
009 
010 
011 
012 
013 
014 
015 
016 
017 
018 
019 
020 
021 
022 
023 
024 
025 
026 
027 
028 
029 
030 
031 
032 
033 
034 
035 
036 
037 
038 
039 
040 
041 
042 
043 
044 
045 
046 
047 
048 
049 
050 
051 
052 
053 
054 
055 
056 
057 
058 
059 
060 
061 
062 
063 
064 
065 
066 
067 
068 
069 
070 
071 
072 
073 
074 
075 
076 
077 
078 
079 
080 
081 
082 
083 
084 
085 
086 
087 
088 
089 
090 
091 
092 
093 
094 
095 
096 
097 
098 
099 
100 
101 
102 
103 
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118 
119 
120 
121 
122 
123 
124 
125 
126 
127 
128 
129 
130 
131 
132 
133 
134 
135 
136 
 | 
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    <modelVersion>4.0.0</modelVersion>    <groupId>org.podcastpedia.batch</groupId>    <artifactId>podcastpedia-batch</artifactId>    <version>0.1.0</version>         <properties>        <sprinb.boot.version>1.1.6.RELEASE</sprinb.boot.version>        <java.version>1.7</java.version>    </properties>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>1.1.6.RELEASE</version>    </parent>         <dependencies>            <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-batch</artifactId>                  </dependency>          <dependency>          <groupId>org.springframework.boot</groupId>          <artifactId>spring-boot-starter-data-jpa</artifactId>                </dependency>                         <dependency>            <groupId>org.apache.httpcomponents</groupId>            <artifactId>httpclient</artifactId>            <version>4.3.5</version>        </dependency>             <dependency>            <groupId>org.apache.httpcomponents</groupId>            <artifactId>httpcore</artifactId>            <version>4.3.2</version>        </dependency>        <!-- velocity -->        <dependency>            <groupId>org.apache.velocity</groupId>            <artifactId>velocity</artifactId>            <version>1.7</version>              </dependency>        <dependency>            <groupId>org.apache.velocity</groupId>            <artifactId>velocity-tools</artifactId>            <version>2.0</version>            <exclusions>                <exclusion>                    <groupId>org.apache.struts</groupId>                    <artifactId>struts-core</artifactId>                </exclusion>            </exclusions>                     </dependency>                                 <!-- Project rome rss, atom -->        <dependency>            <groupId>rome</groupId>            <artifactId>rome</artifactId>            <version>1.0</version>        </dependency>        <!-- option this fetcher thing -->        <dependency>            <groupId>rome</groupId>            <artifactId>rome-fetcher</artifactId>            <version>1.0</version>        </dependency>        <dependency>            <groupId>org.jdom</groupId>            <artifactId>jdom</artifactId>            <version>1.1</version>        </dependency>             <!-- PID 1 -->        <dependency>            <groupId>xerces</groupId>            <artifactId>xercesImpl</artifactId>            <version>2.9.1</version>        </dependency>                                 <!-- MySQL JDBC connector -->        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>5.1.31</version>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-freemarker</artifactId>           </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-remote-shell</artifactId>               <exclusions>                <exclusion>                    <groupId>javax.mail</groupId>                    <artifactId>mail</artifactId>                </exclusion>            </exclusions>                     </dependency>        <dependency>            <groupId>javax.mail</groupId>            <artifactId>mail</artifactId>            <version>1.4.7</version>        </dependency>             <dependency>            <groupId>javax.inject</groupId>            <artifactId>javax.inject</artifactId>            <version>1</version>        </dependency>             <dependency>            <groupId>org.twitter4j</groupId>            <artifactId>twitter4j-core</artifactId>            <version>[4.0,)</version>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                 <artifactId>maven-compiler-plugin</artifactId>             </plugin>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build></project> | 
  Замечания: 
  Одним из больших преимуществ использования spring-boot-starter-parent качестве spring-boot-starter-parent проекта является то, что вам нужно только обновить версию родительского элемента, и он получит «последние» библиотеки для вас.  Когда я запустил проект, весенняя загрузка была в версии 1.1.3.RELEASE и к моменту окончания написания этого поста уже была в 1.1.6.RELEASE . 
3.2. Структура каталога проекта
Я структурировал проект следующим образом:
Структура каталога проекта
| 
 1 
 | 
└── src └── main └── java └── org └── podcastpedia └── batch └── common └── jobs └── addpodcast └── notifysubscribers | 
Замечания:
-   пакет 
org.podcastpedia.batch.jobsсодержитorg.podcastpedia.batch.jobsимеющие определенные классы для определенных заданий. -   пакет 
org.podcastpedia.batch.jobs.commonсодержит классы, используемые всеми заданиями, например, такими как объекты JPA, которые требуются для обоих текущих заданий. 
4. Создайте пакетное задание
Я начну с представления класса конфигурации Java для первого пакетного задания:
Конфигурация пакетного задания
| 
 001 
002 
003 
004 
005 
006 
007 
008 
009 
010 
011 
012 
013 
014 
015 
016 
017 
018 
019 
020 
021 
022 
023 
024 
025 
026 
027 
028 
029 
030 
031 
032 
033 
034 
035 
036 
037 
038 
039 
040 
041 
042 
043 
044 
045 
046 
047 
048 
049 
050 
051 
052 
053 
054 
055 
056 
057 
058 
059 
060 
061 
062 
063 
064 
065 
066 
067 
068 
069 
070 
071 
072 
073 
074 
075 
076 
077 
078 
079 
080 
081 
082 
083 
084 
085 
086 
087 
088 
089 
090 
091 
092 
093 
094 
095 
096 
097 
098 
099 
100 
101 
102 
103 
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118 
 | 
package org.podcastpedia.batch.jobs.addpodcast;import org.podcastpedia.batch.common.configuration.DatabaseAccessConfiguration;import org.podcastpedia.batch.common.listeners.LogProcessListener;import org.podcastpedia.batch.common.listeners.ProtocolListener;import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.ItemReader;import org.springframework.batch.item.ItemWriter;import org.springframework.batch.item.file.FlatFileItemReader;import org.springframework.batch.item.file.LineMapper;import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;import org.springframework.batch.item.file.mapping.DefaultLineMapper;import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Import;import org.springframework.core.io.ClassPathResource;import com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException;@Configuration@EnableBatchProcessing@Import({DatabaseAccessConfiguration.class, ServicesConfiguration.class})public class AddPodcastJobConfiguration {    @Autowired    private JobBuilderFactory jobs;      @Autowired    private StepBuilderFactory stepBuilderFactory;         // tag::jobstep[]    @Bean    public Job addNewPodcastJob(){        return jobs.get("addNewPodcastJob")                .listener(protocolListener())                .start(step())                .build();    }            @Bean    public Step step(){        return stepBuilderFactory.get("step")                .<SuggestedPodcast,SuggestedPodcast>chunk(1) //important to be one in this case to commit after every line read                .reader(reader())                .processor(processor())                .writer(writer())                .listener(logProcessListener())                .faultTolerant()                .skipLimit(10) //default is set to 0                .skip(MySQLIntegrityConstraintViolationException.class)                .build();    }       // end::jobstep[]         // tag::readerwriterprocessor[]    @Bean    public ItemReader<SuggestedPodcast> reader(){        FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>();        reader.setLinesToSkip(1);//first line is title definition         reader.setResource(new ClassPathResource("suggested-podcasts.txt"));        reader.setLineMapper(lineMapper());        return reader;     }    @Bean    public LineMapper<SuggestedPodcast> lineMapper() {        DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>();                 DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();        lineTokenizer.setDelimiter(";");        lineTokenizer.setStrict(false);        lineTokenizer.setNames(new String[]{"FEED_URL", "IDENTIFIER_ON_PODCASTPEDIA", "CATEGORIES", "LANGUAGE", "MEDIA_TYPE", "UPDATE_FREQUENCY", "KEYWORDS", "FB_PAGE", "TWITTER_PAGE", "GPLUS_PAGE", "NAME_SUBMITTER", "EMAIL_SUBMITTER"});                 BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>();        fieldSetMapper.setTargetType(SuggestedPodcast.class);                 lineMapper.setLineTokenizer(lineTokenizer);        lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper());                 return lineMapper;    }    @Bean    public SuggestedPodcastFieldSetMapper suggestedPodcastFieldSetMapper() {        return new SuggestedPodcastFieldSetMapper();    }    /** configure the processor related stuff */    @Bean    public ItemProcessor<SuggestedPodcast, SuggestedPodcast> processor() {        return new SuggestedPodcastItemProcessor();    }         @Bean    public ItemWriter<SuggestedPodcast> writer() {        return new Writer();    }    // end::readerwriterprocessor[]         @Bean    public ProtocolListener protocolListener(){        return new ProtocolListener();    }      @Bean    public LogProcessListener logProcessListener(){        return new LogProcessListener();    }    } | 
  Аннотация @EnableBatchProcessing добавляет много критических компонентов, поддерживающих задания, и экономит нам работу по настройке.  Например, вы также сможете @Autowired некоторые полезные вещи в вашем контексте: 
-   
JobRepository(имя компонента «jobRepository») -   
JobLauncher(имя компонента «jobLauncher») -   
JobRegistry(бобовое имя «jobRegistry») -   
PlatformTransactionManager(имя компонента «транзакционный менеджер») -   
JobBuilderFactory(имя компонента «jobBuilders») для удобства, чтобы вам не приходилось вставлять репозиторий заданий в каждую работу, как в примерах выше -   
StepBuilderFactory(имя компонента «stepBuilders») для удобства, чтобы вам не приходилось внедрять репозиторий заданий и менеджер транзакций на каждом шаге 
Первая часть посвящена фактической конфигурации работы:
Пакетное задание и пошаговая настройка
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
 | 
@Beanpublic Job addNewPodcastJob(){    return jobs.get("addNewPodcastJob")            .listener(protocolListener())            .start(step())            .build();}   @Beanpublic Step step(){    return stepBuilderFactory.get("step")            .<SuggestedPodcast,SuggestedPodcast>chunk(1) //important to be one in this case to commit after every line read            .reader(reader())            .processor(processor())            .writer(writer())            .listener(logProcessListener())            .faultTolerant()            .skipLimit(10) //default is set to 0            .skip(MySQLIntegrityConstraintViolationException.class)            .build();} | 
Первый метод определяет работу, а второй — один шаг. Как вы уже читали в разделе «Доменный язык пакетной обработки» , задания строятся из шагов, где каждый шаг может включать в себя читателя, процессор и писатель.
В определении шага вы определяете, сколько данных записывать за один раз (в нашем случае 1 запись за один раз). Далее вы указываете читатель, процессор и писатель.
5. Пружинные блоки обработки
  Большую часть пакетной обработки можно описать как чтение данных, выполнение каких-либо преобразований и последующую запись результатов.  Это как-то отражает процесс извлечения, преобразования, загрузки (ETL) , если вы знаете об этом больше.  Spring Batch предоставляет три ключевых интерфейса, которые помогают выполнять массовое чтение и запись: ItemReader , ItemProcessor и ItemWriter . 
5.1. Читатели
ItemReader — это абстракция, обеспечивающая возможность извлечения данных из множества различных типов ввода: плоские файлы , XML-файлы , базы данных , JMS и т. Д., По одному элементу за раз. См. Приложение A. Список ItemReaders и ItemWriters для полного списка доступных считывателей элементов.
В пакетных заданиях Podcastpedia я использую следующие специализированные ItemReaders:
5.1.1. FlatFileItemReader
  который, как следует из названия, считывает строки данных из плоского файла, которые обычно описывают записи с полями данных, заданными фиксированными позициями в файле или разделенными каким-либо специальным символом (например, запятой).  Этот тип ItemReader используется в первом пакетном задании addNewPodcastJob .  Используемый входной файл с именем Sugges-podcasts.in находится в classpath ( src / main / resources ) и выглядит примерно так: 
Входной файл для FlatFileItemReader
| 
 1 
2 
3 
 | 
FEED_URL; IDENTIFIER_ON_PODCASTPEDIA; CATEGORIES; LANGUAGE; MEDIA_TYPE; UPDATE_FREQUENCY; KEYWORDS; FB_PAGE; TWITTER_PAGE; GPLUS_PAGE; NAME_SUBMITTER; EMAIL_SUBMITTERhttp://www.5minutebiographies.com/feed/; 5minutebiographies; people_society, history; en; Audio; WEEKLY; biography, biographies, short biography, short biographies, 5 minute biographies, five minute biographies, 5 minute biography, five minute biography; https://www.facebook.com/5minutebiographies; https://twitter.com/5MinuteBios; ; Adrian Matei; adrianmatei@gmail.comhttp://notanotherpodcast.libsyn.com/rss; NotAnotherPodcast; entertainment; en; Audio; WEEKLY; Comedy, Sports, Cinema, Movies, Pop Culture, Food, Games; https://www.facebook.com/notanotherpodcastusa; https://twitter.com/NAPodcastUSA; https://plus.google.com/u/0/103089891373760354121/posts; Adrian Matei; adrianmatei@gmail.com | 
Как видите, первая строка определяет имена «столбцов», а следующие строки содержат фактические данные (разделенные «;»), которые необходимо преобразовать в объекты домена, соответствующие контексту.
  Давайте теперь посмотрим, как настроить FlatFileItemReader : 
Пример FlatFileItemReader
| 
 1 
2 
3 
4 
5 
6 
7 
8 
 | 
@Beanpublic ItemReader<SuggestedPodcast> reader(){    FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>();    reader.setLinesToSkip(1);//first line is title definition     reader.setResource(new ClassPathResource("suggested-podcasts.in"));    reader.setLineMapper(lineMapper());    return reader; } | 
Вы можете указать, среди прочего, входной ресурс, количество пропускаемых строк и отображение строк.
5.1.1.1. LineMapper
  LineMapper — это интерфейс для отображения строк (строк) в объектах домена, обычно используемый для отображения строк, считанных из файла, в объекты домена для каждой строки.  Для задания Podcastpedia я использовал DefaultLineMapper , который представляет собой двухфазную реализацию, состоящую из токенизации строки в FieldSet последующим отображением в элемент: 
Пример реализации LineMapper по умолчанию
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
 | 
@Beanpublic LineMapper<SuggestedPodcast> lineMapper() {    DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>();         DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();    lineTokenizer.setDelimiter(";");    lineTokenizer.setStrict(false);    lineTokenizer.setNames(new String[]{"FEED_URL", "IDENTIFIER_ON_PODCASTPEDIA", "CATEGORIES", "LANGUAGE", "MEDIA_TYPE", "UPDATE_FREQUENCY", "KEYWORDS", "FB_PAGE", "TWITTER_PAGE", "GPLUS_PAGE", "NAME_SUBMITTER", "EMAIL_SUBMITTER"});         BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>();    fieldSetMapper.setTargetType(SuggestedPodcast.class);         lineMapper.setLineTokenizer(lineTokenizer);    lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper());         return lineMapper;} | 
-   
DelimitedLineTokenizerразделяет входную строку через «;» разделитель. -   если вы установите флаг 
strictрежима в значениеfalseто строки с меньшим количеством токенов будут допущены и дополнены пустыми столбцами, а строки с большим количеством токенов будут просто обрезаны. -   имена столбцов из первой строки устанавливаются 
lineTokenizer.setNames(...); -   и 
fieldMapperустановлен (строка 14) 
  Замечания: 
  FieldSet — это «интерфейс, используемый входными источниками плоских файлов для инкапсуляции проблем преобразования массива Strings в нативные типы Java.  Немного похоже на роль ResultSet в JDBC, клиенты будут знать имя или позицию строго типизированных полей, которые они хотят извлечь ». 
5.1.1.2. FieldSetMapper
  FieldSetMapper — это интерфейс, который используется для отображения данных, полученных из FieldSet в объект.  Вот моя реализация, которая отображает fieldSet в объекте домена AdditionalPodcast, который затем будет передан процессору: 
Реализация FieldSetMapper
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
 | 
public class SuggestedPodcastFieldSetMapper implements FieldSetMapper<SuggestedPodcast> {    @Override    public SuggestedPodcast mapFieldSet(FieldSet fieldSet) throws BindException {                 SuggestedPodcast suggestedPodcast = new SuggestedPodcast();                 suggestedPodcast.setCategories(fieldSet.readString("CATEGORIES"));        suggestedPodcast.setEmail(fieldSet.readString("EMAIL_SUBMITTER"));        suggestedPodcast.setName(fieldSet.readString("NAME_SUBMITTER"));        suggestedPodcast.setTags(fieldSet.readString("KEYWORDS"));                 //some of the attributes we can map directly into the Podcast entity that we'll insert later into the database        Podcast podcast = new Podcast();        podcast.setUrl(fieldSet.readString("FEED_URL"));        podcast.setIdentifier(fieldSet.readString("IDENTIFIER_ON_PODCASTPEDIA"));        podcast.setLanguageCode(LanguageCode.valueOf(fieldSet.readString("LANGUAGE")));        podcast.setMediaType(MediaType.valueOf(fieldSet.readString("MEDIA_TYPE")));        podcast.setUpdateFrequency(UpdateFrequency.valueOf(fieldSet.readString("UPDATE_FREQUENCY")));        podcast.setFbPage(fieldSet.readString("FB_PAGE"));        podcast.setTwitterPage(fieldSet.readString("TWITTER_PAGE"));        podcast.setGplusPage(fieldSet.readString("GPLUS_PAGE"));                 suggestedPodcast.setPodcast(podcast);        return suggestedPodcast;    }     } | 
5.2. JdbcCursorItemReader
Во втором задании notifyEmailSubscribeersJob в считывателе я читаю подписчиков электронной почты только из одной таблицы базы данных, но далее в процессоре выполняется более подробное чтение (через JPA) для извлечения всех новых эпизодов подкастов, на которые подписан пользователь , Это распространенная модель, используемая в мире пакетной обработки. Перейдите по этой ссылке для получения более распространенных шаблонов партии.
  Для начального чтения я выбрал JdbcCursorItemReader , который представляет собой простую реализацию для чтения, которая открывает курсор JDBC и постоянно извлекает следующую строку в ResultSet : 
Пример JdbcCursorItemReader
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
 | 
@Beanpublic ItemReader<User> notifySubscribersReader(){         JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>();    String sql = "select * from users where is_email_subscriber is not null";         reader.setSql(sql);    reader.setDataSource(dataSource);    reader.setRowMapper(rowMapper());           return reader;} | 
  Обратите внимание, я должен был установить sql , datasource для чтения и RowMapper . 
5.2.1. RowMapper
  RowMapper — это интерфейс, используемый JdbcTemplate для отображения строк Result’set для каждой строки.  Моя реализация этого интерфейса выполняет фактическую работу по отображению каждой строки в объект результата, но мне не нужно беспокоиться об обработке исключений: 
Реализация RowMapper
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
 | 
public class UserRowMapper implements RowMapper<User> {    @Override    public User mapRow(ResultSet rs, int rowNum) throws SQLException {        User user = new User();        user.setEmail(rs.getString("email"));                 return user;    }} | 
5.2. писатели
  ItemWriter — это абстракция, которая представляет выходные данные Step , одного пакета или части элементов за раз.  Как правило, средство записи элемента не знает о вводе, которое он получит следующим, только элемент, который был передан в его текущем вызове. 
  Авторы двух представленных работ довольно просты.  Они просто используют внешние сервисы для отправки уведомлений по электронной почте и публикации твитов на аккаунте Podcastpedia .  Вот реализация ItemWriter для первого задания — addNewPodcast : 
Писательская реализация ItemWriter
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
57 
58 
59 
60 
61 
62 
63 
64 
65 
66 
67 
68 
 | 
package org.podcastpedia.batch.jobs.addpodcast;import java.util.Date;import java.util.List;import javax.inject.Inject;import javax.persistence.EntityManager;import org.podcastpedia.batch.common.entities.Podcast;import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast;import org.podcastpedia.batch.jobs.addpodcast.service.EmailNotificationService;import org.podcastpedia.batch.jobs.addpodcast.service.SocialMediaService;import org.springframework.batch.item.ItemWriter;import org.springframework.beans.factory.annotation.Autowired;public class Writer implements ItemWriter<SuggestedPodcast>{    @Autowired    private EntityManager entityManager;         @Inject    private EmailNotificationService emailNotificationService;         @Inject    private SocialMediaService socialMediaService;         @Override    public void write(List<? extends SuggestedPodcast> items) throws Exception {        if(items.get(0) != null){            SuggestedPodcast suggestedPodcast = items.get(0);                         //first insert the data in the database             Podcast podcast = suggestedPodcast.getPodcast();                         podcast.setInsertionDate(new Date());            entityManager.persist(podcast);            entityManager.flush();                         //notify submitter about the insertion and post a twitt about it             String url = buildUrlOnPodcastpedia(podcast);                         emailNotificationService.sendPodcastAdditionConfirmation(                    suggestedPodcast.getName(), suggestedPodcast.getEmail(),                    url);            if(podcast.getTwitterPage() != null){                socialMediaService.postOnTwitterAboutNewPodcast(podcast,                url);                           }                           }    }    private String buildUrlOnPodcastpedia(Podcast podcast) {        StringBuffer urlOnPodcastpedia = new StringBuffer(        if (podcast.getIdentifier() != null) {            urlOnPodcastpedia.append("/" + podcast.getIdentifier());        } else {            urlOnPodcastpedia.append("/podcasts/");            urlOnPodcastpedia.append(String.valueOf(podcast.getPodcastId()));            urlOnPodcastpedia.append("/" + podcast.getTitleInUrl());        }               String url = urlOnPodcastpedia.toString();        return url;    }} | 
  Как вы можете видеть, здесь нет ничего особенного, за исключением того, что метод write должен быть переопределен, и именно здесь EmailNotificationService внешние службы EmailNotificationService и SocialMediaService используются для информирования отправителя подкаста по электронной почте о добавлении в каталог подкастов, а также, если используется Twitter. страница была отправлена, твит будет размещен на стене Podcastpedia .  Вы можете найти подробное объяснение о том, как отправлять электронную почту через Velocity и как размещать в Twitter с Java, в следующих сообщениях: 
- Как создавать HTML письма в Java с помощью Spring и Velocity
 - Как опубликовать в Твиттере с Java с Twitter4J за 10 минут
 
5.3. процессоры
  ItemProcessor — это абстракция, представляющая бизнес-обработку элемента.  В то время как ItemReader считывает один элемент, а ItemWriter записывает их, ItemProcessor предоставляет доступ для преобразования или применения другой бизнес-обработки.  При использовании ваших собственных Processors вы должны реализовать ItemProcessor<I,O> , его единственный метод O process(I item) throws Exception , возвращая потенциально измененный или новый элемент для продолжения обработки.  Если возвращаемый результат равен нулю, предполагается, что обработка элемента не должна продолжаться. 
  В то время как процессор первого задания требует немного больше логики, потому что я должен установить атрибуты заголовка etag и last-modified атрибуты фида, эпизоды, категории и ключевые слова подкаста: 
Реализация ItemProcessor для задания addNewPodcast
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
 | 
public class SuggestedPodcastItemProcessor implements ItemProcessor<SuggestedPodcast, SuggestedPodcast> {    private static final int TIMEOUT = 10;    @Autowired    ReadDao readDao;         @Autowired    PodcastAndEpisodeAttributesService podcastAndEpisodeAttributesService;         @Autowired    private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager;           @Autowired    private SyndFeedService syndFeedService;    /**     * Method used to build the categories, tags and episodes of the podcast     */    @Override    public SuggestedPodcast process(SuggestedPodcast item) throws Exception {                 if(isPodcastAlreadyInTheDirectory(item.getPodcast().getUrl())) {            return null;        }                 String[] categories = item.getCategories().trim().split("\\s*,\\s*");               item.getPodcast().setAvailability(org.apache.http.HttpStatus.SC_OK);                 //set etag and last modified attributes for the podcast        setHeaderFieldAttributes(item.getPodcast());                 //set the other attributes of the podcast from the feed         podcastAndEpisodeAttributesService.setPodcastFeedAttributes(item.getPodcast());                         //set the categories        List<Category> categoriesByNames = readDao.findCategoriesByNames(categories);        item.getPodcast().setCategories(categoriesByNames);                 //set the tags        setTagsForPodcast(item);                 //build the episodes         setEpisodesForPodcast(item.getPodcast());                 return item;    }    ......} | 
процессор из второго задания использует подход «Driving Query» , в котором я расширяю данные, полученные из Reader, с помощью «JPA-чтения» и группирую элементы в подкастах с эпизодами, чтобы они хорошо выглядели в электронных письмах, которые я получаю. рассылка подписчикам:
Реализация ItemProcessor второго задания — notifySubscribeers
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
 | 
@Scope("step")public class NotifySubscribersItemProcessor implements ItemProcessor<User, User> {    @Autowired    EntityManager em;         @Value("#{jobParameters[updateFrequency]}")    String updateFrequency;         @Override    public User process(User item) throws Exception {                         String sqlInnerJoinEpisodes = "select e from User u JOIN u.podcasts p JOIN p.episodes e WHERE u.email=?1 AND p.updateFrequency=?2 AND"                + " e.isNew IS NOT NULL  AND e.availability=200 ORDER BY e.podcast.podcastId ASC, e.publicationDate ASC";        TypedQuery<Episode> queryInnerJoinepisodes = em.createQuery(sqlInnerJoinEpisodes, Episode.class);        queryInnerJoinepisodes.setParameter(1, item.getEmail());        queryInnerJoinepisodes.setParameter(2, UpdateFrequency.valueOf(updateFrequency));                                List<Episode> newEpisodes = queryInnerJoinepisodes.getResultList();                 return regroupPodcastsWithEpisodes(item, newEpisodes);                     }    .......} | 
  Замечания: 
  Если вы хотите узнать больше о том, как использовать Apache Http Client, чтобы получить etag и last-modified заголовки, вы можете взглянуть на мой пост — Как использовать новый Apache Http Client, чтобы сделать запрос HEAD 
6. Выполните пакетное приложение
  Пакетная обработка может быть встроена в веб-приложения и файлы WAR, но вначале я выбрал более простой подход, который создает автономное приложение, которое можно запустить методом main() Java: 
Пакетная обработка Java main () метод
| 
 01 
02 
03 
04 
05 
06 
07 
08 
09 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
45 
46 
47 
48 
49 
50 
51 
52 
53 
54 
55 
56 
 | 
package org.podcastpedia.batch;//imports ...;@ComponentScan@EnableAutoConfigurationpublic class Application {    private static final String NEW_EPISODES_NOTIFICATION_JOB = "newEpisodesNotificationJob";    private static final String ADD_NEW_PODCAST_JOB = "addNewPodcastJob";    public static void main(String[] args) throws BeansException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException, InterruptedException {                 Log log = LogFactory.getLog(Application.class);                         SpringApplication app = new SpringApplication(Application.class);        app.setWebEnvironment(false);        ConfigurableApplicationContext ctx= app.run(args);        JobLauncher jobLauncher = ctx.getBean(JobLauncher.class);                                 if(ADD_NEW_PODCAST_JOB.equals(args[0])){            //addNewPodcastJob            Job addNewPodcastJob = ctx.getBean(ADD_NEW_PODCAST_JOB, Job.class);            JobParameters jobParameters = new JobParametersBuilder()            .addDate("date", new Date())            .toJobParameters();                           JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);                         BatchStatus batchStatus = jobExecution.getStatus();            while(batchStatus.isRunning()){                log.info("*********** Still running.... **************");                Thread.sleep(1000);            }            log.info(String.format("*********** Exit status: %s", jobExecution.getExitStatus().getExitCode()));            JobInstance jobInstance = jobExecution.getJobInstance();            log.info(String.format("********* Name of the job %s", jobInstance.getJobName()));                         log.info(String.format("*********** job instance Id: %d", jobInstance.getId()));                         System.exit(0);                     } else if(NEW_EPISODES_NOTIFICATION_JOB.equals(args[0])){            JobParameters jobParameters = new JobParametersBuilder()            .addDate("date", new Date())            .addString("updateFrequency", args[1])            .toJobParameters();                           jobLauncher.run(ctx.getBean(NEW_EPISODES_NOTIFICATION_JOB,  Job.class), jobParameters);           } else {            throw new IllegalArgumentException("Please provide a valid Job name as first application parameter");        }              System.exit(0);    }     } | 
  Лучшее объяснение SpringApplication -, @ComponentScan — и @ComponentScan — @EnableAutoConfiguration которую вы получаете из источника — Начало работы — Создание пакетной службы: 
  «Метод main() ссылается на вспомогательный класс SpringApplication , предоставляя Application.class в качестве аргумента для своего метода run() .  Это заставляет Spring читать метаданные аннотации из Application и управлять ими как компонентом в контексте приложения Spring . 
  Аннотация @ComponentScan указывает Spring рекурсивно выполнять поиск в пакете org.podcastpedia.batch и его дочерних классах, помеченных прямо или косвенно аннотацией @Component .  Эта директива гарантирует, что Spring найдет и зарегистрирует BatchConfiguration , поскольку он помечен @Configuration , что, в свою очередь, является своего рода аннотацией @Component . 
  Аннотация @EnableAutoConfiguration включает разумное поведение по умолчанию в зависимости от содержимого вашего пути к классам.  Например, он ищет любой класс, который реализует интерфейс CommandLineRunner и вызывает его метод run() ». 
Выполнение этапов строительства:
-   
JobLauncher, который является простым интерфейсом для управления заданиями, извлекается из ApplicationContext. Помните, что это автоматически доступно через аннотацию@EnableBatchProcessing. -   Теперь, основываясь на первом параметре приложения ( 
args[0]), я получу соответствующееJobизApplicationContext -   затем 
JobParameters, где я использую текущую дату —.addDate("date", new Date()), так что выполнение задания всегда уникально. -   как только все будет 
JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);, задание можно выполнить:JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters); -   Вы можете использовать возвращенное 
jobExecutionчтобы получить доступ кBatchStatus, кодуBatchStatusили к имени и идентификатору задания. 
Примечание: я настоятельно рекомендую вам прочитать и понять схему метаданных для Spring Batch . Это также поможет вам лучше понять объекты домена Spring Batch.
6.1. Запуск приложения в средах разработки и разработки
Чтобы иметь возможность запускать приложение Spring Batch / Spring Boot в разных средах, я использую возможность Spring Profiles. По умолчанию приложение работает с данными разработки (база данных). Но если я хочу, чтобы работа использовала производственную базу данных, я должен сделать следующее:
-   предоставить следующий аргумент среды 
-Dspring.profiles.active=prod -   настроить свойства производственной базы данных в файле 
application-prod.propertiesв classpath, справа от файлаapplication.propertiesпо умолчанию 
Резюме
В этом руководстве мы узнали, как настроить проект Spring Batch с конфигурацией Spring Boot и Java, как использовать некоторые из наиболее распространенных программ чтения в пакетной обработке, как настроить некоторые простые задания и как запускать задания Spring Batch из основной метод.
| Ссылка: | Spring Batch Tutorial с Spring Boot и настройкой Java от нашего партнера JCG Адриана Матеи в блоге Codingpedia.org . |