Все мы знаем, что составление отчетов о данных, хранящихся в репозиториях NoSQL, может быть громоздким. Либо вы встроили запросы в модель данных, либо нет. Если вам повезет, вы связали Cassandra с индексатором, таким как SOLR или Elastic Search, но иногда индекса недостаточно для выполнения сложной аналитики ваших данных. В качестве альтернативы, возможно, вы просто хотите сделать простое преобразование данных. Это часто легче сказать, чем сделать.
Нам всем нужен общий способ запуска функций над данными, хранящимися в Cassandra. Конечно, вы могли бы взять Hadoop и использовать аналитику / преобразования в виде конструкций MapReduce. Но это просто расстраивает людей. Вместо этого я бы порекомендовал Spark. Это делает людей счастливыми.
Однако когда я решил запустить Spark против Кассандры, я нашел относительно мало информации. Это моя попытка исправить это. Если вы нетерпеливы, вы можете просто скопировать репо, который я сделал:
https://github.com/boneill42/spark-on-cassandra-quickstart
Получить Каллиопу
Первая остановка, Каллиопа.
http://tuplejump.github.io/calliope/
Тогда иди сюда, чтобы вы знали, как это произносится. знак равно
Опять же, по причинам, которые я упоминал ранее , я хотел получить доступ к Cassandra через CQL. К сожалению, на момент написания этой статьи CQL-версия Calliope была вообще недоступна. Вы должны представить для раннего доступа . К счастью, Рохит и команда очень отзывчивы. И когда у вас есть доступ, вы можете создать новый проект, который использует его. Отбросьте зависимость в вашем пом.
<dependency>
<groupId>com.tuplejump</groupId>
<artifactId>calliope_2.9.3</artifactId>
<version>0.8.1-EA</version>
</dependency>
Получить Scala’d Up
Теперь, если вы хотите полностью погрузиться в опыт Spark, вы захотите развиваться в Scala. Для меня это означало переход на IntelliJ, потому что у меня были некоторые проблемы с использованием Eclipse с конкретными (более старыми) версиями Scala . Calliope 0.8.1 раннего доступа был скомпилирован с Scala 2.9.3. Так что вам понадобится IDE, которая поддерживает эту версию. Чтобы получить поддержку Scala от maven, добавьте в свой pom следующее:
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.9.3</artifactId> <version>2.0.M5b</version> </dependency>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.9.3</scalaVersion>
<launchers>
<launcher>
<id>run-scalatest</id>
<mainClass>org.scalatest.tools.Runner</mainClass>
<args>
<arg>-p</arg>
<arg>${project.build.testOutputDirectory}</arg>
</args>
<jvmArgs>
<jvmArg>-Xmx512m</jvmArg>
</jvmArgs>
</launcher>
</launchers>
<jvmArgs>
<jvmArg>-Xms64m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
</configuration>
</plugin>
Получить искру
Теперь простая часть. Добавьте Spark. знак равно
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.9.3</artifactId> <version>0.8.1-incubating</version> </dependency>
Слинг Код
Теперь, когда у нас наш проект встал. Давайте поспешим по семейству колонок и немного обработаем!
Весь код для выполнения задания Spark содержится в FindChildrenTest . Есть два компонента: трансформатор и работа. Трансформатор очень похож на концепцию Mapper, которую мы используем в storm-cassandra-cql . Преобразователь переводит строки CQL в объекты, которые можно использовать в работе. Вот код для трансформатора:
private object Cql3CRDDTransformers {
import RichByteBuffer._
implicit def row2String(key: ThriftRowKey, row: ThriftRowMap): List[String] = {
row.keys.toList
}
implicit def cql3Row2Child(keys: CQLRowKeyMap, values: CQLRowMap): Child = {
Child(keys.get("child_id").get, values.get("first_name").get, values.get("last_name").get, values.get("country").get, values.get("state").get, values.get("zip").get)
}
}
Единственная действительно важная часть — это функция, которая переводит строку (ключи и значения) в дочерний объект.
При наличии трансформатора создать работу довольно просто:
class FindChildrenTest extends FunSpec {
import Cql3CRDDTransformers._
val sc = new SparkContext("local[1]", "castest")
describe("Find All Children in IRL") {
it("should be able find children in IRL") {
val cas = CasBuilder.cql3.withColumnFamily("northpole", "children")
val cqlRdd = sc.cql3Cassandra[Child](cas)
val children = cqlRdd.collect().toList
children.filter((child) => child.country.equals("IRL")).map((child) => println(child))
sc.stop()
}
}
}
Первая строка соединяется с клавиатурой, таблицей. Для этого примера я повторно использовал схему из моего вебинара несколько лет назад. Вы можете найти CQL здесь . Вторая строка создает Resilient Distributed Dataset (RDD), содержащий дочерние объекты. СДР является основной абстракцией набора данных в Spark. Получив СДР, вы можете работать с этим СДР, как если бы это была любая другая карта. (довольно мощный материал)
В приведенном выше коде мы фильтруем СДР для детей в Ирландии. Затем мы мчимся за результатом и распечатываем детей. Если все идет хорошо, вы должны получить следующий вывод:
Child(collin.oneill,Collin,O'Neill,IRL,D,EI33) Child(owen.oneill,Owen,O'Neill,IRL,D,EI33)
ОК — Этого должно быть достаточно, чтобы сделать тебя опасным. Я должен отдать * ОГРОМНУЮ кучу благодарностей Рохиту Раю и его команде из TupleJump за разработку проекта Calliope. Они делают отличные вещи в TupleJump. Я также слежу за Stargate и Cash . На самом деле, в следующий раз я сделаю еще один шаг и покажу Акуле против Кассандры с помощью Кэша.