Статьи

Практическое введение в потоковую обработку MapReduce

В этой статье я представлю концепцию потоковой обработки MapReduce с использованием GridGain и Scala. Выбор Scala объясняется просто тем, что он обеспечивает очень краткую запись, а GridGain предоставляет очень эффективный DSL для Scala. Будьте уверены, вы также можете следить за этим постом на Java или Groovy.

Концепция потоковой обработки (и потокового MapReduce в частности) может быть в основном определена как непрерывная распределенная обработка непрерывно входящих потоков данных . Очевидное различие между другими формами распределенной обработки заключается в том, что входные данные не могут быть полностью измерены (или известны) до начала обработки, и входные данные представляются «бесконечными» с точки зрения приложения обработки. Типичными примерами потоковой обработки могут быть обработка входящих веб-журналов на уровне событий, твиттер, информация об уровне торговли в финансовых системах, обновления в Facebook, обновления RFID-чипов и т. Д.

Другое интересное наблюдение заключается в том, что потоковая обработка почти всегда выполняется в реальном времени . Важным моментом здесь является то, что потоковая природа входных данных требует характеристики обработки в реальном времени. Если ваша обработка отстает от объема входящих живых данных — вам неизбежно не хватит места для буферизации входящих данных, и система выйдет из строя.

Я приведу два примера кода, чтобы выделить потоковую обработку MapReduce с помощью GridGain:

  • Во-первых, это очень простое каноническое приложение MapReduce, которое я буду использовать для иллюстрации основ GridGain.
  • Второе немного сложнее и покажет, как вы можете написать потоковое приложение MapReduce от начала до конца (от приема до запроса).

Пример 1

Давайте начнем с GridGain. GridGain — это промежуточное программное обеспечение на основе Java для обработки больших данных в памяти в распределенной среде. Он основан на высокопроизводительной платформе данных в памяти, которая объединяет самую быструю в мире реализацию MapReduce с технологией In-Memory Data Grid, обеспечивая простое в использовании и легко масштабируемое программное обеспечение.

Для первого примера мы разработаем приложение, которое будет принимать строку в качестве аргумента и вычислять количество непробельных символов в ней. Это будет достигнуто путем разделения строки аргумента на отдельные слова и вычисления количества символов в каждом слове на удаленных узлах, которые в настоящее время доступны в сетке. В конце — он объединит длины всех слов в конечный результат.

Это стандартный пример «HelloWorld» в слове распределенного программирования.

Прежде всего, нам нужно создать кластер для работы. Если вы загружаете GridGain и распаковываете его — все, что вам нужно сделать, это запустить скрипт запуска узла, передав ему путь к файлу конфигурации XML ‘bin / ggstart.sh examples / config / spring-cache-Popularcounts.xml’, чтобы запустить узел :

Обратите внимание, что вы можете запустить столько локальных узлов, сколько вам нужно — просто запустите этот скрипт столько раз. Также обратите внимание, что вы можете запускать автономные узлы из Visor — GridGain DevOps Console (обсуждение этого вопроса находится за пределами этого блога).

Как только вы запустили все узлы (скажем, 2), вы заметите, что все узлы запустились и обнаружили друг друга автоматически без драмы. У вас абсолютно нулевая конфигурация, и все работает полностью из коробки.

Теперь, когда у нас работает кластер, давайте напишем код. Откройте ваш любимый IDEA или текстовый редактор и введите это:

import org.gridgain.scalar.scalar
import scalar._

object Main extends App {
  scalar("examples/config/spring-cache-popularcounts.xml") {
    println("Non-space chars: " + grid$.spreadReduce(
      for (w <- input.split(" ")) yield () => w.length)(_.sum))
  }
}

В зависимости от того, какую систему сборки вы используете (SBT, Ant, IDEA, Eclipse и т. Д.), Вам просто нужно включить библиотеки из GridGain (основные JAR + JAR в подпапке ‘/ libs’) — и скомпилировать.

Если все компилируется — просто запустите его, передав ему некоторую строку ввода Вот и все, что нужно сделать:

 

Позвольте мне быстро объяснить, что здесь происходит (это будет относиться и к следующему примеру):

  • First we use scalar “keyword” passing it a path to configuration XML file to startup a node from within our Scala app.
  • grid$ denotes a global projection on all node in the cluster (GridGain employes functional API in its core). Projection provides a monadic set of operations available on any arbitraty set of GridGain nodes.
  • We use method spreadReduce(…) on projection that takes two curried arguments:
    • set of closures to spread-execute on the cluster, and
    • reduction function that will be used to aggregate the remote results.
  • When spreadReduce(…) completes (and it’s a synch call among synch and async options) – it returns the non-space count of characters.

Now – let me ask you a question… Did you notice any deployment steps, any Ant, Maven, any copying of JAR or any redeploying after we’ve changed the code?

