Статьи

Spark: создание CSV-файлов для импорта в Neo4j

Около года назад Ян указал мне на набор данных по преступности в Чикаго, который показался мне подходящим для Neo4j, и после долгих проволочек я наконец-то нашел способ импортировать его.

Набор данных охватывает преступления, совершенные с 2001 года по настоящее время. Он содержит около 4 миллионов преступлений и метаданных о таких преступлениях, как местонахождение, тип преступления и год, чтобы назвать несколько.

Содержимое файла имеет следующую структуру:

$ head -n 10 ~/Downloads/Crimes_-_2001_to_present.csv
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,"(41.75017626412204, -87.55494559131228)"
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,"(41.729576153145636, -87.57568059471686)"
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,"(41.884543798701515, -87.72803579358926)"
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,"(41.785633535413176, -87.74148516669783)"
9460355,HX113738,01/14/2014 04:21:00 AM,070XX S PEORIA ST,0820,THEFT,$500 AND UNDER,STREET,true,false,0733,007,17,68,06,1171480,1858195,2014,01/16/2014 12:40:00 AM,41.766348042591375,-87.64702037047671,"(41.766348042591375, -87.64702037047671)"
9461140,HX113909,01/14/2014 03:17:00 AM,016XX W HUBBARD ST,0610,BURGLARY,FORCIBLE ENTRY,COMMERCIAL / BUSINESS OFFICE,false,false,1215,012,27,24,05,1165029,1903111,2014,01/16/2014 12:40:00 AM,41.889741146006095,-87.66939334853973,"(41.889741146006095, -87.66939334853973)"
9460361,HX113731,01/14/2014 03:12:00 AM,022XX S WENTWORTH AVE,0820,THEFT,$500 AND UNDER,CTA TRAIN,false,false,0914,009,25,34,06,1175363,1889525,2014,01/20/2014 12:40:05 AM,41.85223460427207,-87.63185047834335,"(41.85223460427207, -87.63185047834335)"
9461691,HX114506,01/14/2014 03:00:00 AM,087XX S COLFAX AVE,0650,BURGLARY,HOME INVASION,RESIDENCE,false,false,0423,004,7,46,05,1195052,1847362,2014,01/17/2014 12:40:17 AM,41.73607283858007,-87.56097809501115,"(41.73607283858007, -87.56097809501115)"
9461792,HX114824,01/14/2014 03:00:00 AM,012XX S CALIFORNIA BLVD,0810,THEFT,OVER $500,STREET,false,false,1023,010,28,29,06,1157929,1894034,2014,01/17/2014 12:40:17 AM,41.86498077118534,-87.69571529596696,"(41.86498077118534, -87.69571529596696)"

Поскольку я хотел импортировать это в Neo4j, мне нужно было выполнить некоторую обработку данных, поскольку инструмент neo4j-import ожидает получить файлы CSV, содержащие узлы и связи, которые мы хотим создать.

Логотип Spark 192x100px

Я смотрел на Spark в конце прошлого года, и предварительная обработка большого исходного файла в меньшие CSV-файлы, содержащие узлы и взаимосвязи, казалась подходящей.

Поэтому мне нужно было создать работу Spark, чтобы сделать это. Затем мы передадим это задание исполнителю Spark, работающему локально, и он выплюнет файлы CSV.

2015 04 15 00 51 42

Мы начнем с создания объекта Scala с методом main, который будет содержать наш код обработки. Внутри этого основного метода мы создадим контекст Spark:

import org.apache.spark.{SparkConf, SparkContext}

object GenerateCSVFiles {  
    def main(args: Array[String]) {    
        val conf = new SparkConf().setAppName("Chicago Crime Dataset")    
        val sc = new SparkContext(conf)  
    }
}

Easy enough. Next we’ll read in the CSV file. I found the easiest way to reference this was with an environment variable but perhaps there’s a more idiomatic way:

