Статьи

Потоковое MapReduce: примеры кода

В этой статье я представлю концепцию потоковой обработки MapReduce с использованием GridGain и Scala. Выбор Scala объясняется просто тем, что он обеспечивает очень краткую запись, а GridGain предоставляет очень эффективный DSL для Scala. Будьте уверены, вы также можете следить за этим постом на Java или Groovy.

Концепция потоковой обработки (и потокового MapReduce в частности) может быть в основном определена как непрерывная распределенная обработка непрерывно входящих потоков данных . Очевидное различие между другими формами распределенной обработки заключается в том, что входные данные не могут быть полностью измерены (или известны) до начала обработки, и входные данные представляются «бесконечными» с точки зрения приложения обработки. Типичными примерами потоковой обработки могут быть обработка входящих веб-журналов на уровне событий, твиттер, информация об уровне торговли в финансовых системах, обновления в Facebook, обновления RFID-чипов и т. Д.

Другое интересное наблюдение заключается в том, что потоковая обработка почти всегда выполняется в реальном времени . Важным моментом здесь является то, что потоковая природа входных данных требует характеристики обработки в реальном времени. Если ваша обработка отстает от объема входящих живых данных — вам неизбежно не хватит места для буферизации входящих данных, и система выйдет из строя.

Я приведу два примера кода, чтобы выделить потоковую обработку MapReduce с помощью GridGain:

  • Во-первых, это очень простое каноническое приложение MapReduce, которое я буду использовать для иллюстрации основ GridGain.
  • Второе немного сложнее и покажет, как вы можете написать потоковое приложение MapReduce от начала до конца (от приема до запроса).

Примеры 1

Давайте начнем с GridGain. GridGain — это промежуточное программное обеспечение на основе Java для обработки больших данных в памяти в распределенной среде. Он основан на высокопроизводительной платформе данных в памяти, которая объединяет самую быструю в мире реализацию MapReduce с технологией In-Memory Data Grid, обеспечивая простое в использовании и легко масштабируемое программное обеспечение.

Для первого примера мы разработаем приложение, которое будет принимать строку в качестве аргумента и вычислять количество непробельных символов в ней. Это будет достигнуто путем разделения строки аргумента на отдельные слова и вычисления количества символов в каждом слове на удаленных узлах, которые в настоящее время доступны в сетке. В конце — он объединит длины всех слов в конечный результат.

Это стандартный пример «HelloWorld» в слове распределенного программирования.

Прежде всего, нам нужно создать кластер для работы. Если вы загружаете GridGain и распаковываете его — все, что вам нужно сделать, это запустить скрипт запуска узла, передав ему путь к файлу конфигурации XML, 'bin/ggstart.sh examples/config/spring-cache-popularcounts.xml'чтобы запустить узел:

Обратите внимание, что вы можете запустить столько локальных узлов, сколько вам нужно — просто запустите этот скрипт столько раз. Также обратите внимание, что вы можете запускать автономные узлы из Visor — GridGain DevOps Console (обсуждение этого вопроса находится за пределами этого блога).

Как только вы запустили все узлы (скажем, 2), вы заметите, что все узлы запустились и обнаружили друг друга автоматически без драмы. У вас абсолютно нулевая конфигурация, и все работает полностью из коробки.

Теперь, когда у нас работает кластер, давайте напишем код. Откройте ваш любимый IDEA или текстовый редактор и введите это:

import org.gridgain.scalar.scalar
import scalar._

object Main extends App {
  scalar("examples/config/spring-cache-popularcounts.xml") {
    println("Non-space chars: " + grid$.spreadReduce(
      for (w <- input.split(" ")) yield () => w.length)(_.sum))
  }
}

В зависимости от используемой вами системы сборки (SBT, Ant, IDEA, Eclipse и т. Д.) Вам просто нужно включить библиотеки из GridGain (основные JAR + JAR в '/libs'подпапке) — и скомпилировать.

Если все компилируется — просто запустите его, передав ему некоторую строку ввода. Вот и все, что нужно сделать:

Позвольте мне быстро объяснить, что здесь происходит (это будет относиться и к следующему примеру):

  • Сначала мы используем scalarключевое слово, передавая ему путь к XML-файлу конфигурации, чтобы запустить узел из нашего приложения Scala.
  • grid$обозначает глобальную проекцию на все узлы в кластере (GridGain использует функциональный API в своей основе). Проекция предоставляет монадический набор операций, доступных на любом произвольном наборе узлов GridGain.
  • Мы используем метод spreadReduce(...)проекции, который принимает два карри аргумента:

    • набор замыканий для распространения-выполнения в кластере и
    • функция сокращения, которая будет использоваться для агрегирования удаленных результатов.
  • Когда spreadReduce(...)завершается (и это синхронный вызов среди параметров синхронизации и асинхронности) — он возвращает количество символов без пробелов.

Теперь — позвольте мне задать вам вопрос … Вы заметили какие-либо шаги по развертыванию, любой Ant, Maven, любое копирование JAR или любое повторное развертывание после того, как мы изменили код?

Ответ — нет. GridGain предоставляет довольно уникальную технологию нулевого развертывания, которая позволяет полностью развертывать классы по требованию во всем кластере, предоставляя разработчику возможность просто писать код и запускать приложения, как если бы вы работали локально. Довольно изящно, не правда ли?

