Статьи

Scala и Hadoop: рука об руку в Twitter

Если вы читали статью, опубликованную Google Джеффри Дином и Санджаем Гемаватом (MapReduce: упрощенная обработка данных на больших кластерах ), они показали, что их работа была вдохновлена ​​концепцией функциональных языков: «Наша абстракция основана на карте и сокращает примитивы. присутствует в Лиспе и многих других функциональных языках…. Наше использование функциональной модели с заданной пользователем картой и операциями сокращения позволяет нам легко распараллеливать большие вычисления и использовать повторное выполнение в качестве основного механизма обеспечения отказоустойчивости ».

Учитывая тот факт, что Scala является языком программирования, который сочетает в себе объективно-ориентированное и функциональное программирование и работает на JVM, это довольно естественная эволюция — внедрение Scala в среду Hadoop. Это то, что сделали инженеры Twitter. (Подробнее о том, как Scala используется в Twitter: « Twitter on Scala » и « Почему и как Scala в Twitter »). Scala имеет мощную поддержку отображения, фильтрации, сопоставления с образцом (регулярные выражения), поэтому он очень хорошо подходит для заданий MapReduce.

обжигающий

Twitter Реализованная в Scala реализация MapReduce — Scalding — основана на Cascading Java API (отсюда и название — по сути, это библиотека Scala, построенная на основе Cascading API) и была открыта в этом году. Код можно найти в github .

Предпосылки

Чтобы использовать ожоги, вам нужно установить Scala, вы можете скачать последнюю версию здесь . На момент написания этой статьи последней стабильной версией является Scala 2.9.2, которая также использовалась в моих тестах.

$ scala
Welcome to Scala version 2.9.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_31).
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Вам также нужен sbt, инструмент для сборки scala. Это можно скачать здесь . Если вы не хотите собирать код из исходного кода, вы можете просто получить файл jar и создать файл sbt, как описано в вики sbt Getting Started . Файл sbt является однострочным, но требует довольно много памяти, поэтому, если вы находитесь в облачной среде, дополнительная небольшая виртуальная машина может не соответствовать вашим потребностям (это была стена, которую я использовал в начале).

java -Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=384M -jar `dirname $0`/sbt-launch.jar "$@"

Кроме того, вам понадобится ruby, потому что есть оболочка ruby ​​для простого запуска задания Hadoop. У меня был ruby ​​1.8.6 для тестов.

ruby --version
ruby 1.8.6 (2007-09-24 patchlevel 111) [x86_64-linux]

Погружение в ошпаривание (Дань Марку Пилигриму)

После того, как вы загрузили код из хранилища Scalding с помощью команды git clone ( git clone [email protected]: twitter / scalding.git ) или получили zip-файл (на момент написания этой статьи он назывался twitter-scalding-0.4.1-15 -ga54a5a3.zip), вам нужно выполнить следующие команды:
$ sbt update
$ sbt test
$ sbt assembly

Последний создает файл jar, который используется сценарием оболочки ruby ​​для запуска заданий hadoop — я вернулся к этому позже.

Так как примеры «Приступая к работе» полны WordCounts, я собирался попробовать что-то другое — я хотел реализовать 

$ hadoop jar hadoop-examples — *. jar grep input output Привет

Код — не то, что этого никогда не видели, просто на вкус. В приложении вы увидите исходный код Grep Hadoop из дистрибутива и можете оценить сокращенный код Scala. Не поймите меня неправильно, эта статья не предназначена для сравнения Java со Scala в любой форме или форме, но стиль Scala все же замечательный.

GrepJob.scala

import com.twitter.scalding._
import scala.util.matching.Regex

class GrepJob(args : Args) extends Job(args) {
    val pattern = new Regex(args("regexp"))

    TextLine(args("input"))
    .flatMap('line -> 'word)  { line : String => line.split("\\s+") }
    .flatMap('word -> 'match) { word : String => pattern.findAllIn(word).toList }
    .groupBy('match) { _.size }
    .write(Tsv(args("output")))
}

Вы можете запустить код с помощью вышеупомянутой оболочки rud scald.rb, которая находится в каталоге скриптов. Код может быть запущен локально или в формате hdf (аналогичная концепция решения Python Yelp для mrjob )

Во-первых, стандартный пример Hadoop grep:

$ hadoop jar /opt/hadoop/hadoop-examples-0.20.203.0.jar grep input output Hello
$ hadoop fs -cat output/part*
3       Hello

Тогда Scalding в локальном режиме выглядит следующим образом (но прежде чем запускать его с помощью скрипта scald.rb, вам нужно указать хост, на котором вы хотите запустить задание Hadoop. Для этого необходимо изменить переменную HOST в файле scald.rb:

HOST = ”my.remote.host” #, где задание выполняется и выполняется

В моем случае это было

HOST =»hadoopmaster»

Также я рекомендую изменить переменную REDUCERS в файле scald.rb, по умолчанию она равна 100.)

scripts/scald.rb --local GrepJob.scala --input input.txt  --output output.txt --regexp  Hello
/root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar
Hello
java -Xmx3g -cp /root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar:/tmp/GrepJob.jar com.twitter.scalding.Tool GrepJob --local --input input.txt --output output.txt --regexp Hello12/04/28 15:58:05 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+]
12/04/28 15:58:05 INFO flow.Flow: [] starting
12/04/28 15:58:05 INFO flow.Flow: []  source: FileTap["TextLine[['num', 'line']->[ALL]]"]["input.txt"]"]
12/04/28 15:58:05 INFO flow.Flow: []  sink: FileTap["TextDelimited[[UNKNOWN]->[ALL]]"]["output.txt"]"]
12/04/28 15:58:05 INFO flow.Flow: []  parallel execution is enabled: true
12/04/28 15:58:05 INFO flow.Flow: []  starting jobs: 1
12/04/28 15:58:05 INFO flow.Flow: []  allocating threads: 1
12/04/28 15:58:05 INFO planner.FlowStep: [] starting step: (1/1) local
12/04/28 15:58:05 INFO assembly.AggregateBy: using threshold value: 100000

