Статьи

Spring Batch Tutorial с Spring Boot и настройкой Java

Я работал над переносом некоторых пакетных заданий для Podcastpedia.org на Spring Batch. Раньше эти рабочие места разрабатывались по-моему, и я подумал, что пора использовать более «стандартизированный» подход. Поскольку я никогда раньше не использовал Spring с настройкой Java, я подумал, что это хорошая возможность узнать об этом, настроив задания Spring Batch в Java. И так как я все пытаюсь что-то новое с Spring, почему бы не бросить Spring Boot в лодку …

Замечания:
Перед тем, как вы начнете с этого учебника, я рекомендую вам сначала прочитать « Начало работы Spring — создание пакетной службы» , поскольку представленная здесь структура и код основаны на этом оригинале.

1. Что я буду строить

Итак, как уже упоминалось, в этом посте я представлю Spring Batch в контексте его настройки и разработки с ним некоторых пакетных заданий для Podcastpedia.org . Вот краткое описание двух заданий, которые в настоящее время являются частью проекта Podcastpedia-batch :

  1. addNewPodcastJob
    1. считывает метаданные подкаста (URL фида, идентификатор, категории и т. д.) из плоского файла
    2. преобразует (анализирует и готовит эпизоды для вставки с помощью Http Apache Client ) данные
    3. и на последнем шаге вставьте его в базу данных Podcastpedia и сообщите об этом отправителю по электронной почте.
  2. notifyEmailSubscribeersJob — люди могут подписаться на свои любимые подкасты на Podcastpedia.org по электронной почте. Для тех, кто сделал это, регулярно проверяется (ЕЖЕДНЕВНО, ЕЖЕНЕДЕЛЬНО, ЕЖЕМЕСЯЧНО), доступны ли новые эпизоды, и если они есть, подписчики информируются об этом по электронной почте; читать из базы данных , расширять данные чтения через JPA, перегруппировать их и уведомлять подписчика по электронной почте

Исходный код:
Исходный код этого руководства доступен на GitHub — Podcastpedia-batch.

Примечание. Перед началом работы я также настоятельно рекомендую ознакомиться с языком пакетной обработки доменов , чтобы такие термины, как «Задания», «Шаги» или «ItemReaders» не казались вам странными.

2. Что вам нужно

  • Любимый текстовый редактор или IDE
  • JDK 1,7 или позже
  • Maven 3.0+

3. Настройте проект

Проект построен с Maven . Он использует Spring Boot, что упрощает создание автономных приложений на основе Spring, которые вы можете «просто запустить». Вы можете узнать больше о Spring Boot, посетив веб-сайт проекта.

3.1. Maven файл сборки

Поскольку он использует Spring Boot, он будет иметь spring-boot-starter-parent качестве родителя и пару других spring-boot-starters, которые получат для нас некоторые библиотеки, необходимые для проекта:

pom.xml проекта podcastpedia-batch

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
<?xml version="1.0" encoding="UTF-8"?>
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>org.podcastpedia.batch</groupId>
    <artifactId>podcastpedia-batch</artifactId>
    <version>0.1.0</version>
     
    <properties>
        <sprinb.boot.version>1.1.6.RELEASE</sprinb.boot.version>
        <java.version>1.7</java.version>
    </properties>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.1.6.RELEASE</version>
    </parent>
     
    <dependencies>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
          
        </dependency
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-jpa</artifactId>       
        </dependency>       
         
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.5</version>
        </dependency>    
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.3.2</version>
        </dependency>
        <!-- velocity -->
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity</artifactId>
            <version>1.7</version>     
        </dependency>
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity-tools</artifactId>
            <version>2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.struts</groupId>
                    <artifactId>struts-core</artifactId>
                </exclusion>
            </exclusions>            
        </dependency>
                         
        <!-- Project rome rss, atom -->
        <dependency>
            <groupId>rome</groupId>
            <artifactId>rome</artifactId>
            <version>1.0</version>
        </dependency>
        <!-- option this fetcher thing -->
        <dependency>
            <groupId>rome</groupId>
            <artifactId>rome-fetcher</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>org.jdom</groupId>
            <artifactId>jdom</artifactId>
            <version>1.1</version>
        </dependency>    
        <!-- PID 1 -->
        <dependency>
            <groupId>xerces</groupId>
            <artifactId>xercesImpl</artifactId>
            <version>2.9.1</version>
        </dependency>
                         
        <!-- MySQL JDBC connector -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.31</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-freemarker</artifactId>  
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-remote-shell</artifactId>  
            <exclusions>
                <exclusion>
                    <groupId>javax.mail</groupId>
                    <artifactId>mail</artifactId>
                </exclusion>
            </exclusions>            
        </dependency>
        <dependency>
            <groupId>javax.mail</groupId>
            <artifactId>mail</artifactId>
            <version>1.4.7</version>
        </dependency>    
        <dependency>
            <groupId>javax.inject</groupId>
            <artifactId>javax.inject</artifactId>
            <version>1</version>
        </dependency>    
        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-core</artifactId>
            <version>[4.0,)</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Замечания:
