Статьи

Spark: создание CSV-файлов для импорта в Neo4j

Около года назад Ян указал мне на набор данных по преступности в Чикаго, который показался мне подходящим для Neo4j, и после долгих проволочек я наконец-то нашел способ импортировать его.

Набор данных охватывает преступления, совершенные с 2001 года по настоящее время. Он содержит около 4 миллионов преступлений и метаданных о таких преступлениях, как местонахождение, тип преступления и год, чтобы назвать несколько.

Содержимое файла имеет следующую структуру:

01
02
03
04
05
06
07
08
09
10
11
$ head -n 10 ~/Downloads/Crimes_-_2001_to_present.csv
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
9464711,HX114160,01/14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT,false,true,0422,004,7,46,08A,1196652,1852516,2014,01/20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228,"(41.75017626412204, -87.55494559131228)"
9460704,HX113741,01/14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK,false,false,0413,004,8,48,03,1191060,1844959,2014,01/18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686,"(41.729576153145636, -87.57568059471686)"
9460339,HX113740,01/14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE,false,true,1114,011,28,26,14,1149075,1901099,2014,01/16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926,"(41.884543798701515, -87.72803579358926)"
9461467,HX114463,01/14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),false,false,0813,008,13,64,06,1145661,1865031,2014,01/16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783,"(41.785633535413176, -87.74148516669783)"
9460355,HX113738,01/14/2014 04:21:00 AM,070XX S PEORIA ST,0820,THEFT,$500 AND UNDER,STREET,true,false,0733,007,17,68,06,1171480,1858195,2014,01/16/2014 12:40:00 AM,41.766348042591375,-87.64702037047671,"(41.766348042591375, -87.64702037047671)"
9461140,HX113909,01/14/2014 03:17:00 AM,016XX W HUBBARD ST,0610,BURGLARY,FORCIBLE ENTRY,COMMERCIAL / BUSINESS OFFICE,false,false,1215,012,27,24,05,1165029,1903111,2014,01/16/2014 12:40:00 AM,41.889741146006095,-87.66939334853973,"(41.889741146006095, -87.66939334853973)"
9460361,HX113731,01/14/2014 03:12:00 AM,022XX S WENTWORTH AVE,0820,THEFT,$500 AND UNDER,CTA TRAIN,false,false,0914,009,25,34,06,1175363,1889525,2014,01/20/2014 12:40:05 AM,41.85223460427207,-87.63185047834335,"(41.85223460427207, -87.63185047834335)"
9461691,HX114506,01/14/2014 03:00:00 AM,087XX S COLFAX AVE,0650,BURGLARY,HOME INVASION,RESIDENCE,false,false,0423,004,7,46,05,1195052,1847362,2014,01/17/2014 12:40:17 AM,41.73607283858007,-87.56097809501115,"(41.73607283858007, -87.56097809501115)"
9461792,HX114824,01/14/2014 03:00:00 AM,012XX S CALIFORNIA BLVD,0810,THEFT,OVER $500,STREET,false,false,1023,010,28,29,06,1157929,1894034,2014,01/17/2014 12:40:17 AM,41.86498077118534,-87.69571529596696,"(41.86498077118534, -87.69571529596696)"

Поскольку я хотел импортировать это в Neo4j, мне нужно было выполнить некоторую обработку данных, поскольку инструмент neo4j-import ожидает получить файлы CSV, содержащие узлы и связи, которые мы хотим создать.

Я смотрел на Spark в конце прошлого года, и предварительная обработка большого исходного файла в меньшие CSV-файлы, содержащие узлы и взаимосвязи, казалась подходящей.

Поэтому мне нужно было создать работу Spark, чтобы сделать это. Затем мы передадим это задание исполнителю Spark, работающему локально, и он выплюнет файлы CSV.

2015-04-15_00-51-42

Мы начнем с создания объекта Scala с методом main, который будет содержать наш код обработки. Внутри этого основного метода мы создадим контекст Spark:

1
2
3
4
5
6
7
8
import org.apache.spark.{SparkConf, SparkContext}
  
object GenerateCSVFiles { 
    def main(args: Array[String]) {   
        val conf = new SparkConf().setAppName("Chicago Crime Dataset")   
        val sc = new SparkContext(conf) 
    }
}

Достаточно просто. Далее мы будем читать в файле CSV. Я нашел самый простой способ сослаться на это с помощью переменной среды, но, возможно, есть более идиоматический способ:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
import java.io.File
import org.apache.spark.{SparkConf, SparkContext}
  
