Статьи

Пример потокового подсчета Apache Ignite

В этом примере мы перенаправим текст в Apache Ignite и посчитаем каждое отдельное слово. Мы также будем периодически выдавать SQL-запросы в поток, чтобы запросить 10 самых популярных слов.

Пример будет работать следующим образом:

  1. Мы настроим кеш для хранения слов по мере их поступления из потока.
  2. Мы установим 1-секундное скользящее окно, чтобы слова сохранялись только в течение 1-й секунды.
  3. Программа StreamWords будет транслировать текстовые данные в Ignite.
  4. Программа QueryWords будет запрашивать 10 лучших слов из потока.

Конфигурация кэша

Мы определяем класс CacheConfig, который будет предоставлять конфигурацию для использования в обеих программах, StreamWords и QueryWords. Кеш будет секционированным, который будет хранить слова как значения. Чтобы гарантировать, что идентичные слова кэшируются на одном узле данных, мы используем тип AffinityUuid для уникальных ключей кэша.

Обратите внимание, что в этом примере мы используем скользящее окно длительностью 1 секунда для нашего кэша. Это означает, что слова исчезнут из кэша через 1 секунду, так как они были впервые введены в кэш.

public class CacheConfig {
  public static CacheConfiguration<String, Long> wordCache() {
    CacheConfiguration<String, Long> cfg = new CacheConfiguration<>("words");

    // Index individual words.
    cfg.setIndexedTypes(AffinityUuid.class, /*word type*/String.class);

    // Sliding window of 1 seconds.
    cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(
      new CreatedExpiryPolicy(new Duration(SECONDS, 1))));

    return cfg;
  }
}

Поток слов

Мы определяем класс StreamWords, который будет отвечать за непрерывное чтение слов, формируя локальный текстовый файл (в нашем случае «alice-in-wonderland.txt»), и направляем их в Ignite « словесный » кэш.

пример

public class StreamWords {
  public static void main(String[] args) throws Exception {
    // Mark this cluster member as client.
    Ignition.setClientMode(true);

    try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
      // The cache is configured with sliding window holding 1 second of the streaming data.
      IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());

      try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
        // Stream words from "alice-in-wonderland" book.
        while (true) {
          InputStream in = StreamWords.class.getResourceAsStream("alice-in-wonderland.txt");

          try (LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
            for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
              for (String word : line.split(" "))
                if (!word.isEmpty())
                  // Stream words into Ignite.
                  // By using AffinityUuid as a key, we ensure that identical
                  // words are processed on the same cluster node.
                  stmr.addData(new AffinityUuid(word), word);
            }
          }
        }
      }
    }
  }
}

Запрашивать слова

Мы определяем класс QueryWords, который будет периодически запрашивать количество слов из кэша.

SQL-запрос

  1. Мы используем стандартный SQL для запроса популярных слов.
  2. Ignite SQL обрабатывает классы Java как таблицы SQL. Поскольку наши слова хранятся как простой тип String, SQL-запрос ниже запрашивает таблицу String.
  3. Ignite всегда хранит ключи и значения кэша в виде полей «_key» и «_val». В нашем случае слово «_val» является словом, поэтому мы используем этот синтаксис в нашем запросе SQL.

пример

public class QueryWords {
  public static void main(String[] args) throws Exception {
    // Mark this cluster member as client.
    Ignition.setClientMode(true);

    try (Ignite ignite = Ignition.start()) {
      IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());

      // Select top 10 words.
      SqlFieldsQuery top10Qry = new SqlFieldsQuery(
          "select _val, count(_val) as cnt from String " + 
            "group by _val " + 
            "order by cnt desc " + 
            "limit 10",
          true /*collocated*/
      );

      // Query top 10 popular numbers every 5 seconds.
      while (true) {
        // Execute queries.
        List<List<?>> top10 = stmCache.query(top10Qry).getAll();

        // Print top 10 words.
        ExamplesUtils.printQueryResults(top10);

        Thread.sleep(5000);
      }
    }
  }
}

Начальные узлы сервера

Для запуска примера необходимо запустить узлы данных. В Ignite узлы данных называются узлами сервера. Вы можете запустить столько серверных узлов, сколько захотите, но у вас должно быть как минимум 1, чтобы запустить пример.

Узлы сервера могут быть запущены из командной строки следующим образом:

bin/ignite.sh

Вы также можете запускать узлы сервера программно, например:

public class ExampleNodeStartup {
    public static void main(String[] args) throws IgniteException {
        Ignition.start();
    }
}

Вот как выглядит вывод программы QueryWords на моем ноутбуке MacBook Pro (я также запустил 2 серверных узла и одну программу StreamWords )

...
Query results:
(the,2890)
(and,1355)
(to,1298)
(a,1139)
(of,1029)
(said,1002)
(in,912)
(she,820)
(was,766)
(you,711)
Query results:
(the,1679)
(to,830)
(and,810)
(a,680)
(of,629)
(she,491)
(it,357)
(in,330)
(said,315)
(was,274)
...