Статьи

Стек Spark / Cassandra: выполнение операций RDD с использованием соединителя Datastax

В этой статье мы будем использовать новый соединитель Datastax Cassandra / Spark, чтобы иметь возможность загружать данные из таблицы Cassandra и выполнять операции RDD с этими данными с помощью Spark .

Из конкретных наборов проезжей части мы рассчитаем среднее расстояние проезжей части, сгруппированное по городу происхождения.

Вот шаги, чтобы выполнить это:

  •  Вставьте информацию о roadtrips (из roadtrips.zip в / src / main / resources ) во встроенный сервер Cassandra (используя Cassandra Unit и драйвер Datastax Java )
  • Затем подсчитайте количество поездок, сгруппированных по городам происхождения, используя соединитель Datastax Cassandra / Spark и Spark .
  • Наконец, вычислите среднее расстояние до дороги, все еще сгруппированное по городу происхождения.

Далее весь код написан на языке Java.

Предпосылки

Для запуска этого теста вам необходимо:

  • JDK 7 или выше
  • Гит
  • специалист

Хранение информации о поездках

Мы импортируем информацию о трехсторонних поездках из файла CSV и сохраняем ее в таблице Cassandra:

CREATE TABLE RoadTrip (
    id int PRIMARY KEY,
    origin_city_name varchar,
    origin_state_abr varchar,
    destination_city_name varchar,
    destination_state_abr varchar,
    elapsed_time int,
    distance int
);

Класс RoadTrip выглядит следующим образом:

public class RoadTrip {
    private Integer id = null;
    private String originCityName = null;
    private String originStateAbr = null;
    private String destinationCityName = null;
    private String destinationStateAbr = null;
    private Integer elapsedTime = null;
    private Integer distance = null;
 ...
}

Мы будем хранить компоненты Roadtrip с драйвером Datastax Java, используя оператор Batch:

Insert insertStatement = QueryBuilder.insertInto("RoadTrip");
insertStatement.value("id", QueryBuilder.bindMarker())
    .value("origin_city_name", QueryBuilder.bindMarker())
    .value("origin_state_abr", QueryBuilder.bindMarker())
    .value("destination_city_name", QueryBuilder.bindMarker())
    .value("destination_state_abr", QueryBuilder.bindMarker())
    .value("elapsed_time", QueryBuilder.bindMarker())
    .value("distance", QueryBuilder.bindMarker())
;
PreparedStatement ps = session.prepare(insertStatement.toString());
...
BatchStatement batch = new BatchStatement();
for (RoadTrip roadtrip : roadtrips) {
 batch.add(ps.bind(roadtrip.getId(),
    roadtrip.getOriginCityName(),
    roadtrip.getOriginStateAbr(),
    roadtrip.getDestinationCityName(),
    roadtrip.getDestinationStateAbr(),
    roadtrip.getElapsedTime(),
    roadtrip.getDistance()
 ));
}
session.execute(batch);
...

Spark Context

Вы можете увидеть обзор того, что Spark делает здесь . По сути, он предоставляет способ выполнения операций (группирование, объединение и т. Д.) Над разделенными наборами данных, и здесь мы увидим, как мы можем использовать Cassandra, как устойчивый источник распределенного набора данных (RDD).

Для этого нам нужно использовать соединитель Datastax Cassandra / Spark .

Сначала нам нужно инициализировать SparkContext и создать абстракцию Resilient Distributed Dataset, используя таблицу Cassandra в качестве источника этого набора данных:

SparkConf conf = new SparkConf(true)
    .setMaster("local")
    .setAppName("DatastaxTests")
    .set("spark.executor.memory", "1g")
    .set("spark.cassandra.connection.host", "localhost")
    .set("spark.cassandra.connection.native.port", "9142")
    .set("spark.cassandra.connection.rpc.port", "9171");
SparkContext ctx = new SparkContext(conf);
SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ctx);

JavaRDD<CassandraRow> rdd = functions.cassandraTable("roadtrips", "roadtrip").toJavaRDD();

