Статьи

Простая обработка событий с помощью var, Lombok и Fluxtion

Вступление

В этой статье я объединяю два продукта Lombok и Fluxtion, чтобы продемонстрировать, как инструменты могут сократить как написанный код, так и время доставки, одновременно улучшая читабельность кода. Использование var из java 10 еще больше улучшает ситуацию. Оба продукта и var используют вывод во время сборки для ускорения разработки.

Идея Fluxtion состоит в том, чтобы минимизировать отходы, наша цель здесь состоит в том, чтобы удалить код пластины котла, уменьшить шум кода и упростить задачи интеграции. Мы хотим потратить как можно меньше времени на разработку, одновременно предоставляя эффективное и высокопроизводительное решение, способное обрабатывать миллионы сообщений в секунду.

Используя описанные методы, я сравниваю реализацию Fluxtion / Lombok с примером scala, использующим потоки Akka, для версии Java требуется меньше кода, и ее проще построить.

Домашнее хозяйство, извинения за не признание Ричарда Уорбертона
Опсиан , в моем первом блоге .

Кодовое соотношение сигнал / шум

Когда мы кодируем, мы решаем две основные задачи:

  • Перевод бизнес-требований в программную логику
  • Взаимодействие логики со средой развертывания

В идеале мы хотели бы потратить все свое время на первое и ничего не на втором. Кроме того, общий объем написанного кода также должен быть уменьшен. Сбалансировать абстракцию, в то же время расширяя возможности разработчика, нелегко, слишком большая абстракция, и мы убираем выразительную силу. Я надеюсь найти хороший баланс с подходом, принятым в этой статье.

Представьте себе, что вы пишете логику для расчета налогов, которая занимает 50 строк, но для написания кода для баз данных, веб-серверов, сортировки, ведения журналов и т. Д. Требуется 1000 строк. Хотя демонстрация технических возможностей не представляет никакой коммерческой ценности в чисто технических деталях реализации. Рассматривая это с другой стороны, мы могли видеть бизнес-логику как сигнал, а код инфраструктуры — как шум. Решения, которые мы пишем, могут быть измерены с отношением сигнал / шум относительно полезной бизнес-логики.

Википедия определяет отношение сигнал / шум как:

Отношение сигнал / шум (сокращенно SNR или S / N) является мерой, используемой в
наука и техника, которая сравнивает уровень полезного сигнала с уровнем фонового шума . SNR определяется как отношение мощности сигнала к мощности шума, часто выражаемое в децибелах . Соотношение выше 1: 1 (больше 0 дБ) указывает больше сигнала, чем шума.

Желательно стремиться к высокому отношению SNR в большинстве систем, с точки зрения программирования некоторые из преимуществ высокого SNR:

  • Меньше кода для записи
  • Более простая бизнес-логика для понимания и поддержки
  • Более короткая кривая обучения
  • Более простая отладка / поиск ошибок, меньше ошибок
  • Более эффективное развитие

В java мы ощущали это давление для улучшения SNR кода на протяжении многих лет, переходя от тяжелого j2ee-контейнера к более простым фреймворкам, таким как spark и spring boot . Сам язык учел этот сдвиг, введя такие изменения, как лямбда-выражения, потоки, ссылки на методы и объявление переменных var.

Сочетание Fluxtion и Lombok

Перед примером быстрый учебник по Fluxtion и Lombok.

Флуктионный праймер

Fluxtion — это встраиваемый механизм обработки потоковых событий, написанный на Java. Разработчик описывает обработку в смеси декларативных и императивных форм, чтобы Fluxtion могла генерировать механизм принятия решений. Движок сериализован как Java-код и может быть встроен в любое Java-приложение. Приложение передает события в механизм для обработки потока.

Генерация движка может происходить в приложении или в процессе сборки с помощью подключаемого модуля maven.

Праймер Ломбок

Lombok — это утилита, которая автоматически записывает код платформы для классов Java, экономя время разработчиков и уменьшая шум кода. Выполняясь как инструмент обработки аннотаций, Lombok генерирует байт-код, представляющий код базовой платы для аннотированных классов. Неполный набор функций Lombok включает в себя:

  • Автоматический метод получения и установки стиля бобов для свойств
  • Хеш-код и сгенерированные равные для свойств
  • Автоматический метод toString
  • Автоматический конструктор для всех свойств класса

