Статьи

Пакетная обработка Java EE 7 и World of Warcraft — часть 1

Это была одна из моих сессий на последнем JavaOne . Этот пост будет расширять тему и смотреть на реальное приложение, использующее Batch JSR-352 API. Это приложение интегрируется с MMORPG World of Warcraft .

Поскольку JSR-352 — это новая спецификация в мире Java EE, я думаю, что многие люди не знают, как правильно ее использовать. Также может быть проблемой определить случаи использования, к которым применяется данная спецификация. Надеюсь, этот пример поможет вам лучше понять варианты использования.

Аннотация

World of Warcraft — игра, в которую играют более 8 миллионов игроков по всему миру. Услуга предоставляется по регионам: США (США) , Европе (ЕС) , Китаю и Корее. В каждом регионе есть набор серверов под названием Realm, которые вы используете для подключения, чтобы иметь возможность играть в игру. Для этого примера мы рассматриваем только регионы США и ЕС .

Оргриммар-аукцион дом

Одна из самых интересных особенностей игры заключается в том, что вы можете покупать и продавать внутриигровые товары, называемые Предметами , используя Аукционный Дом . У каждого Царства есть два Аукционных Дома . В среднем каждое Царство торгует около 70.000 Предметов . Давайте разберемся с некоторыми цифрами:

  • 512 Королевств ( США и ЕС )
  • 70 K предметов в мире
  • Всего более 35 млн. Шт.

Данные

Еще одна интересная вещь в World of Warcraft — разработчики предоставляют REST API для доступа к большей части игровой информации, включая данные аукционного дома . Проверьте здесь полный API.

Данные аукционного дома получаются в два этапа. Сначала нам нужно запросить соответствующую конечную точку REST Auction House Realm, чтобы получить ссылку на файл JSON. Затем нам нужно получить доступ к этому URL и загрузить файл со всей информацией об объекте аукциона . Вот пример:

http://eu.battle.net/api/wow/auction/data/aggra-portugues

Приложение

Наша цель здесь — создать приложение, которое загружает Аукционный Дом , обрабатывает его и извлекает метрики. Эти метрики будут строить историю эволюции цены Предметов во времени. Кто знает? Возможно, с помощью этой информации мы можем предсказать колебания цен и покупать или продавать товары в лучшие времена.

Настройка

Для настройки мы собираемся использовать несколько дополнительных вещей в Java EE 7:

работы

Основную работу он будет выполнять Batch JSR-352 Jobs. Задание — это объект, который инкапсулирует весь пакетный процесс. Работа будет соединена вместе через язык спецификации работы. В JSR-352 задание — это просто контейнер для шагов. Он объединяет несколько шагов, которые логически связаны друг с другом в потоке.

Мы собираемся разделить бизнес-логин на три вакансии:

  • Подготовка — Создает все необходимые вспомогательные данные. Список областей , создавать папки для копирования файлов.
  • Файлы — области запросов для проверки новых файлов для обработки.
  • Обработка — загрузка файла, обработка данных, извлечение метрик.

Код

Серверная часть — Java EE 7 с Java 8

Большая часть кода будет в конце. Нам нужен пакетный JSR-352 , но мы также собираемся использовать множество других технологий из Java EE: например, JPA , JAX-RS , CDI и JSON-P .

Поскольку задание « Подготовка» предназначено только для инициализации ресурсов приложения для обработки, я пропускаю его и углубляюсь в самые интересные части.

Работа с файлами

Файловое задание является реализацией AbstractBatchlet . Batchlet — это самый простой стиль обработки, доступный в спецификации Batch. Это шаг, ориентированный на задачу, когда задача вызывается один раз, выполняется и возвращает статус выхода. Этот тип наиболее полезен для выполнения различных задач, не ориентированных на элементы, таких как выполнение команды или передача файла. В этом случае наш Batchlet будет выполнять итерацию для каждого Realm, отправлять REST-запрос каждому и получать URL-адрес с файлом, содержащим данные, которые мы хотим обработать. Вот код:

LoadAuctionFilesBatchlet

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Named
public class LoadAuctionFilesBatchlet extends AbstractBatchlet {
    @Inject
    private WoWBusiness woWBusiness;
 
    @Inject
    @BatchProperty(name = "region")
    private String region;
    @Inject
    @BatchProperty(name = "target")
    private String target;
 
    @Override
    public String process() throws Exception {
        List<Realm> realmsByRegion = woWBusiness.findRealmsByRegion(Realm.Region.valueOf(region));
        realmsByRegion.parallelStream().forEach(this::getRealmAuctionFileInformation);
 
        return "COMPLETED";
    }
 