Одним из больших преимуществ использования spring-boot-starter-parent качестве spring-boot-starter-parent проекта является то, что вам нужно только обновить версию родительского элемента, и он получит «последние» библиотеки для вас. Когда я запустил проект, весенняя загрузка была в версии 1.1.3.RELEASE и к моменту окончания написания этого поста уже была в 1.1.6.RELEASE .

3.2. Структура каталога проекта

Я структурировал проект следующим образом:

Структура каталога проекта

1
└── src └── main └── java └── org └── podcastpedia └── batch └── common └── jobs └── addpodcast └── notifysubscribers

Замечания:

  • пакет org.podcastpedia.batch.jobs содержит org.podcastpedia.batch.jobs имеющие определенные классы для определенных заданий.
  • пакет org.podcastpedia.batch.jobs.common содержит классы, используемые всеми заданиями, например, такими как объекты JPA, которые требуются для обоих текущих заданий.

4. Создайте пакетное задание

Я начну с представления класса конфигурации Java для первого пакетного задания:

Конфигурация пакетного задания

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package org.podcastpedia.batch.jobs.addpodcast;
 
import org.podcastpedia.batch.common.configuration.DatabaseAccessConfiguration;
import org.podcastpedia.batch.common.listeners.LogProcessListener;
import org.podcastpedia.batch.common.listeners.ProtocolListener;
import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.ClassPathResource;
 
import com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException;
 
@Configuration
@EnableBatchProcessing
@Import({DatabaseAccessConfiguration.class, ServicesConfiguration.class})
public class AddPodcastJobConfiguration {
 
    @Autowired
    private JobBuilderFactory jobs;
  
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
     
    // tag::jobstep[]
    @Bean
    public Job addNewPodcastJob(){
        return jobs.get("addNewPodcastJob")
                .listener(protocolListener())
                .start(step())
                .build();
    }  
     
    @Bean
    public Step step(){
        return stepBuilderFactory.get("step")
                .<SuggestedPodcast,SuggestedPodcast>chunk(1) //important to be one in this case to commit after every line read
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .listener(logProcessListener())
                .faultTolerant()
                .skipLimit(10) //default is set to 0
                .skip(MySQLIntegrityConstraintViolationException.class)
                .build();
    }  
    // end::jobstep[]
     
    // tag::readerwriterprocessor[]
    @Bean
    public ItemReader<SuggestedPodcast> reader(){
        FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>();
        reader.setLinesToSkip(1);//first line is title definition
        reader.setResource(new ClassPathResource("suggested-podcasts.txt"));
        reader.setLineMapper(lineMapper());
        return reader;
    }
 
