Статьи

Spark: используйте Cassandra как устойчивый распределенный источник данных

В этой статье мы увидим, как мы можем использовать Cassandra в качестве источника эластичного распределенного набора данных (RDD) для Spark для выполнения операций RDD.

Мы будем использовать библиотеку Stratio для выполнения операции Group By, которую Cassandra изначально не поддерживает.

В частности, мы будем использовать результаты матчей Кубка мира, «группировать» эти результаты «по командам-победителям» и отображать для каждой команды, сколько раз она выиграла во время чемпионата мира (до сих пор).

Вот шаги, чтобы выполнить это:
— Получить статистику совпадений кубка мира из API worldcup.sfg.io json, используя Retrofit . Вот данные JSON, которые мы будем использовать: http://worldcup.sfg.io/matches .
— Сохранять полученную статистику на встроенном сервере Cassandra (используя Cassandra Unit и драйвер Datastax Java ).
— Выполните операцию страны «Победитель группы» во всех матчах, используя Stratio , позволяя использовать базу данных Cassandra в качестве источника упругого распределенного набора данных (RDD) для Spark.

Далее весь код написан на языке Java.

Предпосылки

Для запуска этого теста вам необходимо:
— JDK 7 или выше
— Git
— Maven

Получение статистики Кубка мира

Модификация позволяет выполнять HTTP-запросы и автоматически отображать результаты JSON. Нам просто нужно создать интерфейс, отображающий последующий HTTP-запрос:

public interface WorldCupService {
    @GET("/matches")
    List<Match> getMatchs();
}

Класс Match содержит поля, отображающие точно возвращаемые поля JSON:

public class Match {
    private int match_number=-1;
    private String location=null;
    private String status=null;
    private String datetime=null;
    private String winner=null;
    private Team home_team = null;
    private Team away_team = null;
 ...
}

с классом Команды, являющимся:

public class Team {
    private String country=null;
    private String code=null;
    private int goals=-1;
...
}

Finally, we create an adapter based on the previous interface:

RestAdapter restAdapter = new RestAdapter.Builder().setEndpoint(
                "http://worldcup.sfg.io").build();
WorldCupService service = restAdapter.create(WorldCupService.class);
return service.getMatchs();

