время назад я пообещал опубликовать код из
презентации GridGain, которую я сделал в QCon London. Поскольку презентация была в Scala, код, который я буду публиковать здесь, находится в Scala, но я также опубликую версию Java в течение нескольких дней.
Сначала краткое вступление. Все мы знаем пример подсчета слов в Hadoop, который берет файл со словами, а затем создает другой файл с числом вхождений рядом с каждым словом. Hadoop очень хорошо справляется с этим примером, однако главное предостережение с примером Hadoop заключается в том, что это не реальное время.
Пример подсчета слов, который я сделал на QCon, фактически подсчитывал слова в реальном времени. Программа была разделена на две части. Первая часть отвечает за загрузку слов в реальном времени в сетку данных GridGain, а вторая часть запрашивала сетку каждые 3 секунды, чтобы непрерывно распечатывать 10 лучших слов, хранящихся до сих пор.
Пример был сделан с использованием ‘
Scalar ‘ — GridGain DSL для Scala, но это могло быть сделано и в Java с использованием API-интерфейсов GridGain Java.
Постоянно заполнять слова в реальном времени
Начнем с постоянной загрузки сетки данных новыми словами. Для этого я скачал несколько книг в текстовом формате и начал одновременно читать их из метода populate (…) , по одному потоку на книгу. Для каждого прочитанного слова я сохраняю его в кеше, в котором само слово является ключом, а число текущих вхождений — значением. Также обратите внимание на то, как мы позволяем грид-асинхронно обновлять кэш с использованием асинхронного запуска при чтении следующей строки из файла книги (на самом деле, скорее всего, у вас будет более одного асинхронного задания или если функция загрузки данных GridGain сделает это за вас).
def populate(threadPool: CompletionService, dir: File) { val bookFileNames = dir.list() // For every book, start a new thread and start populating cache // with words and their counts. for (bookFileName <- bookFileNames) { threadPool.submit(new Callable { def call() = { val cache = grid$.cache[String, JInt] var fut: GridFuture[_] = null; Source.fromFile(new File(dir, name)).getLines().foreach(line => { line.split("[^a-zA-Z0-9]").foreach(word => { if (!word.isEmpty) { if (fut != null) fut.get() fut = grid$.affinityRunAsync(null, word, () => { // Increment word counter and store it in cache. // We use cache transaction to make sure that // gets and puts are consistent and atomic. cache.inTx( () => cache += (word -> (cache.getOrElse(word, 0) + 1)) ) () }) } }) }) None // Return nothing. } }) } // Wait for all threads to finish. books.foreach(_ => threadPool.take().get()) }
Распределенный SQL-запрос
Теперь давайте реализуем наш распределенный запрос к сетке данных GridGain, который будет выполняться каждые 3 секунды. Обратите внимание, что мы используем стандартный синтаксис SQL для запроса удаленных узлов сетки. Интересно, что сетка данных GridGain позволяет использовать SQL практически без каких-либо ограничений. Вы можете использовать любую встроенную функцию SQL и даже соединения SQL между различными классами. Здесь, например, мы используем функцию длины SQL (…), чтобы запрашивать только слова длиной более 3 букв, чтобы избавиться от частых коротких статей, таких как «a» или «the» в наших поисках. Мы также используем ключевое слово desc для сортировки количества слов в порядке убывания и ограничиваем ключевое слово, чтобы ограничить наш выбор только 10 словами.
def queryPopularWords(cnt: Int) { // Type alias for sequences of strings (for readability only). type SSeq = Seq[String] grid$.cache[String, JInt].sqlReduce( // PROJECTION (where to run): grid$.projectionForCaches(null), // SQL QUERY (what to run): "length(_key) > 3 order by _val desc limit " + cnt, // REMOTE REDUCER (how to reduce on remote nodes): (it: Iterable[(String, JInt)]) => // Pre-reduce by converting // Seq[(String, JInt)] to Map[JInt, Seq[String]]. (it :\ Map.empty[JInt, SSeq])((e, m) => m + (e._2 -> (m.getOrElse(e._2, Seq.empty[String]) :+ e._1))), // LOCAL REDUCER (how to finally reduce on local node): (it: Iterable[Map[JInt, SSeq]]) => { // Print 'cnt' of most popular words collected from all remote nodes. (new TreeMap()(implicitly[Ordering[JInt]].reverse) ++ it.flatten) .take(cnt).foreach(println _) println("------------") // Formatting. } ) }
Пример запуска
И , наконец , давайте реализуем наш основной (…) метод наших Заселите (…) и queryPopularWords (…) методы , которые мы только что определили.
def main(args: Array[String]) { // Initialize book directory val bookDir = new File(BOOK_PATH); // Start GridGain with specified configuration file. scalar("examples/config/spring-cache-popularwords.xml") { // Create as many threads as we have book, so we can use // thread per book to load data grid concurrently. val threadPool = Executors.newFixedThreadPool(bookDir.list.length); val popWordsQryTimer = new Timer("words-query-worker"); try { // Schedule word queries to run every 3 seconds. popWordsQryTimer.schedule(new TimerTask { def run() { queryPopularWords(10) // Query top 10 words from data grid. } }, 3000, 3000) // Populate cache with word counts. populate(new ExecutorCompletionService(threadPool), bookDir) // Force one more run to print final counts. queryPopularWords(POPULAR_WORDS_CNT) } finally { popWordsQryTimer.cancel() // Cancel timer. threadPool.shutdownNow() // Graceful shutdown. } } }
Чтобы выполнить пример, запустите несколько автономных узлов GridGain с помощью файла конфигурации examples / config / spring-cache-Popularwords.xml, а затем запустите пример, который мы только что создали из IDE. Вы можете добавить больше распечаток, чтобы лучше видеть происходящее.
Этот пример также поставляется с GridGain 4.0 и также доступен в GridGain GitHub Repository .