    @Bean
    public LineMapper<SuggestedPodcast> lineMapper() {
        DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>();
         
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setDelimiter(";");
        lineTokenizer.setStrict(false);
        lineTokenizer.setNames(new String[]{"FEED_URL", "IDENTIFIER_ON_PODCASTPEDIA", "CATEGORIES", "LANGUAGE", "MEDIA_TYPE", "UPDATE_FREQUENCY", "KEYWORDS", "FB_PAGE", "TWITTER_PAGE", "GPLUS_PAGE", "NAME_SUBMITTER", "EMAIL_SUBMITTER"});
         
        BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>();
        fieldSetMapper.setTargetType(SuggestedPodcast.class);
         
        lineMapper.setLineTokenizer(lineTokenizer);
        lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper());
         
        return lineMapper;
    }
 
    @Bean
    public SuggestedPodcastFieldSetMapper suggestedPodcastFieldSetMapper() {
        return new SuggestedPodcastFieldSetMapper();
    }
 
    /** configure the processor related stuff */
    @Bean
    public ItemProcessor<SuggestedPodcast, SuggestedPodcast> processor() {
        return new SuggestedPodcastItemProcessor();
    }
     
    @Bean
    public ItemWriter<SuggestedPodcast> writer() {
        return new Writer();
    }
    // end::readerwriterprocessor[]
     
    @Bean
    public ProtocolListener protocolListener(){
        return new ProtocolListener();
    }
  
    @Bean
    public LogProcessListener logProcessListener(){
        return new LogProcessListener();
    }   
 
}

Аннотация @EnableBatchProcessing добавляет много критических компонентов, поддерживающих задания, и экономит нам работу по настройке. Например, вы также сможете @Autowired некоторые полезные вещи в вашем контексте:

  • JobRepository (имя компонента «jobRepository»)
  • JobLauncher (имя компонента «jobLauncher»)
  • JobRegistry (бобовое имя «jobRegistry»)
  • PlatformTransactionManager (имя компонента «транзакционный менеджер»)
  • JobBuilderFactory (имя компонента «jobBuilders») для удобства, чтобы вам не приходилось вставлять репозиторий заданий в каждую работу, как в примерах выше
  • StepBuilderFactory (имя компонента «stepBuilders») для удобства, чтобы вам не приходилось внедрять репозиторий заданий и менеджер транзакций на каждом шаге

Первая часть посвящена фактической конфигурации работы:

Пакетное задание и пошаговая настройка

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
@Bean
public Job addNewPodcastJob(){
    return jobs.get("addNewPodcastJob")
            .listener(protocolListener())
            .start(step())
            .build();
}  
 
@Bean
public Step step(){
    return stepBuilderFactory.get("step")
            .<SuggestedPodcast,SuggestedPodcast>chunk(1) //important to be one in this case to commit after every line read
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .listener(logProcessListener())
            .faultTolerant()
            .skipLimit(10) //default is set to 0
            .skip(MySQLIntegrityConstraintViolationException.class)
            .build();
}

Первый метод определяет работу, а второй — один шаг. Как вы уже читали в разделе «Доменный язык пакетной обработки» , задания строятся из шагов, где каждый шаг может включать в себя читателя, процессор и писатель.

В определении шага вы определяете, сколько данных записывать за один раз (в нашем случае 1 запись за один раз). Далее вы указываете читатель, процессор и писатель.

5. Пружинные блоки обработки

Большую часть пакетной обработки можно описать как чтение данных, выполнение каких-либо преобразований и последующую запись результатов. Это как-то отражает процесс извлечения, преобразования, загрузки (ETL) , если вы знаете об этом больше. Spring Batch предоставляет три ключевых интерфейса, которые помогают выполнять массовое чтение и запись: ItemReader , ItemProcessor и ItemWriter .

5.1. Читатели

ItemReader — это абстракция, обеспечивающая возможность извлечения данных из множества различных типов ввода: плоские файлы , XML-файлы , базы данных , JMS и т. Д., По одному элементу за раз. См. Приложение A. Список ItemReaders и ItemWriters для полного списка доступных считывателей элементов.

В пакетных заданиях Podcastpedia я использую следующие специализированные ItemReaders:

5.1.1. FlatFileItemReader