The answer is no. GridGain provides pretty unique zero deployment technology that allows for complete on-demand class deployment throughout the cluster – leaving you the developer to simply write the code and run your applications as you would do locally. Pretty nifty, isn’t it?

Example 2

Ok, now that we tried something very simple and trivial let’s develop a full featured streaming MapReduce app using what we’ve learned so far. We’ll adopt a canonical example from Hadoop: we’ll ingest number of books into in-memory data grid, and will find 10 most frequent words from those books.

The way we’ll be doing it is via streaming MapReduce:

while we are loading books into memory we will be continuously querying the data grid for 10 most frequent words. As data gets loaded the results will change, and when all books are fully loaded we’ll get our correct (and final) tally of 10 most frequent words.

Unlike Hadoop example:

  • We’ll show both programmatic ingestion and querying in one application (no need to pre-copy any stuff into anything like HDFS), and
  • We’ll develop this application in true streaming fashion, i.e. we won’t wait until all data is loaded and we’ll start querying concurrently before all data is loaded

Here’s the full source code:

import org.gridgain.scalar.scalar
import scalar._
import org.gridgain.grid.typedef.X
import java.io.File
import io.Source
import java.util.Timer
import actors.threadpool._

object ScalarPopularWordsRealTimeExample extends App {
  private final val WORDS_CNT = 10
  private final val BOOK_PATH = 
    "examples/java/org/gridgain/examples/realtime/books"

  type JINT = java.lang.Integer

  val dir = new File(X.getSystemOrEnv("GRIDGAIN_HOME"), BOOK_PATH)

  if (!dir.exists)
    println("Input directory does not exist: " + dir.getAbsolutePath)
  else
    scalar("examples/config/spring-cache-popularcounts.xml") {
      val pool = Executors.newFixedThreadPool(dir.list.length)
      val timer = new Timer("words-query-worker")

      try {
        timer.schedule(timerTask(() => query(WORDS_CNT)), 3000, 3000)

        // Populate cache & force one more run to get the final counts.
        ingest(pool, dir)
        query(WORDS_CNT)

        // Clean up after ourselves.
        grid$.projectionForCaches(null).bcastRun(
          () => grid$.cache().clearAll())
      }
      finally {
        timer.cancel()
        pool.shutdownNow()
      }
    }

  def ingest(pool: ExecutorService, dir: File) {
    val ldr = dataLoader$[String, Int](null, 2048, 8, 128)

    // For every book, allocate a new thread from the pool and start 
    // populating cache with words and their counts.
    try {
      (for (book <- dir.list()) yield
        pool.submit(() => Source.fromFile(new File(dir, book), "ISO-8859-1").
          getLines().foreach(
            line => for (w <- line.split("[^a-zA-Z0-9]") if !w.isEmpty)
              ldr.addData(w, (i: Int) => if (i == null) 1 else i + 1)
      ))).foreach(_.get)
    }
    finally
      ldr.close(false) // Wait for data loader to complete.
  }

  def query(cnt: Int) {
    cache$[String, JINT].get.sql(grid$.projectionForCaches(null),
      "length(_key) > 3 order by _val desc limit " + cnt).
        toIndexedSeq.sortBy[JINT](_._2).reverse.take(cnt).foreach(println _)

    println("------------------")
  }
}

Few notes about the code in general:

  • We use the books that are shipped with GridGain’s examples
  • We are passing specific XML configuration file for ‘scalar’ keyword (it configures TCP discovery and partitioned cache with one backup)
  • We use a simple timer to run a query every 3 seconds while we are loading the books
  • After everything is done – we are cleaning after ourselves (so that you can run this app multiple times without leaving garbage in the data grid)

Notes about ingest(…) and query(…) method:

  • We use GridGain’s data loader in ingest(…) method that provides advanced back-pressure management for asynchronous bulk load distributed operations
  • We use method sql(…) on cache projection (cache projections provide monadic set of data grid operations) to issue a simple distributed SQL query
  • In GridGain you can omit “select * from table” in most cases, and just supply a where clause

That’s all there is to it. Compile it (as always, no deployment or redeployment is necessary) and run it. You will see print out of 10 most frequent words every 3 second while books are being read on and put into the data grid:

Final Thoughts

In about 50 lines of code we’ve put together both ingestion and querying streaming MapReduce app. We’ve run it on the local cluster – and it will run just the same way on 3, 10, 100s or 1000s of nodes deployed anywhere in the world (as long as we have some way to connect to them).

Keep in mind that this is obviously a very simply (almost trivialized) example of streaming MapReduce. Yet with additional few lines of code you can replace book with, let’s say, Twitter firehose keyed by hashtags, and print outs with updates to your social dashboard – and you get a pretty useful system tracking most popular Twitter hashtags in real time in a few hundred lines of code – while automatically scaling to 100s terabytes of data being processed on 1000s of nodes.