Все мы знаем, что составление отчетов о данных, хранящихся в репозиториях NoSQL, может быть громоздким. Либо вы встроили запросы в модель данных, либо нет. Если вам повезет, вы связали Cassandra с индексатором, таким как SOLR или Elastic Search, но иногда индекса недостаточно для выполнения сложной аналитики ваших данных. В качестве альтернативы, возможно, вы просто хотите сделать простое преобразование данных. Это часто легче сказать, чем сделать.
Нам всем нужен общий способ запуска функций над данными, хранящимися в Cassandra. Конечно, вы могли бы взять Hadoop и использовать аналитику / преобразования в виде конструкций MapReduce. Но это просто расстраивает людей. Вместо этого я бы порекомендовал Spark. Это делает людей счастливыми.
Однако когда я решил запустить Spark против Кассандры, я нашел относительно мало информации. Это моя попытка исправить это. Если вы нетерпеливы, вы можете просто скопировать репо, который я сделал:
https://github.com/boneill42/spark-on-cassandra-quickstart
Получить Каллиопу
Первая остановка, Каллиопа.
http://tuplejump.github.io/calliope/
Тогда иди сюда, чтобы вы знали, как это произносится. знак равно
Опять же, по причинам, которые я упоминал ранее , я хотел получить доступ к Cassandra через CQL. К сожалению, на момент написания этой статьи CQL-версия Calliope была вообще недоступна. Вы должны представить для раннего доступа . К счастью, Рохит и команда очень отзывчивы. И когда у вас есть доступ, вы можете создать новый проект, который использует его. Отбросьте зависимость в вашем пом.
<dependency> <groupId>com.tuplejump</groupId> <artifactId>calliope_2.9.3</artifactId> <version>0.8.1-EA</version> </dependency>
Получить Scala’d Up
Теперь, если вы хотите полностью погрузиться в опыт Spark, вы захотите развиваться в Scala. Для меня это означало переход на IntelliJ, потому что у меня были некоторые проблемы с использованием Eclipse с конкретными (более старыми) версиями Scala . Calliope 0.8.1 раннего доступа был скомпилирован с Scala 2.9.3. Так что вам понадобится IDE, которая поддерживает эту версию. Чтобы получить поддержку Scala от maven, добавьте в свой pom следующее:
<pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories>
<dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_2.9.3</artifactId> <version>2.0.M5b</version> </dependency>
<plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>2.9.3</scalaVersion> <launchers> <launcher> <id>run-scalatest</id> <mainClass>org.scalatest.tools.Runner</mainClass> <args> <arg>-p</arg> <arg>${project.build.testOutputDirectory}</arg> </args> <jvmArgs> <jvmArg>-Xmx512m</jvmArg> </jvmArgs> </launcher> </launchers> <jvmArgs> <jvmArg>-Xms64m</jvmArg> <jvmArg>-Xmx1024m</jvmArg> </jvmArgs> </configuration> </plugin>
Получить искру
Теперь простая часть. Добавьте Spark. знак равно
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.9.3</artifactId> <version>0.8.1-incubating</version> </dependency>
Слинг Код
Теперь, когда у нас наш проект встал. Давайте поспешим по семейству колонок и немного обработаем!
Весь код для выполнения задания Spark содержится в FindChildrenTest . Есть два компонента: трансформатор и работа. Трансформатор очень похож на концепцию Mapper, которую мы используем в storm-cassandra-cql . Преобразователь переводит строки CQL в объекты, которые можно использовать в работе. Вот код для трансформатора:
private object Cql3CRDDTransformers { import RichByteBuffer._ implicit def row2String(key: ThriftRowKey, row: ThriftRowMap): List[String] = { row.keys.toList } implicit def cql3Row2Child(keys: CQLRowKeyMap, values: CQLRowMap): Child = { Child(keys.get("child_id").get, values.get("first_name").get, values.get("last_name").get, values.get("country").get, values.get("state").get, values.get("zip").get) } }
Единственная действительно важная часть — это функция, которая переводит строку (ключи и значения) в дочерний объект.
При наличии трансформатора создать работу довольно просто:
class FindChildrenTest extends FunSpec { import Cql3CRDDTransformers._ val sc = new SparkContext("local[1]", "castest") describe("Find All Children in IRL") { it("should be able find children in IRL") { val cas = CasBuilder.cql3.withColumnFamily("northpole", "children") val cqlRdd = sc.cql3Cassandra[Child](cas) val children = cqlRdd.collect().toList children.filter((child) => child.country.equals("IRL")).map((child) => println(child)) sc.stop() } } }
Первая строка соединяется с клавиатурой, таблицей. Для этого примера я повторно использовал схему из моего вебинара несколько лет назад. Вы можете найти CQL здесь . Вторая строка создает Resilient Distributed Dataset (RDD), содержащий дочерние объекты. СДР является основной абстракцией набора данных в Spark. Получив СДР, вы можете работать с этим СДР, как если бы это была любая другая карта. (довольно мощный материал)
В приведенном выше коде мы фильтруем СДР для детей в Ирландии. Затем мы мчимся за результатом и распечатываем детей. Если все идет хорошо, вы должны получить следующий вывод:
Child(collin.oneill,Collin,O'Neill,IRL,D,EI33) Child(owen.oneill,Owen,O'Neill,IRL,D,EI33)
ОК — Этого должно быть достаточно, чтобы сделать тебя опасным. Я должен отдать * ОГРОМНУЮ кучу благодарностей Рохиту Раю и его команде из TupleJump за разработку проекта Calliope. Они делают отличные вещи в TupleJump. Я также слежу за Stargate и Cash . На самом деле, в следующий раз я сделаю еще один шаг и покажу Акуле против Кассандры с помощью Кэша.