который, как следует из названия, считывает строки данных из плоского файла, которые обычно описывают записи с полями данных, заданными фиксированными позициями в файле или разделенными каким-либо специальным символом (например, запятой). Этот тип ItemReader используется в первом пакетном задании addNewPodcastJob . Используемый входной файл с именем Sugges-podcasts.in находится в classpath ( src / main / resources ) и выглядит примерно так:

Входной файл для FlatFileItemReader

1
2
3
FEED_URL; IDENTIFIER_ON_PODCASTPEDIA; CATEGORIES; LANGUAGE; MEDIA_TYPE; UPDATE_FREQUENCY; KEYWORDS; FB_PAGE; TWITTER_PAGE; GPLUS_PAGE; NAME_SUBMITTER; EMAIL_SUBMITTER
http://www.5minutebiographies.com/feed/; 5minutebiographies; people_society, history; en; Audio; WEEKLY; biography, biographies, short biography, short biographies, 5 minute biographies, five minute biographies, 5 minute biography, five minute biography; https://www.facebook.com/5minutebiographies; https://twitter.com/5MinuteBios; ; Adrian Matei; [email protected]
http://notanotherpodcast.libsyn.com/rss; NotAnotherPodcast; entertainment; en; Audio; WEEKLY; Comedy, Sports, Cinema, Movies, Pop Culture, Food, Games; https://www.facebook.com/notanotherpodcastusa; https://twitter.com/NAPodcastUSA; https://plus.google.com/u/0/103089891373760354121/posts; Adrian Matei; [email protected]

Как видите, первая строка определяет имена «столбцов», а следующие строки содержат фактические данные (разделенные «;»), которые необходимо преобразовать в объекты домена, соответствующие контексту.

Давайте теперь посмотрим, как настроить FlatFileItemReader :

Пример FlatFileItemReader

1
2
3
4
5
6
7
8
@Bean
public ItemReader<SuggestedPodcast> reader(){
    FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>();
    reader.setLinesToSkip(1);//first line is title definition
    reader.setResource(new ClassPathResource("suggested-podcasts.in"));
    reader.setLineMapper(lineMapper());
    return reader;
}

Вы можете указать, среди прочего, входной ресурс, количество пропускаемых строк и отображение строк.

5.1.1.1. LineMapper

LineMapper — это интерфейс для отображения строк (строк) в объектах домена, обычно используемый для отображения строк, считанных из файла, в объекты домена для каждой строки. Для задания Podcastpedia я использовал DefaultLineMapper , который представляет собой двухфазную реализацию, состоящую из токенизации строки в FieldSet последующим отображением в элемент:

Пример реализации LineMapper по умолчанию

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
@Bean
public LineMapper<SuggestedPodcast> lineMapper() {
    DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>();
     
    DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
    lineTokenizer.setDelimiter(";");
    lineTokenizer.setStrict(false);
    lineTokenizer.setNames(new String[]{"FEED_URL", "IDENTIFIER_ON_PODCASTPEDIA", "CATEGORIES", "LANGUAGE", "MEDIA_TYPE", "UPDATE_FREQUENCY", "KEYWORDS", "FB_PAGE", "TWITTER_PAGE", "GPLUS_PAGE", "NAME_SUBMITTER", "EMAIL_SUBMITTER"});
     
    BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>();
    fieldSetMapper.setTargetType(SuggestedPodcast.class);
     
    lineMapper.setLineTokenizer(lineTokenizer);
    lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper());
     
    return lineMapper;
}
  • DelimitedLineTokenizer разделяет входную строку через «;» разделитель.
  • если вы установите флаг strict режима в значение false то строки с меньшим количеством токенов будут допущены и дополнены пустыми столбцами, а строки с большим количеством токенов будут просто обрезаны.
  • имена столбцов из первой строки устанавливаются lineTokenizer.setNames(...);
  • и fieldMapper установлен (строка 14)

Замечания:
FieldSet — это «интерфейс, используемый входными источниками плоских файлов для инкапсуляции проблем преобразования массива Strings в нативные типы Java. Немного похоже на роль ResultSet в JDBC, клиенты будут знать имя или позицию строго типизированных полей, которые они хотят извлечь ».