import java.io.File
import org.apache.spark.{SparkConf, SparkContext}

object GenerateCSVFiles {
  def main(args: Array[String]) {
    var crimeFile = System.getenv("CSV_FILE")

    if(crimeFile == null || !new File(crimeFile).exists()) {
      throw new RuntimeException("Cannot find CSV file [" + crimeFile + "]")
    }

    println("Using %s".format(crimeFile))

    val conf = new SparkConf().setAppName("Chicago Crime Dataset")

    val sc = new SparkContext(conf)
    val crimeData = sc.textFile(crimeFile).cache()
}

The type of crimeData is RDD[String] – Spark’s way of representing the (lazily evaluated) lines of the CSV file. This also includes the header of the file so let’s write a function to get rid of that since we’ll be generating our own headers for the different files:

import org.apache.spark.rdd.RDD

// http://mail-archives.apache.org/mod_mbox/spark-user/201404.mbox/%3CCAEYYnxYuEaie518ODdn-fR7VvD39d71=CgB_Dxw_4COVXgmYYQ@mail.gmail.com%3E
def dropHeader(data: RDD[String]): RDD[String] = {
  data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
      lines.drop(1)
    }
    lines
  })
}

Now we’re ready to start generating our new CSV files so we’ll write a function which parses each line and extracts the appropriate columns. I’m using Open CSV for this:

import au.com.bytecode.opencsv.CSVParser

def generateFile(file: String, withoutHeader: RDD[String], fn: Array[String] => Array[String], header: String , distinct:Boolean = true, separator: String = ",") = {
  FileUtil.fullyDelete(new File(file))

  val tmpFile = "/tmp/" + System.currentTimeMillis() + "-" + file
  val rows: RDD[String] = withoutHeader.mapPartitions(lines => {
    val parser = new CSVParser(',')
    lines.map(line => {
      val columns = parser.parseLine(line)
      fn(columns).mkString(separator)
    })
  })

  if (distinct) rows.distinct() saveAsTextFile tmpFile else rows.saveAsTextFile(tmpFile)
}

We then call this function like this:

generateFile("/tmp/crimes.csv", withoutHeader, columns => Array(columns(0),"Crime", columns(2), columns(6)), "id:ID(Crime),:LABEL,date,description", false)

The output into ‘tmpFile’ is actually 32 ‘part files’ but I wanted to be able to merge those together into individual CSV files that were easier to work with.

I won’t paste the the full job here but if you want to take a look it’s on github.

Now we need to submit the job to Spark. I’ve wrapped this in a script if you want to follow along but these are the contents:

./spark-1.1.0-bin-hadoop1/bin/spark-submit \
--driver-memory 5g \
--class GenerateCSVFiles \
--master local[8] \ 
target/scala-2.10/playground_2.10-1.0.jar \
$@

If we execute that we’ll see the following output…”

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Crimes_-_2001_to_present.csv
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/04/15 00:31:44 INFO SparkContext: Running Spark version 1.3.0
...
15/04/15 00:47:26 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool
15/04/15 00:47:26 INFO DAGScheduler: Stage 8 (saveAsTextFile at GenerateCSVFiles.scala:51) finished in 2.702 s
15/04/15 00:47:26 INFO DAGScheduler: Job 4 finished: saveAsTextFile at GenerateCSVFiles.scala:51, took 8.715588 s

real0m44.935s
user4m2.259s
sys0m14.159s

and these CSV files will be generated:

