В своем последнем сообщении в блоге я показал, как писать в один CSV-файл, используя Spark и Hadoop, и следующее, что я хотел сделать, это добавить строку заголовка в результирующую строку.
Функция Hadoop FileUtil # copyMerge принимает параметр String, но добавляет этот текст в конец каждого файла раздела, что не совсем то, что мы хотим.
Однако, если мы скопируем эту функцию в наш собственный класс FileUtil, мы сможем реструктурировать его так, чтобы он делал то, что нам нужно
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.IOUtils;import java.io.IOException; public class MyFileUtil { public static boolean copyMergeWithHeader(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String header) throws IOException { dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false); if(!srcFS.getFileStatus(srcDir).isDir()) { return false; } else { FSDataOutputStream out = dstFS.create(dstFile); if(header != null) { out.write((header + "\n").getBytes("UTF-8")); } try { FileStatus[] contents = srcFS.listStatus(srcDir); for(int i = 0; i < contents.length; ++i) { if(!contents[i].isDir()) { FSDataInputStream in = srcFS.open(contents[i].getPath()); try { IOUtils.copyBytes(in, out, conf, false); } finally { in.close(); } } } } finally { out.close(); } return deleteSource?srcFS.delete(srcDir, true):true; } } private static Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) throws IOException { if(dstFS.exists(dst)) { FileStatus sdst = dstFS.getFileStatus(dst); if(sdst.isDir()) { if(null == srcName) { throw new IOException("Target " + dst + " is a directory"); } return checkDest((String)null, dstFS, new Path(dst, srcName), overwrite); } if(!overwrite) { throw new IOException("Target " + dst + " already exists"); } } return dst; }} |
Затем мы можем обновить нашу функцию слияния для вызова этого вместо:
|
1
2
3
4
5
|
def merge(srcPath: String, dstPath: String, header:String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, header)} |
Мы называем слияние из нашего кода следующим образом:
|
1
|
merge(file, destinationFile, "type,count") |
Я не был уверен, как импортировать мой класс на основе Java в оболочку Spark, поэтому я скомпилировал код в JAR и вместо этого отправил его в виде задания:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
$ sbt package[info] Loading global plugins from /Users/markneedham/.sbt/0.13/plugins[info] Loading project definition from /Users/markneedham/projects/spark-play/playground/project[info] Set current project to playground (in build file:/Users/markneedham/projects/spark-play/playground/)[info] Compiling 3 Scala sources to /Users/markneedham/projects/spark-play/playground/target/scala-2.10/classes...[info] Packaging /Users/markneedham/projects/spark-play/playground/target/scala-2.10/playground_2.10-1.0.jar ...[info] Done packaging.[success] Total time: 8 s, completed 30-Nov-2014 08:12:26 $ time ./bin/spark-submit --class "WriteToCsvWithHeader" --master local[4] /path/to/playground/target/scala-2.10/playground_2.10-1.0.jarSpark assembly has been built with Hive, including Datanucleus jars on classpathUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.propertie...14/11/30 08:16:15 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool14/11/30 08:16:15 INFO SparkContext: Job finished: saveAsTextFile at WriteToCsvWithHeader.scala:49, took 0.589036 s real 0m13.061suser 0m38.977ssys 0m3.393s |
И если мы посмотрим на наш файл назначения:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
$ cat /tmp/singlePrimaryTypes.csvtype,countTHEFT,859197BATTERY,757530NARCOTICS,489528CRIMINAL DAMAGE,488209BURGLARY,257310OTHER OFFENSE,253964ASSAULT,247386MOTOR VEHICLE THEFT,197404ROBBERY,157706DECEPTIVE PRACTICE,137538CRIMINAL TRESPASS,124974PROSTITUTION,47245WEAPONS VIOLATION,40361PUBLIC PEACE VIOLATION,31585OFFENSE INVOLVING CHILDREN,26524CRIM SEXUAL ASSAULT,14788SEX OFFENSE,14283GAMBLING,10632LIQUOR LAW VIOLATION,8847ARSON,6443INTERFERE WITH PUBLIC OFFICER,5178HOMICIDE,4846KIDNAPPING,3585INTERFERENCE WITH PUBLIC OFFICER,3147INTIMIDATION,2471STALKING,1985OFFENSES INVOLVING CHILDREN,355OBSCENITY,219PUBLIC INDECENCY,86OTHER NARCOTIC VIOLATION,80RITUALISM,12NON-CRIMINAL,12OTHER OFFENSE ,6NON - CRIMINAL,2NON-CRIMINAL (SUBJECT SPECIFIED),2 |
Счастливые дни!
- Код доступен как суть, если вы хотите увидеть все детали.
| Ссылка: | Spark: запись в CSV-файл с заголовком с помощью saveAsFile от нашего партнера по JCG Марка Нидхэма в блоге Марка Нидхэма . |