5.1.1.2. FieldSetMapper

FieldSetMapper — это интерфейс, который используется для отображения данных, полученных из FieldSet в объект. Вот моя реализация, которая отображает fieldSet в объекте домена AdditionalPodcast, который затем будет передан процессору:

Реализация FieldSetMapper

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SuggestedPodcastFieldSetMapper implements FieldSetMapper<SuggestedPodcast> {
 
    @Override
    public SuggestedPodcast mapFieldSet(FieldSet fieldSet) throws BindException {
         
        SuggestedPodcast suggestedPodcast = new SuggestedPodcast();
         
        suggestedPodcast.setCategories(fieldSet.readString("CATEGORIES"));
        suggestedPodcast.setEmail(fieldSet.readString("EMAIL_SUBMITTER"));
        suggestedPodcast.setName(fieldSet.readString("NAME_SUBMITTER"));
        suggestedPodcast.setTags(fieldSet.readString("KEYWORDS"));
         
        //some of the attributes we can map directly into the Podcast entity that we'll insert later into the database
        Podcast podcast = new Podcast();
        podcast.setUrl(fieldSet.readString("FEED_URL"));
        podcast.setIdentifier(fieldSet.readString("IDENTIFIER_ON_PODCASTPEDIA"));
        podcast.setLanguageCode(LanguageCode.valueOf(fieldSet.readString("LANGUAGE")));
        podcast.setMediaType(MediaType.valueOf(fieldSet.readString("MEDIA_TYPE")));
        podcast.setUpdateFrequency(UpdateFrequency.valueOf(fieldSet.readString("UPDATE_FREQUENCY")));
        podcast.setFbPage(fieldSet.readString("FB_PAGE"));
        podcast.setTwitterPage(fieldSet.readString("TWITTER_PAGE"));
        podcast.setGplusPage(fieldSet.readString("GPLUS_PAGE"));
         
        suggestedPodcast.setPodcast(podcast);
 
        return suggestedPodcast;
    }
     
}

5.2. JdbcCursorItemReader

Во втором задании notifyEmailSubscribeersJob в считывателе я читаю подписчиков электронной почты только из одной таблицы базы данных, но далее в процессоре выполняется более подробное чтение (через JPA) для извлечения всех новых эпизодов подкастов, на которые подписан пользователь , Это распространенная модель, используемая в мире пакетной обработки. Перейдите по этой ссылке для получения более распространенных шаблонов партии.

Для начального чтения я выбрал JdbcCursorItemReader , который представляет собой простую реализацию для чтения, которая открывает курсор JDBC и постоянно извлекает следующую строку в ResultSet :

Пример JdbcCursorItemReader

01
02
03
04
05
06
07
08
09
10
11
12
@Bean
public ItemReader<User> notifySubscribersReader(){
     
    JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>();
    String sql = "select * from users where is_email_subscriber is not null";
     
    reader.setSql(sql);
    reader.setDataSource(dataSource);
    reader.setRowMapper(rowMapper());      
 
    return reader;
}

Обратите внимание, я должен был установить sql , datasource для чтения и RowMapper .

5.2.1. RowMapper

RowMapper — это интерфейс, используемый JdbcTemplate для отображения строк Result’set для каждой строки. Моя реализация этого интерфейса выполняет фактическую работу по отображению каждой строки в объект результата, но мне не нужно беспокоиться об обработке исключений:

Реализация RowMapper

01
02
03
04
05
06
07
08
09
10
11
public class UserRowMapper implements RowMapper<User> {
 
    @Override
    public User mapRow(ResultSet rs, int rowNum) throws SQLException {
        User user = new User();
        user.setEmail(rs.getString("email"));
         
        return user;
    }
 
}

5.2. писатели

ItemWriter — это абстракция, которая представляет выходные данные Step , одного пакета или части элементов за раз. Как правило, средство записи элемента не знает о вводе, которое он получит следующим, только элемент, который был передан в его текущем вызове.

Авторы двух представленных работ довольно просты. Они просто используют внешние сервисы для отправки уведомлений по электронной почте и публикации твитов на аккаунте Podcastpedia . Вот реализация ItemWriter для первого задания — addNewPodcast :

