Пару недель назад я написал, как я использовал Spark для исследования набора данных о преступности в городе Чикаго и выяснив, сколько из каждого преступления было совершено, я хотел записать это в файл CSV.
Spark предоставляет функцию saveAsTextFile, которая позволяет нам сохранять RDD, поэтому я реорганизовал свой код в следующий формат, чтобы позволить мне использовать это:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
import au.com.bytecode.opencsv.CSVParser import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext. _ def dropHeader(data : RDD[String]) : RDD[String] = { data.mapPartitionsWithIndex((idx, lines) = > { if (idx == 0 ) { lines.drop( 1 ) } lines }) } val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv" val crimeData = sc.textFile(crimeFile).cache() val withoutHeader : RDD[String] = dropHeader(crimeData) val file = "/tmp/primaryTypes.csv" FileUtil.fullyDelete( new File(file)) val partitions : RDD[(String, Int)] = withoutHeader.mapPartitions(lines = > { val parser = new CSVParser( ',' ) lines.map(line = > { val columns = parser.parseLine(line) (columns( 5 ), 1 ) }) }) val counts = partitions. reduceByKey { case (x,y) = > x + y}. sortBy { case (key, value) = > -value}. map { case (key, value) = > Array(key, value).mkString( "," ) } counts.saveAsTextFile(file) |
Если мы запустим этот код из оболочки Spark, мы получим папку с именем /tmp/primaryTypes.csv, содержащую несколько файлов деталей :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
$ ls -lah /tmp/primaryTypes .csv/ total 496 drwxr-xr-x 66 markneedham wheel 2.2K 30 Nov 07:17 . drwxrwxrwt 80 root wheel 2.7K 30 Nov 07:16 .. -rw-r--r-- 1 markneedham wheel 8B 30 Nov 07:16 ._SUCCESS.crc -rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00000.crc -rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00001.crc -rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00002.crc -rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00003.crc ... -rwxrwxrwx 1 markneedham wheel 0B 30 Nov 07:16 _SUCCESS -rwxrwxrwx 1 markneedham wheel 28B 30 Nov 07:16 part-00000 -rwxrwxrwx 1 markneedham wheel 17B 30 Nov 07:16 part-00001 -rwxrwxrwx 1 markneedham wheel 23B 30 Nov 07:16 part-00002 -rwxrwxrwx 1 markneedham wheel 16B 30 Nov 07:16 part-00003 ... |
Если мы посмотрим на некоторые из этих файлов с деталями, то увидим, что в них записаны типы преступлений и рассчитывается, как и ожидалось:
1
2
3
4
5
6
|
$ cat /tmp/primaryTypes .csv /part-00000 THEFT,859197 BATTERY,757530 $ cat /tmp/primaryTypes .csv /part-00003 BURGLARY,257310 |
Это хорошо, если мы собираемся передать эти CSV-файлы в другое задание на основе Hadoop, но я на самом деле хочу один CSV-файл, так что это не совсем то, что я хочу.
Один из способов добиться этого — заставить все рассчитываться на одном разделе, что будет означать, что мы генерируем только один файл детали:
1
2
3
4
5
6
7
|
val counts = partitions.repartition( 1 ). reduceByKey { case (x,y) = > x + y}. sortBy { case (key, value) = > -value}. map { case (key, value) = > Array(key, value).mkString( "," ) } counts.saveAsTextFile(file) |
part-00000 теперь выглядит так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
$ cat !$ cat /tmp/primaryTypes .csv /part-00000 THEFT,859197 BATTERY,757530 NARCOTICS,489528 CRIMINAL DAMAGE,488209 BURGLARY,257310 OTHER OFFENSE,253964 ASSAULT,247386 MOTOR VEHICLE THEFT,197404 ROBBERY,157706 DECEPTIVE PRACTICE,137538 CRIMINAL TRESPASS,124974 PROSTITUTION,47245 WEAPONS VIOLATION,40361 PUBLIC PEACE VIOLATION,31585 OFFENSE INVOLVING CHILDREN,26524 CRIM SEXUAL ASSAULT,14788 SEX OFFENSE,14283 GAMBLING,10632 LIQUOR LAW VIOLATION,8847 ARSON,6443 INTERFERE WITH PUBLIC OFFICER,5178 HOMICIDE,4846 KIDNAPPING,3585 INTERFERENCE WITH PUBLIC OFFICER,3147 INTIMIDATION,2471 STALKING,1985 OFFENSES INVOLVING CHILDREN,355 OBSCENITY,219 PUBLIC INDECENCY,86 OTHER NARCOTIC VIOLATION,80 NON-CRIMINAL,12 RITUALISM,12 OTHER OFFENSE ,6 NON - CRIMINAL,2 NON-CRIMINAL (SUBJECT SPECIFIED),2 |
Это работает, но это немного медленнее, чем когда мы делали агрегацию по разделам, так что это не идеально.
Вместо этого мы можем использовать одну из функций слияния Hadoop, которая объединяет файлы деталей в один файл.
Сначала мы импортируем Hadoop в наш файл SBT:
1
|
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2" |
Теперь давайте перенесем нашу функцию слияния в оболочку Spark:
1
2
3
4
5
6
7
8
|
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs. _ def merge(srcPath : String, dstPath : String) : Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false , hadoopConfig, null ) } |
А теперь давайте воспользуемся этим:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
val file = "/tmp/primaryTypes.csv" FileUtil.fullyDelete( new File(file)) val destinationFile = "/tmp/singlePrimaryTypes.csv" FileUtil.fullyDelete( new File(destinationFile)) val counts = partitions. reduceByKey { case (x,y) = > x + y}. sortBy { case (key, value) = > -value}. map { case (key, value) = > Array(key, value).mkString( "," ) } counts.saveAsTextFile(file) merge(file, destinationFile) |
И теперь у нас есть лучшее из обоих миров:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
$ cat /tmp/singlePrimaryTypes .csv THEFT,859197 BATTERY,757530 NARCOTICS,489528 CRIMINAL DAMAGE,488209 BURGLARY,257310 OTHER OFFENSE,253964 ASSAULT,247386 MOTOR VEHICLE THEFT,197404 ROBBERY,157706 DECEPTIVE PRACTICE,137538 CRIMINAL TRESPASS,124974 PROSTITUTION,47245 WEAPONS VIOLATION,40361 PUBLIC PEACE VIOLATION,31585 OFFENSE INVOLVING CHILDREN,26524 CRIM SEXUAL ASSAULT,14788 SEX OFFENSE,14283 GAMBLING,10632 LIQUOR LAW VIOLATION,8847 ARSON,6443 INTERFERE WITH PUBLIC OFFICER,5178 HOMICIDE,4846 KIDNAPPING,3585 INTERFERENCE WITH PUBLIC OFFICER,3147 INTIMIDATION,2471 STALKING,1985 OFFENSES INVOLVING CHILDREN,355 OBSCENITY,219 PUBLIC INDECENCY,86 OTHER NARCOTIC VIOLATION,80 RITUALISM,12 NON-CRIMINAL,12 OTHER OFFENSE ,6 NON - CRIMINAL,2 NON-CRIMINAL (SUBJECT SPECIFIED),2 |
- Полный код доступен как суть, если вы хотите поиграть с ним.
Ссылка: | Spark: напишите в CSV-файл от нашего партнера по JCG Марка Нидхэма в блоге Марка Нидхэма . |