$ cat output.txt
Hello   3

И, наконец, Масштабирование по умолчанию, режим hdfs:

scripts/scald.rb --hdfs GrepJob.scala --input input/input.txt --output output --regexp Hello
/root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar

ssh -C hadoopmaster HADOOP_CLASSPATH=/usr/share/java/hadoop-lzo-0.4.14.jar:scalding-assembly-0.4.1.jar:job-jars/GrepJob.jar hadoop jar scalding-assembly-0.4.1.jar -libjars job-jars/GrepJob.jar -Dmapred.reduce.tasks=1 GrepJob --hdfs --input input/input.txt --output output --regexp Hello12/04/28 15:55:53 INFO hadoop.HadoopUtil: resolving application jar from found main method on: com.twitter.scalding.Tool$

12/04/28 15:55:53 INFO hadoop.HadoopPlanner: using application jar: /root/scalding-assembly-0.4.1.jar
12/04/28 15:55:53 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+]
12/04/28 15:55:53 INFO flow.Flow: [] starting
12/04/28 15:55:53 INFO flow.Flow: [] source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["input/input.txt"]"]
12/04/28 15:55:53 INFO flow.Flow: [] sink: Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"]
12/04/28 15:55:53 INFO flow.Flow: [] parallel execution is enabled: true
12/04/28 15:55:53 INFO flow.Flow: [] starting jobs: 1
12/04/28 15:55:53 INFO flow.Flow: [] allocating threads: 1
12/04/28 15:55:53 INFO planner.FlowStep: [] starting step: (1/1) Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"]
12/04/28 15:56:02 INFO mapred.FileInputFormat: Total input paths to process : 1
12/04/28 15:56:02 INFO planner.FlowStep: [] submitted hadoop job: job_201204241302_0034

$ hadoop fs -cat output/part*
Hello 3

Ключевая команда, которая выполняется на сервере hadoopmaster, — это, по сути, команда joop hadoop с файлом jar отжигаемой сборки (той, которая была создана командой сборки sbt выше):

ssh -C hadoopmaster HADOOP_CLASSPATH = / usr / share / java / hadoop-lzo-0.4.14.jar: scalding-assembly-0.4.1.jar: job-jars / GrepJob.jar hadoop jar scalding-Assembly-0.4.1. jar -libjars job-jars / GrepJob.jar -Dmapred.reduce.tasks = 1 GrepJob –hdfs –input input / input.txt –output output –regexp Hello

Вот и все. Надеюсь, у вас сложилось впечатление, что Scala и Hadoop действительно могут идти рука об руку, они могут очень хорошо дополнять друг друга.

аппендикс 

Grep.java

package org.apache.hadoop.examples;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/* Extracts matching regexs from input files and counts them. */
public class Grep extends Configured implements Tool {
  private Grep() {}                               // singleton

  public int run(String[] args) throws Exception {
    if (args.length < 3) {
      System.out.println("Grep    []");
      ToolRunner.printGenericCommandUsage(System.out);
      return -1;
    }

    Path tempDir =
      new Path("grep-temp-"+
          Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

    JobConf grepJob = new JobConf(getConf(), Grep.class);

    try {

      grepJob.setJobName("grep-search");

      FileInputFormat.setInputPaths(grepJob, args[0]);

      grepJob.setMapperClass(RegexMapper.class);
      grepJob.set("mapred.mapper.regex", args[2]);
      if (args.length == 4)
        grepJob.set("mapred.mapper.regex.group", args[3]);

      grepJob.setCombinerClass(LongSumReducer.class);
      grepJob.setReducerClass(LongSumReducer.class);

      FileOutputFormat.setOutputPath(grepJob, tempDir);
      grepJob.setOutputFormat(SequenceFileOutputFormat.class);
      grepJob.setOutputKeyClass(Text.class);
      grepJob.setOutputValueClass(LongWritable.class);

      JobClient.runJob(grepJob);

      JobConf sortJob = new JobConf(Grep.class);
      sortJob.setJobName("grep-sort");

      FileInputFormat.setInputPaths(sortJob, tempDir);
      sortJob.setInputFormat(SequenceFileInputFormat.class);

      sortJob.setMapperClass(InverseMapper.class);

      sortJob.setNumReduceTasks(1);                 // write a single file
      FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
      sortJob.setOutputKeyComparatorClass           // sort by decreasing freq
      (LongWritable.DecreasingComparator.class);

      JobClient.runJob(sortJob);
    }
    finally {
      FileSystem.get(grepJob).delete(tempDir, true);
    }
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new Grep(), args);
    System.exit(res);
  }

}