    void getRealmAuctionFileInformation(Realm realm) {
        try {
            Client client = ClientBuilder.newClient();
            Files files = client.target(target + realm.getSlug())
                                .request(MediaType.TEXT_PLAIN).async()
                                .get(Files.class)
                                .get(2, TimeUnit.SECONDS);
 
            files.getFiles().forEach(auctionFile -> createAuctionFile(realm, auctionFile));
        } catch (Exception e) {
            getLogger(this.getClass().getName()).log(Level.INFO, "Could not get files for " + realm.getRealmDetail());
        }
    }
 
    void createAuctionFile(Realm realm, AuctionFile auctionFile) {
        auctionFile.setRealm(realm);
        auctionFile.setFileName("auctions." + auctionFile.getLastModified() + ".json");
        auctionFile.setFileStatus(FileStatus.LOADED);
 
        if (!woWBusiness.checkIfAuctionFileExists(auctionFile)) {
            woWBusiness.createAuctionFile(auctionFile);
        }
    }
}

Крутая вещь по этому поводу — это использование Java 8. С parallelStream() вызов нескольких запросов REST очень прост! Вы действительно можете заметить разницу. Если вы хотите попробовать это, просто запустите пример и замените функцию parallelStream() на stream() и проверьте его. На моей машине, с помощью parallelStream() задача выполняется примерно в 5 или 6 раз быстрее.

Обновить
Обычно я бы не использовал этот подход. Я сделал это, потому что часть логики заключается в вызове медленных REST-запросов, и параллельные потоки действительно сияют здесь. Делать это с помощью пакетных разделов возможно, но сложно реализовать. Нам также нужно каждый раз объединять серверы для новых данных, так что это не страшно, если мы пропустим один или два файла. Имейте в виду, что если вы не хотите пропустить ни одной записи, стиль обработки чанков больше подходит. Спасибо Саймону Мартинелли за то, что он привлек мое внимание.

Поскольку Королевства США и ЕС требуют для вызова разных конечных точек REST, они идеально подходят для разделения. Разбиение означает, что задача будет выполняться в несколько потоков. Один поток на раздел. В этом случае у нас есть два раздела.

Чтобы завершить определение задания, нам нужно предоставить файл JoB XML. Это должно быть помещено в каталог META-INF/batch-jobs . Вот files-job.xml для этой работы:

файлы-job.xml

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
<job id="loadRealmAuctionFileJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <step id="loadRealmAuctionFileStep">
        <batchlet ref="loadAuctionFilesBatchlet">
            <properties>
                <property name="region" value="#{partitionPlan['region']}"/>
                <property name="target" value="#{partitionPlan['target']}"/>
            </properties>
        </batchlet>
        <partition>
            <plan partitions="2">
                <properties partition="0">
                    <property name="region" value="US"/>
                    <property name="target" value="http://us.battle.net/api/wow/auction/data/"/>
                </properties>
                <properties partition="1">
                    <property name="region" value="EU"/>
                    <property name="target" value="http://eu.battle.net/api/wow/auction/data/"/>
                </properties>
            </plan>
        </partition>
    </step>
</job>

В files-job.xml нам нужно определить наш Batchlet в элементе batchlet . Для разделов просто определите элемент partition и назначьте разные properties каждому plan . Затем эти properties можно использовать для позднего связывания значения в LoadAuctionFilesBatchlet с выражениями #{partitionPlan['region']} и #{partitionPlan['target']} . Это очень простой механизм привязки выражений, который работает только для простых свойств и строк.

Процесс работы

Теперь мы хотим обработать файл данных аукциона Realm . Используя информацию из предыдущей работы, мы теперь можем загрузить файл и что-то сделать с данными. Файл JSON имеет следующую структуру:

пункт-аукционы-sample.json

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
{
    "realm": {
        "name": "Grim Batol",
        "slug": "grim-batol"
    },
    "alliance": {
        "auctions": [
            {
                "auc": 279573567,            // Auction Id
                "item": 22792,               // Item for sale Id
                "owner": "Miljanko",         // Seller Name
                "ownerRealm": "GrimBatol",   // Realm
                "bid": 3800000,              // Bid Value
                "buyout": 4000000,           // Buyout Value
                "quantity": 20,              // Numbers of items in the Auction
                "timeLeft": "LONG",          // Time left for the Auction
                "rand": 0,
                "seed": 1069994368
            },
            {
                "auc": 278907544,
                "item": 40195,
                "owner": "Mongobank",
                "ownerRealm": "GrimBatol",
                "bid": 38000,
                "buyout": 40000,
                "quantity": 1,
                "timeLeft": "VERY_LONG",
                "rand": 0,
                "seed": 1978036736
            }
        ]
    },
    "horde": {
        "auctions": [
            {
                "auc": 278268046,
                "item": 4306,
                "owner": "Thuglifer",
                "ownerRealm": "GrimBatol",
                "bid": 570000,
                "buyout": 600000,
                "quantity": 20,
                "timeLeft": "VERY_LONG",
                "rand": 0,
                "seed": 1757531904
            },
            {
                "auc": 278698948,
                "item": 4340,
                "owner": "Celticpala",
                "ownerRealm": "Aggra(Português)",
                "bid": 1000000,
                "buyout": 1000000,
                "quantity": 10,
                "timeLeft": "LONG",
                "rand": 0,
                "seed": 0
            }
        ]
    }
}

В файле есть список аукционов из области , из которой он был загружен. В каждой записи мы можем проверить товар на продажу, цены, продавца и время, оставшееся до конца аукциона. Аукционы агрегированы по типу Аукционного Дома : Альянс и Орда .

Для process-job мы хотим прочитать файл JSON, преобразовать данные и сохранить их в базе данных. Это может быть достигнуто с помощью Chunk Processing. Чанк — это стиль обработки ETL (Extract — Transform — Load), который подходит для обработки больших объемов данных. Блок читает данные по одному элементу за раз и создает фрагменты, которые будут записаны в транзакции. Один элемент считывается из ItemReader , передается ItemProcessor и агрегируется. Как только количество прочитанных элементов становится равным интервалу фиксации, весь блок записывается с помощью ItemWriter , а затем транзакция ItemWriter .

ItemReader

Реальные файлы настолько велики, что не могут быть полностью загружены в память, иначе у вас может не хватить. Вместо этого мы используем JSON-P API для потокового анализа данных.

AuctionDataItemReader

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@Named
public class AuctionDataItemReader extends AbstractAuctionFileProcess implements ItemReader {
    private JsonParser parser;
    private AuctionHouse auctionHouse;
 
    @Inject
    private JobContext jobContext;
    @Inject
    private WoWBusiness woWBusiness;
 
    @Override
    public void open(Serializable checkpoint) throws Exception {
        setParser(Json.createParser(openInputStream(getContext().getFileToProcess(FolderType.FI_TMP))));
 
        AuctionFile fileToProcess = getContext().getFileToProcess();
        fileToProcess.setFileStatus(FileStatus.PROCESSING);
        woWBusiness.updateAuctionFile(fileToProcess);
    }
 
    @Override
    public void close() throws Exception {
        AuctionFile fileToProcess = getContext().getFileToProcess();
        fileToProcess.setFileStatus(FileStatus.PROCESSED);
        woWBusiness.updateAuctionFile(fileToProcess);
    }
 
    @Override
    public Object readItem() throws Exception {
        while (parser.hasNext()) {
            JsonParser.Event event = parser.next();
            Auction auction = new Auction();
            switch (event) {
                case KEY_NAME:
                    updateAuctionHouseIfNeeded(auction);
 
                    if (readAuctionItem(auction)) {
                        return auction;
                    }
                    break;
            }
        }
        return null;
    }
 
    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }
 
    protected void updateAuctionHouseIfNeeded(Auction auction) {
        if (parser.getString().equalsIgnoreCase(AuctionHouse.ALLIANCE.toString())) {
            auctionHouse = AuctionHouse.ALLIANCE;
        } else if (parser.getString().equalsIgnoreCase(AuctionHouse.HORDE.toString())) {
            auctionHouse = AuctionHouse.HORDE;
        } else if (parser.getString().equalsIgnoreCase(AuctionHouse.NEUTRAL.toString())) {
            auctionHouse = AuctionHouse.NEUTRAL;
        }
 
        auction.setAuctionHouse(auctionHouse);
    }
 
    protected boolean readAuctionItem(Auction auction) {
        if (parser.getString().equalsIgnoreCase("auc")) {
            parser.next();
            auction.setAuctionId(parser.getLong());
            parser.next();
            parser.next();
            auction.setItemId(parser.getInt());
            parser.next();
            parser.next();
            parser.next();
            parser.next();
            auction.setOwnerRealm(parser.getString());
            parser.next();
            parser.next();
            auction.setBid(parser.getInt());
            parser.next();
            parser.next();
            auction.setBuyout(parser.getInt());
            parser.next();
            parser.next();
            auction.setQuantity(parser.getInt());
            return true;
        }
        return false;
    }
 
    public void setParser(JsonParser parser) {
        this.parser = parser;
    }
}