Писательская реализация ItemWriter

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package org.podcastpedia.batch.jobs.addpodcast;
 
import java.util.Date;
import java.util.List;
 
import javax.inject.Inject;
import javax.persistence.EntityManager;
 
import org.podcastpedia.batch.common.entities.Podcast;
import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast;
import org.podcastpedia.batch.jobs.addpodcast.service.EmailNotificationService;
import org.podcastpedia.batch.jobs.addpodcast.service.SocialMediaService;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
 
public class Writer implements ItemWriter<SuggestedPodcast>{
 
    @Autowired
    private EntityManager entityManager;
     
    @Inject
    private EmailNotificationService emailNotificationService;
     
    @Inject
    private SocialMediaService socialMediaService;
     
    @Override
    public void write(List<? extends SuggestedPodcast> items) throws Exception {
 
        if(items.get(0) != null){
            SuggestedPodcast suggestedPodcast = items.get(0);
             
            //first insert the data in the database
            Podcast podcast = suggestedPodcast.getPodcast();
             
            podcast.setInsertionDate(new Date());
            entityManager.persist(podcast);
            entityManager.flush();
             
            //notify submitter about the insertion and post a twitt about it
            String url = buildUrlOnPodcastpedia(podcast);
             
            emailNotificationService.sendPodcastAdditionConfirmation(
                    suggestedPodcast.getName(), suggestedPodcast.getEmail(),
                    url);
            if(podcast.getTwitterPage() != null){
                socialMediaService.postOnTwitterAboutNewPodcast(podcast,
                url);              
            }                  
        }
 
    }
 
    private String buildUrlOnPodcastpedia(Podcast podcast) {
        StringBuffer urlOnPodcastpedia = new StringBuffer(
                "http://www.podcastpedia.org");
        if (podcast.getIdentifier() != null) {
            urlOnPodcastpedia.append("/" + podcast.getIdentifier());
        } else {
            urlOnPodcastpedia.append("/podcasts/");
            urlOnPodcastpedia.append(String.valueOf(podcast.getPodcastId()));
            urlOnPodcastpedia.append("/" + podcast.getTitleInUrl());
        }      
        String url = urlOnPodcastpedia.toString();
        return url;
    }
 
}

Как вы можете видеть, здесь нет ничего особенного, за исключением того, что метод write должен быть переопределен, и именно здесь EmailNotificationService внешние службы EmailNotificationService и SocialMediaService используются для информирования отправителя подкаста по электронной почте о добавлении в каталог подкастов, а также, если используется Twitter. страница была отправлена, твит будет размещен на стене Podcastpedia . Вы можете найти подробное объяснение о том, как отправлять электронную почту через Velocity и как размещать в Twitter с Java, в следующих сообщениях:

5.3. процессоры

ItemProcessor — это абстракция, представляющая бизнес-обработку элемента. В то время как ItemReader считывает один элемент, а ItemWriter записывает их, ItemProcessor предоставляет доступ для преобразования или применения другой бизнес-обработки. При использовании ваших собственных Processors вы должны реализовать ItemProcessor<I,O> , его единственный метод O process(I item) throws Exception , возвращая потенциально измененный или новый элемент для продолжения обработки. Если возвращаемый результат равен нулю, предполагается, что обработка элемента не должна продолжаться.

В то время как процессор первого задания требует немного больше логики, потому что я должен установить атрибуты заголовка etag и last-modified атрибуты фида, эпизоды, категории и ключевые слова подкаста:

Реализация ItemProcessor для задания addNewPodcast

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class SuggestedPodcastItemProcessor implements ItemProcessor<SuggestedPodcast, SuggestedPodcast> {
 
    private static final int TIMEOUT = 10;
 
    @Autowired
    ReadDao readDao;
     
    @Autowired
    PodcastAndEpisodeAttributesService podcastAndEpisodeAttributesService;
     
    @Autowired
    private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager; 
     
    @Autowired
    private SyndFeedService syndFeedService;
 
    /**
     * Method used to build the categories, tags and episodes of the podcast
     */
    @Override
    public SuggestedPodcast process(SuggestedPodcast item) throws Exception {
         
        if(isPodcastAlreadyInTheDirectory(item.getPodcast().getUrl())) {
            return null;
        }
         
        String[] categories = item.getCategories().trim().split("\\s*,\\s*");      
 
        item.getPodcast().setAvailability(org.apache.http.HttpStatus.SC_OK);
         
        //set etag and last modified attributes for the podcast
        setHeaderFieldAttributes(item.getPodcast());
         
        //set the other attributes of the podcast from the feed
        podcastAndEpisodeAttributesService.setPodcastFeedAttributes(item.getPodcast());
                 
        //set the categories
        List<Category> categoriesByNames = readDao.findCategoriesByNames(categories);
        item.getPodcast().setCategories(categoriesByNames);
         
        //set the tags
        setTagsForPodcast(item);
         
        //build the episodes
        setEpisodesForPodcast(item.getPodcast());
         
        return item;
    }
    ......
}

процессор из второго задания использует подход «Driving Query» , в котором я расширяю данные, полученные из Reader, с помощью «JPA-чтения» и группирую элементы в подкастах с эпизодами, чтобы они хорошо выглядели в электронных письмах, которые я получаю. рассылка подписчикам:

Реализация ItemProcessor второго задания — notifySubscribeers

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Scope("step")
public class NotifySubscribersItemProcessor implements ItemProcessor<User, User> {
 
    @Autowired
    EntityManager em;
     
    @Value("#{jobParameters[updateFrequency]}")
    String updateFrequency;
     
    @Override
    public User process(User item) throws Exception {
                 
        String sqlInnerJoinEpisodes = "select e from User u JOIN u.podcasts p JOIN p.episodes e WHERE u.email=?1 AND p.updateFrequency=?2 AND"
                + " e.isNew IS NOT NULL  AND e.availability=200 ORDER BY e.podcast.podcastId ASC, e.publicationDate ASC";
        TypedQuery<Episode> queryInnerJoinepisodes = em.createQuery(sqlInnerJoinEpisodes, Episode.class);
        queryInnerJoinepisodes.setParameter(1, item.getEmail());
        queryInnerJoinepisodes.setParameter(2, UpdateFrequency.valueOf(updateFrequency));      
                 
        List<Episode> newEpisodes = queryInnerJoinepisodes.getResultList();
         
        return regroupPodcastsWithEpisodes(item, newEpisodes);
                 
    }
    .......
}

Замечания:
Если вы хотите узнать больше о том, как использовать Apache Http Client, чтобы получить etag и last-modified заголовки, вы можете взглянуть на мой пост — Как использовать новый Apache Http Client, чтобы сделать запрос HEAD

6. Выполните пакетное приложение

Пакетная обработка может быть встроена в веб-приложения и файлы WAR, но вначале я выбрал более простой подход, который создает автономное приложение, которое можно запустить методом main() Java:

Пакетная обработка Java main () метод

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package org.podcastpedia.batch;
//imports ...;
 
@ComponentScan
@EnableAutoConfiguration
public class Application {
 
    private static final String NEW_EPISODES_NOTIFICATION_JOB = "newEpisodesNotificationJob";
    private static final String ADD_NEW_PODCAST_JOB = "addNewPodcastJob";
 
