Это была одна из моих сессий на последнем JavaOne . Этот пост будет расширять тему и смотреть на реальное приложение, использующее Batch JSR-352 API. Это приложение интегрируется с MMORPG World of Warcraft .
Поскольку JSR-352 — это новая спецификация в мире Java EE, я думаю, что многие люди не знают, как правильно ее использовать. Также может быть проблемой определить случаи использования, к которым применяется данная спецификация. Надеюсь, этот пример поможет вам лучше понять варианты использования.
Аннотация
World of Warcraft — игра, в которую играют более 8 миллионов игроков по всему миру. Услуга предоставляется по регионам: США (США) , Европе (ЕС) , Китаю и Корее. В каждом регионе есть набор серверов под названием Realm, которые вы используете для подключения, чтобы иметь возможность играть в игру. Для этого примера мы рассматриваем только регионы США и ЕС .
Одна из самых интересных особенностей игры заключается в том, что вы можете покупать и продавать внутриигровые товары, называемые Предметами , используя Аукционный Дом . У каждого Царства есть два Аукционных Дома . В среднем каждое Царство торгует около 70.000 Предметов . Давайте разберемся с некоторыми цифрами:
- 512 Королевств ( США и ЕС )
- 70 K предметов в мире
- Всего более 35 млн. Шт.
Данные
Еще одна интересная вещь в World of Warcraft — разработчики предоставляют REST API для доступа к большей части игровой информации, включая данные аукционного дома . Проверьте здесь полный API.
Данные аукционного дома получаются в два этапа. Сначала нам нужно запросить соответствующую конечную точку REST Auction House Realm, чтобы получить ссылку на файл JSON. Затем нам нужно получить доступ к этому URL и загрузить файл со всей информацией об объекте аукциона . Вот пример:
http://eu.battle.net/api/wow/auction/data/aggra-portugues
Приложение
Наша цель здесь — создать приложение, которое загружает Аукционный Дом , обрабатывает его и извлекает метрики. Эти метрики будут строить историю эволюции цены Предметов во времени. Кто знает? Возможно, с помощью этой информации мы можем предсказать колебания цен и покупать или продавать товары в лучшие времена.
Настройка
Для настройки мы собираемся использовать несколько дополнительных вещей в Java EE 7:
работы
Основную работу он будет выполнять Batch JSR-352 Jobs. Задание — это объект, который инкапсулирует весь пакетный процесс. Работа будет соединена вместе через язык спецификации работы. В JSR-352 задание — это просто контейнер для шагов. Он объединяет несколько шагов, которые логически связаны друг с другом в потоке.
Мы собираемся разделить бизнес-логин на три вакансии:
- Подготовка — Создает все необходимые вспомогательные данные. Список областей , создавать папки для копирования файлов.
- Файлы — области запросов для проверки новых файлов для обработки.
- Обработка — загрузка файла, обработка данных, извлечение метрик.
Код
Серверная часть — Java EE 7 с Java 8
Большая часть кода будет в конце. Нам нужен пакетный JSR-352 , но мы также собираемся использовать множество других технологий из Java EE: например, JPA , JAX-RS , CDI и JSON-P .
Поскольку задание « Подготовка» предназначено только для инициализации ресурсов приложения для обработки, я пропускаю его и углубляюсь в самые интересные части.
Работа с файлами
Файловое задание является реализацией AbstractBatchlet . Batchlet — это самый простой стиль обработки, доступный в спецификации Batch. Это шаг, ориентированный на задачу, когда задача вызывается один раз, выполняется и возвращает статус выхода. Этот тип наиболее полезен для выполнения различных задач, не ориентированных на элементы, таких как выполнение команды или передача файла. В этом случае наш Batchlet будет выполнять итерацию для каждого Realm, отправлять REST-запрос каждому и получать URL-адрес с файлом, содержащим данные, которые мы хотим обработать. Вот код:
LoadAuctionFilesBatchlet
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
@Namedpublic class LoadAuctionFilesBatchlet extends AbstractBatchlet { @Inject private WoWBusiness woWBusiness; @Inject @BatchProperty(name = "region") private String region; @Inject @BatchProperty(name = "target") private String target; @Override public String process() throws Exception { List<Realm> realmsByRegion = woWBusiness.findRealmsByRegion(Realm.Region.valueOf(region)); realmsByRegion.parallelStream().forEach(this::getRealmAuctionFileInformation); return "COMPLETED"; } void getRealmAuctionFileInformation(Realm realm) { try { Client client = ClientBuilder.newClient(); Files files = client.target(target + realm.getSlug()) .request(MediaType.TEXT_PLAIN).async() .get(Files.class) .get(2, TimeUnit.SECONDS); files.getFiles().forEach(auctionFile -> createAuctionFile(realm, auctionFile)); } catch (Exception e) { getLogger(this.getClass().getName()).log(Level.INFO, "Could not get files for " + realm.getRealmDetail()); } } void createAuctionFile(Realm realm, AuctionFile auctionFile) { auctionFile.setRealm(realm); auctionFile.setFileName("auctions." + auctionFile.getLastModified() + ".json"); auctionFile.setFileStatus(FileStatus.LOADED); if (!woWBusiness.checkIfAuctionFileExists(auctionFile)) { woWBusiness.createAuctionFile(auctionFile); } }} |
Крутая вещь по этому поводу — это использование Java 8. С parallelStream() вызов нескольких запросов REST очень прост! Вы действительно можете заметить разницу. Если вы хотите попробовать это, просто запустите пример и замените функцию parallelStream() на stream() и проверьте его. На моей машине, с помощью parallelStream() задача выполняется примерно в 5 или 6 раз быстрее.
Обновить
Обычно я бы не использовал этот подход. Я сделал это, потому что часть логики заключается в вызове медленных REST-запросов, и параллельные потоки действительно сияют здесь. Делать это с помощью пакетных разделов возможно, но сложно реализовать. Нам также нужно каждый раз объединять серверы для новых данных, так что это не страшно, если мы пропустим один или два файла. Имейте в виду, что если вы не хотите пропустить ни одной записи, стиль обработки чанков больше подходит. Спасибо Саймону Мартинелли за то, что он привлек мое внимание.
Поскольку Королевства США и ЕС требуют для вызова разных конечных точек REST, они идеально подходят для разделения. Разбиение означает, что задача будет выполняться в несколько потоков. Один поток на раздел. В этом случае у нас есть два раздела.
Чтобы завершить определение задания, нам нужно предоставить файл JoB XML. Это должно быть помещено в каталог META-INF/batch-jobs . Вот files-job.xml для этой работы:
файлы-job.xml
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
<step id="loadRealmAuctionFileStep"> <batchlet ref="loadAuctionFilesBatchlet"> <properties> <property name="region" value="#{partitionPlan['region']}"/> <property name="target" value="#{partitionPlan['target']}"/> </properties> </batchlet> <partition> <plan partitions="2"> <properties partition="0"> <property name="region" value="US"/> </properties> <properties partition="1"> <property name="region" value="EU"/> </properties> </plan> </partition> </step></job> |
В files-job.xml нам нужно определить наш Batchlet в элементе batchlet . Для разделов просто определите элемент partition и назначьте разные properties каждому plan . Затем эти properties можно использовать для позднего связывания значения в LoadAuctionFilesBatchlet с выражениями #{partitionPlan['region']} и #{partitionPlan['target']} . Это очень простой механизм привязки выражений, который работает только для простых свойств и строк.
Процесс работы
Теперь мы хотим обработать файл данных аукциона Realm . Используя информацию из предыдущей работы, мы теперь можем загрузить файл и что-то сделать с данными. Файл JSON имеет следующую структуру:
пункт-аукционы-sample.json
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
{ "realm": { "name": "Grim Batol", "slug": "grim-batol" }, "alliance": { "auctions": [ { "auc": 279573567, // Auction Id "item": 22792, // Item for sale Id "owner": "Miljanko", // Seller Name "ownerRealm": "GrimBatol", // Realm "bid": 3800000, // Bid Value "buyout": 4000000, // Buyout Value "quantity": 20, // Numbers of items in the Auction "timeLeft": "LONG", // Time left for the Auction "rand": 0, "seed": 1069994368 }, { "auc": 278907544, "item": 40195, "owner": "Mongobank", "ownerRealm": "GrimBatol", "bid": 38000, "buyout": 40000, "quantity": 1, "timeLeft": "VERY_LONG", "rand": 0, "seed": 1978036736 } ] }, "horde": { "auctions": [ { "auc": 278268046, "item": 4306, "owner": "Thuglifer", "ownerRealm": "GrimBatol", "bid": 570000, "buyout": 600000, "quantity": 20, "timeLeft": "VERY_LONG", "rand": 0, "seed": 1757531904 }, { "auc": 278698948, "item": 4340, "owner": "Celticpala", "ownerRealm": "Aggra(Português)", "bid": 1000000, "buyout": 1000000, "quantity": 10, "timeLeft": "LONG", "rand": 0, "seed": 0 } ] }} |
В файле есть список аукционов из области , из которой он был загружен. В каждой записи мы можем проверить товар на продажу, цены, продавца и время, оставшееся до конца аукциона. Аукционы агрегированы по типу Аукционного Дома : Альянс и Орда .
Для process-job мы хотим прочитать файл JSON, преобразовать данные и сохранить их в базе данных. Это может быть достигнуто с помощью Chunk Processing. Чанк — это стиль обработки ETL (Extract — Transform — Load), который подходит для обработки больших объемов данных. Блок читает данные по одному элементу за раз и создает фрагменты, которые будут записаны в транзакции. Один элемент считывается из ItemReader , передается ItemProcessor и агрегируется. Как только количество прочитанных элементов становится равным интервалу фиксации, весь блок записывается с помощью ItemWriter , а затем транзакция ItemWriter .
ItemReader
Реальные файлы настолько велики, что не могут быть полностью загружены в память, иначе у вас может не хватить. Вместо этого мы используем JSON-P API для потокового анализа данных.
AuctionDataItemReader
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
@Namedpublic class AuctionDataItemReader extends AbstractAuctionFileProcess implements ItemReader { private JsonParser parser; private AuctionHouse auctionHouse; @Inject private JobContext jobContext; @Inject private WoWBusiness woWBusiness; @Override public void open(Serializable checkpoint) throws Exception { setParser(Json.createParser(openInputStream(getContext().getFileToProcess(FolderType.FI_TMP)))); AuctionFile fileToProcess = getContext().getFileToProcess(); fileToProcess.setFileStatus(FileStatus.PROCESSING); woWBusiness.updateAuctionFile(fileToProcess); } @Override public void close() throws Exception { AuctionFile fileToProcess = getContext().getFileToProcess(); fileToProcess.setFileStatus(FileStatus.PROCESSED); woWBusiness.updateAuctionFile(fileToProcess); } @Override public Object readItem() throws Exception { while (parser.hasNext()) { JsonParser.Event event = parser.next(); Auction auction = new Auction(); switch (event) { case KEY_NAME: updateAuctionHouseIfNeeded(auction); if (readAuctionItem(auction)) { return auction; } break; } } return null; } @Override public Serializable checkpointInfo() throws Exception { return null; } protected void updateAuctionHouseIfNeeded(Auction auction) { if (parser.getString().equalsIgnoreCase(AuctionHouse.ALLIANCE.toString())) { auctionHouse = AuctionHouse.ALLIANCE; } else if (parser.getString().equalsIgnoreCase(AuctionHouse.HORDE.toString())) { auctionHouse = AuctionHouse.HORDE; } else if (parser.getString().equalsIgnoreCase(AuctionHouse.NEUTRAL.toString())) { auctionHouse = AuctionHouse.NEUTRAL; } auction.setAuctionHouse(auctionHouse); } protected boolean readAuctionItem(Auction auction) { if (parser.getString().equalsIgnoreCase("auc")) { parser.next(); auction.setAuctionId(parser.getLong()); parser.next(); parser.next(); auction.setItemId(parser.getInt()); parser.next(); parser.next(); parser.next(); parser.next(); auction.setOwnerRealm(parser.getString()); parser.next(); parser.next(); auction.setBid(parser.getInt()); parser.next(); parser.next(); auction.setBuyout(parser.getInt()); parser.next(); parser.next(); auction.setQuantity(parser.getInt()); return true; } return false; } public void setParser(JsonParser parser) { this.parser = parser; }} |
Чтобы открыть поток Json.createParser JSON, нам нужен Json.createParser и передать ссылку на Json.createParser поток. Для чтения элементов нам просто нужно вызвать hasNext() и next() . Это возвращает JsonParser.Event который позволяет нам проверять положение парсера в потоке. Элементы читаются и возвращаются в метод readItem() из Batch API ItemReader . Если больше нет доступных элементов для чтения, верните null чтобы завершить обработку. Обратите внимание, что мы также реализуем метод open и close из ItemReader . Они используются для инициализации и очистки ресурсов. Они исполняются только один раз.
ItemProcessor
ItemProcessor является необязательным. Он используется для преобразования данных, которые были прочитаны. В этом случае нам нужно добавить дополнительную информацию на аукцион .
AuctionDataItemProcessor
|
01
02
03
04
05
06
07
08
09
10
11
12
|
@Namedpublic class AuctionDataItemProcessor extends AbstractAuctionFileProcess implements ItemProcessor { @Override public Object processItem(Object item) throws Exception { Auction auction = (Auction) item; auction.setRealm(getContext().getRealm()); auction.setAuctionFile(getContext().getFileToProcess()); return auction; }} |
ItemWriter
Наконец, нам просто нужно записать данные в базу данных:
AuctionDataItemWriter
|
01
02
03
04
05
06
07
08
09
10
|
@Namedpublic class AuctionDataItemWriter extends AbstractItemWriter { @PersistenceContext protected EntityManager em; @Override public void writeItems(List<Object> items) throws Exception { items.forEach(em::persist); }} |
Весь процесс с файлом записи 70 К занимает около 20 секунд на моей машине. Я заметил кое-что очень интересное. До этого кода я использовал встроенный EJB, который вызывал метод с операцией persist. В общей сложности это заняло 30 секунд, поэтому внедрение EntityManager и выполнение постоянного сохранения сэкономили мне треть времени обработки. Я могу только предположить, что задержка вызвана увеличением стекового вызова с перехватчиками EJB в середине. Это происходило в Wildfly. Я буду исследовать это дальше.
Чтобы определить чанк, нам нужно добавить его в файл process-job.xml:
Процесс-job.xml
|
1
2
3
4
5
6
7
|
<step id="processFile" next="moveFileToProcessed"> <chunk item-count="100"> <reader ref="auctionDataItemReader"/> <processor ref="auctionDataItemProcessor"/> <writer ref="auctionDataItemWriter"/> </chunk></step> |
В свойстве item-count мы определяем, сколько элементов вписывается в каждый блок обработки. Это означает, что для каждых 100 транзакция совершается. Это полезно для сохранения размера транзакции на низком уровне и для проверки данных. Если нам нужно остановить, а затем перезапустить операцию, мы можем сделать это без необходимости повторной обработки каждого элемента. Мы должны сами кодировать эту логику. Это не входит в пример, но я сделаю это в будущем.
Бег
Чтобы запустить работу, нам нужно получить ссылку на JobOperator . JobOperator предоставляет интерфейс для управления всеми аспектами обработки заданий, включая такие рабочие команды, как запуск, перезапуск и останов, а также команды, связанные с хранилищем заданий, такие как извлечение задания и выполнение шагов.
Для запуска предыдущего files-job.xml Job мы выполняем:
Выполнить работу
|
1
2
|
JobOperator jobOperator = BatchRuntime.getJobOperator();jobOperator.start("files-job", new Properties()); |
Обратите внимание, что мы используем имя XML-файла задания без расширения в JobOperator .
Следующие шаги
Нам все еще нужно объединить данные, чтобы извлечь метрики и отобразить их на веб-странице. Этот пост уже длинный, поэтому я опишу следующие шаги в следующем посте. Во всяком случае, код для этой части уже находится в репозитории Github. Проверьте раздел Ресурсы.
Ресурсы
Вы можете клонировать полную рабочую копию из моего репозитория github и развернуть ее в Wildfly. Вы можете найти там инструкции по его развертыванию.
| Ссылка: | Java EE 7 Batch Processing и World of Warcraft — часть 1 от нашего партнера по JCG Роберто Кортеса в блоге |