Retrofit will automatically populate Match and Team fields from JSON fields (http://worldcup.sfg.io/matches.) using reflection.

Persisting Data to Cassandra

The keyspace and table are created using CQL:

CREATE KEYSPACE WorldCup WITH replication={'class' : 'SimpleStrategy', 'replication_factor':1};
 
use WorldCup;
 
CREATE TABLE Match (
    number int PRIMARY KEY,
    status varchar,
    location varchar,
    datetime  varchar,
    winner varchar,
    home_team_code varchar,
    home_team_country varchar,
    home_team_goals int,
    away_team_code varchar,
    away_team_country varchar,
    away_team_goals int
 
);

Then we will use the Datastax Java driver to store our Matchs to Cassandra, using a Batch Statement:

Insert insertStatement = QueryBuilder.insertInto("Match");
insertStatement.value("number", QueryBuilder.bindMarker())
 .value("status", QueryBuilder.bindMarker())
 .value("location", QueryBuilder.bindMarker())
 .value("datetime", QueryBuilder.bindMarker())
 .value("winner", QueryBuilder.bindMarker())
 .value("home_team_code", QueryBuilder.bindMarker())
 .value("home_team_country", QueryBuilder.bindMarker())
 .value("home_team_goals", QueryBuilder.bindMarker())
 .value("away_team_code", QueryBuilder.bindMarker())
 .value("away_team_country", QueryBuilder.bindMarker())
 .value("away_team_goals", QueryBuilder.bindMarker());
PreparedStatement ps = session.prepare(insertStatement.toString());
BatchStatement batch = new BatchStatement();
for(Match match : matchs){
 batch.add(ps.bind(match.getMatch_number(),
 match.getStatus(),
 match.getLocation(),
 match.getDatetime(),
 match.getWinner(),
 match.getHome_team().getCode(),
 match.getHome_team().getCountry(),
 match.getHome_team().getGoals(),
 match.getAway_team().getCode(),
 match.getAway_team().getCountry(),
 match.getAway_team().getGoals()
));
}
session.execute(batch);
session.close();

Spark Context

You can see a overview of what Spark does here. Basically, it provides a way to perform operations (group by, join, etc) in a distributed way on a data set, that should also be able to be retrieved in a distributed way. Thus, Spark manages the Hadoop filesystem (HDFS), distributed by nature, and we will see here how can use Cassandra, as a resilient distributed dataset (RDD) source.

To do so, we need to use the Stratio API.

First, we need to initialize a DeepSparkContext, inheriting from JavaSparkContext:

String cluster = "local";
String job = "myJobName";
String sparkHome = "";
 
DeepSparkContext deepContext =
   new DeepSparkContext(cluster, job, sparkHome, new String[]{});

Here, the Spark cluster will be embedded in the Java Virtual Machine, therefore we set the cluster parameter as “local”.

We can set any job name.

The sparkHome parameter is not needed here, since we will not rely on an existing Spark installation.

The String[] parameter indicates jar files, that we need if we use Entity RDD to map a cql table with a Java Object (like Hibernate). In that case only, jar files should contain classes related to these objects. Here, we will use the more generic Cell RDD, where columns are bound to generic cells that include metadata along with the values. Therefore no need to specify jar files.

Job Config

We need a way to tell Stratio where and what data we need to retrieve from Cassandra. This can be done by initializing an ICassandraDeepJobConfig bean:

ICassandraDeepJobConfig<Cells> config = DeepJobConfigFactory
                .create().host("localhost").cqlPort(9142)
                .keyspace("worldCup").table("match")
                .inputColumns("winner")
                .initialize();

Here, we’re telling Stratio how to connect to Cassandra, and which keyspace and table will contain data that will be used as a source of RDD. To optimize further access, we can also specify which columns we need to retrieve, as well as filtering with secondary indexes with the filterByField(filterColumnName, filterValue) method (not used here).

Creating a RDD and planning operations

Once done, we can create our RDD abstraction, and plan operations (plan because operations are lazy) on this data set:

CassandraJavaRDD rdd = deepContext.cassandraJavaRDD(config);
         
JavaPairRDD<String, Iterable<Cells>> groups = rdd.groupBy(new Function<Cells, String>() {
    @Override
    public String call(Cells cells) throws Exception {
        Object cellValue = cells.getCellByName("winner").getCellValue();
        return cellValue!=null ? cellValue.toString() : null;
    }
});
JavaPairRDD<String,Integer> counts = groups.mapToPair(new PairFunction<Tuple2<String, Iterable<Cells>>, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(Tuple2<String, Iterable<Cells>> t) throws Exception {
        return new Tuple2<String,Integer>(t._1(), Lists.newArrayList(t._2()).size());
    }
});

Here, we’re telling that we will “group” match results “by” winner team. Then, we will map each team with the number of times it won.

Collecting results

Finally, we can collect and display results:

List<Tuple2<String,Integer>> results = counts.collect();
LOGGER.info("GroupBy Results:");
for(Tuple2<String,Integer> tuple : results){
    LOGGER.info(tuple.toString());
}

This code will output the following (as of 29 June 2014):

GroupBy Results:
(null,14)
(Portugal,1)
(Brazil,3)
(Netherlands,3)
(Draw,9)
(Uruguay,2)
(Colombia,4)
(Argentina,3)
(Chile,2)
(Croatia,1)
(Belgium,3)
(Mexico,2)
(Ecuador,1)
(Greece,1)
(France,2)
(Italy,1)
(Ivory Coast,1)
(Algeria,1)
(Switzerland,2)
(Spain,1)
(Germany,2)
(USA,1)
(Costa Rica,2)
(Bosnia and Herzegovina,1)
(Nigeria,1)

Conclusion

Now, you will know how to perform distributed operations using Spark and Cassandra.
By the way, Databricks and Datastax, companies supporting Spark and Cassandra, recently announced Partnership centring around a supported open-source integration of their products.

Hope you enjoyed this post, and don’t forget to share!

Get the code

The code and details on how to run this test are available on GitHub.

About the author

Julien Sebrien, passionate developer, follow me here.

Links

Spark home page : https://spark.apache.org/
Cassandra home page : http://cassandra.apache.org/
Stratio home page : http://www.openstratio.org/
Retrofit home page : http://square.github.io/retrofit/
Cassandra Unit Github Repository : https://github.com/jsevellec/cassandra-unit
Datastax Java driver : http://www.datastax.com/documentation/developer/java-driver/2.0
WorldCup Api home page : http://worldcup.sfg.io/