время назад я пообещал опубликовать код из
презентации GridGain, которую я сделал в QCon London. Поскольку презентация была в Scala, код, который я буду публиковать здесь, находится в Scala, но я также опубликую версию Java в течение нескольких дней.
Сначала краткое вступление. Все мы знаем пример подсчета слов в Hadoop, который берет файл со словами, а затем создает другой файл с числом вхождений рядом с каждым словом. Hadoop очень хорошо справляется с этим примером, однако главное предостережение с примером Hadoop заключается в том, что это не реальное время.
Пример подсчета слов, который я сделал на QCon, фактически подсчитывал слова в реальном времени. Программа была разделена на две части. Первая часть отвечает за загрузку слов в реальном времени в сетку данных GridGain, а вторая часть запрашивала сетку каждые 3 секунды, чтобы непрерывно распечатывать 10 лучших слов, хранящихся до сих пор.
Пример был сделан с использованием ‘
Scalar ‘ — GridGain DSL для Scala, но это могло быть сделано и в Java с использованием API-интерфейсов GridGain Java.
Постоянно заполнять слова в реальном времени
Начнем с постоянной загрузки сетки данных новыми словами. Для этого я скачал несколько книг в текстовом формате и начал одновременно читать их из метода populate (…) , по одному потоку на книгу. Для каждого прочитанного слова я сохраняю его в кеше, в котором само слово является ключом, а число текущих вхождений — значением. Также обратите внимание на то, как мы позволяем грид-асинхронно обновлять кэш с использованием асинхронного запуска при чтении следующей строки из файла книги (на самом деле, скорее всего, у вас будет более одного асинхронного задания или если функция загрузки данных GridGain сделает это за вас).
def populate(threadPool: CompletionService, dir: File) {
val bookFileNames = dir.list()
// For every book, start a new thread and start populating cache
// with words and their counts.
for (bookFileName <- bookFileNames) {
threadPool.submit(new Callable {
def call() = {
val cache = grid$.cache[String, JInt]
var fut: GridFuture[_] = null;
Source.fromFile(new File(dir, name)).getLines().foreach(line => {
line.split("[^a-zA-Z0-9]").foreach(word => {
if (!word.isEmpty) {
if (fut != null)
fut.get()
fut = grid$.affinityRunAsync(null, word, () => {
// Increment word counter and store it in cache.
// We use cache transaction to make sure that
// gets and puts are consistent and atomic.
cache.inTx(
() => cache += (word -> (cache.getOrElse(word, 0) + 1))
)
()
})
}
})
})
None // Return nothing.
}
})
}
// Wait for all threads to finish.
books.foreach(_ => threadPool.take().get())
}
Распределенный SQL-запрос
Теперь давайте реализуем наш распределенный запрос к сетке данных GridGain, который будет выполняться каждые 3 секунды. Обратите внимание, что мы используем стандартный синтаксис SQL для запроса удаленных узлов сетки. Интересно, что сетка данных GridGain позволяет использовать SQL практически без каких-либо ограничений. Вы можете использовать любую встроенную функцию SQL и даже соединения SQL между различными классами. Здесь, например, мы используем функцию длины SQL (…), чтобы запрашивать только слова длиной более 3 букв, чтобы избавиться от частых коротких статей, таких как «a» или «the» в наших поисках. Мы также используем ключевое слово desc для сортировки количества слов в порядке убывания и ограничиваем ключевое слово, чтобы ограничить наш выбор только 10 словами.
def queryPopularWords(cnt: Int) {
// Type alias for sequences of strings (for readability only).
type SSeq = Seq[String]
grid$.cache[String, JInt].sqlReduce(
// PROJECTION (where to run):
grid$.projectionForCaches(null),
// SQL QUERY (what to run):
"length(_key) > 3 order by _val desc limit " + cnt,
// REMOTE REDUCER (how to reduce on remote nodes):
(it: Iterable[(String, JInt)]) =>
// Pre-reduce by converting
// Seq[(String, JInt)] to Map[JInt, Seq[String]].
(it :\ Map.empty[JInt, SSeq])((e, m) =>
m + (e._2 -> (m.getOrElse(e._2, Seq.empty[String]) :+ e._1))),
// LOCAL REDUCER (how to finally reduce on local node):
(it: Iterable[Map[JInt, SSeq]]) => {
// Print 'cnt' of most popular words collected from all remote nodes.
(new TreeMap()(implicitly[Ordering[JInt]].reverse) ++ it.flatten)
.take(cnt).foreach(println _)
println("------------") // Formatting.
}
)
}
Пример запуска
И , наконец , давайте реализуем наш основной (…) метод наших Заселите (…) и queryPopularWords (…) методы , которые мы только что определили.
def main(args: Array[String]) {
// Initialize book directory
val bookDir = new File(BOOK_PATH);
// Start GridGain with specified configuration file.
scalar("examples/config/spring-cache-popularwords.xml") {
// Create as many threads as we have book, so we can use
// thread per book to load data grid concurrently.
val threadPool = Executors.newFixedThreadPool(bookDir.list.length);
val popWordsQryTimer = new Timer("words-query-worker");
try {
// Schedule word queries to run every 3 seconds.
popWordsQryTimer.schedule(new TimerTask {
def run() {
queryPopularWords(10) // Query top 10 words from data grid.
}
}, 3000, 3000)
// Populate cache with word counts.
populate(new ExecutorCompletionService(threadPool), bookDir)
// Force one more run to print final counts.
queryPopularWords(POPULAR_WORDS_CNT)
}
finally {
popWordsQryTimer.cancel() // Cancel timer.
threadPool.shutdownNow() // Graceful shutdown.
}
}
}
Чтобы выполнить пример, запустите несколько автономных узлов GridGain с помощью файла конфигурации examples / config / spring-cache-Popularwords.xml, а затем запустите пример, который мы только что создали из IDE. Вы можете добавить больше распечаток, чтобы лучше видеть происходящее.
Этот пример также поставляется с GridGain 4.0 и также доступен в GridGain GitHub Repository .