Статьи

Искра на Кассандре (с Каллиопой)

Все мы знаем, что составление отчетов о данных, хранящихся в репозиториях NoSQL, может быть громоздким. Либо вы встроили запросы в модель данных, либо нет. Если вам повезет, вы связали Cassandra с индексатором, таким как SOLR или Elastic Search, но иногда индекса недостаточно для выполнения сложной аналитики ваших данных. В качестве альтернативы, возможно, вы просто хотите сделать простое преобразование данных. Это часто легче сказать, чем сделать.

Нам всем нужен общий способ запуска функций над данными, хранящимися в Cassandra. Конечно, вы могли бы взять Hadoop и использовать аналитику / преобразования в виде конструкций MapReduce. Но это просто расстраивает людей. Вместо этого я бы порекомендовал Spark. Это делает людей счастливыми.

Однако когда я решил запустить Spark против Кассандры, я нашел относительно мало информации. Это моя попытка исправить это. Если вы нетерпеливы, вы можете просто скопировать репо, который я сделал:
https://github.com/boneill42/spark-on-cassandra-quickstart

Получить Каллиопу

Первая остановка, Каллиопа.
http://tuplejump.github.io/calliope/

Тогда иди сюда, чтобы вы знали, как это произносится. знак равно

Опять же, по причинам, которые я упоминал ранее , я хотел получить доступ к Cassandra через CQL. К сожалению, на момент написания этой статьи CQL-версия Calliope была вообще недоступна. Вы должны представить для раннего доступа . К счастью, Рохит и команда очень отзывчивы. И когда у вас есть доступ, вы можете создать новый проект, который использует его. Отбросьте зависимость в вашем пом.

<dependency>
    <groupId>com.tuplejump</groupId>
    <artifactId>calliope_2.9.3</artifactId>
    <version>0.8.1-EA</version>
</dependency>

Получить Scala’d Up

Теперь, если вы хотите полностью погрузиться в опыт Spark, вы захотите развиваться в Scala. Для меня это означало переход на IntelliJ, потому что у меня были некоторые проблемы с использованием Eclipse с конкретными (более старыми) версиями Scala . Calliope 0.8.1 раннего доступа был скомпилирован с Scala 2.9.3. Так что вам понадобится IDE, которая поддерживает эту версию. Чтобы получить поддержку Scala от maven, добавьте в свой pom следующее:

<pluginRepositories>
   <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
   </pluginRepository>
</pluginRepositories>
<dependency>
   <groupId>org.scalatest</groupId>
   <artifactId>scalatest_2.9.3</artifactId>
   <version>2.0.M5b</version>
</dependency>
<plugin>
   <groupId>org.scala-tools</groupId>
   <artifactId>maven-scala-plugin</artifactId>
   <version>2.15.2</version>
   <executions>
      <execution>
         <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
         </goals>
      </execution>
   </executions>
   <configuration>
      <scalaVersion>2.9.3</scalaVersion>
      <launchers>
         <launcher>
            <id>run-scalatest</id>
            <mainClass>org.scalatest.tools.Runner</mainClass>
            <args>
               <arg>-p</arg>
               <arg>${project.build.testOutputDirectory}</arg>
            </args>
            <jvmArgs>
               <jvmArg>-Xmx512m</jvmArg>
            </jvmArgs>
         </launcher>
      </launchers>
      <jvmArgs>
         <jvmArg>-Xms64m</jvmArg>
         <jvmArg>-Xmx1024m</jvmArg>
      </jvmArgs>
   </configuration>
</plugin>

Получить искру

Теперь простая часть. Добавьте Spark. знак равно

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.9.3</artifactId>
   <version>0.8.1-incubating</version>
</dependency>

Слинг Код

Теперь, когда у нас наш проект встал. Давайте поспешим по семейству колонок и немного обработаем!

Весь код для выполнения задания Spark содержится в FindChildrenTest . Есть два компонента: трансформатор и работа. Трансформатор очень похож на концепцию Mapper, которую мы используем в storm-cassandra-cql . Преобразователь переводит строки CQL в объекты, которые можно использовать в работе. Вот код для трансформатора:

private object Cql3CRDDTransformers {
  import RichByteBuffer._
  implicit def row2String(key: ThriftRowKey, row: ThriftRowMap): List[String] = {
    row.keys.toList
  }
  implicit def cql3Row2Child(keys: CQLRowKeyMap, values: CQLRowMap): Child = {
    Child(keys.get("child_id").get, values.get("first_name").get, values.get("last_name").get, values.get("country").get, values.get("state").get, values.get("zip").get)
  }
}

Единственная действительно важная часть — это функция, которая переводит строку (ключи и значения) в дочерний объект.

При наличии трансформатора создать работу довольно просто:

class FindChildrenTest extends FunSpec {
  import Cql3CRDDTransformers._
  val sc = new SparkContext("local[1]", "castest")
  describe("Find All Children in IRL") {
    it("should be able find children in IRL") {
      val cas = CasBuilder.cql3.withColumnFamily("northpole", "children")
      val cqlRdd = sc.cql3Cassandra[Child](cas)
      val children = cqlRdd.collect().toList
      children.filter((child) => child.country.equals("IRL")).map((child) => println(child))
      sc.stop()
    }
  }
}

Первая строка соединяется с клавиатурой, таблицей. Для этого примера я повторно использовал схему из моего вебинара несколько лет назад. Вы можете найти CQL здесь . Вторая строка создает Resilient Distributed Dataset (RDD), содержащий дочерние объекты. СДР является основной абстракцией набора данных в Spark. Получив СДР, вы можете работать с этим СДР, как если бы это была любая другая карта. (довольно мощный материал)

В приведенном выше коде мы фильтруем СДР для детей в Ирландии. Затем мы мчимся за результатом и распечатываем детей. Если все идет хорошо, вы должны получить следующий вывод:

Child(collin.oneill,Collin,O'Neill,IRL,D,EI33)
Child(owen.oneill,Owen,O'Neill,IRL,D,EI33)

ОК — Этого должно быть достаточно, чтобы сделать тебя опасным. Я должен отдать * ОГРОМНУЮ кучу благодарностей Рохиту Раю и его команде из TupleJump за разработку проекта Calliope. Они делают отличные вещи в TupleJump. Я также слежу за Stargate и Cash . На самом деле, в следующий раз я сделаю еще один шаг и покажу Акуле против Кассандры с помощью Кэша.