Статьи

Подсчет слов в реальном времени на GridGain

Некоторое
время назад я пообещал опубликовать код из
презентации
GridGain, которую я сделал в QCon London. Поскольку презентация была в Scala, код, который я буду публиковать здесь, находится в Scala, но я также опубликую версию Java в течение нескольких дней.

Сначала краткое вступление. Все мы знаем пример подсчета слов в Hadoop, который берет файл со словами, а затем создает другой файл с числом вхождений рядом с каждым словом. Hadoop очень хорошо справляется с этим примером, однако главное предостережение с примером Hadoop заключается в том, что это не реальное время.

Пример подсчета слов, который я сделал на QCon, фактически подсчитывал слова в реальном времени. Программа была разделена на две части. Первая часть отвечает за загрузку слов в реальном времени в сетку данных GridGain, а вторая часть запрашивала сетку каждые 3 секунды, чтобы непрерывно распечатывать 10 лучших слов, хранящихся до сих пор.

Пример был сделан с использованием ‘
Scalar ‘ — GridGain DSL для Scala, но это могло быть сделано и в Java с использованием API-интерфейсов GridGain Java.

Постоянно заполнять слова в реальном времени

Начнем с постоянной загрузки сетки данных новыми словами. Для этого я скачал несколько книг в текстовом формате и начал одновременно читать их из  метода populate (…) , по одному потоку на книгу. Для каждого прочитанного слова я сохраняю его в кеше, в котором само слово является ключом, а число текущих вхождений — значением. Также обратите внимание на то, как мы позволяем грид-асинхронно обновлять кэш с использованием асинхронного запуска при чтении следующей строки из файла книги (на самом деле, скорее всего, у вас будет более одного асинхронного задания или если функция загрузки данных GridGain сделает это за вас).

def populate(threadPool: CompletionService, dir: File) {
  val bookFileNames = dir.list()
 
  // For every book, start a new thread and start populating cache
  // with words and their counts.
  for (bookFileName <- bookFileNames) {
    threadPool.submit(new Callable {
      def call() = {
        val cache = grid$.cache[String, JInt]
 
        var fut: GridFuture[_] = null;
 
        Source.fromFile(new File(dir, name)).getLines().foreach(line => {
          line.split("[^a-zA-Z0-9]").foreach(word => {
            if (!word.isEmpty) {
              if (fut != null)
                fut.get()
 
              fut = grid$.affinityRunAsync(null, word, () => {
                // Increment word counter and store it in cache.
                // We use cache transaction to make sure that
                // gets and puts are consistent and atomic.
                cache.inTx(
                  () => cache += (word -> (cache.getOrElse(word, 0) + 1))
                )
 
                ()
              })
            }
          })
        })
 
        None // Return nothing.
      }
    })
  }
 
  // Wait for all threads to finish.
  books.foreach(_ => threadPool.take().get())
}

Распределенный SQL-запрос

Теперь давайте реализуем наш распределенный запрос к сетке данных GridGain, который будет выполняться каждые 3 секунды. Обратите внимание, что мы используем стандартный синтаксис SQL для запроса удаленных узлов сетки. Интересно, что сетка данных GridGain позволяет использовать SQL практически без каких-либо ограничений. Вы можете использовать любую встроенную функцию SQL и даже соединения SQL между различными классами. Здесь, например, мы используем функцию длины SQL (…), чтобы запрашивать только слова длиной более 3 букв, чтобы избавиться от частых коротких статей, таких как «a» или «the» в наших поисках. Мы также используем ключевое слово desc для сортировки количества слов в порядке убывания и ограничиваем ключевое слово, чтобы ограничить наш выбор только 10 словами.

def queryPopularWords(cnt: Int) {
  // Type alias for sequences of strings (for readability only).
  type SSeq = Seq[String]
 
  grid$.cache[String, JInt].sqlReduce(
    // PROJECTION (where to run):
    grid$.projectionForCaches(null),
    // SQL QUERY (what to run):
    "length(_key) > 3 order by _val desc limit " + cnt,
    // REMOTE REDUCER (how to reduce on remote nodes):
    (it: Iterable[(String, JInt)]) =>
      // Pre-reduce by converting
      // Seq[(String, JInt)] to Map[JInt, Seq[String]].
      (it :\ Map.empty[JInt, SSeq])((e, m) =>
        m + (e._2 -> (m.getOrElse(e._2, Seq.empty[String]) :+ e._1))),
    // LOCAL REDUCER (how to finally reduce on local node):
    (it: Iterable[Map[JInt, SSeq]]) => {
      // Print 'cnt' of most popular words collected from all remote nodes.
      (new TreeMap()(implicitly[Ordering[JInt]].reverse) ++ it.flatten)
        .take(cnt).foreach(println _)
 
      println("------------") // Formatting.
    }
  )
}

Пример запуска

И , наконец , давайте реализуем наш основной (…) метод наших Заселите (…) и queryPopularWords (…) методы , которые мы только что определили.

def main(args: Array[String]) {
  // Initialize book directory
  val bookDir = new File(BOOK_PATH);
 
  // Start GridGain with specified configuration file.
  scalar("examples/config/spring-cache-popularwords.xml") {
    // Create as many threads as we have book, so we can use
    // thread per book to load data grid concurrently.
    val threadPool = Executors.newFixedThreadPool(bookDir.list.length);
 
    val popWordsQryTimer = new Timer("words-query-worker");
 
    try {
      // Schedule word queries to run every 3 seconds.
      popWordsQryTimer.schedule(new TimerTask {
        def run() {
          queryPopularWords(10) // Query top 10 words from data grid.
        }
      }, 3000, 3000)
 
      // Populate cache with word counts.
      populate(new ExecutorCompletionService(threadPool), bookDir)
 
      // Force one more run to print final counts.
      queryPopularWords(POPULAR_WORDS_CNT)
    }
    finally {
      popWordsQryTimer.cancel() // Cancel timer.
 
      threadPool.shutdownNow() // Graceful shutdown.
    }
  }
}

Чтобы выполнить пример, запустите несколько автономных узлов  GridGain с помощью файла конфигурации examples / config / spring-cache-Popularwords.xml, а затем запустите пример, который мы только что создали из IDE. Вы можете добавить больше распечаток, чтобы лучше видеть происходящее.

Этот пример также поставляется с GridGain 4.0 и также доступен в GridGain GitHub Repository .