Если вы читали статью, опубликованную Google Джеффри Дином и Санджаем Гемаватом (MapReduce: упрощенная обработка данных на больших кластерах ), они показали, что их работа была вдохновлена концепцией функциональных языков: «Наша абстракция основана на карте и сокращает примитивы. присутствует в Лиспе и многих других функциональных языках…. Наше использование функциональной модели с заданной пользователем картой и операциями сокращения позволяет нам легко распараллеливать большие вычисления и использовать повторное выполнение в качестве основного механизма обеспечения отказоустойчивости ».
Учитывая тот факт, что Scala является языком программирования, который сочетает в себе объективно-ориентированное и функциональное программирование и работает на JVM, это довольно естественная эволюция — внедрение Scala в среду Hadoop. Это то, что сделали инженеры Twitter. (Подробнее о том, как Scala используется в Twitter: « Twitter on Scala » и « Почему и как Scala в Twitter »). Scala имеет мощную поддержку отображения, фильтрации, сопоставления с образцом (регулярные выражения), поэтому он очень хорошо подходит для заданий MapReduce.
обжигающий
Twitter Реализованная в Scala реализация MapReduce — Scalding — основана на Cascading Java API (отсюда и название — по сути, это библиотека Scala, построенная на основе Cascading API) и была открыта в этом году. Код можно найти в github .
Предпосылки
Чтобы использовать ожоги, вам нужно установить Scala, вы можете скачать последнюю версию здесь . На момент написания этой статьи последней стабильной версией является Scala 2.9.2, которая также использовалась в моих тестах.
$ scala Welcome to Scala version 2.9.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_31). Type in expressions to have them evaluated. Type :help for more information. scala>
Вам также нужен sbt, инструмент для сборки scala. Это можно скачать здесь . Если вы не хотите собирать код из исходного кода, вы можете просто получить файл jar и создать файл sbt, как описано в вики sbt Getting Started . Файл sbt является однострочным, но требует довольно много памяти, поэтому, если вы находитесь в облачной среде, дополнительная небольшая виртуальная машина может не соответствовать вашим потребностям (это была стена, которую я использовал в начале).
java -Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=384M -jar `dirname $0`/sbt-launch.jar "$@"
Кроме того, вам понадобится ruby, потому что есть оболочка ruby для простого запуска задания Hadoop. У меня был ruby 1.8.6 для тестов.
ruby --version ruby 1.8.6 (2007-09-24 patchlevel 111) [x86_64-linux]
Погружение в ошпаривание (Дань Марку Пилигриму)
После того, как вы загрузили код из хранилища Scalding с помощью команды git clone ( git clone [email protected]: twitter / scalding.git ) или получили zip-файл (на момент написания этой статьи он назывался twitter-scalding-0.4.1-15 -ga54a5a3.zip), вам нужно выполнить следующие команды:
$ sbt update
$ sbt test
$ sbt assembly
Последний создает файл jar, который используется сценарием оболочки ruby для запуска заданий hadoop — я вернулся к этому позже.
Так как примеры «Приступая к работе» полны WordCounts, я собирался попробовать что-то другое — я хотел реализовать
$ hadoop jar hadoop-examples — *. jar grep input output Привет
Код — не то, что этого никогда не видели, просто на вкус. В приложении вы увидите исходный код Grep Hadoop из дистрибутива и можете оценить сокращенный код Scala. Не поймите меня неправильно, эта статья не предназначена для сравнения Java со Scala в любой форме или форме, но стиль Scala все же замечательный.
GrepJob.scala
import com.twitter.scalding._ import scala.util.matching.Regex class GrepJob(args : Args) extends Job(args) { val pattern = new Regex(args("regexp")) TextLine(args("input")) .flatMap('line -> 'word) { line : String => line.split("\\s+") } .flatMap('word -> 'match) { word : String => pattern.findAllIn(word).toList } .groupBy('match) { _.size } .write(Tsv(args("output"))) }
Вы можете запустить код с помощью вышеупомянутой оболочки rud scald.rb, которая находится в каталоге скриптов. Код может быть запущен локально или в формате hdf (аналогичная концепция решения Python Yelp для mrjob )
Во-первых, стандартный пример Hadoop grep:
$ hadoop jar /opt/hadoop/hadoop-examples-0.20.203.0.jar grep input output Hello $ hadoop fs -cat output/part* 3 Hello
Тогда Scalding в локальном режиме выглядит следующим образом (но прежде чем запускать его с помощью скрипта scald.rb, вам нужно указать хост, на котором вы хотите запустить задание Hadoop. Для этого необходимо изменить переменную HOST в файле scald.rb:
HOST = ”my.remote.host” #, где задание выполняется и выполняется
В моем случае это было
HOST =»hadoopmaster»
Также я рекомендую изменить переменную REDUCERS в файле scald.rb, по умолчанию она равна 100.)
scripts/scald.rb --local GrepJob.scala --input input.txt --output output.txt --regexp Hello /root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar Hello java -Xmx3g -cp /root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar:/tmp/GrepJob.jar com.twitter.scalding.Tool GrepJob --local --input input.txt --output output.txt --regexp Hello12/04/28 15:58:05 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+] 12/04/28 15:58:05 INFO flow.Flow: [] starting 12/04/28 15:58:05 INFO flow.Flow: [] source: FileTap["TextLine[['num', 'line']->[ALL]]"]["input.txt"]"] 12/04/28 15:58:05 INFO flow.Flow: [] sink: FileTap["TextDelimited[[UNKNOWN]->[ALL]]"]["output.txt"]"] 12/04/28 15:58:05 INFO flow.Flow: [] parallel execution is enabled: true 12/04/28 15:58:05 INFO flow.Flow: [] starting jobs: 1 12/04/28 15:58:05 INFO flow.Flow: [] allocating threads: 1 12/04/28 15:58:05 INFO planner.FlowStep: [] starting step: (1/1) local 12/04/28 15:58:05 INFO assembly.AggregateBy: using threshold value: 100000 $ cat output.txt Hello 3
И, наконец, Масштабирование по умолчанию, режим hdfs:
scripts/scald.rb --hdfs GrepJob.scala --input input/input.txt --output output --regexp Hello /root/scalding/twitter-scalding/target/scalding-assembly-0.4.1.jar ssh -C hadoopmaster HADOOP_CLASSPATH=/usr/share/java/hadoop-lzo-0.4.14.jar:scalding-assembly-0.4.1.jar:job-jars/GrepJob.jar hadoop jar scalding-assembly-0.4.1.jar -libjars job-jars/GrepJob.jar -Dmapred.reduce.tasks=1 GrepJob --hdfs --input input/input.txt --output output --regexp Hello12/04/28 15:55:53 INFO hadoop.HadoopUtil: resolving application jar from found main method on: com.twitter.scalding.Tool$ 12/04/28 15:55:53 INFO hadoop.HadoopPlanner: using application jar: /root/scalding-assembly-0.4.1.jar 12/04/28 15:55:53 INFO util.Version: Concurrent, Inc - Cascading 2.0.0 [hadoop-0.20.2+] 12/04/28 15:55:53 INFO flow.Flow: [] starting 12/04/28 15:55:53 INFO flow.Flow: [] source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["input/input.txt"]"] 12/04/28 15:55:53 INFO flow.Flow: [] sink: Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"] 12/04/28 15:55:53 INFO flow.Flow: [] parallel execution is enabled: true 12/04/28 15:55:53 INFO flow.Flow: [] starting jobs: 1 12/04/28 15:55:53 INFO flow.Flow: [] allocating threads: 1 12/04/28 15:55:53 INFO planner.FlowStep: [] starting step: (1/1) Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["output"]"] 12/04/28 15:56:02 INFO mapred.FileInputFormat: Total input paths to process : 1 12/04/28 15:56:02 INFO planner.FlowStep: [] submitted hadoop job: job_201204241302_0034 $ hadoop fs -cat output/part* Hello 3
Ключевая команда, которая выполняется на сервере hadoopmaster, — это, по сути, команда joop hadoop с файлом jar отжигаемой сборки (той, которая была создана командой сборки sbt выше):
ssh -C hadoopmaster HADOOP_CLASSPATH = / usr / share / java / hadoop-lzo-0.4.14.jar: scalding-assembly-0.4.1.jar: job-jars / GrepJob.jar hadoop jar scalding-Assembly-0.4.1. jar -libjars job-jars / GrepJob.jar -Dmapred.reduce.tasks = 1 GrepJob –hdfs –input input / input.txt –output output –regexp Hello
Вот и все. Надеюсь, у вас сложилось впечатление, что Scala и Hadoop действительно могут идти рука об руку, они могут очень хорошо дополнять друг друга.
аппендикс
Grep.java
package org.apache.hadoop.examples; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.*; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* Extracts matching regexs from input files and counts them. */ public class Grep extends Configured implements Tool { private Grep() {} // singleton public int run(String[] args) throws Exception { if (args.length < 3) { System.out.println("Grep []"); ToolRunner.printGenericCommandUsage(System.out); return -1; } Path tempDir = new Path("grep-temp-"+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); JobConf grepJob = new JobConf(getConf(), Grep.class); try { grepJob.setJobName("grep-search"); FileInputFormat.setInputPaths(grepJob, args[0]); grepJob.setMapperClass(RegexMapper.class); grepJob.set("mapred.mapper.regex", args[2]); if (args.length == 4) grepJob.set("mapred.mapper.regex.group", args[3]); grepJob.setCombinerClass(LongSumReducer.class); grepJob.setReducerClass(LongSumReducer.class); FileOutputFormat.setOutputPath(grepJob, tempDir); grepJob.setOutputFormat(SequenceFileOutputFormat.class); grepJob.setOutputKeyClass(Text.class); grepJob.setOutputValueClass(LongWritable.class); JobClient.runJob(grepJob); JobConf sortJob = new JobConf(Grep.class); sortJob.setJobName("grep-sort"); FileInputFormat.setInputPaths(sortJob, tempDir); sortJob.setInputFormat(SequenceFileInputFormat.class); sortJob.setMapperClass(InverseMapper.class); sortJob.setNumReduceTasks(1); // write a single file FileOutputFormat.setOutputPath(sortJob, new Path(args[1])); sortJob.setOutputKeyComparatorClass // sort by decreasing freq (LongWritable.DecreasingComparator.class); JobClient.runJob(sortJob); } finally { FileSystem.get(grepJob).delete(tempDir, true); } return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Grep(), args); System.exit(res); } }