В этом примере мы перенаправим текст в Apache Ignite и посчитаем каждое отдельное слово. Мы также будем периодически выдавать SQL-запросы в поток, чтобы запросить 10 самых популярных слов.
Пример будет работать следующим образом:
- Мы настроим кеш для хранения слов по мере их поступления из потока.
- Мы установим 1-секундное скользящее окно, чтобы слова сохранялись только в течение 1-й секунды.
- Программа StreamWords будет транслировать текстовые данные в Ignite.
- Программа QueryWords будет запрашивать 10 лучших слов из потока.
Конфигурация кэша
Мы определяем класс CacheConfig, который будет предоставлять конфигурацию для использования в обеих программах, StreamWords и QueryWords. Кеш будет секционированным, который будет хранить слова как значения. Чтобы гарантировать, что идентичные слова кэшируются на одном узле данных, мы используем тип AffinityUuid для уникальных ключей кэша.
Обратите внимание, что в этом примере мы используем скользящее окно длительностью 1 секунда для нашего кэша. Это означает, что слова исчезнут из кэша через 1 секунду, так как они были впервые введены в кэш.
public class CacheConfig {
public static CacheConfiguration<String, Long> wordCache() {
CacheConfiguration<String, Long> cfg = new CacheConfiguration<>("words");
// Index individual words.
cfg.setIndexedTypes(AffinityUuid.class, /*word type*/String.class);
// Sliding window of 1 seconds.
cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(
new CreatedExpiryPolicy(new Duration(SECONDS, 1))));
return cfg;
}
}
Поток слов
Мы определяем класс StreamWords, который будет отвечать за непрерывное чтение слов, формируя локальный текстовый файл (в нашем случае «alice-in-wonderland.txt»), и направляем их в Ignite « словесный » кэш.
пример
public class StreamWords {
public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
// The cache is configured with sliding window holding 1 second of the streaming data.
IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
// Stream words from "alice-in-wonderland" book.
while (true) {
InputStream in = StreamWords.class.getResourceAsStream("alice-in-wonderland.txt");
try (LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
for (String word : line.split(" "))
if (!word.isEmpty())
// Stream words into Ignite.
// By using AffinityUuid as a key, we ensure that identical
// words are processed on the same cluster node.
stmr.addData(new AffinityUuid(word), word);
}
}
}
}
}
}
}
Запрашивать слова
Мы определяем класс QueryWords, который будет периодически запрашивать количество слов из кэша.
SQL-запрос
- Мы используем стандартный SQL для запроса популярных слов.
- Ignite SQL обрабатывает классы Java как таблицы SQL. Поскольку наши слова хранятся как простой тип String, SQL-запрос ниже запрашивает таблицу String.
- Ignite всегда хранит ключи и значения кэша в виде полей «_key» и «_val». В нашем случае слово «_val» является словом, поэтому мы используем этот синтаксис в нашем запросе SQL.
пример
public class QueryWords {
public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start()) {
IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
// Select top 10 words.
SqlFieldsQuery top10Qry = new SqlFieldsQuery(
"select _val, count(_val) as cnt from String " +
"group by _val " +
"order by cnt desc " +
"limit 10",
true /*collocated*/
);
// Query top 10 popular numbers every 5 seconds.
while (true) {
// Execute queries.
List<List<?>> top10 = stmCache.query(top10Qry).getAll();
// Print top 10 words.
ExamplesUtils.printQueryResults(top10);
Thread.sleep(5000);
}
}
}
}
Начальные узлы сервера
Для запуска примера необходимо запустить узлы данных. В Ignite узлы данных называются узлами сервера. Вы можете запустить столько серверных узлов, сколько захотите, но у вас должно быть как минимум 1, чтобы запустить пример.
Узлы сервера могут быть запущены из командной строки следующим образом:
bin/ignite.sh
Вы также можете запускать узлы сервера программно, например:
public class ExampleNodeStartup {
public static void main(String[] args) throws IgniteException {
Ignition.start();
}
}
Вот как выглядит вывод программы QueryWords на моем ноутбуке MacBook Pro (я также запустил 2 серверных узла и одну программу StreamWords )
...
Query results:
(the,2890)
(and,1355)
(to,1298)
(a,1139)
(of,1029)
(said,1002)
(in,912)
(she,820)
(was,766)
(you,711)
Query results:
(the,1679)
(to,830)
(and,810)
(a,680)
(of,629)
(she,491)
(it,357)
(in,330)
(said,315)
(was,274)
...