Здесь кластер Spark будет встроен в виртуальную машину Java, поэтому мы устанавливаем параметр master как «локальный».

Нам необходимо предоставить основную информацию об имени приложения, объеме памяти, выделенной для контекста, а также о хосте и портах Cassandra.

Наконец, мы сообщаем Spark, какая таблица будет источником данных RDD.

Создание СДР и планирование операций

После этого мы можем создать абстракцию RDD и планировать операции (планировать, потому что операции ленивы ) на этом наборе данных.

Сначала нам нужно кэшировать этот RDD. Это скажет Spark, что не нужно пересчитывать этот набор данных и сохранять его в памяти (это настройка по умолчанию). Это также позволит нам повторно использовать этот СДР для нескольких операций, что будет иметь место позже в этой статье.

JavaRDD<CassandraRow> rdd = functions.cassandraTable("roadtrips", "roadtrip").toJavaRDD();
rdd.cache();

Теперь мы вычислим количество поездок и сгруппируем по origin_city:

JavaPairRDD<String, Integer> sizes = rdd.groupBy( new Function<CassandraRow, String>() {
        @Override
    public String call(CassandraRow row) throws Exception {
        return row.getString("origin_city_name");
    }
}).mapToPair(new PairFunction<Tuple2<String,Iterable<CassandraRow>>, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(Tuple2<String, Iterable<CassandraRow>> t) throws Exception {
        return new Tuple2<String,Integer>(t._1(), Lists.newArrayList(t._2()).size());
    }
});
sizes.cache();

Это все! По сути, используя операцию groupBy, мы создаем кортеж Tuple2 <String, Iterable <CassandraRow >>, где String является городом происхождения, и Iterable <CassandraRow>, список строк, имеющих этот самый город происхождения.

Затем во время связанной операции mapToPair мы получаем каждый предыдущий кортеж и создаем новую абстракцию, которая будет иметь вид Tuple2 <String, Integer>, где String — город происхождения, а Integer — число Roadtrips, начиная с этот город.

Здесь вы заметите, что мы также кэшируем абстракцию RDD размеров, так как мы будем повторно использовать этот промежуточный результат для вычисления среднего расстояния, по-прежнему по городу происхождения.

Вот вывод как этот этап:

Nb RoadTrips by origin
Albuquerque : 61
Raleigh/Durham : 62
Memphis : 24
Seattle : 31
Orlando : 154
Salt Lake City : 31
Newark : 61
Hartford : 31
Miami : 773
San Antonio : 176
New York : 978
Omaha : 57
Portland : 9
San Jose : 57
Austin : 194
Charlotte : 31
Kansas City : 93
Chicago : 1108
Fort Lauderdale : 31
Dayton : 31
San Francisco : 362
Tulsa : 62
Los Angeles : 957
Atlanta : 31
Indianapolis : 1
Fayetteville : 31
Wichita : 62
Columbus : 31
Washington : 358
St. Louis : 204
Kahului : 93
El Paso : 31
Oklahoma City : 31
Ontario : 36
Phoenix : 124
Santa Ana : 33
Baltimore : 27
Burbank : 8
Kona : 31
Las Vegas : 93
Norfolk : 50
Philadelphia : 8
Minneapolis : 30
Houston : 58
Lihue : 42
Palm Springs : 31
Honolulu : 164
San Juan : 62
Louisville : 1
Tampa : 124
Fort Myers : 31
Colorado Springs : 31
San Diego : 159
Boston : 212
Mission/McAllen/Edinburg : 30
West Palm Beach/Palm Beach : 62
Dallas/Fort Worth : 2275
Charlotte Amalie : 31

Теперь нам нужно вычислить среднее расстояние по маршруту поездки, все еще сгруппированное по городу происхождения:

JavaPairRDD<String, Integer> sums = rdd.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(CassandraRow row) throws Exception {
        return new Tuple2(row.getString("origin_city_name"), row.getInt("distance"));
    }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer d1, Integer d2) throws Exception {
        return Integer.valueOf(d1.intValue()+d2.intValue());
    }
});

