Статьи

Пакетная обработка Java EE 7 и World of Warcraft — часть 2

Сегодня я представляю вам вторую часть моего предыдущего поста о пакетной обработке Java EE 7 и World of Warcraft — часть 1 . В этом посте мы увидим, как агрегировать и извлекать метрики из данных, которые мы получили в первой части .

Оргриммар-аукцион дом

резюмировать

Цель пакета — загрузить данные аукционного дома World of Warcraft , обработать аукционы и извлечь метрики. Эти метрики будут строить историю эволюции цен Аукционных предметов во времени. В первой части мы уже загрузили и вставили данные в базу данных.

Приложение

Процесс работы

После добавления необработанных данных в базу данных, мы собираемся добавить еще один шаг с обработкой стиля Chunk. В блоке мы собираемся прочитать агрегированные данные, а затем вставить их в другую таблицу в базе данных для быстрого доступа. Это делается в process-job.xml :

Процесс-job.xml

1
2
3
4
5
6
7
<step id="importStatistics">
    <chunk item-count="100">
        <reader ref="processedAuctionsReader"/>
        <processor ref="processedAuctionsProcessor"/>
        <writer ref="processedAuctionsWriter"/>
    </chunk>
</step>

Блок читает данные по одному элементу за раз и создает фрагменты, которые будут записаны в транзакции. Один элемент считывается из ItemReader , передается ItemProcessor и агрегируется. Как только количество прочитанных элементов становится равным интервалу фиксации, весь блок записывается с помощью ItemWriter , а затем транзакция ItemWriter .

ProcessedAuctionsReader

В читателе мы собираемся выбирать и объединять метрики, используя функции базы данных.

ProcessedAuctionsReader.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Named
public class ProcessedAuctionsReader extends AbstractAuctionFileProcess implements ItemReader {
    @Resource(name = "java:comp/DefaultDataSource")
    protected DataSource dataSource;
  
    private PreparedStatement preparedStatement;
    private ResultSet resultSet;
  
    @Override
    public void open(Serializable checkpoint) throws Exception {
        Connection connection = dataSource.getConnection();
  
        preparedStatement = connection.prepareStatement(
                        "SELECT" +
                        "   itemid as itemId," +
                        "   sum(quantity)," +
                        "   sum(bid)," +
                        "   sum(buyout)," +
                        "   min(bid / quantity)," +
                        "   min(buyout / quantity)," +
                        "   max(bid / quantity)," +
                        "   max(buyout / quantity)" +
                        " FROM auction" +
                        " WHERE auctionfile_id = " +
                        getContext().getFileToProcess().getId() +
                        " GROUP BY itemid" +
                        " ORDER BY 1",
                ResultSet.TYPE_FORWARD_ONLY,
                ResultSet.CONCUR_READ_ONLY,
                ResultSet.HOLD_CURSORS_OVER_COMMIT
        );
  
        // Weird bug here. Check https://java.net/bugzilla/show_bug.cgi?id=5315
        //preparedStatement.setLong(1, getContext().getFileToProcess().getId());
  
        resultSet = preparedStatement.executeQuery();
    }
  
    @Override
    public void close() throws Exception {
        DbUtils.closeQuietly(resultSet);
        DbUtils.closeQuietly(preparedStatement);
    }
  
    @Override
    public Object readItem() throws Exception {
        return resultSet.next() ? resultSet : null;
    }
  
    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }

В этом примере мы получаем лучшие результаты производительности, используя простой JDBC с простым прокручиваемым набором результатов. Таким образом, выполняется только один запрос и результаты извлекаются по мере необходимости в readItem . Возможно, вы захотите изучить другие альтернативы.

Обычный JPA не имеет прокручиваемого результата в стандартах, поэтому вам нужно разбить результаты на страницы. Это приведет к нескольким запросам, которые замедляют чтение. Другой вариант — использовать новый API Java 8 Streams для выполнения операций агрегирования. Операции выполняются быстро, но вам нужно выбрать весь набор данных из базы данных в потоки. В конечном итоге это убьет вашу производительность.

Я попробовал оба подхода и получил лучшие результаты, используя возможности агрегации баз данных. Я не говорю, что это всегда лучший вариант, но в данном конкретном случае это был лучший вариант.

Во время реализации я также обнаружил ошибку в пакетном режиме. Вы можете проверить это здесь . Исключение выдается при установке параметров в PreparedStatement. Обходной путь должен был ввести параметры непосредственно в запрос SQL. Гадкий, я знаю …

ProcessedAuctionsProcessor

В процессоре давайте сохраним все агрегированные значения в объекте-держателе для хранения в базе данных.

