Пару недель назад я написал, как я использовал Spark для исследования набора данных о преступности в городе Чикаго и выяснив, сколько из каждого преступления было совершено, я хотел записать это в файл CSV.
Spark предоставляет функцию saveAsTextFile, которая позволяет нам сохранять RDD, поэтому я реорганизовал свой код в следующий формат, чтобы позволить мне использовать это:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
import au.com.bytecode.opencsv.CSVParserimport org.apache.spark.rdd.RDDimport org.apache.spark.SparkContext._ def dropHeader(data: RDD[String]): RDD[String] = { data.mapPartitionsWithIndex((idx, lines) => { if (idx == 0) { lines.drop(1) } lines })} val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv" val crimeData = sc.textFile(crimeFile).cache()val withoutHeader: RDD[String] = dropHeader(crimeData) val file = "/tmp/primaryTypes.csv"FileUtil.fullyDelete(new File(file)) val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => { val parser = new CSVParser(',') lines.map(line => { val columns = parser.parseLine(line) (columns(5), 1) })}) val counts = partitions. reduceByKey {case (x,y) => x + y}. sortBy {case (key, value) => -value}. map { case (key, value) => Array(key, value).mkString(",") } counts.saveAsTextFile(file) |
Если мы запустим этот код из оболочки Spark, мы получим папку с именем /tmp/primaryTypes.csv, содержащую несколько файлов деталей :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
$ ls -lah /tmp/primaryTypes.csv/total 496drwxr-xr-x 66 markneedham wheel 2.2K 30 Nov 07:17 .drwxrwxrwt 80 root wheel 2.7K 30 Nov 07:16 ..-rw-r--r-- 1 markneedham wheel 8B 30 Nov 07:16 ._SUCCESS.crc-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00000.crc-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00001.crc-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00002.crc-rw-r--r-- 1 markneedham wheel 12B 30 Nov 07:16 .part-00003.crc...-rwxrwxrwx 1 markneedham wheel 0B 30 Nov 07:16 _SUCCESS-rwxrwxrwx 1 markneedham wheel 28B 30 Nov 07:16 part-00000-rwxrwxrwx 1 markneedham wheel 17B 30 Nov 07:16 part-00001-rwxrwxrwx 1 markneedham wheel 23B 30 Nov 07:16 part-00002-rwxrwxrwx 1 markneedham wheel 16B 30 Nov 07:16 part-00003... |
Если мы посмотрим на некоторые из этих файлов с деталями, то увидим, что в них записаны типы преступлений и рассчитывается, как и ожидалось:
|
1
2
3
4
5
6
|
$ cat /tmp/primaryTypes.csv/part-00000THEFT,859197BATTERY,757530 $ cat /tmp/primaryTypes.csv/part-00003BURGLARY,257310 |
Это хорошо, если мы собираемся передать эти CSV-файлы в другое задание на основе Hadoop, но я на самом деле хочу один CSV-файл, так что это не совсем то, что я хочу.
Один из способов добиться этого — заставить все рассчитываться на одном разделе, что будет означать, что мы генерируем только один файл детали:
|
1
2
3
4
5
6
7
|
val counts = partitions.repartition(1). reduceByKey {case (x,y) => x + y}. sortBy {case (key, value) => -value}. map { case (key, value) => Array(key, value).mkString(",") } counts.saveAsTextFile(file) |
part-00000 теперь выглядит так:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
$ cat !$cat /tmp/primaryTypes.csv/part-00000THEFT,859197BATTERY,757530NARCOTICS,489528CRIMINAL DAMAGE,488209BURGLARY,257310OTHER OFFENSE,253964ASSAULT,247386MOTOR VEHICLE THEFT,197404ROBBERY,157706DECEPTIVE PRACTICE,137538CRIMINAL TRESPASS,124974PROSTITUTION,47245WEAPONS VIOLATION,40361PUBLIC PEACE VIOLATION,31585OFFENSE INVOLVING CHILDREN,26524CRIM SEXUAL ASSAULT,14788SEX OFFENSE,14283GAMBLING,10632LIQUOR LAW VIOLATION,8847ARSON,6443INTERFERE WITH PUBLIC OFFICER,5178HOMICIDE,4846KIDNAPPING,3585INTERFERENCE WITH PUBLIC OFFICER,3147INTIMIDATION,2471STALKING,1985OFFENSES INVOLVING CHILDREN,355OBSCENITY,219PUBLIC INDECENCY,86OTHER NARCOTIC VIOLATION,80NON-CRIMINAL,12RITUALISM,12OTHER OFFENSE ,6NON - CRIMINAL,2NON-CRIMINAL (SUBJECT SPECIFIED),2 |
Это работает, но это немного медленнее, чем когда мы делали агрегацию по разделам, так что это не идеально.
Вместо этого мы можем использовать одну из функций слияния Hadoop, которая объединяет файлы деталей в один файл.
Сначала мы импортируем Hadoop в наш файл SBT:
|
1
|
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2" |
Теперь давайте перенесем нашу функцию слияния в оболочку Spark:
|
1
2
3
4
5
6
7
8
|
import org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs._ def merge(srcPath: String, dstPath: String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)} |
А теперь давайте воспользуемся этим:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
val file = "/tmp/primaryTypes.csv"FileUtil.fullyDelete(new File(file)) val destinationFile= "/tmp/singlePrimaryTypes.csv"FileUtil.fullyDelete(new File(destinationFile)) val counts = partitions.reduceByKey {case (x,y) => x + y}.sortBy {case (key, value) => -value}.map { case (key, value) => Array(key, value).mkString(",") } counts.saveAsTextFile(file) merge(file, destinationFile) |
И теперь у нас есть лучшее из обоих миров:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
$ cat /tmp/singlePrimaryTypes.csvTHEFT,859197BATTERY,757530NARCOTICS,489528CRIMINAL DAMAGE,488209BURGLARY,257310OTHER OFFENSE,253964ASSAULT,247386MOTOR VEHICLE THEFT,197404ROBBERY,157706DECEPTIVE PRACTICE,137538CRIMINAL TRESPASS,124974PROSTITUTION,47245WEAPONS VIOLATION,40361PUBLIC PEACE VIOLATION,31585OFFENSE INVOLVING CHILDREN,26524CRIM SEXUAL ASSAULT,14788SEX OFFENSE,14283GAMBLING,10632LIQUOR LAW VIOLATION,8847ARSON,6443INTERFERE WITH PUBLIC OFFICER,5178HOMICIDE,4846KIDNAPPING,3585INTERFERENCE WITH PUBLIC OFFICER,3147INTIMIDATION,2471STALKING,1985OFFENSES INVOLVING CHILDREN,355OBSCENITY,219PUBLIC INDECENCY,86OTHER NARCOTIC VIOLATION,80RITUALISM,12NON-CRIMINAL,12OTHER OFFENSE ,6NON - CRIMINAL,2NON-CRIMINAL (SUBJECT SPECIFIED),2 |
- Полный код доступен как суть, если вы хотите поиграть с ним.
| Ссылка: | Spark: напишите в CSV-файл от нашего партнера по JCG Марка Нидхэма в блоге Марка Нидхэма . |