object GenerateCSVFiles {
  def main(args: Array[String]) {
    var crimeFile = System.getenv("CSV_FILE")
  
    if(crimeFile == null || !new File(crimeFile).exists()) {
      throw new RuntimeException("Cannot find CSV file [" + crimeFile + "]")
    }
  
    println("Using %s".format(crimeFile))
  
    val conf = new SparkConf().setAppName("Chicago Crime Dataset")
  
    val sc = new SparkContext(conf)
    val crimeData = sc.textFile(crimeFile).cache()
}

Тип crimeData — это RDD [String] — способ Spark для представления (лениво оцененных) строк файла CSV. Это также включает заголовок файла, поэтому давайте напишем функцию, чтобы избавиться от этого, так как мы будем генерировать наши собственные заголовки для разных файлов:

01
02
03
04
05
06
07
08
09
10
11
import org.apache.spark.rdd.RDD
  
def dropHeader(data: RDD[String]): RDD[String] = {
  data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
      lines.drop(1)
    }
    lines
  })
}

Теперь мы готовы начать генерировать наши новые файлы CSV, поэтому мы напишем функцию, которая анализирует каждую строку и извлекает соответствующие столбцы. Я использую Open CSV для этого:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
import au.com.bytecode.opencsv.CSVParser
  
def generateFile(file: String, withoutHeader: RDD[String], fn: Array[String] => Array[String], header: String , distinct:Boolean = true, separator: String = ",") = {
  FileUtil.fullyDelete(new File(file))
  
  val tmpFile = "/tmp/" + System.currentTimeMillis() + "-" + file
  val rows: RDD[String] = withoutHeader.mapPartitions(lines => {
    val parser = new CSVParser(',')
    lines.map(line => {
      val columns = parser.parseLine(line)
      fn(columns).mkString(separator)
    })
  })
  
  if (distinct) rows.distinct() saveAsTextFile tmpFile else rows.saveAsTextFile(tmpFile)
}

Затем мы вызываем эту функцию так:

1
generateFile("/tmp/crimes.csv", withoutHeader, columns => Array(columns(0),"Crime", columns(2), columns(6)), "id:ID(Crime),:LABEL,date,description", false)

Вывод в ‘tmpFile’ на самом деле представляет собой 32 ‘файла детали’, но я хотел иметь возможность объединить их вместе в отдельные файлы CSV, с которыми было бы легче работать.

Полная работа выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import java.io.File
  
import au.com.bytecode.opencsv.CSVParser
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
  
object GenerateCSVFiles {
  def merge(srcPath: String, dstPath: String, header: String): Unit =  {
    val hadoopConfig = new Configuration()
    val hdfs = FileSystem.get(hadoopConfig)
    MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, header)
  }
  
  def main(args: Array[String]) {
    var crimeFile = System.getenv("CSV_FILE")
  
    if(crimeFile == null || !new File(crimeFile).exists()) {
      throw new RuntimeException("Cannot find CSV file [" + crimeFile + "]")
    }
  
    println("Using %s".format(crimeFile))
  
    val conf = new SparkConf().setAppName("Chicago Crime Dataset")
  
    val sc = new SparkContext(conf)
    val crimeData = sc.textFile(crimeFile).cache()
    val withoutHeader: RDD[String] = dropHeader(crimeData)
  
    generateFile("/tmp/primaryTypes.csv", withoutHeader, columns => Array(columns(5).trim(), "CrimeType"), "crimeType:ID(CrimeType),:LABEL")
    generateFile("/tmp/beats.csv", withoutHeader, columns => Array(columns(10), "Beat"), "id:ID(Beat),:LABEL")
    generateFile("/tmp/crimes.csv", withoutHeader, columns => Array(columns(0),"Crime", columns(2), columns(6)), "id:ID(Crime),:LABEL,date,description", false)
    generateFile("/tmp/crimesPrimaryTypes.csv", withoutHeader, columns => Array(columns(0),columns(5).trim(), "CRIME_TYPE"), ":START_ID(Crime),:END_ID(CrimeType),:TYPE")
    generateFile("/tmp/crimesBeats.csv", withoutHeader, columns => Array(columns(0),columns(10), "ON_BEAT"), ":START_ID(Crime),:END_ID(Beat),:TYPE")
  }
  
  def generateFile(file: String, withoutHeader: RDD[String], fn: Array[String] => Array[String], header: String , distinct:Boolean = true, separator: String = ",") = {
    FileUtil.fullyDelete(new File(file))
  
    val tmpFile = "/tmp/" + System.currentTimeMillis() + "-" + file
    val rows: RDD[String] = withoutHeader.mapPartitions(lines => {
      val parser = new CSVParser(',')
      lines.map(line => {
        val columns = parser.parseLine(line)
        fn(columns).mkString(separator)
      })
    })
  
    if (distinct) rows.distinct() saveAsTextFile tmpFile else rows.saveAsTextFile(tmpFile)
  
    merge(tmpFile, file, header)
  }
  
  def dropHeader(data: RDD[String]): RDD[String] = {
    data.mapPartitionsWithIndex((idx, lines) => {
      if (idx == 0) {
        lines.drop(1)
      }
      lines
    })
  }
}

Теперь нам нужно отправить работу в Spark. Я обернул это в сценарий, если вы хотите следовать, но это содержание:

1
2
3
4
5
6
./spark-1.1.0-bin-hadoop1/bin/spark-submit \
--driver-memory 5g \
--class GenerateCSVFiles \
--master local[8] \
target/scala-2.10/playground_2.10-1.0.jar \
$@