    public static void main(String[] args) throws BeansException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException, InterruptedException {
         
        Log log = LogFactory.getLog(Application.class);
                 
        SpringApplication app = new SpringApplication(Application.class);
        app.setWebEnvironment(false);
        ConfigurableApplicationContext ctx= app.run(args);
        JobLauncher jobLauncher = ctx.getBean(JobLauncher.class);
                         
        if(ADD_NEW_PODCAST_JOB.equals(args[0])){
            //addNewPodcastJob
            Job addNewPodcastJob = ctx.getBean(ADD_NEW_PODCAST_JOB, Job.class);
            JobParameters jobParameters = new JobParametersBuilder()
            .addDate("date", new Date())
            .toJobParameters(); 
             
            JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);
             
            BatchStatus batchStatus = jobExecution.getStatus();
            while(batchStatus.isRunning()){
                log.info("*********** Still running.... **************");
                Thread.sleep(1000);
            }
            log.info(String.format("*********** Exit status: %s", jobExecution.getExitStatus().getExitCode()));
            JobInstance jobInstance = jobExecution.getJobInstance();
            log.info(String.format("********* Name of the job %s", jobInstance.getJobName()));
             
            log.info(String.format("*********** job instance Id: %d", jobInstance.getId()));
             
            System.exit(0);
             
        } else if(NEW_EPISODES_NOTIFICATION_JOB.equals(args[0])){
            JobParameters jobParameters = new JobParametersBuilder()
            .addDate("date", new Date())
            .addString("updateFrequency", args[1])
            .toJobParameters(); 
             
            jobLauncher.run(ctx.getBean(NEW_EPISODES_NOTIFICATION_JOB,  Job.class), jobParameters);  
        } else {
            throw new IllegalArgumentException("Please provide a valid Job name as first application parameter");
        }
      
        System.exit(0);
    }
     
}

Лучшее объяснение SpringApplication -, @ComponentScan — и @ComponentScan@EnableAutoConfiguration которую вы получаете из источника — Начало работы — Создание пакетной службы:

«Метод main() ссылается на вспомогательный класс SpringApplication , предоставляя Application.class в качестве аргумента для своего метода run() . Это заставляет Spring читать метаданные аннотации из Application и управлять ими как компонентом в контексте приложения Spring .

Аннотация @ComponentScan указывает Spring рекурсивно выполнять поиск в пакете org.podcastpedia.batch и его дочерних классах, помеченных прямо или косвенно аннотацией @Component . Эта директива гарантирует, что Spring найдет и зарегистрирует BatchConfiguration , поскольку он помечен @Configuration , что, в свою очередь, является своего рода аннотацией @Component .

Аннотация @EnableAutoConfiguration включает разумное поведение по умолчанию в зависимости от содержимого вашего пути к классам. Например, он ищет любой класс, который реализует интерфейс CommandLineRunner и вызывает его метод run() ».

Выполнение этапов строительства:

  • JobLauncher , который является простым интерфейсом для управления заданиями, извлекается из ApplicationContext. Помните, что это автоматически доступно через аннотацию @EnableBatchProcessing .
  • Теперь, основываясь на первом параметре приложения ( args[0] ), я получу соответствующее Job из ApplicationContext
  • затем JobParameters , где я использую текущую дату — .addDate("date", new Date()) , так что выполнение задания всегда уникально.
  • как только все будет JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters); , задание можно выполнить: JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);
  • Вы можете использовать возвращенное jobExecution чтобы получить доступ к BatchStatus , коду BatchStatus или к имени и идентификатору задания.

Примечание: я настоятельно рекомендую вам прочитать и понять схему метаданных для Spring Batch . Это также поможет вам лучше понять объекты домена Spring Batch.

6.1. Запуск приложения в средах разработки и разработки

Чтобы иметь возможность запускать приложение Spring Batch / Spring Boot в разных средах, я использую возможность Spring Profiles. По умолчанию приложение работает с данными разработки (база данных). Но если я хочу, чтобы работа использовала производственную базу данных, я должен сделать следующее:

  • предоставить следующий аргумент среды -Dspring.profiles.active=prod
  • настроить свойства производственной базы данных в файле application-prod.properties в classpath, справа от файла application.properties по умолчанию

Резюме

В этом руководстве мы узнали, как настроить проект Spring Batch с конфигурацией Spring Boot и Java, как использовать некоторые из наиболее распространенных программ чтения в пакетной обработке, как настроить некоторые простые задания и как запускать задания Spring Batch из основной метод.

Ссылка: Spring Batch Tutorial с Spring Boot и настройкой Java от нашего партнера JCG Адриана Матеи в блоге Codingpedia.org .