Статьи

Spark: запись в CSV-файл с заголовком с помощью saveAsFile

В своем последнем сообщении в блоге я показал, как писать в один CSV-файл, используя Spark и Hadoop, и следующее, что я хотел сделать, это добавить строку заголовка в результирующую строку.

Функция Hadoop FileUtil # copyMerge принимает параметр String, но добавляет этот текст в конец каждого файла раздела, что не совсем то, что мы хотим.

Однако, если мы скопируем эту функцию в наш собственный класс FileUtil, мы сможем реструктурировать его так, чтобы он делал то, что нам нужно

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
  
public class MyFileUtil {
    public static boolean copyMergeWithHeader(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String header) throws IOException {
        dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
        if(!srcFS.getFileStatus(srcDir).isDir()) {
            return false;
        } else {
            FSDataOutputStream out = dstFS.create(dstFile);
            if(header != null) {
                out.write((header + "\n").getBytes("UTF-8"));
            }
  
            try {
                FileStatus[] contents = srcFS.listStatus(srcDir);
  
                for(int i = 0; i < contents.length; ++i) {
                    if(!contents[i].isDir()) {
                        FSDataInputStream in = srcFS.open(contents[i].getPath());
  
                        try {
                            IOUtils.copyBytes(in, out, conf, false);
  
                        } finally {
                            in.close();
                        }
                    }
                }
            } finally {
                out.close();
            }
  
            return deleteSource?srcFS.delete(srcDir, true):true;
        }
    }
  
    private static Path checkDest(String srcName, FileSystem dstFS, Path dst, boolean overwrite) throws IOException {
        if(dstFS.exists(dst)) {
            FileStatus sdst = dstFS.getFileStatus(dst);
            if(sdst.isDir()) {
                if(null == srcName) {
                    throw new IOException("Target " + dst + " is a directory");
                }
  
                return checkDest((String)null, dstFS, new Path(dst, srcName), overwrite);
            }
  
            if(!overwrite) {
                throw new IOException("Target " + dst + " already exists");
            }
        }
        return dst;
    }
}

Затем мы можем обновить нашу функцию слияния для вызова этого вместо:

1
2
3
4
5
def merge(srcPath: String, dstPath: String, header:String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  MyFileUtil.copyMergeWithHeader(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, header)
}

Мы называем слияние из нашего кода следующим образом:

1
merge(file, destinationFile, "type,count")

Я не был уверен, как импортировать мой класс на основе Java в оболочку Spark, поэтому я скомпилировал код в JAR и вместо этого отправил его в виде задания:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
$ sbt package
[info] Loading global plugins from /Users/markneedham/.sbt/0.13/plugins
[info] Loading project definition from /Users/markneedham/projects/spark-play/playground/project
[info] Set current project to playground (in build file:/Users/markneedham/projects/spark-play/playground/)
[info] Compiling 3 Scala sources to /Users/markneedham/projects/spark-play/playground/target/scala-2.10/classes...
[info] Packaging /Users/markneedham/projects/spark-play/playground/target/scala-2.10/playground_2.10-1.0.jar ...
[info] Done packaging.
[success] Total time: 8 s, completed 30-Nov-2014 08:12:26
  
$ time ./bin/spark-submit --class "WriteToCsvWithHeader" --master local[4] /path/to/playground/target/scala-2.10/playground_2.10-1.0.jar
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.propertie
...
14/11/30 08:16:15 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
14/11/30 08:16:15 INFO SparkContext: Job finished: saveAsTextFile at WriteToCsvWithHeader.scala:49, took 0.589036 s
  
real    0m13.061s
user    0m38.977s
sys 0m3.393s

И если мы посмотрим на наш файл назначения:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
$ cat /tmp/singlePrimaryTypes.csv
type,count
THEFT,859197
BATTERY,757530
NARCOTICS,489528
CRIMINAL DAMAGE,488209
BURGLARY,257310
OTHER OFFENSE,253964
ASSAULT,247386
MOTOR VEHICLE THEFT,197404
ROBBERY,157706
DECEPTIVE PRACTICE,137538
CRIMINAL TRESPASS,124974
PROSTITUTION,47245
WEAPONS VIOLATION,40361
PUBLIC PEACE VIOLATION,31585
OFFENSE INVOLVING CHILDREN,26524
CRIM SEXUAL ASSAULT,14788
SEX OFFENSE,14283
GAMBLING,10632
LIQUOR LAW VIOLATION,8847
ARSON,6443
INTERFERE WITH PUBLIC OFFICER,5178
HOMICIDE,4846
KIDNAPPING,3585
INTERFERENCE WITH PUBLIC OFFICER,3147
INTIMIDATION,2471
STALKING,1985
OFFENSES INVOLVING CHILDREN,355
OBSCENITY,219
PUBLIC INDECENCY,86
OTHER NARCOTIC VIOLATION,80
RITUALISM,12
NON-CRIMINAL,12
OTHER OFFENSE ,6
NON - CRIMINAL,2
NON-CRIMINAL (SUBJECT SPECIFIED),2

Счастливые дни!

  • Код доступен как суть, если вы хотите увидеть все детали.