Около года назад Ян указал мне на набор данных по преступности в Чикаго, который показался мне подходящим для Neo4j, и после долгих проволочек я наконец-то нашел способ импортировать его.
Набор данных охватывает преступления, совершенные с 2001 года по настоящее время. Он содержит около 4 миллионов преступлений и метаданных о таких преступлениях, как местонахождение, тип преступления и год, чтобы назвать несколько.
Содержимое файла имеет следующую структуру:
01
02
03
04
05
06
07
08
09
10
11
|
$ head -n 10 ~ /Downloads/Crimes_-_2001_to_present .csv ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location 9464711,HX114160,01 /14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT, false , true ,0422,004,7,46,08A,1196652,1852516,2014,01 /20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228, "(41.75017626412204, -87.55494559131228)" 9460704,HX113741,01 /14/2014 04:55:00 AM,091XX S JEFFERY AVE,031A,ROBBERY,ARMED: HANDGUN,SIDEWALK, false , false ,0413,004,8,48,03,1191060,1844959,2014,01 /18/2014 12:39:56 AM,41.729576153145636,-87.57568059471686, "(41.729576153145636, -87.57568059471686)" 9460339,HX113740,01 /14/2014 04:44:00 AM,040XX W MAYPOLE AVE,1310,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE, false , true ,1114,011,28,26,14,1149075,1901099,2014,01 /16/2014 12:40:00 AM,41.884543798701515,-87.72803579358926, "(41.884543798701515, -87.72803579358926)" 9461467,HX114463,01 /14/2014 04:43:00 AM,059XX S CICERO AVE,0820,THEFT,$500 AND UNDER,PARKING LOT /GARAGE (NON.RESID.), false , false ,0813,008,13,64,06,1145661,1865031,2014,01 /16/2014 12:40:00 AM,41.785633535413176,-87.74148516669783, "(41.785633535413176, -87.74148516669783)" 9460355,HX113738,01 /14/2014 04:21:00 AM,070XX S PEORIA ST,0820,THEFT,$500 AND UNDER,STREET, true , false ,0733,007,17,68,06,1171480,1858195,2014,01 /16/2014 12:40:00 AM,41.766348042591375,-87.64702037047671, "(41.766348042591375, -87.64702037047671)" 9461140,HX113909,01 /14/2014 03:17:00 AM,016XX W HUBBARD ST,0610,BURGLARY,FORCIBLE ENTRY,COMMERCIAL / BUSINESS OFFICE, false , false ,1215,012,27,24,05,1165029,1903111,2014,01 /16/2014 12:40:00 AM,41.889741146006095,-87.66939334853973, "(41.889741146006095, -87.66939334853973)" 9460361,HX113731,01 /14/2014 03:12:00 AM,022XX S WENTWORTH AVE,0820,THEFT,$500 AND UNDER,CTA TRAIN, false , false ,0914,009,25,34,06,1175363,1889525,2014,01 /20/2014 12:40:05 AM,41.85223460427207,-87.63185047834335, "(41.85223460427207, -87.63185047834335)" 9461691,HX114506,01 /14/2014 03:00:00 AM,087XX S COLFAX AVE,0650,BURGLARY,HOME INVASION,RESIDENCE, false , false ,0423,004,7,46,05,1195052,1847362,2014,01 /17/2014 12:40:17 AM,41.73607283858007,-87.56097809501115, "(41.73607283858007, -87.56097809501115)" 9461792,HX114824,01 /14/2014 03:00:00 AM,012XX S CALIFORNIA BLVD,0810,THEFT,OVER $500,STREET, false , false ,1023,010,28,29,06,1157929,1894034,2014,01 /17/2014 12:40:17 AM,41.86498077118534,-87.69571529596696, "(41.86498077118534, -87.69571529596696)" |
Поскольку я хотел импортировать это в Neo4j, мне нужно было выполнить некоторую обработку данных, поскольку инструмент neo4j-import ожидает получить файлы CSV, содержащие узлы и связи, которые мы хотим создать.
Я смотрел на Spark в конце прошлого года, и предварительная обработка большого исходного файла в меньшие CSV-файлы, содержащие узлы и взаимосвязи, казалась подходящей.
Поэтому мне нужно было создать работу Spark, чтобы сделать это. Затем мы передадим это задание исполнителю Spark, работающему локально, и он выплюнет файлы CSV.
Мы начнем с создания объекта Scala с методом main, который будет содержать наш код обработки. Внутри этого основного метода мы создадим контекст Spark:
1
2
3
4
5
6
7
8
|
import org.apache.spark.{SparkConf, SparkContext} object GenerateCSVFiles { def main(args : Array[String]) { val conf = new SparkConf().setAppName( "Chicago Crime Dataset" ) val sc = new SparkContext(conf) } } |
Достаточно просто. Далее мы будем читать в файле CSV. Я нашел самый простой способ сослаться на это с помощью переменной среды, но, возможно, есть более идиоматический способ:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
import java.io.File import org.apache.spark.{SparkConf, SparkContext} object GenerateCSVFiles { def main(args : Array[String]) { var crimeFile = System.getenv( "CSV_FILE" ) if (crimeFile == null || ! new File(crimeFile).exists()) { throw new RuntimeException( "Cannot find CSV file [" + crimeFile + "]" ) } println( "Using %s" .format(crimeFile)) val conf = new SparkConf().setAppName( "Chicago Crime Dataset" ) val sc = new SparkContext(conf) val crimeData = sc.textFile(crimeFile).cache() } |
Тип crimeData — это RDD [String] — способ Spark для представления (лениво оцененных) строк файла CSV. Это также включает заголовок файла, поэтому давайте напишем функцию, чтобы избавиться от этого, так как мы будем генерировать наши собственные заголовки для разных файлов:
01
02
03
04
05
06
07
08
09
10
11
|
import org.apache.spark.rdd.RDD def dropHeader(data : RDD[String]) : RDD[String] = { data.mapPartitionsWithIndex((idx, lines) = > { if (idx == 0 ) { lines.drop( 1 ) } lines }) } |
Теперь мы готовы начать генерировать наши новые файлы CSV, поэтому мы напишем функцию, которая анализирует каждую строку и извлекает соответствующие столбцы. Я использую Open CSV для этого:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
import au.com.bytecode.opencsv.CSVParser def generateFile(file : String, withoutHeader : RDD[String], fn : Array[String] = > Array[String], header : String , distinct : Boolean = true , separator : String = "," ) = { FileUtil.fullyDelete( new File(file)) val tmpFile = "/tmp/" + System.currentTimeMillis() + "-" + file val rows : RDD[String] = withoutHeader.mapPartitions(lines = > { val parser = new CSVParser( ',' ) lines.map(line = > { val columns = parser.parseLine(line) fn(columns).mkString(separator) }) }) if (distinct) rows.distinct() saveAsTextFile tmpFile else rows.saveAsTextFile(tmpFile) } |
Затем мы вызываем эту функцию так:
1
|
generateFile( "/tmp/crimes.csv" , withoutHeader, columns = > Array(columns( 0 ), "Crime" , columns( 2 ), columns( 6 )), "id:ID(Crime),:LABEL,date,description" , false ) |
Вывод в ‘tmpFile’ на самом деле представляет собой 32 ‘файла детали’, но я хотел иметь возможность объединить их вместе в отдельные файлы CSV, с которыми было бы легче работать.
Полная работа выглядит так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
import java.io.File import au.com.bytecode.opencsv.CSVParser import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs. _ import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object GenerateCSVFiles { def merge(srcPath : String, dstPath : String, header : String) : Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false , hadoopConfig, header) } def main(args : Array[String]) { var crimeFile = System.getenv( "CSV_FILE" ) if (crimeFile == null || ! new File(crimeFile).exists()) { throw new RuntimeException( "Cannot find CSV file [" + crimeFile + "]" ) } println( "Using %s" .format(crimeFile)) val conf = new SparkConf().setAppName( "Chicago Crime Dataset" ) val sc = new SparkContext(conf) val crimeData = sc.textFile(crimeFile).cache() val withoutHeader : RDD[String] = dropHeader(crimeData) generateFile( "/tmp/primaryTypes.csv" , withoutHeader, columns = > Array(columns( 5 ).trim(), "CrimeType" ), "crimeType:ID(CrimeType),:LABEL" ) generateFile( "/tmp/beats.csv" , withoutHeader, columns = > Array(columns( 10 ), "Beat" ), "id:ID(Beat),:LABEL" ) generateFile( "/tmp/crimes.csv" , withoutHeader, columns = > Array(columns( 0 ), "Crime" , columns( 2 ), columns( 6 )), "id:ID(Crime),:LABEL,date,description" , false ) generateFile( "/tmp/crimesPrimaryTypes.csv" , withoutHeader, columns = > Array(columns( 0 ),columns( 5 ).trim(), "CRIME_TYPE" ), ":START_ID(Crime),:END_ID(CrimeType),:TYPE" ) generateFile( "/tmp/crimesBeats.csv" , withoutHeader, columns = > Array(columns( 0 ),columns( 10 ), "ON_BEAT" ), ":START_ID(Crime),:END_ID(Beat),:TYPE" ) } def generateFile(file : String, withoutHeader : RDD[String], fn : Array[String] = > Array[String], header : String , distinct : Boolean = true , separator : String = "," ) = { FileUtil.fullyDelete( new File(file)) val tmpFile = "/tmp/" + System.currentTimeMillis() + "-" + file val rows : RDD[String] = withoutHeader.mapPartitions(lines = > { val parser = new CSVParser( ',' ) lines.map(line = > { val columns = parser.parseLine(line) fn(columns).mkString(separator) }) }) if (distinct) rows.distinct() saveAsTextFile tmpFile else rows.saveAsTextFile(tmpFile) merge(tmpFile, file, header) } def dropHeader(data : RDD[String]) : RDD[String] = { data.mapPartitionsWithIndex((idx, lines) = > { if (idx == 0 ) { lines.drop( 1 ) } lines }) } } |
Теперь нам нужно отправить работу в Spark. Я обернул это в сценарий, если вы хотите следовать, но это содержание:
1
2
3
4
5
6
|
. /spark-1 .1.0-bin-hadoop1 /bin/spark-submit \ --driver-memory 5g \ --class GenerateCSVFiles \ --master local [8] \ target /scala-2 .10 /playground_2 .10-1.0.jar \ $@ |
Если мы выполним это, мы получим следующие CSV-файлы:
1
2
3
4
5
6
|
$ ls -alh /tmp/ *.csv -rwxrwxrwx 1 markneedham wheel 3.0K 14 Apr 07:37 /tmp/beats .csv -rwxrwxrwx 1 markneedham wheel 217M 14 Apr 07:37 /tmp/crimes .csv -rwxrwxrwx 1 markneedham wheel 84M 14 Apr 07:37 /tmp/crimesBeats .csv -rwxrwxrwx 1 markneedham wheel 120M 14 Apr 07:37 /tmp/crimesPrimaryTypes .csv -rwxrwxrwx 1 markneedham wheel 912B 14 Apr 07:37 /tmp/primaryTypes .csv |
Давайте быстро проверим, что они содержат:
01
02
03
04
05
06
07
08
09
10
11
|
$ head -n 10 /tmp/beats .csv id :ID(Beat),:LABEL 1135,Beat 1421,Beat 2312,Beat 1113,Beat 1014,Beat 2411,Beat 1333,Beat 2521,Beat 1652,Beat |
01
02
03
04
05
06
07
08
09
10
11
|
$ head -n 10 /tmp/crimes .csv id :ID(Crime),:LABEL, date ,description 9464711,Crime,01 /14/2014 05:00:00 AM,SIMPLE 9460704,Crime,01 /14/2014 04:55:00 AM,ARMED: HANDGUN 9460339,Crime,01 /14/2014 04:44:00 AM,TO PROPERTY 9461467,Crime,01 /14/2014 04:43:00 AM,$500 AND UNDER 9460355,Crime,01 /14/2014 04:21:00 AM,$500 AND UNDER 9461140,Crime,01 /14/2014 03:17:00 AM,FORCIBLE ENTRY 9460361,Crime,01 /14/2014 03:12:00 AM,$500 AND UNDER 9461691,Crime,01 /14/2014 03:00:00 AM,HOME INVASION 9461792,Crime,01 /14/2014 03:00:00 AM,OVER $500 |
01
02
03
04
05
06
07
08
09
10
11
|
$ head -n 10 /tmp/crimesBeats .csv :START_ID(Crime),:END_ID(Beat),:TYPE 5896915,0733,ON_BEAT 9208776,2232,ON_BEAT 8237555,0111,ON_BEAT 6464775,0322,ON_BEAT 6468868,0411,ON_BEAT 4189649,0524,ON_BEAT 7620897,0421,ON_BEAT 7720402,0321,ON_BEAT 5053025,1115,ON_BEAT |
Хорошо смотритесь. Давайте импортируем их в Neo4j:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
$ . /neo4j-community-2 .2.0 /bin/neo4j-import --into /tmp/my-neo --nodes /tmp/crimes .csv --nodes /tmp/beats .csv --nodes /tmp/primaryTypes .csv --relationships /tmp/crimesBeats .csv --relationships /tmp/crimesPrimaryTypes .csv Nodes [*>:45.76 MB /s---------------------------------- |PROPERTIES(2)=============|NODE:3| v :118.05 MB/] 4M Done in 5s 605ms Prepare node index [*RESOLVE:64.85 MB-----------------------------------------------------------------------------] 4M Done in 4s 930ms Calculate dense nodes [>:42.33 MB /s------------------- |*PREPARE(7)===================================|CALCULATOR-----] 8M Done in 5s 417ms Relationships [>:42.33 MB /s------------- |*PREPARE(7)==========================|RELATIONSHIP------------| v :44.] 8M Done in 6s 62ms Node --> Relationship [*>:??-----------------------------------------------------------------------------------------] 4M Done in 324ms Relationship --> Relationship [*LINK-----------------------------------------------------------------------------------------] 8M Done in 1s 984ms Node counts [*>:??-----------------------------------------------------------------------------------------] 4M Done in 360ms Relationship counts [*>:??-----------------------------------------------------------------------------------------] 8M Done in 653ms IMPORT DONE in 26s 517ms |
Затем я обновил conf / neo4j-server.properties, чтобы он указывал на мою новую базу данных:
1
2
3
4
5
6
7
|
#*************************************************************** # Server configuration #*************************************************************** # location of the database directory #org.neo4j.server.database.location=data/graph.db org.neo4j.server.database.location=/tmp/my-neo |
Теперь я могу запустить Neo и начать изучать данные:
1
|
$ . /neo4j-community-2 .2.0 /bin/neo4j start |
1
2
3
|
MATCH (:Crime)-[r:CRIME_TYPE]->() RETURN r LIMIT 10 |
Есть намного больше связей и сущностей, которые мы могли бы извлечь из этого набора данных — то, что я сделал, это только начало. Так что, если вы хотите больше узнать о преступлениях в Чикаго, код и инструкции, объясняющие, как его использовать, находятся на github .
Ссылка: | Spark: создание CSV-файлов для импорта в Neo4j от нашего партнера по JCG Марка Нидхэма в блоге Марка Нидхэма . |