Если мы выполним это, мы получим следующие CSV-файлы:

1
2
3
4
5
6
$ ls -alh /tmp/*.csv
-rwxrwxrwx  1 markneedham  wheel   3.0K 14 Apr 07:37 /tmp/beats.csv
-rwxrwxrwx  1 markneedham  wheel   217M 14 Apr 07:37 /tmp/crimes.csv
-rwxrwxrwx  1 markneedham  wheel    84M 14 Apr 07:37 /tmp/crimesBeats.csv
-rwxrwxrwx  1 markneedham  wheel   120M 14 Apr 07:37 /tmp/crimesPrimaryTypes.csv
-rwxrwxrwx  1 markneedham  wheel   912B 14 Apr 07:37 /tmp/primaryTypes.csv

Давайте быстро проверим, что они содержат:

01
02
03
04
05
06
07
08
09
10
11
$ head -n 10 /tmp/beats.csv
id:ID(Beat),:LABEL
1135,Beat
1421,Beat
2312,Beat
1113,Beat
1014,Beat
2411,Beat
1333,Beat
2521,Beat
1652,Beat
01
02
03
04
05
06
07
08
09
10
11
$ head -n 10 /tmp/crimes.csv
id:ID(Crime),:LABEL,date,description
9464711,Crime,01/14/2014 05:00:00 AM,SIMPLE
9460704,Crime,01/14/2014 04:55:00 AM,ARMED: HANDGUN
9460339,Crime,01/14/2014 04:44:00 AM,TO PROPERTY
9461467,Crime,01/14/2014 04:43:00 AM,$500 AND UNDER
9460355,Crime,01/14/2014 04:21:00 AM,$500 AND UNDER
9461140,Crime,01/14/2014 03:17:00 AM,FORCIBLE ENTRY
9460361,Crime,01/14/2014 03:12:00 AM,$500 AND UNDER
9461691,Crime,01/14/2014 03:00:00 AM,HOME INVASION
9461792,Crime,01/14/2014 03:00:00 AM,OVER $500
01
02
03
04
05
06
07
08
09
10
11
$ head -n 10 /tmp/crimesBeats.csv
:START_ID(Crime),:END_ID(Beat),:TYPE
5896915,0733,ON_BEAT
9208776,2232,ON_BEAT
8237555,0111,ON_BEAT
6464775,0322,ON_BEAT
6468868,0411,ON_BEAT
4189649,0524,ON_BEAT
7620897,0421,ON_BEAT
7720402,0321,ON_BEAT
5053025,1115,ON_BEAT

Хорошо смотритесь. Давайте импортируем их в Neo4j:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ ./neo4j-community-2.2.0/bin/neo4j-import --into /tmp/my-neo --nodes /tmp/crimes.csv --nodes /tmp/beats.csv --nodes /tmp/primaryTypes.csv --relationships /tmp/crimesBeats.csv --relationships /tmp/crimesPrimaryTypes.csv
Nodes
[*>:45.76 MB/s----------------------------------|PROPERTIES(2)=============|NODE:3|v:118.05 MB/]  4M
Done in 5s 605ms
Prepare node index
[*RESOLVE:64.85 MB-----------------------------------------------------------------------------]  4M
Done in 4s 930ms
Calculate dense nodes
[>:42.33 MB/s-------------------|*PREPARE(7)===================================|CALCULATOR-----]  8M
Done in 5s 417ms
Relationships
[>:42.33 MB/s-------------|*PREPARE(7)==========================|RELATIONSHIP------------|v:44.]  8M
Done in 6s 62ms
Node --> Relationship
[*>:??-----------------------------------------------------------------------------------------]  4M
Done in 324ms
Relationship --> Relationship
[*LINK-----------------------------------------------------------------------------------------]  8M
Done in 1s 984ms
Node counts
[*>:??-----------------------------------------------------------------------------------------]  4M
Done in 360ms
Relationship counts
[*>:??-----------------------------------------------------------------------------------------]  8M
Done in 653ms
  
IMPORT DONE in 26s 517ms

Затем я обновил conf / neo4j-server.properties, чтобы он указывал на мою новую базу данных:

1
2
3
4
5
6
7
#***************************************************************
# Server configuration
#***************************************************************
  
# location of the database directory
#org.neo4j.server.database.location=data/graph.db
org.neo4j.server.database.location=/tmp/my-neo

Теперь я могу запустить Neo и начать изучать данные:

1
$ ./neo4j-community-2.2.0/bin/neo4j start
1
2
3
MATCH (:Crime)-[r:CRIME_TYPE]->()
RETURN r
LIMIT 10

граф-15

Есть намного больше связей и сущностей, которые мы могли бы извлечь из этого набора данных — то, что я сделал, это только начало. Так что, если вы хотите больше узнать о преступлениях в Чикаго, код и инструкции, объясняющие, как его использовать, находятся на github .