Просто добавьте Lombok в вашу сборку maven, и ваш ide должен просто работать, или он работает с netbeans и intellij.

Пример максимальной температуры

Давайте посмотрим на общий шаблон использования Fluxtion. Подписаться на поток событий, извлечь значение из события, выполнить расчет значения, отфильтровать и отправить результат в пользовательский объект. В этом простом примере у нас есть следующие требования:

  • Прослушивание температурных событий
  • Извлеките температуру
  • Поддерживать максимальную температуру
  • Ввод температуры в пользовательский экземпляр, когда есть новый максимум

Клонируйте репозиторий из github и используйте помеченную версию этой статьи. Проект здесь .

1
git clone --branch  article_lombok_july2019 https://github.com/gregv12/articles.git<br><br>cd articles/2019/june/lombok/<br><br>mvn clean install

Код Fluxtion для выполнения требований обработки:

1
2
3
select(TempEvent::getTemp)
  .map(max()).notifyOnChange(true)
  .push(new MyTempProcessor()::setMaxTemp);

Это дает высокий SNR кода и низкое количество строк, весь код ориентирован на бизнес-логику. Для достижения этого Fluxtion использует ссылки на методы и вывод типов. Ссылки на методы позволяют Fluxtion определять желаемое поведение, какие функции строить, исходный и целевой типы и как передавать данные от одного узла к другому в графе выполнения. Ссылки на метод дают нам приятный тип безопасного способа выражения произвольной логики. Это умозаключение, используемое инструментом, которое снимает с разработчика нагрузку для явного выражения каждого шага обработки, предоставляя нам среду с низким кодом для работы.

После генерации Fluxtion обработчик событий сериализованного потокового
здесь , представлен в виде кода Java. Тест для примера здесь .

01
02
03
04
05
06
07
08
09
10
11
12
@Test
    public void testTemp() throws Exception{
        EventHandler handler = new InlineLombok().handler();
        ((Lifecycle)handler).init();
        handler.onEvent(new InlineLombok.TempEvent(10));
        handler.onEvent(new InlineLombok.TempEvent(9));
        handler.onEvent(new InlineLombok.TempEvent(17));
        handler.onEvent(new InlineLombok.TempEvent(16));
        handler.onEvent(new InlineLombok.TempEvent(14));
        handler.onEvent(new InlineLombok.TempEvent(24));
        Assert.assertEquals(3, MyTempProcessor.count);
    }

выход:

1
2
3
4
5
08:08:42.921 [main] INFO  c.f.generator.compiler.SepCompiler - generated sep: D:\projects\fluxtion\articles\2019\june\lombok\target\generated-sources\fluxtion\com\fluxtion\articles\lombok\temperature\generated\lombok\TempMonitor.java
new max temp:10.0
new max temp:17.0
new max temp:24.0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.79 sec

Обработка графического изображения:

Посмотрев ближе на первую строку в примере выше, выберите (TempEvent :: getTemp) , мы можем проверить вывод, который делает Fluxtion. Логика подразумевается здесь:

  • Создать подписку на события типа TempEvent,
  • Добавить узел, который извлекает значение getTemp из входящего события
  • Сделайте значение temp доступным как свойство Number узла
  • Сообщите детям об изменении значения температуры при получении события входящей температуры.

Функции map, notifyOnChange и push — это шаги, добавленные в цепочку выполнения. Подробнее см. Интерфейс Wrapper потокового модуля Fluxtion. Из-за высокого SNR легко понять их назначение и эффект, но для полноты:

  • map (max ()) извлекает свойство число из предыдущего узла (температура). Примените значение к функции max с сохранением состояния при получении нового значения. Сохраните текущее максимальное значение в узле со свойством Number. Уведомляет любые дочерние узлы о значении текущего максимума при получении события.
  • notifyOnChange Функция с сохранением состояния, которая срабатывает, когда отслеживаемое значение обновилось и отличается от предыдущего значения. Только новые максимальные значения распространяются на дочерние узлы.
  • push (new MyTempProcessor () :: setMaxTemp) Добавляет пользовательский узел MyTempProcessor в цепочку выполнения. Когда инициируется новым максимальным темпом, значение узла помещается в setMaxTemp MyTempProcessor. Выполните все преобразования типов для примитивных типов без создания мусора.

