В своем последнем сообщении в блоге я показал, как писать в один CSV-файл, используя Spark и Hadoop, и следующее, что я хотел сделать, это добавить строку заголовка в результирующую строку.
Функция Hadoop FileUtil # copyMerge принимает параметр String, но добавляет этот текст в конец каждого файла раздела, что не совсем то, что мы хотим.
Однако, если мы скопируем эту функцию в наш собственный класс FileUtil, мы сможем реструктурировать его так, чтобы он делал то, что нам нужно
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import java.io.IOException; public class MyFileUtil { public static boolean copyMergeWithHeader(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String header) throws IOException { dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false ); if (!srcFS.getFileStatus(srcDir).isDir()) { return false ; } else { FSDataOutputStream out = dstFS.create(dstFile); if (header != null ) { out.write((header + "\n" ).getBytes( "UTF-8" )); } try { FileStatus[] contents = srcFS.listStatus(srcDir); for ( int i = 0 ; i < contents.length; ++i) { if (!contents[i].isDir()) { FSDataInputStream in = srcFS.open(contents[i].getPath()); try { IOUtils.copyBytes(in, out, conf, false ); } finally { in.close(); } } } } finally { out.close(); } return deleteSource?srcFS.delete(srcDir, true ): true ; } } private static Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) throws IOException { if (dstFS.exists(dst)) { FileStatus sdst = dstFS.getFileStatus(dst); if (sdst.isDir()) { if ( null == srcName) { throw new IOException( "Target " + dst + " is a directory" ); } return checkDest((String) null , dstFS, new Path(dst, srcName), overwrite); } if (!overwrite) { throw new IOException( "Target " + dst + " already exists" ); } } return dst; } } |
Затем мы можем обновить нашу функцию слияния для вызова этого вместо:
1
2
3
4
5
|
def merge(srcPath : String, dstPath : String, header : String) : Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false , hadoopConfig, header) } |
Мы называем слияние из нашего кода следующим образом:
1
|
merge(file, destinationFile, "type,count" ) |
Я не был уверен, как импортировать мой класс на основе Java в оболочку Spark, поэтому я скомпилировал код в JAR и вместо этого отправил его в виде задания:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
$ sbt package [info] Loading global plugins from /Users/markneedham/ .sbt /0 .13 /plugins [info] Loading project definition from /Users/markneedham/projects/spark-play/playground/project [info] Set current project to playground ( in build file : /Users/markneedham/projects/spark-play/playground/ ) [info] Compiling 3 Scala sources to /Users/markneedham/projects/spark-play/playground/target/scala-2 .10 /classes ... [info] Packaging /Users/markneedham/projects/spark-play/playground/target/scala-2 .10 /playground_2 .10-1.0.jar ... [info] Done packaging. [success] Total time : 8 s, completed 30-Nov-2014 08:12:26 $ time . /bin/spark-submit --class "WriteToCsvWithHeader" --master local [4] /path/to/playground/target/scala-2 .10 /playground_2 .10-1.0.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org /apache/spark/log4j-defaults .propertie ... 14 /11/30 08:16:15 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14 /11/30 08:16:15 INFO SparkContext: Job finished: saveAsTextFile at WriteToCsvWithHeader.scala:49, took 0.589036 s real 0m13.061s user 0m38.977s sys 0m3.393s |
И если мы посмотрим на наш файл назначения:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
$ cat /tmp/singlePrimaryTypes .csv type ,count THEFT,859197 BATTERY,757530 NARCOTICS,489528 CRIMINAL DAMAGE,488209 BURGLARY,257310 OTHER OFFENSE,253964 ASSAULT,247386 MOTOR VEHICLE THEFT,197404 ROBBERY,157706 DECEPTIVE PRACTICE,137538 CRIMINAL TRESPASS,124974 PROSTITUTION,47245 WEAPONS VIOLATION,40361 PUBLIC PEACE VIOLATION,31585 OFFENSE INVOLVING CHILDREN,26524 CRIM SEXUAL ASSAULT,14788 SEX OFFENSE,14283 GAMBLING,10632 LIQUOR LAW VIOLATION,8847 ARSON,6443 INTERFERE WITH PUBLIC OFFICER,5178 HOMICIDE,4846 KIDNAPPING,3585 INTERFERENCE WITH PUBLIC OFFICER,3147 INTIMIDATION,2471 STALKING,1985 OFFENSES INVOLVING CHILDREN,355 OBSCENITY,219 PUBLIC INDECENCY,86 OTHER NARCOTIC VIOLATION,80 RITUALISM,12 NON-CRIMINAL,12 OTHER OFFENSE ,6 NON - CRIMINAL,2 NON-CRIMINAL (SUBJECT SPECIFIED),2 |
Счастливые дни!
- Код доступен как суть, если вы хотите увидеть все детали.
Ссылка: | Spark: запись в CSV-файл с заголовком с помощью saveAsFile от нашего партнера по JCG Марка Нидхэма в блоге Марка Нидхэма . |