$ ls -alh /tmp/*.csv
-rwxrwxrwx  1 markneedham  wheel   3.0K 14 Apr 07:37 /tmp/beats.csv
-rwxrwxrwx  1 markneedham  wheel   217M 14 Apr 07:37 /tmp/crimes.csv
-rwxrwxrwx  1 markneedham  wheel    84M 14 Apr 07:37 /tmp/crimesBeats.csv
-rwxrwxrwx  1 markneedham  wheel   120M 14 Apr 07:37 /tmp/crimesPrimaryTypes.csv
-rwxrwxrwx  1 markneedham  wheel   912B 14 Apr 07:37 /tmp/primaryTypes.csv

Let’s have a quick check what they contain:

$ head -n 10 /tmp/beats.csv
id:ID(Beat),:LABEL
1135,Beat
1421,Beat
2312,Beat
1113,Beat
1014,Beat
2411,Beat
1333,Beat
2521,Beat
1652,Beat

$ head -n 10 /tmp/crimes.csv
id:ID(Crime),:LABEL,date,description
9464711,Crime,01/14/2014 05:00:00 AM,SIMPLE
9460704,Crime,01/14/2014 04:55:00 AM,ARMED: HANDGUN
9460339,Crime,01/14/2014 04:44:00 AM,TO PROPERTY
9461467,Crime,01/14/2014 04:43:00 AM,$500 AND UNDER
9460355,Crime,01/14/2014 04:21:00 AM,$500 AND UNDER
9461140,Crime,01/14/2014 03:17:00 AM,FORCIBLE ENTRY
9460361,Crime,01/14/2014 03:12:00 AM,$500 AND UNDER
9461691,Crime,01/14/2014 03:00:00 AM,HOME INVASION
9461792,Crime,01/14/2014 03:00:00 AM,OVER $500

$ head -n 10 /tmp/crimesBeats.csv
:START_ID(Crime),:END_ID(Beat),:TYPE
5896915,0733,ON_BEAT
9208776,2232,ON_BEAT
8237555,0111,ON_BEAT
6464775,0322,ON_BEAT
6468868,0411,ON_BEAT
4189649,0524,ON_BEAT
7620897,0421,ON_BEAT
7720402,0321,ON_BEAT
5053025,1115,ON_BEAT

Looking good. Let’s get them imported into Neo4j:

$ ./neo4j-community-2.2.0/bin/neo4j-import --into /tmp/my-neo --nodes /tmp/crimes.csv --nodes /tmp/beats.csv --nodes /tmp/primaryTypes.csv --relationships /tmp/crimesBeats.csv --relationships /tmp/crimesPrimaryTypes.csv
Nodes
[*>:45.76 MB/s----------------------------------|PROPERTIES(2)=============|NODE:3|v:118.05 MB/]  4M
Done in 5s 605ms
Prepare node index
[*RESOLVE:64.85 MB-----------------------------------------------------------------------------]  4M
Done in 4s 930ms
Calculate dense nodes
[>:42.33 MB/s-------------------|*PREPARE(7)===================================|CALCULATOR-----]  8M
Done in 5s 417ms
Relationships
[>:42.33 MB/s-------------|*PREPARE(7)==========================|RELATIONSHIP------------|v:44.]  8M
Done in 6s 62ms
Node --> Relationship
[*>:??-----------------------------------------------------------------------------------------]  4M
Done in 324ms
Relationship --> Relationship
[*LINK-----------------------------------------------------------------------------------------]  8M
Done in 1s 984ms
Node counts
[*>:??-----------------------------------------------------------------------------------------]  4M
Done in 360ms
Relationship counts
[*>:??-----------------------------------------------------------------------------------------]  8M
Done in 653ms

IMPORT DONE in 26s 517ms

Next I updated conf/neo4j-server.properties to point to my new database:

#***************************************************************
# Server configuration
#***************************************************************

# location of the database directory
#org.neo4j.server.database.location=data/graph.db
org.neo4j.server.database.location=/tmp/my-neo

Now I can start up Neo and start exploring the data:

$ ./neo4j-community-2.2.0/bin/neo4j start

MATCH (:Crime)-[r:CRIME_TYPE]->() 
RETURN r 
LIMIT 10

График 15

There’s lots more relationships and entities that we could pull out of this data set – what I’ve done is just a start. So if you’re up for some more Chicago crime exploration the code and instructions explaining how to run it are on github.