Чтобы использовать ссылки на методы в TempEvent, нам сначала нужно определить пару методов доступа в стиле getter / setter. Конечно, ide могут генерировать требуемые методы, но SNR все равно будет падать после генерации. Расширьте это до большего домена, и проблема умножится. Ломбок может прийти нам на помощь, удалив ненужный код и восстановив SNR.

Перед Ломбоком:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class InlineNoLombok {
     
    public EventHandler handler() throws Exception {
        return sepInstance(c
                -> select(TempEvent::getTemp)
                        .map(max()).notifyOnChange(true)
                        .push(new MyTempProcessor()::setMaxTemp),
                "com.fluxtion.articles.lombok.temperature.generated.nolombok", "TempMonitor");
    }
     
 
    public static class TempEvent extends Event {
 
        private double temp;
 
        public TempEvent(double temp) {
            this.temp = temp;
        }
 
        public double getTemp() {
            return temp;
        }
 
        public void setTemp(double temp) {
            this.temp = temp;
        }
         
    }
 
}

После Ломбок:

Добавление одной аннотации @Data удаляет метод получения / установки, а @AllArgsConstructor удаляет конструктор:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
public class InlineLombok {
 
    public EventHandler handler() throws Exception {
        return sepInstance(c
                -> select(TempEvent::getTemp)
                        .map(max()).notifyOnChange(true)
                        .push(new MyTempProcessor()::setMaxTemp),
                "com.fluxtion.articles.lombok.temperature.generated.nolombok", "TempMonitor");
    }
 
    @Data
    @AllArgsConstructor
    public static class TempEvent extends Event {
        private double temp;
    }
}

Даже с этими наименьшими примерами использования Lombok и Fluxtion реальную бизнес-логику гораздо легче прочитать. Более качественный SNR кода делает приложение более эффективным для построения и более легким для понимания.

Пример данных полета

Давайте расширим это на более сложный пример, где значение высокого SNR становится очевидным. В этом примере мы обрабатываем данные о рейсах за целый год. Пример был вдохновлен этим блогом , и код для потокового решения akka находится здесь . Краткое изложение требований:

Обработка годовой стоимости всех записей о посадках в США, хранящихся в формате CSV
здесь

  • Группировать перевозчиков по имени
  • Фильтрация записей с задержкой> 0
  • Название перевозчика: столбец 8, задержка: столбец 14
  • Для группировки перевозчиков рассчитайте:
    • Накопленная сумма общей задержки
    • Общее количество задержанных рейсов
    • Средняя задержка рейса, если уже поздно
  • Рассчитать общее количество рейсов независимо от задержки

Нам нужно определить типы данных и логику обработки, чтобы решить эту проблему. Было бы легко быть пораженным шумом в решении. Но Fluxtion позволяет нам сосредоточиться на бизнес-логике, а Lombok упрощает работу с типами данных, причем оба инструмента используют логический вывод, чтобы сократить объем кода для записи:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class FlightAnalyser {
 
  @SepBuilder(
          name = "FlightDelayAnalyser",
          packageName = "com.fluxtion.articles.lombok.flight.generated"
  )
  public void buildFlightProcessor(SEPConfig cfg) {
    var flightDetails = csvMarshaller(FlightDetails.class, 1)
            .map(14, FlightDetails::setDelay).converter(14, defaultInt(-1))
            .map(8, FlightDetails::setCarrier).converter(8, Converters::intern).build();
    //filter and group by
    var delayedFlight = flightDetails.filter(FlightDetails::getDelay, positive());
    var carrierDelay = groupBy(delayedFlight, FlightDetails::getCarrier, CarrierDelay.class);
    //derived values for a group
    carrierDelay.init(FlightDetails::getCarrier, CarrierDelay::setCarrierId);
    carrierDelay.avg(FlightDetails::getDelay, CarrierDelay::setAvgDelay);
    carrierDelay.count(CarrierDelay::setTotalFlights);
    carrierDelay.sum(FlightDetails::getDelay, CarrierDelay::setTotalDelayMins);
    //make public for testing
    var delayByGroup = cfg.addPublicNode(carrierDelay.build(), "delayMap");
    //dump to console, triggers on EofEvent
    printValues("\nFlight delay analysis\n========================",
            delayByGroup, eofTrigger());
  }
 
  @Data //input data from CSV
  public static class FlightDetails {
    private String carrier;
    private int delay;
  }
 
  @Data //derived data
  public static class CarrierDelay {
    private String carrierId;
    private int avgDelay;
    private int totalFlights;
    private int totalDelayMins;
  }
 
}