ProcessedAuctionsProcessor.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Named
public class ProcessedAuctionsProcessor extends AbstractAuctionFileProcess implements ItemProcessor {
    @Override
    @SuppressWarnings("unchecked")
    public Object processItem(Object item) throws Exception {
        ResultSet resultSet = (ResultSet) item;
  
        AuctionItemStatistics auctionItemStatistics = new AuctionItemStatistics();
        auctionItemStatistics.setItemId(resultSet.getInt(1));
        auctionItemStatistics.setQuantity(resultSet.getLong(2));
        auctionItemStatistics.setBid(resultSet.getLong(3));
        auctionItemStatistics.setBuyout(resultSet.getLong(4));
        auctionItemStatistics.setMinBid(resultSet.getLong(5));
        auctionItemStatistics.setMinBuyout(resultSet.getLong(6));
        auctionItemStatistics.setMaxBid(resultSet.getLong(7));
        auctionItemStatistics.setMaxBuyout(resultSet.getLong(8));
  
        auctionItemStatistics.setTimestamp(getContext().getFileToProcess().getLastModified());
  
        auctionItemStatistics.setAvgBid(
                (double) (auctionItemStatistics.getBid() / auctionItemStatistics.getQuantity()));
        auctionItemStatistics.setAvgBuyout(
                (double) (auctionItemStatistics.getBuyout() / auctionItemStatistics.getQuantity()));
  
        auctionItemStatistics.setRealm(getContext().getRealm());
  
        return auctionItemStatistics;
    }
}

Поскольку метрики записывают точный моментальный снимок данных во времени, вычисление необходимо выполнить только один раз. Вот почему мы сохраняем агрегированные показатели. Они никогда не изменятся, и мы можем легко проверить историю.

Если вы знаете, что ваши исходные данные являются неизменными, и вам нужно выполнять над ними операции, я рекомендую вам сохранить результат где-нибудь. Это сэкономит ваше время. Конечно, вам нужно балансировать, если в будущем к этим данным будет обращаться много раз. Если нет, возможно, вам не нужно проходить через проблему сохранения данных.

ProcessedAuctionsWriter

Наконец, нам просто нужно записать данные в базу данных:

ProcessedAuctionsWriter.java

01
02
03
04
05
06
07
08
09
10
11
12
@Named
public class ProcessedAuctionsWriter extends AbstractItemWriter {
    @PersistenceContext
    protected EntityManager em;
  
    @Override
    @SuppressWarnings("unchecked")
    public void writeItems(List items) throws Exception {
        List<AuctionItemStatistics> statistis = (List<AuctionItemStatistics>) items;
        statistis.forEach(em::persist);
    }
}

метрика

Теперь, чтобы сделать что-то полезное с данными, мы собираемся предоставить конечную точку REST для выполнения запросов по вычисленным метрикам. Вот как:

WowBusinessBean.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
@Override @GET
    @Path("items")
    public List<AuctionItemStatistics> findAuctionItemStatisticsByRealmAndItem(@QueryParam("realmId") Long realmId,
                                                                               @QueryParam("itemId") Integer itemId) {
  
        Realm realm = (Realm) em.createNamedQuery("Realm.findRealmsWithConnectionsById")
                                .setParameter("id", realmId)
                                .getSingleResult();
  
        // Workaround for https://bugs.eclipse.org/bugs/show_bug.cgi?id=433075 if using EclipseLink
        List<Realm> connectedRealms = new ArrayList<>();
        connectedRealms.addAll(realm.getConnectedRealms());
        List<Long> ids = connectedRealms.stream().map(Realm::getId).collect(Collectors.toList());
        ids.add(realmId);
  
        return em.createNamedQuery("AuctionItemStatistics.findByRealmsAndItem")
                 .setParameter("realmIds", ids)
                 .setParameter("itemId", itemId)
                 .getResultList();
    }

Если вы помните некоторые детали поста части 1 , серверы World of Warcraft называются Realms . Эти области могут быть связаны друг с другом и совместно использовать один и тот же Аукционный Дом . Для этого у нас также есть информация о том, как сферы связаны друг с другом. Это важно, потому что мы можем искать предмет аукциона во всех связанных сферах. Остальная часть логики — это простые запросы для вывода данных.

Во время разработки я также обнаружил ошибку в Eclipse Link (если вы работаете в Glassfish) и Java 8. По-видимому, базовая коллекция, возвращаемая Eclipse Link, имеет число элементов, равное 0. Это не очень хорошо работает с Streams, если вы попытаться встроить запрос запроса плюс операцию потока. Поток будет думать, что он пуст и результаты не возвращаются. Вы можете прочитать немного больше об этом здесь .

Интерфейс

Я также разработал небольшой интерфейс, используя Angular и Google Charts для отображения метрик. Посмотри:

вау-аукцион-поиск

Здесь я ищу в Царстве под названием «Aggra (Português)» и идентификатор предмета аукциона 72092, который соответствует Призрачной железной руде . Как вы можете видеть, мы можем проверить количество для продажи, цену покупки и выкупа и колебания цены во времени. Аккуратные? Я могу написать еще один пост о создании веб-интерфейса в будущем.

Ресурсы

Вы можете клонировать полную рабочую копию из моего репозитория github и развернуть ее в Wildfly или Glassfish . Вы можете найти там инструкции по его развертыванию: Аукционы World of Warcraft

Проверьте также примеры проектов Java EE с большим количеством пакетных примеров, полностью документированных.

Ссылка: Java EE 7 Batch Processing и World of Warcraft — часть 2 от нашего партнера по JCG Роберто Кортеса в блоге