Примеры 2

Хорошо, теперь, когда мы попробовали что-то очень простое и тривиальное, давайте разработаем полнофункциональное потоковое приложение MapReduce, используя то, что мы уже изучили. Мы примем канонический пример из Hadoop : мы включим количество книг в сетку данных в памяти и найдем 10 наиболее часто встречающихся слов из этих книг.

Мы сделаем это с помощью потоковой передачи MapReduce:

в то время как мы загружаем книги в память, мы будем постоянно запрашивать сетку данных для 10 самых частых слов. По мере загрузки данных результаты будут меняться, и когда все книги будут полностью загружены, мы получим правильное (и последнее) число из 10 наиболее часто встречающихся слов.

В отличие от примера Hadoop :

  • Мы покажем как программный прием, так и запросы в одном приложении (не нужно предварительно копировать какие-либо вещи во что-то вроде HDFS), и
  • Мы разработаем это приложение в режиме реального потока, то есть мы не будем ждать, пока все данные будут загружены, и мы начнем выполнять запросы одновременно, прежде чем все данные будут загружены

Вот полный исходный код:

import org.gridgain.scalar.scalar
import scalar._
import org.gridgain.grid.typedef.X
import java.io.File
import io.Source
import java.util.Timer
import actors.threadpool._

object ScalarPopularWordsRealTimeExample extends App {
  private final val WORDS_CNT = 10
  private final val BOOK_PATH = 
    "examples/java/org/gridgain/examples/realtime/books"

  type JINT = java.lang.Integer

  val dir = new File(X.getSystemOrEnv("GRIDGAIN_HOME"), BOOK_PATH)

  if (!dir.exists)
    println("Input directory does not exist: " + dir.getAbsolutePath)
  else
    scalar("examples/config/spring-cache-popularcounts.xml") {
      val pool = Executors.newFixedThreadPool(dir.list.length)
      val timer = new Timer("words-query-worker")

      try {
        timer.schedule(timerTask(() => query(WORDS_CNT)), 3000, 3000)

        // Populate cache & force one more run to get the final counts.
        ingest(pool, dir)
        query(WORDS_CNT)

        // Clean up after ourselves.
        grid$.projectionForCaches(null).bcastRun(
          () => grid$.cache().clearAll())
      }
      finally {
        timer.cancel()
        pool.shutdownNow()
      }
    }

  def ingest(pool: ExecutorService, dir: File) {
    val ldr = dataLoader$[String, Int](null, 2048, 8, 128)

    // For every book, allocate a new thread from the pool and start 
    // populating cache with words and their counts.
    try {
      (for (book <- dir.list()) yield
        pool.submit(() => Source.fromFile(new File(dir, book), "ISO-8859-1").
          getLines().foreach(
            line => for (w <- line.split("[^a-zA-Z0-9]") if !w.isEmpty)
              ldr.addData(w, (i: Int) => if (i == null) 1 else i + 1)
      ))).foreach(_.get)
    }
    finally
      ldr.close(false) // Wait for data loader to complete.
  }

  def query(cnt: Int) {
    cache$[String, JINT].get.sql(grid$.projectionForCaches(null),
      "length(_key) > 3 order by _val desc limit " + cnt).
        toIndexedSeq.sortBy[JINT](_._2).reverse.take(cnt).foreach(println _)

    println("------------------")
  }
}

Несколько замечаний о коде в целом:

  • Мы используем книги, которые поставляются с примерами GridGain
  • Мы передаем специальный файл конфигурации XML для 'scalar'ключевого слова (он настраивает обнаружение TCP и многораздельный кеш с одной резервной копией)
  • Мы используем простой таймер для запуска запроса каждые 3 секунды, пока мы загружаем книги
  • После того, как все сделано — мы убираем за собой (чтобы вы могли запустить это приложение несколько раз, не оставляя мусора в сетке данных)

Примечания ingest(...)и query(...)метод:

  • Мы используем загрузчик данных GridGain в ingest(...)методе, который обеспечивает расширенное управление противодавлением для асинхронных распределенных операций с массовой загрузкой
  • Мы используем метод sql(...)проекции кеша (проекции кеша обеспечивают монадический набор операций с сеткой данных) для выдачи простого распределенного SQL-запроса.
  • В GridGain вы можете опустить «select * from table» в большинстве случаев и просто указать предложение where

Это все, что нужно сделать . Скомпилируйте его (как всегда, развертывание или повторное развертывание не требуется ) и запустите его. Вы будете печатать 10 наиболее часто употребляемых слов каждые 3 секунды, пока книги читаются и помещаются в таблицу данных:

Последние мысли

In about 50 lines of code we’ve put together both ingestion and querying streaming MapReduce app. We’ve run it on the local cluster – and it will run just the same way on 3, 10, 100s or 1000s of nodes deployed anywhere in the world (as long as we have some way to connect to them).

Keep in mind that this is obviously a very simply (almost trivialized) example of streaming MapReduce. Yet with additional few lines of code you can replace book with, let’s say, Twitter firehose keyed by hashtags, and print outs with updates to your social dashboard – and you get a pretty useful system tracking most popular Twitter hashtags in real time in a few hundred lines of code – while automatically scaling to 100s terabytes of data being processed on 1000s of nodes.