Анализ реализации

Lombok позволяет нам иметь дело с классами данных и типами полей, игнорируя леса геттеров / сеттеров. Мы определяем входную запись FlightDetails и сводную запись группировки CarrierDelay.

Использование ключевого слова var для назначения промежуточного экземпляра упрощает чтение и написание кода.

  • строка 8 Fluxtion отображает CSV в тип FlightDetails, 1 указывает на начальную строку заголовка, которую следует игнорировать.
  • строка 9 отображает столбец 14 в значение задержки. Дополнительная функция преобразователя отображает отсутствующую или нечисловую задержку в значение -1. Вывод типа с помощью Fluxtion обеспечивает преобразование char в int с нулевым значением gc
  • строка 10 сопоставляет столбец 8 с именем перевозчика. Имя носителя встроено, чтобы уменьшить ненужное выделение объектов String, поскольку мы ожидаем, что одни и те же имена носителей будут появляться много раз. Принимая во внимание, что существует 7 миллионов записей, это значительно снизит давление gc.
  • В строке 12 функция фильтра Positive () применяется к полю FlightDetails :: getDelay. только задержанные рейсы обрабатываются дочерними узлами.
  • В строке 13 отфильтрованные записи delayedFlight сгруппированы по ключу FlightDetails :: getCarrier, целью группы является CarrierDelay.
  • строка 15 определяет функцию инициализации для записи нового оператора в группу, вызываемую только тогда, когда в группе назначен новый ключ.
  • строка 16 применяет функцию усреднения к задержке и устанавливает значение CarrierDelay: setAvgDelay
  • строка 17 применяет функцию отсчета для задержки и устанавливает значение CarrierDelay: setTotalFlights
  • строка 18 применяет функцию суммы к задержке и устанавливает значение CarrierDelay: setTotalDelayMinutes

Расчеты выполняются с учетом состояния и имеют уникальные значения для каждой несущей, каждый раз, когда запись FlightDelay получает обновление вычислений для соответствующей несущей.

  • строка 21 назначает delayMap в качестве публичной конечной переменной, чтобы помочь тестированию
  • строка 22 печатает значения карты при получении события конца файла

Производительность

Выполняя анализ полета за 2008 год, разархивируйте данные CSV для полета и передайте расположение файла в исполняемый файл jar в дистрибутиве.

