Статьи

Spark: запись в файл CSV

Пару недель назад я написал, как я использовал Spark для исследования набора данных о преступности в городе Чикаго и выяснив, сколько из каждого преступления было совершено, я хотел записать это в файл CSV.

Spark предоставляет функцию saveAsTextFile, которая позволяет нам сохранять RDD, поэтому я реорганизовал свой код в следующий формат, чтобы позволить мне использовать это:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import au.com.bytecode.opencsv.CSVParser
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
  
def dropHeader(data: RDD[String]): RDD[String] = {
  data.mapPartitionsWithIndex((idx, lines) => {
    if (idx == 0) {
      lines.drop(1)
    }
    lines
  })
}
  
val crimeFile = "/Users/markneedham/Downloads/Crimes_-_2001_to_present.csv"
  
val crimeData = sc.textFile(crimeFile).cache()
val withoutHeader: RDD[String] = dropHeader(crimeData)
  
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
  
val partitions: RDD[(String, Int)] = withoutHeader.mapPartitions(lines => {
  val parser = new CSVParser(',')
  lines.map(line => {
    val columns = parser.parseLine(line)
    (columns(5), 1)
  })
})
  
val counts = partitions.
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }
  
counts.saveAsTextFile(file)

Если мы запустим этот код из оболочки Spark, мы получим папку с именем /tmp/primaryTypes.csv, содержащую несколько файлов деталей :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
$ ls -lah /tmp/primaryTypes.csv/
total 496
drwxr-xr-x  66 markneedham  wheel   2.2K 30 Nov 07:17 .
drwxrwxrwt  80 root         wheel   2.7K 30 Nov 07:16 ..
-rw-r--r--   1 markneedham  wheel     8B 30 Nov 07:16 ._SUCCESS.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00000.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00001.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00002.crc
-rw-r--r--   1 markneedham  wheel    12B 30 Nov 07:16 .part-00003.crc
...
-rwxrwxrwx   1 markneedham  wheel     0B 30 Nov 07:16 _SUCCESS
-rwxrwxrwx   1 markneedham  wheel    28B 30 Nov 07:16 part-00000
-rwxrwxrwx   1 markneedham  wheel    17B 30 Nov 07:16 part-00001
-rwxrwxrwx   1 markneedham  wheel    23B 30 Nov 07:16 part-00002
-rwxrwxrwx   1 markneedham  wheel    16B 30 Nov 07:16 part-00003
...

Если мы посмотрим на некоторые из этих файлов с деталями, то увидим, что в них записаны типы преступлений и рассчитывается, как и ожидалось:

1
2
3
4
5
6
$ cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
  
$ cat /tmp/primaryTypes.csv/part-00003
BURGLARY,257310

Это хорошо, если мы собираемся передать эти CSV-файлы в другое задание на основе Hadoop, но я на самом деле хочу один CSV-файл, так что это не совсем то, что я хочу.

Один из способов добиться этого — заставить все рассчитываться на одном разделе, что будет означать, что мы генерируем только один файл детали:

1
2
3
4
5
6
7
val counts = partitions.repartition(1).
  reduceByKey {case (x,y) => x + y}.
  sortBy {case (key, value) => -value}.
  map { case (key, value) => Array(key, value).mkString(",") }
  
  
counts.saveAsTextFile(file)

part-00000 теперь выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
$ cat !$
cat /tmp/primaryTypes.csv/part-00000
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
NON-CRIMINAL,12
RITUALISM,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

Это работает, но это немного медленнее, чем когда мы делали агрегацию по разделам, так что это не идеально.

Вместо этого мы можем использовать одну из функций слияния Hadoop, которая объединяет файлы деталей в один файл.

Сначала мы импортируем Hadoop в наш файл SBT:

1
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.5.2"

Теперь давайте перенесем нашу функцию слияния в оболочку Spark:

1
2
3
4
5
6
7
8
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
  
def merge(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

А теперь давайте воспользуемся этим:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
val file = "/tmp/primaryTypes.csv"
FileUtil.fullyDelete(new File(file))
  
val destinationFile= "/tmp/singlePrimaryTypes.csv"
FileUtil.fullyDelete(new File(destinationFile))
  
val counts = partitions.
reduceByKey {case (x,y) => x + y}.
sortBy {case (key, value) => -value}.
map { case (key, value) => Array(key, value).mkString(",") }
  
counts.saveAsTextFile(file)
  
merge(file, destinationFile)

И теперь у нас есть лучшее из обоих миров:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
$ cat /tmp/singlePrimaryTypes.csv
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2
  • Полный код доступен как суть, если вы хотите поиграть с ним.
Ссылка: Spark: напишите в CSV-файл от нашего партнера по JCG Марка Нидхэма в блоге Марка Нидхэма .