Я работал над переносом некоторых пакетных заданий для Podcastpedia.org на Spring Batch. Раньше эти рабочие места разрабатывались по-моему, и я подумал, что пора использовать более «стандартизированный» подход. Поскольку я никогда раньше не использовал Spring с настройкой Java, я подумал, что это хорошая возможность узнать об этом, настроив задания Spring Batch в Java. И так как я все пытаюсь что-то новое с Spring, почему бы не бросить Spring Boot в лодку …
Замечания:
Перед тем, как вы начнете с этого учебника, я рекомендую вам сначала прочитать « Начало работы Spring — создание пакетной службы» , поскольку представленная здесь структура и код основаны на этом оригинале.
1. Что я буду строить
Итак, как уже упоминалось, в этом посте я представлю Spring Batch в контексте его настройки и разработки с ним некоторых пакетных заданий для Podcastpedia.org . Вот краткое описание двух заданий, которые в настоящее время являются частью проекта Podcastpedia-batch :
- addNewPodcastJob
- считывает метаданные подкаста (URL фида, идентификатор, категории и т. д.) из плоского файла
- преобразует (анализирует и готовит эпизоды для вставки с помощью Http Apache Client ) данные
- и на последнем шаге вставьте его в базу данных Podcastpedia и сообщите об этом отправителю по электронной почте.
- notifyEmailSubscribeersJob — люди могут подписаться на свои любимые подкасты на Podcastpedia.org по электронной почте. Для тех, кто сделал это, регулярно проверяется (ЕЖЕДНЕВНО, ЕЖЕНЕДЕЛЬНО, ЕЖЕМЕСЯЧНО), доступны ли новые эпизоды, и если они есть, подписчики информируются об этом по электронной почте; читать из базы данных , расширять данные чтения через JPA, перегруппировать их и уведомлять подписчика по электронной почте
Исходный код:
Исходный код этого руководства доступен на GitHub — Podcastpedia-batch.
Примечание. Перед началом работы я также настоятельно рекомендую ознакомиться с языком пакетной обработки доменов , чтобы такие термины, как «Задания», «Шаги» или «ItemReaders» не казались вам странными.
2. Что вам нужно
- Любимый текстовый редактор или IDE
- JDK 1,7 или позже
- Maven 3.0+
3. Настройте проект
Проект построен с Maven . Он использует Spring Boot, что упрощает создание автономных приложений на основе Spring, которые вы можете «просто запустить». Вы можете узнать больше о Spring Boot, посетив веб-сайт проекта.
3.1. Maven файл сборки
Поскольку он использует Spring Boot, он будет иметь spring-boot-starter-parent
качестве родителя и пару других spring-boot-starters, которые получат для нас некоторые библиотеки, необходимые для проекта:
pom.xml проекта podcastpedia-batch
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
<? xml version = "1.0" encoding = "UTF-8" ?> < project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" < modelVersion >4.0.0</ modelVersion > < groupId >org.podcastpedia.batch</ groupId > < artifactId >podcastpedia-batch</ artifactId > < version >0.1.0</ version > < properties > < sprinb.boot.version >1.1.6.RELEASE</ sprinb.boot.version > < java.version >1.7</ java.version > </ properties > < parent > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-parent</ artifactId > < version >1.1.6.RELEASE</ version > </ parent > < dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-batch</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-data-jpa</ artifactId > </ dependency > < dependency > < groupId >org.apache.httpcomponents</ groupId > < artifactId >httpclient</ artifactId > < version >4.3.5</ version > </ dependency > < dependency > < groupId >org.apache.httpcomponents</ groupId > < artifactId >httpcore</ artifactId > < version >4.3.2</ version > </ dependency > <!-- velocity --> < dependency > < groupId >org.apache.velocity</ groupId > < artifactId >velocity</ artifactId > < version >1.7</ version > </ dependency > < dependency > < groupId >org.apache.velocity</ groupId > < artifactId >velocity-tools</ artifactId > < version >2.0</ version > < exclusions > < exclusion > < groupId >org.apache.struts</ groupId > < artifactId >struts-core</ artifactId > </ exclusion > </ exclusions > </ dependency > <!-- Project rome rss, atom --> < dependency > < groupId >rome</ groupId > < artifactId >rome</ artifactId > < version >1.0</ version > </ dependency > <!-- option this fetcher thing --> < dependency > < groupId >rome</ groupId > < artifactId >rome-fetcher</ artifactId > < version >1.0</ version > </ dependency > < dependency > < groupId >org.jdom</ groupId > < artifactId >jdom</ artifactId > < version >1.1</ version > </ dependency > <!-- PID 1 --> < dependency > < groupId >xerces</ groupId > < artifactId >xercesImpl</ artifactId > < version >2.9.1</ version > </ dependency > <!-- MySQL JDBC connector --> < dependency > < groupId >mysql</ groupId > < artifactId >mysql-connector-java</ artifactId > < version >5.1.31</ version > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-freemarker</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-remote-shell</ artifactId > < exclusions > < exclusion > < groupId >javax.mail</ groupId > < artifactId >mail</ artifactId > </ exclusion > </ exclusions > </ dependency > < dependency > < groupId >javax.mail</ groupId > < artifactId >mail</ artifactId > < version >1.4.7</ version > </ dependency > < dependency > < groupId >javax.inject</ groupId > < artifactId >javax.inject</ artifactId > < version >1</ version > </ dependency > < dependency > < groupId >org.twitter4j</ groupId > < artifactId >twitter4j-core</ artifactId > < version >[4.0,)</ version > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-test</ artifactId > </ dependency > </ dependencies > < build > < plugins > < plugin > < artifactId >maven-compiler-plugin</ artifactId > </ plugin > < plugin > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-maven-plugin</ artifactId > </ plugin > </ plugins > </ build > </ project > |
Замечания:
Одним из больших преимуществ использования spring-boot-starter-parent
качестве spring-boot-starter-parent
проекта является то, что вам нужно только обновить версию родительского элемента, и он получит «последние» библиотеки для вас. Когда я запустил проект, весенняя загрузка была в версии 1.1.3.RELEASE
и к моменту окончания написания этого поста уже была в 1.1.6.RELEASE
.
3.2. Структура каталога проекта
Я структурировал проект следующим образом:
Структура каталога проекта
1
|
└── src └── main └── java └── org └── podcastpedia └── batch └── common └── jobs └── addpodcast └── notifysubscribers |
Замечания:
- пакет
org.podcastpedia.batch.jobs
содержитorg.podcastpedia.batch.jobs
имеющие определенные классы для определенных заданий. - пакет
org.podcastpedia.batch.jobs.common
содержит классы, используемые всеми заданиями, например, такими как объекты JPA, которые требуются для обоих текущих заданий.
4. Создайте пакетное задание
Я начну с представления класса конфигурации Java для первого пакетного задания:
Конфигурация пакетного задания
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
package org.podcastpedia.batch.jobs.addpodcast; import org.podcastpedia.batch.common.configuration.DatabaseAccessConfiguration; import org.podcastpedia.batch.common.listeners.LogProcessListener; import org.podcastpedia.batch.common.listeners.ProtocolListener; import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.io.ClassPathResource; import com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException; @Configuration @EnableBatchProcessing @Import ({DatabaseAccessConfiguration. class , ServicesConfiguration. class }) public class AddPodcastJobConfiguration { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory stepBuilderFactory; // tag::jobstep[] @Bean public Job addNewPodcastJob(){ return jobs.get( "addNewPodcastJob" ) .listener(protocolListener()) .start(step()) .build(); } @Bean public Step step(){ return stepBuilderFactory.get( "step" ) .<SuggestedPodcast,SuggestedPodcast>chunk( 1 ) //important to be one in this case to commit after every line read .reader(reader()) .processor(processor()) .writer(writer()) .listener(logProcessListener()) .faultTolerant() .skipLimit( 10 ) //default is set to 0 .skip(MySQLIntegrityConstraintViolationException. class ) .build(); } // end::jobstep[] // tag::readerwriterprocessor[] @Bean public ItemReader<SuggestedPodcast> reader(){ FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>(); reader.setLinesToSkip( 1 ); //first line is title definition reader.setResource( new ClassPathResource( "suggested-podcasts.txt" )); reader.setLineMapper(lineMapper()); return reader; } @Bean public LineMapper<SuggestedPodcast> lineMapper() { DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>(); DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter( ";" ); lineTokenizer.setStrict( false ); lineTokenizer.setNames( new String[]{ "FEED_URL" , "IDENTIFIER_ON_PODCASTPEDIA" , "CATEGORIES" , "LANGUAGE" , "MEDIA_TYPE" , "UPDATE_FREQUENCY" , "KEYWORDS" , "FB_PAGE" , "TWITTER_PAGE" , "GPLUS_PAGE" , "NAME_SUBMITTER" , "EMAIL_SUBMITTER" }); BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>(); fieldSetMapper.setTargetType(SuggestedPodcast. class ); lineMapper.setLineTokenizer(lineTokenizer); lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper()); return lineMapper; } @Bean public SuggestedPodcastFieldSetMapper suggestedPodcastFieldSetMapper() { return new SuggestedPodcastFieldSetMapper(); } /** configure the processor related stuff */ @Bean public ItemProcessor<SuggestedPodcast, SuggestedPodcast> processor() { return new SuggestedPodcastItemProcessor(); } @Bean public ItemWriter<SuggestedPodcast> writer() { return new Writer(); } // end::readerwriterprocessor[] @Bean public ProtocolListener protocolListener(){ return new ProtocolListener(); } @Bean public LogProcessListener logProcessListener(){ return new LogProcessListener(); } } |
Аннотация @EnableBatchProcessing
добавляет много критических компонентов, поддерживающих задания, и экономит нам работу по настройке. Например, вы также сможете @Autowired
некоторые полезные вещи в вашем контексте:
-
JobRepository
(имя компонента «jobRepository») -
JobLauncher
(имя компонента «jobLauncher») -
JobRegistry
(бобовое имя «jobRegistry») -
PlatformTransactionManager
(имя компонента «транзакционный менеджер») -
JobBuilderFactory
(имя компонента «jobBuilders») для удобства, чтобы вам не приходилось вставлять репозиторий заданий в каждую работу, как в примерах выше -
StepBuilderFactory
(имя компонента «stepBuilders») для удобства, чтобы вам не приходилось внедрять репозиторий заданий и менеджер транзакций на каждом шаге
Первая часть посвящена фактической конфигурации работы:
Пакетное задание и пошаговая настройка
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@Bean public Job addNewPodcastJob(){ return jobs.get( "addNewPodcastJob" ) .listener(protocolListener()) .start(step()) .build(); } @Bean public Step step(){ return stepBuilderFactory.get( "step" ) .<SuggestedPodcast,SuggestedPodcast>chunk( 1 ) //important to be one in this case to commit after every line read .reader(reader()) .processor(processor()) .writer(writer()) .listener(logProcessListener()) .faultTolerant() .skipLimit( 10 ) //default is set to 0 .skip(MySQLIntegrityConstraintViolationException. class ) .build(); } |
Первый метод определяет работу, а второй — один шаг. Как вы уже читали в разделе «Доменный язык пакетной обработки» , задания строятся из шагов, где каждый шаг может включать в себя читателя, процессор и писатель.
В определении шага вы определяете, сколько данных записывать за один раз (в нашем случае 1 запись за один раз). Далее вы указываете читатель, процессор и писатель.
5. Пружинные блоки обработки
Большую часть пакетной обработки можно описать как чтение данных, выполнение каких-либо преобразований и последующую запись результатов. Это как-то отражает процесс извлечения, преобразования, загрузки (ETL) , если вы знаете об этом больше. Spring Batch предоставляет три ключевых интерфейса, которые помогают выполнять массовое чтение и запись: ItemReader
, ItemProcessor
и ItemWriter
.
5.1. Читатели
ItemReader — это абстракция, обеспечивающая возможность извлечения данных из множества различных типов ввода: плоские файлы , XML-файлы , базы данных , JMS и т. Д., По одному элементу за раз. См. Приложение A. Список ItemReaders и ItemWriters для полного списка доступных считывателей элементов.
В пакетных заданиях Podcastpedia я использую следующие специализированные ItemReaders:
5.1.1. FlatFileItemReader
который, как следует из названия, считывает строки данных из плоского файла, которые обычно описывают записи с полями данных, заданными фиксированными позициями в файле или разделенными каким-либо специальным символом (например, запятой). Этот тип ItemReader
используется в первом пакетном задании addNewPodcastJob . Используемый входной файл с именем Sugges-podcasts.in находится в classpath ( src / main / resources ) и выглядит примерно так:
Входной файл для FlatFileItemReader
1
2
3
|
FEED_URL; IDENTIFIER_ON_PODCASTPEDIA; CATEGORIES; LANGUAGE; MEDIA_TYPE; UPDATE_FREQUENCY; KEYWORDS; FB_PAGE; TWITTER_PAGE; GPLUS_PAGE; NAME_SUBMITTER; EMAIL_SUBMITTER http: //www.5minutebiographies.com/feed/; 5minutebiographies; people_society, history; en; Audio; WEEKLY; biography, biographies, short biography, short biographies, 5 minute biographies, five minute biographies, 5 minute biography, five minute biography; https://www.facebook.com/5minutebiographies; https://twitter.com/5MinuteBios; ; Adrian Matei; [email protected] http: //notanotherpodcast.libsyn.com/rss; NotAnotherPodcast; entertainment; en; Audio; WEEKLY; Comedy, Sports, Cinema, Movies, Pop Culture, Food, Games; https://www.facebook.com/notanotherpodcastusa; https://twitter.com/NAPodcastUSA; https://plus.google.com/u/0/103089891373760354121/posts; Adrian Matei; [email protected] |
Как видите, первая строка определяет имена «столбцов», а следующие строки содержат фактические данные (разделенные «;»), которые необходимо преобразовать в объекты домена, соответствующие контексту.
Давайте теперь посмотрим, как настроить FlatFileItemReader
:
Пример FlatFileItemReader
1
2
3
4
5
6
7
8
|
@Bean public ItemReader<SuggestedPodcast> reader(){ FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>(); reader.setLinesToSkip( 1 ); //first line is title definition reader.setResource( new ClassPathResource( "suggested-podcasts.in" )); reader.setLineMapper(lineMapper()); return reader; } |
Вы можете указать, среди прочего, входной ресурс, количество пропускаемых строк и отображение строк.
5.1.1.1. LineMapper
LineMapper
— это интерфейс для отображения строк (строк) в объектах домена, обычно используемый для отображения строк, считанных из файла, в объекты домена для каждой строки. Для задания Podcastpedia я использовал DefaultLineMapper
, который представляет собой двухфазную реализацию, состоящую из токенизации строки в FieldSet
последующим отображением в элемент:
Пример реализации LineMapper по умолчанию
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
@Bean public LineMapper<SuggestedPodcast> lineMapper() { DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>(); DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter( ";" ); lineTokenizer.setStrict( false ); lineTokenizer.setNames( new String[]{ "FEED_URL" , "IDENTIFIER_ON_PODCASTPEDIA" , "CATEGORIES" , "LANGUAGE" , "MEDIA_TYPE" , "UPDATE_FREQUENCY" , "KEYWORDS" , "FB_PAGE" , "TWITTER_PAGE" , "GPLUS_PAGE" , "NAME_SUBMITTER" , "EMAIL_SUBMITTER" }); BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>(); fieldSetMapper.setTargetType(SuggestedPodcast. class ); lineMapper.setLineTokenizer(lineTokenizer); lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper()); return lineMapper; } |
-
DelimitedLineTokenizer
разделяет входную строку через «;» разделитель. - если вы установите флаг
strict
режима в значениеfalse
то строки с меньшим количеством токенов будут допущены и дополнены пустыми столбцами, а строки с большим количеством токенов будут просто обрезаны. - имена столбцов из первой строки устанавливаются
lineTokenizer.setNames(...);
- и
fieldMapper
установлен (строка 14)
Замечания:
FieldSet
— это «интерфейс, используемый входными источниками плоских файлов для инкапсуляции проблем преобразования массива Strings в нативные типы Java. Немного похоже на роль ResultSet
в JDBC, клиенты будут знать имя или позицию строго типизированных полей, которые они хотят извлечь ».
5.1.1.2. FieldSetMapper
FieldSetMapper
— это интерфейс, который используется для отображения данных, полученных из FieldSet
в объект. Вот моя реализация, которая отображает fieldSet в объекте домена AdditionalPodcast, который затем будет передан процессору:
Реализация FieldSetMapper
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class SuggestedPodcastFieldSetMapper implements FieldSetMapper<SuggestedPodcast> { @Override public SuggestedPodcast mapFieldSet(FieldSet fieldSet) throws BindException { SuggestedPodcast suggestedPodcast = new SuggestedPodcast(); suggestedPodcast.setCategories(fieldSet.readString( "CATEGORIES" )); suggestedPodcast.setEmail(fieldSet.readString( "EMAIL_SUBMITTER" )); suggestedPodcast.setName(fieldSet.readString( "NAME_SUBMITTER" )); suggestedPodcast.setTags(fieldSet.readString( "KEYWORDS" )); //some of the attributes we can map directly into the Podcast entity that we'll insert later into the database Podcast podcast = new Podcast(); podcast.setUrl(fieldSet.readString( "FEED_URL" )); podcast.setIdentifier(fieldSet.readString( "IDENTIFIER_ON_PODCASTPEDIA" )); podcast.setLanguageCode(LanguageCode.valueOf(fieldSet.readString( "LANGUAGE" ))); podcast.setMediaType(MediaType.valueOf(fieldSet.readString( "MEDIA_TYPE" ))); podcast.setUpdateFrequency(UpdateFrequency.valueOf(fieldSet.readString( "UPDATE_FREQUENCY" ))); podcast.setFbPage(fieldSet.readString( "FB_PAGE" )); podcast.setTwitterPage(fieldSet.readString( "TWITTER_PAGE" )); podcast.setGplusPage(fieldSet.readString( "GPLUS_PAGE" )); suggestedPodcast.setPodcast(podcast); return suggestedPodcast; } } |
5.2. JdbcCursorItemReader
Во втором задании notifyEmailSubscribeersJob в считывателе я читаю подписчиков электронной почты только из одной таблицы базы данных, но далее в процессоре выполняется более подробное чтение (через JPA) для извлечения всех новых эпизодов подкастов, на которые подписан пользователь , Это распространенная модель, используемая в мире пакетной обработки. Перейдите по этой ссылке для получения более распространенных шаблонов партии.
Для начального чтения я выбрал JdbcCursorItemReader
, который представляет собой простую реализацию для чтения, которая открывает курсор JDBC и постоянно извлекает следующую строку в ResultSet
:
Пример JdbcCursorItemReader
01
02
03
04
05
06
07
08
09
10
11
12
|
@Bean public ItemReader<User> notifySubscribersReader(){ JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>(); String sql = "select * from users where is_email_subscriber is not null" ; reader.setSql(sql); reader.setDataSource(dataSource); reader.setRowMapper(rowMapper()); return reader; } |
Обратите внимание, я должен был установить sql
, datasource
для чтения и RowMapper
.
5.2.1. RowMapper
RowMapper
— это интерфейс, используемый JdbcTemplate
для отображения строк Result’set для каждой строки. Моя реализация этого интерфейса выполняет фактическую работу по отображению каждой строки в объект результата, но мне не нужно беспокоиться об обработке исключений:
Реализация RowMapper
01
02
03
04
05
06
07
08
09
10
11
|
public class UserRowMapper implements RowMapper<User> { @Override public User mapRow(ResultSet rs, int rowNum) throws SQLException { User user = new User(); user.setEmail(rs.getString( "email" )); return user; } } |
5.2. писатели
ItemWriter
— это абстракция, которая представляет выходные данные Step
, одного пакета или части элементов за раз. Как правило, средство записи элемента не знает о вводе, которое он получит следующим, только элемент, который был передан в его текущем вызове.
Авторы двух представленных работ довольно просты. Они просто используют внешние сервисы для отправки уведомлений по электронной почте и публикации твитов на аккаунте Podcastpedia . Вот реализация ItemWriter
для первого задания — addNewPodcast :
Писательская реализация ItemWriter
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
package org.podcastpedia.batch.jobs.addpodcast; import java.util.Date; import java.util.List; import javax.inject.Inject; import javax.persistence.EntityManager; import org.podcastpedia.batch.common.entities.Podcast; import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast; import org.podcastpedia.batch.jobs.addpodcast.service.EmailNotificationService; import org.podcastpedia.batch.jobs.addpodcast.service.SocialMediaService; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; public class Writer implements ItemWriter<SuggestedPodcast>{ @Autowired private EntityManager entityManager; @Inject private EmailNotificationService emailNotificationService; @Inject private SocialMediaService socialMediaService; @Override public void write(List<? extends SuggestedPodcast> items) throws Exception { if (items.get( 0 ) != null ){ SuggestedPodcast suggestedPodcast = items.get( 0 ); //first insert the data in the database Podcast podcast = suggestedPodcast.getPodcast(); podcast.setInsertionDate( new Date()); entityManager.persist(podcast); entityManager.flush(); //notify submitter about the insertion and post a twitt about it String url = buildUrlOnPodcastpedia(podcast); emailNotificationService.sendPodcastAdditionConfirmation( suggestedPodcast.getName(), suggestedPodcast.getEmail(), url); if (podcast.getTwitterPage() != null ){ socialMediaService.postOnTwitterAboutNewPodcast(podcast, url); } } } private String buildUrlOnPodcastpedia(Podcast podcast) { StringBuffer urlOnPodcastpedia = new StringBuffer( if (podcast.getIdentifier() != null ) { urlOnPodcastpedia.append( "/" + podcast.getIdentifier()); } else { urlOnPodcastpedia.append( "/podcasts/" ); urlOnPodcastpedia.append(String.valueOf(podcast.getPodcastId())); urlOnPodcastpedia.append( "/" + podcast.getTitleInUrl()); } String url = urlOnPodcastpedia.toString(); return url; } } |
Как вы можете видеть, здесь нет ничего особенного, за исключением того, что метод write
должен быть переопределен, и именно здесь EmailNotificationService
внешние службы EmailNotificationService
и SocialMediaService
используются для информирования отправителя подкаста по электронной почте о добавлении в каталог подкастов, а также, если используется Twitter. страница была отправлена, твит будет размещен на стене Podcastpedia . Вы можете найти подробное объяснение о том, как отправлять электронную почту через Velocity и как размещать в Twitter с Java, в следующих сообщениях:
- Как создавать HTML письма в Java с помощью Spring и Velocity
- Как опубликовать в Твиттере с Java с Twitter4J за 10 минут
5.3. процессоры
ItemProcessor
— это абстракция, представляющая бизнес-обработку элемента. В то время как ItemReader
считывает один элемент, а ItemWriter
записывает их, ItemProcessor
предоставляет доступ для преобразования или применения другой бизнес-обработки. При использовании ваших собственных Processors
вы должны реализовать ItemProcessor<I,O>
, его единственный метод O process(I item) throws Exception
, возвращая потенциально измененный или новый элемент для продолжения обработки. Если возвращаемый результат равен нулю, предполагается, что обработка элемента не должна продолжаться.
В то время как процессор первого задания требует немного больше логики, потому что я должен установить атрибуты заголовка etag
и last-modified
атрибуты фида, эпизоды, категории и ключевые слова подкаста:
Реализация ItemProcessor для задания addNewPodcast
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
public class SuggestedPodcastItemProcessor implements ItemProcessor<SuggestedPodcast, SuggestedPodcast> { private static final int TIMEOUT = 10 ; @Autowired ReadDao readDao; @Autowired PodcastAndEpisodeAttributesService podcastAndEpisodeAttributesService; @Autowired private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager; @Autowired private SyndFeedService syndFeedService; /** * Method used to build the categories, tags and episodes of the podcast */ @Override public SuggestedPodcast process(SuggestedPodcast item) throws Exception { if (isPodcastAlreadyInTheDirectory(item.getPodcast().getUrl())) { return null ; } String[] categories = item.getCategories().trim().split( "\\s*,\\s*" ); item.getPodcast().setAvailability(org.apache.http.HttpStatus.SC_OK); //set etag and last modified attributes for the podcast setHeaderFieldAttributes(item.getPodcast()); //set the other attributes of the podcast from the feed podcastAndEpisodeAttributesService.setPodcastFeedAttributes(item.getPodcast()); //set the categories List<Category> categoriesByNames = readDao.findCategoriesByNames(categories); item.getPodcast().setCategories(categoriesByNames); //set the tags setTagsForPodcast(item); //build the episodes setEpisodesForPodcast(item.getPodcast()); return item; } ...... } |
процессор из второго задания использует подход «Driving Query» , в котором я расширяю данные, полученные из Reader, с помощью «JPA-чтения» и группирую элементы в подкастах с эпизодами, чтобы они хорошо выглядели в электронных письмах, которые я получаю. рассылка подписчикам:
Реализация ItemProcessor второго задания — notifySubscribeers
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Scope ( "step" ) public class NotifySubscribersItemProcessor implements ItemProcessor<User, User> { @Autowired EntityManager em; @Value ( "#{jobParameters[updateFrequency]}" ) String updateFrequency; @Override public User process(User item) throws Exception { String sqlInnerJoinEpisodes = "select e from User u JOIN u.podcasts p JOIN p.episodes e WHERE u.email=?1 AND p.updateFrequency=?2 AND" + " e.isNew IS NOT NULL AND e.availability=200 ORDER BY e.podcast.podcastId ASC, e.publicationDate ASC" ; TypedQuery<Episode> queryInnerJoinepisodes = em.createQuery(sqlInnerJoinEpisodes, Episode. class ); queryInnerJoinepisodes.setParameter( 1 , item.getEmail()); queryInnerJoinepisodes.setParameter( 2 , UpdateFrequency.valueOf(updateFrequency)); List<Episode> newEpisodes = queryInnerJoinepisodes.getResultList(); return regroupPodcastsWithEpisodes(item, newEpisodes); } ....... } |
Замечания:
Если вы хотите узнать больше о том, как использовать Apache Http Client, чтобы получить etag
и last-modified
заголовки, вы можете взглянуть на мой пост — Как использовать новый Apache Http Client, чтобы сделать запрос HEAD
6. Выполните пакетное приложение
Пакетная обработка может быть встроена в веб-приложения и файлы WAR, но вначале я выбрал более простой подход, который создает автономное приложение, которое можно запустить методом main()
Java:
Пакетная обработка Java main () метод
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package org.podcastpedia.batch; //imports ...; @ComponentScan @EnableAutoConfiguration public class Application { private static final String NEW_EPISODES_NOTIFICATION_JOB = "newEpisodesNotificationJob" ; private static final String ADD_NEW_PODCAST_JOB = "addNewPodcastJob" ; public static void main(String[] args) throws BeansException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException, InterruptedException { Log log = LogFactory.getLog(Application. class ); SpringApplication app = new SpringApplication(Application. class ); app.setWebEnvironment( false ); ConfigurableApplicationContext ctx= app.run(args); JobLauncher jobLauncher = ctx.getBean(JobLauncher. class ); if (ADD_NEW_PODCAST_JOB.equals(args[ 0 ])){ //addNewPodcastJob Job addNewPodcastJob = ctx.getBean(ADD_NEW_PODCAST_JOB, Job. class ); JobParameters jobParameters = new JobParametersBuilder() .addDate( "date" , new Date()) .toJobParameters(); JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters); BatchStatus batchStatus = jobExecution.getStatus(); while (batchStatus.isRunning()){ log.info( "*********** Still running.... **************" ); Thread.sleep( 1000 ); } log.info(String.format( "*********** Exit status: %s" , jobExecution.getExitStatus().getExitCode())); JobInstance jobInstance = jobExecution.getJobInstance(); log.info(String.format( "********* Name of the job %s" , jobInstance.getJobName())); log.info(String.format( "*********** job instance Id: %d" , jobInstance.getId())); System.exit( 0 ); } else if (NEW_EPISODES_NOTIFICATION_JOB.equals(args[ 0 ])){ JobParameters jobParameters = new JobParametersBuilder() .addDate( "date" , new Date()) .addString( "updateFrequency" , args[ 1 ]) .toJobParameters(); jobLauncher.run(ctx.getBean(NEW_EPISODES_NOTIFICATION_JOB, Job. class ), jobParameters); } else { throw new IllegalArgumentException( "Please provide a valid Job name as first application parameter" ); } System.exit( 0 ); } } |
Лучшее объяснение SpringApplication
-, @ComponentScan
— и @ComponentScan
— @EnableAutoConfiguration
которую вы получаете из источника — Начало работы — Создание пакетной службы:
«Метод main()
ссылается на вспомогательный класс SpringApplication
, предоставляя Application.class
в качестве аргумента для своего метода run()
. Это заставляет Spring читать метаданные аннотации из Application
и управлять ими как компонентом в контексте приложения Spring .
Аннотация @ComponentScan
указывает Spring рекурсивно выполнять поиск в пакете org.podcastpedia.batch
и его дочерних классах, помеченных прямо или косвенно аннотацией @Component
. Эта директива гарантирует, что Spring найдет и зарегистрирует BatchConfiguration
, поскольку он помечен @Configuration
, что, в свою очередь, является своего рода аннотацией @Component
.
Аннотация @EnableAutoConfiguration
включает разумное поведение по умолчанию в зависимости от содержимого вашего пути к классам. Например, он ищет любой класс, который реализует интерфейс CommandLineRunner
и вызывает его метод run()
».
Выполнение этапов строительства:
-
JobLauncher
, который является простым интерфейсом для управления заданиями, извлекается из ApplicationContext. Помните, что это автоматически доступно через аннотацию@EnableBatchProcessing
. - Теперь, основываясь на первом параметре приложения (
args[0]
), я получу соответствующееJob
изApplicationContext
- затем
JobParameters
, где я использую текущую дату —.addDate("date", new Date())
, так что выполнение задания всегда уникально. - как только все будет
JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);
, задание можно выполнить:JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);
- Вы можете использовать возвращенное
jobExecution
чтобы получить доступ кBatchStatus
, кодуBatchStatus
или к имени и идентификатору задания.
Примечание: я настоятельно рекомендую вам прочитать и понять схему метаданных для Spring Batch . Это также поможет вам лучше понять объекты домена Spring Batch.
6.1. Запуск приложения в средах разработки и разработки
Чтобы иметь возможность запускать приложение Spring Batch / Spring Boot в разных средах, я использую возможность Spring Profiles. По умолчанию приложение работает с данными разработки (база данных). Но если я хочу, чтобы работа использовала производственную базу данных, я должен сделать следующее:
- предоставить следующий аргумент среды
-Dspring.profiles.active=prod
- настроить свойства производственной базы данных в файле
application-prod.properties
в classpath, справа от файлаapplication.properties
по умолчанию
Резюме
В этом руководстве мы узнали, как настроить проект Spring Batch с конфигурацией Spring Boot и Java, как использовать некоторые из наиболее распространенных программ чтения в пакетной обработке, как настроить некоторые простые задания и как запускать задания Spring Batch из основной метод.
Ссылка: | Spring Batch Tutorial с Spring Boot и настройкой Java от нашего партнера JCG Адриана Матеи в блоге Codingpedia.org . |