Чтобы открыть поток Json.createParser JSON, нам нужен Json.createParser и передать ссылку на Json.createParser поток. Для чтения элементов нам просто нужно вызвать hasNext() и next() . Это возвращает JsonParser.Event который позволяет нам проверять положение парсера в потоке. Элементы читаются и возвращаются в метод readItem() из Batch API ItemReader . Если больше нет доступных элементов для чтения, верните null чтобы завершить обработку. Обратите внимание, что мы также реализуем метод open и close из ItemReader . Они используются для инициализации и очистки ресурсов. Они исполняются только один раз.

ItemProcessor

ItemProcessor является необязательным. Он используется для преобразования данных, которые были прочитаны. В этом случае нам нужно добавить дополнительную информацию на аукцион .

AuctionDataItemProcessor

01
02
03
04
05
06
07
08
09
10
11
12
@Named
public class AuctionDataItemProcessor extends AbstractAuctionFileProcess implements ItemProcessor {
    @Override
    public Object processItem(Object item) throws Exception {
        Auction auction = (Auction) item;
 
        auction.setRealm(getContext().getRealm());
        auction.setAuctionFile(getContext().getFileToProcess());
 
        return auction;
    }
}

ItemWriter

Наконец, нам просто нужно записать данные в базу данных:

AuctionDataItemWriter

01
02
03
04
05
06
07
08
09
10
@Named
public class AuctionDataItemWriter extends AbstractItemWriter {
    @PersistenceContext
    protected EntityManager em;
 
    @Override
    public void writeItems(List<Object> items) throws Exception {
        items.forEach(em::persist);
    }
}

Весь процесс с файлом записи 70 К занимает около 20 секунд на моей машине. Я заметил кое-что очень интересное. До этого кода я использовал встроенный EJB, который вызывал метод с операцией persist. В общей сложности это заняло 30 секунд, поэтому внедрение EntityManager и выполнение постоянного сохранения сэкономили мне треть времени обработки. Я могу только предположить, что задержка вызвана увеличением стекового вызова с перехватчиками EJB в середине. Это происходило в Wildfly. Я буду исследовать это дальше.

Чтобы определить чанк, нам нужно добавить его в файл process-job.xml:

Процесс-job.xml

1
2
3
4
5
6
7
<step id="processFile" next="moveFileToProcessed">
    <chunk item-count="100">
        <reader ref="auctionDataItemReader"/>
        <processor ref="auctionDataItemProcessor"/>
        <writer ref="auctionDataItemWriter"/>
    </chunk>
</step>

В свойстве item-count мы определяем, сколько элементов вписывается в каждый блок обработки. Это означает, что для каждых 100 транзакция совершается. Это полезно для сохранения размера транзакции на низком уровне и для проверки данных. Если нам нужно остановить, а затем перезапустить операцию, мы можем сделать это без необходимости повторной обработки каждого элемента. Мы должны сами кодировать эту логику. Это не входит в пример, но я сделаю это в будущем.

Бег

Чтобы запустить работу, нам нужно получить ссылку на JobOperator . JobOperator предоставляет интерфейс для управления всеми аспектами обработки заданий, включая такие рабочие команды, как запуск, перезапуск и останов, а также команды, связанные с хранилищем заданий, такие как извлечение задания и выполнение шагов.

Для запуска предыдущего files-job.xml Job мы выполняем:

Выполнить работу

1
2
JobOperator jobOperator = BatchRuntime.getJobOperator();
jobOperator.start("files-job", new Properties());

Обратите внимание, что мы используем имя XML-файла задания без расширения в JobOperator .

Следующие шаги

Нам все еще нужно объединить данные, чтобы извлечь метрики и отобразить их на веб-странице. Этот пост уже длинный, поэтому я опишу следующие шаги в следующем посте. Во всяком случае, код для этой части уже находится в репозитории Github. Проверьте раздел Ресурсы.

Ресурсы

Вы можете клонировать полную рабочую копию из моего репозитория github и развернуть ее в Wildfly. Вы можете найти там инструкции по его развертыванию.

Ссылка: Java EE 7 Batch Processing и World of Warcraft — часть 1 от нашего партнера по JCG Роберто Кортеса в блоге