Я обнаружил, что довольно часто работаю с большими CSV-файлами и понимаю, что мой существующий набор инструментов не позволяет мне быстро их исследовать. Я подумал, что потрачу немного времени на изучение Spark, чтобы выяснить, может ли это помочь.
Я работаю с набором данных о преступности, выпущенным городом Чикаго : он имеет размер 1 ГБ и содержит сведения о 4 миллионах преступлений:
1
2
3
4
5
|
$ ls -alh ~ /Downloads/Crimes_-_2001_to_present .csv -rw-r--r--@ 1 markneedham staff 1.0G 16 Nov 12:14 /Users/markneedham/Downloads/Crimes_-_2001_to_present .csv $ wc -l ~ /Downloads/Crimes_-_2001_to_present .csv 4193441 /Users/markneedham/Downloads/Crimes_-_2001_to_present .csv |
Мы можем получить приблизительное представление о содержимом файла, посмотрев на первую строку вместе с заголовком:
1
2
3
|
$ head -n 2 ~ /Downloads/Crimes_-_2001_to_present .csv ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location 9464711,HX114160,01 /14/2014 05:00:00 AM,028XX E 80TH ST,0560,ASSAULT,SIMPLE,APARTMENT, false , true ,0422,004,7,46,08A,1196652,1852516,2014,01 /20/2014 12:40:05 AM,41.75017626412204,-87.55494559131228, "(41.75017626412204, -87.55494559131228)" |
Я хотел подсчитать столбец «Основной тип», чтобы узнать, сколько у нас преступлений. Используя только инструменты командной строки Unix, вот как мы это сделаем:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
$ time tail +2 ~ /Downloads/Crimes_-_2001_to_present .csv | cut -d, -f6 | sort | uniq -c | sort -rn 859197 THEFT 757530 BATTERY 489528 NARCOTICS 488209 CRIMINAL DAMAGE 257310 BURGLARY 253964 OTHER OFFENSE 247386 ASSAULT 197404 MOTOR VEHICLE THEFT 157706 ROBBERY 137538 DECEPTIVE PRACTICE 124974 CRIMINAL TRESPASS 47245 PROSTITUTION 40361 WEAPONS VIOLATION 31585 PUBLIC PEACE VIOLATION 26524 OFFENSE INVOLVING CHILDREN 14788 CRIM SEXUAL ASSAULT 14283 SEX OFFENSE 10632 GAMBLING 8847 LIQUOR LAW VIOLATION 6443 ARSON 5178 INTERFERE WITH PUBLIC OFFICER 4846 HOMICIDE 3585 KIDNAPPING 3147 INTERFERENCE WITH PUBLIC OFFICER 2471 INTIMIDATION 1985 STALKING 355 OFFENSES INVOLVING CHILDREN 219 OBSCENITY 86 PUBLIC INDECENCY 80 OTHER NARCOTIC VIOLATION 12 RITUALISM 12 NON-CRIMINAL 6 OTHER OFFENSE 2 NON-CRIMINAL (SUBJECT SPECIFIED) 2 NON - CRIMINAL real 2m37.495s user 3m0.337s sys 0m1.471s |
Это не так уж и плохо, но похоже на тип расчета, для которого сделан Spark, поэтому я посмотрел, как мне это сделать. Для начала я создал проект SBT со следующим файлом сборки:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
name := "playground" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0" libraryDependencies += "net.sf.opencsv" % "opencsv" % "2.3" ideaExcludeFolders += ".idea" ideaExcludeFolders += ".idea_modules" |
Я скачал Spark и после распаковки запустил оболочку Spark:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
$ pwd /Users/markneedham/projects/spark-play/spark-1 .1.0 /spark-1 .1.0-bin-hadoop1 $ . /bin/spark-shell ... Welcome to ____ __ / __ /__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_ /_/ /_/ \_\ version 1.1.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51) ... Spark context available as sc. scala> |
Сначала я импортирую некоторые классы, которые мне понадобятся:
1
2
3
4
5
|
scala> import au.com.bytecode.opencsv.CSVParser import au.com.bytecode.opencsv.CSVParser scala> import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD |
Теперь, следуя примеру быстрого запуска , мы создадим Resilient Distributed Dataset (RDD) из нашего файла Crime CSV:
1
2
3
4
5
6
7
|
scala> val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv" crimeFile : String = /Users/markneedham/Downloads/Crimes _ - _ 2001 _ to _ present.csv scala> val crimeData = sc.textFile(crimeFile).cache() 14 / 11 / 16 22 : 31 : 16 INFO MemoryStore : ensureFreeSpace( 32768 ) called with curMem = 0 , maxMem = 278302556 14 / 11 / 16 22 : 31 : 16 INFO MemoryStore : Block broadcast _ 0 stored as values in memory (estimated size 32.0 KB, free 265.4 MB) crimeData : org.apache.spark.rdd.RDD[String] = /Users/markneedham/Downloads/Crimes _ - _ 2001 _ to _ present.csv MappedRDD[ 1 ] at textFile at <console> : 17 |
Наш следующий шаг — обработка каждой строки файла с использованием нашего CSV Parser. Простой способ сделать это — создать новый CSVParser для каждой строки :
01
02
03
04
05
06
07
08
09
10
11
12
|
scala> crimeData.map(line = > { val parser = new CSVParser( ',' ) parser.parseLine(line).mkString( "," ) }).take( 5 ).foreach(println) 14 / 11 / 16 22 : 35 : 49 INFO SparkContext : Starting job : take at <console> : 23 ... 4 / 11 / 16 22 : 35 : 49 INFO SparkContext : Job finished : take at <console> : 23 , took 0.013904 s ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location 9464711 ,HX 114160 , 01 / 14 / 2014 05 : 00 : 00 AM, 028 XX E 80 TH ST, 0560 ,ASSAULT,SIMPLE,APARTMENT, false , true , 0422 , 004 , 7 , 46 , 08 A, 1196652 , 1852516 , 2014 , 01 / 20 / 2014 12 : 40 : 05 AM, 41.75017626412204 ,- 87.55494559131228 ,( 41.75017626412204 , - 87.55494559131228 ) 9460704 ,HX 113741 , 01 / 14 / 2014 04 : 55 : 00 AM, 091 XX S JEFFERY AVE, 031 A,ROBBERY,ARMED : HANDGUN,SIDEWALK, false , false , 0413 , 004 , 8 , 48 , 03 , 1191060 , 1844959 , 2014 , 01 / 18 / 2014 12 : 39 : 56 AM, 41.729576153145636 ,- 87.57568059471686 ,( 41.729576153145636 , - 87.57568059471686 ) 9460339 ,HX 113740 , 01 / 14 / 2014 04 : 44 : 00 AM, 040 XX W MAYPOLE AVE, 1310 ,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE, false , true , 1114 , 011 , 28 , 26 , 14 , 1149075 , 1901099 , 2014 , 01 / 16 / 2014 12 : 40 : 00 AM, 41.884543798701515 ,- 87.72803579358926 ,( 41.884543798701515 , - 87.72803579358926 ) 9461467 ,HX 114463 , 01 / 14 / 2014 04 : 43 : 00 AM, 059 XX S CICERO AVE, 0820 ,THEFT,$ 500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.), false , false , 0813 , 008 , 13 , 64 , 06 , 1145661 , 1865031 , 2014 , 01 / 16 / 2014 12 : 40 : 00 AM, 41.785633535413176 ,- 87.74148516669783 ,( 41.785633535413176 , - 87.74148516669783 ) |
Это работает, но немного расточительно каждый раз создавать новый CSVParser, поэтому вместо этого давайте просто создадим один для каждого раздела, на который Spark разбивает наш файл:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
scala> crimeData.mapPartitions(lines = > { val parser = new CSVParser( ',' ) lines.map(line = > { parser.parseLine(line).mkString( "," ) }) }).take( 5 ).foreach(println) 14 / 11 / 16 22 : 38 : 44 INFO SparkContext : Starting job : take at <console> : 25 ... 14 / 11 / 16 22 : 38 : 44 INFO SparkContext : Job finished : take at <console> : 25 , took 0.015216 s ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location 9464711 ,HX 114160 , 01 / 14 / 2014 05 : 00 : 00 AM, 028 XX E 80 TH ST, 0560 ,ASSAULT,SIMPLE,APARTMENT, false , true , 0422 , 004 , 7 , 46 , 08 A, 1196652 , 1852516 , 2014 , 01 / 20 / 2014 12 : 40 : 05 AM, 41.75017626412204 ,- 87.55494559131228 ,( 41.75017626412204 , - 87.55494559131228 ) 9460704 ,HX 113741 , 01 / 14 / 2014 04 : 55 : 00 AM, 091 XX S JEFFERY AVE, 031 A,ROBBERY,ARMED : HANDGUN,SIDEWALK, false , false , 0413 , 004 , 8 , 48 , 03 , 1191060 , 1844959 , 2014 , 01 / 18 / 2014 12 : 39 : 56 AM, 41.729576153145636 ,- 87.57568059471686 ,( 41.729576153145636 , - 87.57568059471686 ) 9460339 ,HX 113740 , 01 / 14 / 2014 04 : 44 : 00 AM, 040 XX W MAYPOLE AVE, 1310 ,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE, false , true , 1114 , 011 , 28 , 26 , 14 , 1149075 , 1901099 , 2014 , 01 / 16 / 2014 12 : 40 : 00 AM, 41.884543798701515 ,- 87.72803579358926 ,( 41.884543798701515 , - 87.72803579358926 ) 9461467 ,HX 114463 , 01 / 14 / 2014 04 : 43 : 00 AM, 059 XX S CICERO AVE, 0820 ,THEFT,$ 500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.), false , false , 0813 , 008 , 13 , 64 , 06 , 1145661 , 1865031 , 2014 , 01 / 16 / 2014 12 : 40 : 00 AM, 41.785633535413176 ,- 87.74148516669783 ,( 41.785633535413176 , - 87.74148516669783 ) |
Вы заметите, что мы все еще печатаем заголовок, который не идеален — давайте избавимся от него!
Я ожидал, что будет функция «drop», которая позволит мне сделать это, но на самом деле это не так. Вместо этого мы можем использовать наши знания о том, что первый раздел будет содержать первую строку и вырезать ее таким образом:
1
2
3
4
5
6
7
8
9
|
scala> def dropHeader(data : RDD[String]) : RDD[String] = { data.mapPartitionsWithIndex((idx, lines) = > { if (idx == 0 ) { lines.drop( 1 ) } lines }) } dropHeader : (data : org.apache.spark.rdd.RDD[String])org.apache.spark.rdd.RDD[String] |
Теперь давайте снова возьмем первые 5 строк и распечатаем их:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
scala> val withoutHeader : RDD[String] = dropHeader(crimeData) withoutHeader : org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[ 7 ] at mapPartitionsWithIndex at <console> : 14 scala> withoutHeader.mapPartitions(lines = > { val parser = new CSVParser( ',' ) lines.map(line = > { parser.parseLine(line).mkString( "," ) }) }).take( 5 ).foreach(println) 14 / 11 / 16 22 : 43 : 27 INFO SparkContext : Starting job : take at <console> : 29 ... 14 / 11 / 16 22 : 43 : 27 INFO SparkContext : Job finished : take at <console> : 29 , took 0.018557 s 9464711 ,HX 114160 , 01 / 14 / 2014 05 : 00 : 00 AM, 028 XX E 80 TH ST, 0560 ,ASSAULT,SIMPLE,APARTMENT, false , true , 0422 , 004 , 7 , 46 , 08 A, 1196652 , 1852516 , 2014 , 01 / 20 / 2014 12 : 40 : 05 AM, 41.75017626412204 ,- 87.55494559131228 ,( 41.75017626412204 , - 87.55494559131228 ) 9460704 ,HX 113741 , 01 / 14 / 2014 04 : 55 : 00 AM, 091 XX S JEFFERY AVE, 031 A,ROBBERY,ARMED : HANDGUN,SIDEWALK, false , false , 0413 , 004 , 8 , 48 , 03 , 1191060 , 1844959 , 2014 , 01 / 18 / 2014 12 : 39 : 56 AM, 41.729576153145636 ,- 87.57568059471686 ,( 41.729576153145636 , - 87.57568059471686 ) 9460339 ,HX 113740 , 01 / 14 / 2014 04 : 44 : 00 AM, 040 XX W MAYPOLE AVE, 1310 ,CRIMINAL DAMAGE,TO PROPERTY,RESIDENCE, false , true , 1114 , 011 , 28 , 26 , 14 , 1149075 , 1901099 , 2014 , 01 / 16 / 2014 12 : 40 : 00 AM, 41.884543798701515 ,- 87.72803579358926 ,( 41.884543798701515 , - 87.72803579358926 ) 9461467 ,HX 114463 , 01 / 14 / 2014 04 : 43 : 00 AM, 059 XX S CICERO AVE, 0820 ,THEFT,$ 500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.), false , false , 0813 , 008 , 13 , 64 , 06 , 1145661 , 1865031 , 2014 , 01 / 16 / 2014 12 : 40 : 00 AM, 41.785633535413176 ,- 87.74148516669783 ,( 41.785633535413176 , - 87.74148516669783 ) 9460355 ,HX 113738 , 01 / 14 / 2014 04 : 21 : 00 AM, 070 XX S PEORIA ST, 0820 ,THEFT,$ 500 AND UNDER,STREET, true , false , 0733 , 007 , 17 , 68 , 06 , 1171480 , 1858195 , 2014 , 01 / 16 / 2014 12 : 40 : 00 AM, 41.766348042591375 ,- 87.64702037047671 ,( 41.766348042591375 , - 87.64702037047671 ) |
Наконец-то мы в хорошей форме, чтобы извлечь значения из столбца «Основной тип» и посчитать, сколько раз каждое из них появляется в нашем наборе данных:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
scala> withoutHeader.mapPartitions(lines = > { val parser = new CSVParser( ',' ) lines.map(line = > { val columns = parser.parseLine(line) Array(columns( 5 )).mkString( "," ) }) }).countByValue().toList.sortBy(- _ . _ 2 ).foreach(println) 14 / 11 / 16 22 : 45 : 20 INFO SparkContext : Starting job : countByValue at <console> : 30 14 / 11 / 16 22 : 45 : 20 INFO DAGScheduler : Got job 7 (countByValue at <console> : 30 ) with 32 output partitions (allowLocal = false ) ... 14 / 11 / 16 22 : 45 : 30 INFO SparkContext : Job finished : countByValue at <console> : 30 , took 9.796565 s (THEFT, 859197 ) (BATTERY, 757530 ) (NARCOTICS, 489528 ) (CRIMINAL DAMAGE, 488209 ) (BURGLARY, 257310 ) (OTHER OFFENSE, 253964 ) (ASSAULT, 247386 ) (MOTOR VEHICLE THEFT, 197404 ) (ROBBERY, 157706 ) (DECEPTIVE PRACTICE, 137538 ) (CRIMINAL TRESPASS, 124974 ) (PROSTITUTION, 47245 ) (WEAPONS VIOLATION, 40361 ) (PUBLIC PEACE VIOLATION, 31585 ) (OFFENSE INVOLVING CHILDREN, 26524 ) (CRIM SEXUAL ASSAULT, 14788 ) (SEX OFFENSE, 14283 ) (GAMBLING, 10632 ) (LIQUOR LAW VIOLATION, 8847 ) (ARSON, 6443 ) (INTERFERE WITH PUBLIC OFFICER, 5178 ) (HOMICIDE, 4846 ) (KIDNAPPING, 3585 ) (INTERFERENCE WITH PUBLIC OFFICER, 3147 ) (INTIMIDATION, 2471 ) (STALKING, 1985 ) (OFFENSES INVOLVING CHILDREN, 355 ) (OBSCENITY, 219 ) (PUBLIC INDECENCY, 86 ) (OTHER NARCOTIC VIOLATION, 80 ) (NON-CRIMINAL, 12 ) (RITUALISM, 12 ) (OTHER OFFENSE , 6 ) (NON-CRIMINAL (SUBJECT SPECIFIED), 2 ) (NON - CRIMINAL, 2 ) |
Мы получаем те же результаты, что и с командами Unix, за исключением того, что на их вычисление ушло менее 10 секунд, что довольно здорово!
Ссылка: | Spark: проанализируйте CSV-файл и сгруппируйте по столбцу значение от нашего партнера по JCG Марка Нидхэма в блоге Марка Нидхэма |