Здесь для каждого CassandraRow мы создаем Tuple2 <String, Integer>, где String является городом происхождения, а Integer — расстояние соответствующей RoadTrip (т.е. CassandraRow).

Затем мы используем операцию reduByKey для вычисления суммы расстояний проезда для указанного ключа Tuple2, который здесь является городом происхождения. Эта операция пройдет все ранее найденные расстояния и для каждой пары расстояний (d1, d2) возвращает ее сумму. Эта вычисленная сумма будет использоваться для следующей итерации, пока не будет обработана вся коллекция расстояний.

Наконец, чтобы вычислить среднее расстояние для каждой RoadTrip, нам нужен способ, позволяющий объединить количество поездок и сумму расстояний, которые мы ранее вычислили. Это можно сделать с помощью операции соединения:

List<Tuple2<String,Double>> averageResults = sums.join(sizes)
       .mapValues(new Function<Tuple2<Integer,Integer>, Double>() {
        @Override
        public Double call(Tuple2<Integer, Integer> tuple) throws Exception {
            return Double.valueOf((double)tuple._1() / tuple._2());
        }
    }).collect();

sums.join (размеры) создает новую абстракцию RDD, которая будет иметь вид JavaRDD <String, Tuple2 <Integer, Integer >> где String является городом происхождения, а Tuple2 <Integer, Integer> содержит соответственно суммы расстояния и количество RoadTrips.

Обратите внимание, что мы повторно используем здесь ранее вычисленные размеры RDD абстракции.

Наконец, операция mapValues ​​преобразует эту абстракцию JavaRDD <String, Tuple2 <Integer, Integer >> в абстракцию JavaRDD <String, Double>, где Double представляет среднее расстояние.

Вызов метода collect () выдаст следующее:

Average distance by origin
Albuquerque : 569.0
Raleigh/Durham : 880.5
Memphis : 432.0
Seattle : 2428.8387096774195
Orlando : 1313.7662337662337
Salt Lake City : 989.0
Newark : 1904.1311475409836
Hartford : 1471.0
Miami : 1404.1875808538164
San Antonio : 247.0
New York : 1639.402862985685
Omaha : 583.0
Portland : 1616.0
San Jose : 1643.7894736842106
Austin : 520.7835051546392
Charlotte : 936.0
Kansas City : 441.0
Chicago : 906.5361010830325
Fort Lauderdale : 1182.0
Dayton : 861.0
San Francisco : 2099.5552486187844
Tulsa : 448.61290322580646
Los Angeles : 2424.0010449320794
Atlanta : 731.0
Indianapolis : 761.0
Fayetteville : 280.0
Wichita : 328.0
Columbus : 926.0
Washington : 1322.2067039106146
St. Louis : 752.1764705882352
Kahului : 2881.043010752688
El Paso : 551.0
Oklahoma City : 175.0
Ontario : 1188.0
Phoenix : 1154.0
Santa Ana : 1315.5151515151515
Baltimore : 1217.0
Burbank : 1231.0
Kona : 2504.0
Las Vegas : 1605.6666666666667
Norfolk : 1212.0
Philadelphia : 1303.0
Minneapolis : 852.0
Houston : 619.5172413793103
Lihue : 2615.0
Palm Springs : 1126.0
Honolulu : 3112.8231707317073
San Juan : 1045.0
Louisville : 733.0
Tampa : 789.25
Fort Myers : 1120.0
Colorado Springs : 592.0
San Diego : 1558.4528301886792
Boston : 1871.1462264150944
Mission/McAllen/Edinburg : 469.0
West Palm Beach/Palm Beach : 1123.0
Dallas/Fort Worth : 1040.072087912088
Charlotte Amalie : 1623.0

Вывод

Теперь вы узнаете, как выполнять распределенные операции, используя Spark и Cassandra и Datastax Connector.

Надеюсь, вам понравился этот пост, и не забудьте поделиться!

Получить код

Код и подробности о том, как запустить этот тест, доступны на GitHub .

Об авторе

Julien Sebrien, страстный разработчик, следуйте за мной здесь .

связи