1
java.exe -jar dist\flightanalyser.jar [FLIGHT_CSV_DATA]
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Flight delay analysis
========================
FlightAnalyser.CarrierDelay(carrierId=OO, avgDelay=31, totalFlights=219367, totalDelayMins=6884487)
FlightAnalyser.CarrierDelay(carrierId=AA, avgDelay=35, totalFlights=293277, totalDelayMins=10414936)
FlightAnalyser.CarrierDelay(carrierId=MQ, avgDelay=35, totalFlights=205765, totalDelayMins=7255602)
FlightAnalyser.CarrierDelay(carrierId=FL, avgDelay=31, totalFlights=117632, totalDelayMins=3661868)
FlightAnalyser.CarrierDelay(carrierId=DL, avgDelay=27, totalFlights=209018, totalDelayMins=5839658)
FlightAnalyser.CarrierDelay(carrierId=NW, avgDelay=28, totalFlights=158797, totalDelayMins=4482112)
FlightAnalyser.CarrierDelay(carrierId=UA, avgDelay=38, totalFlights=200470, totalDelayMins=7763908)
FlightAnalyser.CarrierDelay(carrierId=9E, avgDelay=32, totalFlights=90601, totalDelayMins=2907848)
FlightAnalyser.CarrierDelay(carrierId=CO, avgDelay=34, totalFlights=141680, totalDelayMins=4818397)
FlightAnalyser.CarrierDelay(carrierId=XE, avgDelay=36, totalFlights=162602, totalDelayMins=5989016)
FlightAnalyser.CarrierDelay(carrierId=AQ, avgDelay=12, totalFlights=1908, totalDelayMins=23174)
FlightAnalyser.CarrierDelay(carrierId=EV, avgDelay=35, totalFlights=122751, totalDelayMins=4402397)
FlightAnalyser.CarrierDelay(carrierId=AS, avgDelay=27, totalFlights=62241, totalDelayMins=1714954)
FlightAnalyser.CarrierDelay(carrierId=F9, avgDelay=21, totalFlights=46836, totalDelayMins=992044)
FlightAnalyser.CarrierDelay(carrierId=B6, avgDelay=42, totalFlights=83202, totalDelayMins=3559212)
FlightAnalyser.CarrierDelay(carrierId=WN, avgDelay=26, totalFlights=469518, totalDelayMins=12633319)
FlightAnalyser.CarrierDelay(carrierId=OH, avgDelay=34, totalFlights=96154, totalDelayMins=3291908)
FlightAnalyser.CarrierDelay(carrierId=HA, avgDelay=18, totalFlights=18736, totalDelayMins=342715)
FlightAnalyser.CarrierDelay(carrierId=YV, avgDelay=37, totalFlights=111004, totalDelayMins=4159465)
FlightAnalyser.CarrierDelay(carrierId=US, avgDelay=28, totalFlights=167945, totalDelayMins=4715728)
 
millis:2682

Анализ производительности обработки:

  размер файла = 673 Мб 

количество записей = 7 009 728

время обработки = 2,689 секунды

скорость обработки байтов = 250 Мб / с

время процесса записи = 383 нано на запись

скорость обработки записей = 2,6 миллиона записей в секунду

Сравнивая два решения, мы видим следующее:

  • Версия Java использует меньше кода, чем версия Scala
  • Fluxtion избавляет от необходимости определять граф, просто бизнес-логику
  • Построение графика вручную — источник ошибок
  • Lombok делает типы данных такими же краткими, как и классы Scala.
  • var уменьшает раздувание кода
  • Соотношение сигнал / шум высокое, что облегчает поддержку и понимание кода
  • Fluxtion намного легче запустить, он не требует настройки сервера, просто скомпилируйте и работайте.

Трудно сравнить показатели производительности, версия Akka говорит о минуте, чтобы запустить пример, но у меня недостаточно опыта Akka, чтобы это проверить. Кроме того, это старый блог, поэтому ситуация, вероятно, пошла дальше.

Вывод

Мы намерены продемонстрировать, что java может быть кратким языком для потоковой передачи событий, если мы выберем хороший набор инструментов для использования. Lombok и Fluxtion элегантно сочетаются, что позволяет декларативному определению логики обработки быть простым и безопасным для типов. Использование var делает код еще более читабельным и простым для написания. Ключом ко всему этому является вывод, каждый инструмент выводит свой тип поведения, и все они избавляют кодировщика от необходимости явно указывать его:

  • var — вывод типа
  • Ломбок — выводим котловую плиту
  • Fluxtion — выводит график обработки

В случае Fluxtion мы сравниваем, как версия Akka требует, чтобы разработчик явно определил граф обработки. Это не масштабируется для более крупных и сложных ситуаций и будет источником ошибок. Еще хуже то, что бизнес-логика скрывается за технической инфраструктурой, что делает обслуживание в будущем еще более дорогостоящим.

В заключение отметим, что производительность этого решения превосходна: он обрабатывает 2,6 миллиона записей в секунду с нулевым разрешением. Надеюсь, вам понравилась работа, и вы захотите попробовать Fluxtion и Lombok.

Подтверждения

AllSimon на github , его комментарии, в то же время помогая Fluxtion, привели меня к экспериментам с Lombok

Опубликовано на Java Code Geeks с разрешения Грега Хиггинса, партнера нашей программы JCG . Смотрите оригинальную статью здесь: Простая обработка событий с помощью var, Lombok и Fluxtion

Мнения, высказанные участниками Java Code Geeks, являются их собственными.