Я недавно писал о том, как настроить кластер EMR с помощью CLI. В этом посте я покажу вам, как настроить кластер с помощью Java SDK для AWS . На мой взгляд, лучший способ показать, как это сделать с помощью Java AWS SDK, — показать полный пример, так что давайте начнем.
- Создать новый проект Maven
Для этой задачи я создал новый проект Maven по умолчанию . Основной класс в этом проекте — тот, который вы можете запустить, чтобы инициировать кластер EMR и выполнить задание MapReduce, которое я создал в этом посте :
package net.pascalalma.aws.emr; import com.amazonaws.AmazonServiceException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.PropertiesCredentials; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.ec2.model.InstanceType; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient; import com.amazonaws.services.elasticmapreduce.model.*; import com.amazonaws.services.elasticmapreduce.util.StepFactory; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.UUID; /** * Created with IntelliJ IDEA. * User: pascal * Date: 22-07-13 * Time: 20:45 */ public class MyClient { private static final String HADOOP_VERSION = "1.0.3"; private static final int INSTANCE_COUNT = 1; private static final String INSTANCE_TYPE = InstanceType.M1Small.toString(); private static final UUID RANDOM_UUID = UUID.randomUUID(); private static final String FLOW_NAME = "dictionary-" + RANDOM_UUID.toString(); private static final String BUCKET_NAME = "map-reduce-intro"; private static final String S3N_HADOOP_JAR = "s3n://" + BUCKET_NAME + "/job/MapReduce-1.0-SNAPSHOT.jar"; private static final String S3N_LOG_URI = "s3n://" + BUCKET_NAME + "/log/"; private static final String[] JOB_ARGS = new String[]{"s3n://" + BUCKET_NAME + "/input/input.txt", "s3n://" + BUCKET_NAME + "/result/" + FLOW_NAME}; private static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS); private static final List<JobFlowExecutionState> DONE_STATES = Arrays .asList(new JobFlowExecutionState[]{JobFlowExecutionState.COMPLETED, JobFlowExecutionState.FAILED, JobFlowExecutionState.TERMINATED}); static AmazonS3 s3; static AmazonElasticMapReduceClient emr; private static void init() throws Exception { AWSCredentials credentials = new PropertiesCredentials( MyClient.class.getClassLoader().getResourceAsStream("AwsCredentials.properties")); s3 = new AmazonS3Client(credentials); emr = new AmazonElasticMapReduceClient(credentials); emr.setRegion(Region.getRegion(Regions.EU_WEST_1)); } private static JobFlowInstancesConfig configInstance() throws Exception { // Configure instances to use JobFlowInstancesConfig instance = new JobFlowInstancesConfig(); instance.setHadoopVersion(HADOOP_VERSION); instance.setInstanceCount(INSTANCE_COUNT); instance.setMasterInstanceType(INSTANCE_TYPE); instance.setSlaveInstanceType(INSTANCE_TYPE); // instance.setKeepJobFlowAliveWhenNoSteps(true); // instance.setEc2KeyName("4synergy_palma"); return instance; } private static void runCluster() throws Exception { // Configure the job flow RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, configInstance()); request.setLogUri(S3N_LOG_URI); // Configure the Hadoop jar to use HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR); jarConfig.setArgs(ARGS_AS_LIST); try { StepConfig enableDebugging = new StepConfig() .withName("Enable debugging") .withActionOnFailure("TERMINATE_JOB_FLOW") .withHadoopJarStep(new StepFactory().newEnableDebuggingStep()); StepConfig runJar = new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1), jarConfig); request.setSteps(Arrays.asList(new StepConfig[]{enableDebugging, runJar})); //Run the job flow RunJobFlowResult result = emr.runJobFlow(request); //Check the status of the running job String lastState = ""; STATUS_LOOP: while (true) { DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest( Arrays.asList(new String[]{result.getJobFlowId()})); DescribeJobFlowsResult descResult = emr.describeJobFlows(desc); for (JobFlowDetail detail : descResult.getJobFlows()) { String state = detail.getExecutionStatusDetail().getState(); if (isDone(state)) { System.out.println("Job " + state + ": " + detail.toString()); break STATUS_LOOP; } else if (!lastState.equals(state)) { lastState = state; System.out.println("Job " + state + " at " + new Date().toString()); } } Thread.sleep(10000); } } catch (AmazonServiceException ase) { System.out.println("Caught Exception: " + ase.getMessage()); System.out.println("Reponse Status Code: " + ase.getStatusCode()); System.out.println("Error Code: " + ase.getErrorCode()); System.out.println("Request ID: " + ase.getRequestId()); } } public static boolean isDone(String value) { JobFlowExecutionState state = JobFlowExecutionState.fromValue(value); return DONE_STATES.contains(state); } public static void main(String[] args) { try { init(); runCluster(); } catch (Exception e) { e.printStackTrace(); } } }
- Подготовьте вход
- s3: // Карта-свертка-интро
- s3: // Карта-свертка-интро / вход
- s3: //map-reduce-intro/input/input.txt
- s3: // Карта-свертка-интро / работа
- s3: //map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar
- s3: // Карта-свертка-интро / журнал
- s3: // Карта-свертка-интро / результат
В этом классе я сначала объявляю некоторые константы, я предполагаю, что они будут очевидны. В методе init () я использую файл свойств учетных данных, который я добавил в проект. Я добавил этот файл в папку / main / resources моего проекта Maven. Он содержит мой ключ доступа и секретный ключ.
Также для региона EMR я установил регион EU-WEST.
Следующий метод — configInstance () . В этом методе я создаю и настраиваю JobFlowInstance , устанавливая версию Hadoop, количество экземпляров, размер экземпляров и т. Д. Кроме того, вы можете настроить keepAlive.настройка, чтобы сохранить кластер живым после завершения заданий. Это может быть полезно в некоторых случаях. Если вы хотите использовать эту опцию, может быть полезно также задать пару ключей, которую вы хотите использовать для доступа к кластеру, потому что я не смог получить доступ к кластеру без установки этого ключа.
Метод runCluster () — это то место, где фактически запускается кластер. Создает запрос на запуск кластера. В этом запросе добавляются шаги, которые должны быть выполнены. В нашем случае одним из шагов является запуск файла JAR, который мы создали на предыдущих шагах. Я также добавил шаг отладки, чтобы у нас был доступ к журналу отладки после завершения и завершения кластера. Мы можем просто получить доступ к файлам журнала в корзине S3, которую я установил с помощью константы S3N_LOG_URI .
Когда этот запрос создан, мы запускаем кластер на его основе. Затем мы проводим каждые 10 секунд, чтобы увидеть, завершилось ли задание, и показать сообщение в консоли, указывающее текущее состояние задания.
Чтобы выполнить первый запуск, мы должны подготовить ввод.
В качестве входных данных для задания (прочитайте это для получения дополнительной информации об этом примере задания) мы должны сделать содержимое словаря доступным для кластера EMR. Кроме того, мы должны сделать JAR-файл доступным и убедиться, что каталог вывода и журнала существует в наших контейнерах S3. Есть несколько способов сделать это. Вы можете сделать это программно, используя SDK, используя S3cmd для этого из командной строки или используя Консоль управления AWS . Какой бы метод ни находился, пока вы в конечном итоге будете иметь подобную настройку:
Или, при использовании S3cmd, вот так:
s3cmd-1.5.0-alpha1$ s3cmd ls --recursive s3://map-reduce-intro/ 2013-07-20 13:06 469941 s3://map-reduce-intro/input/input.txt 2013-07-20 14:12 5491 s3://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar 2013-08-06 14:30 0 s3://map-reduce-intro/log/ 2013-08-06 14:27 0 s3://map-reduce-intro/result/
- Запустить кластер
В приведенном выше примере я уже представил клиента S3 в коде. Вы также можете использовать это, чтобы подготовить ввод или получить вывод как часть работы клиента.
Когда все на месте, мы можем запустить работу. Я просто запускаю основной метод MyClient в IntelliJ и получаю следующий вывод в моей консоли:
Job STARTING at Tue Aug 06 16:31:55 CEST 2013 Job RUNNING at Tue Aug 06 16:36:18 CEST 2013 Job SHUTTING_DOWN at Tue Aug 06 16:38:40 CEST 2013 Job COMPLETED: { JobFlowId: j-JDB14HVTRC1L ,Name: dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43 ,LogUri: s3n://map-reduce-intro/log/,AmiVersion: 2.4.0 ,ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013 ,StartDateTime: Tue Aug 06 16:36:14 CEST 2013 ,ReadyDateTime: Tue Aug 06 16:36:14 CEST 2013 ,EndDateTime: Tue Aug 06 16:39:02 CEST 2013 ,LastStateChangeReason: Steps completed} ,Instances: {MasterInstanceType: m1.small ,MasterPublicDnsName: ec2-54-216-104-11.eu-west-1.compute.amazonaws.com ,MasterInstanceId: i-93268ddf ,InstanceCount: 1 ,InstanceGroups: [{InstanceGroupId: ig-2LURHNAK5NVKZ ,Name: master ,Market: ON_DEMAND ,InstanceRole: MASTER ,InstanceType: m1.small ,InstanceRequestCount: 1 ,InstanceRunningCount: 0 ,State: ENDED ,LastStateChangeReason: Job flow terminated ,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013 ,StartDateTime: Tue Aug 06 16:34:28 CEST 2013 ,ReadyDateTime: Tue Aug 06 16:36:10 CEST 2013 ,EndDateTime: Tue Aug 06 16:39:02 CEST 2013}] ,NormalizedInstanceHours: 1 ,Ec2KeyName: 4synergy_palma ,Placement: {AvailabilityZone: eu-west-1a} ,KeepJobFlowAliveWhenNoSteps: false ,TerminationProtected: false ,HadoopVersion: 1.0.3} ,Steps: [ {StepConfig: {Name: Enable debugging ,ActionOnFailure: TERMINATE_JOB_FLOW ,HadoopJarStep: {Properties: [] ,Jar: s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar ,Args: [s3://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch]} } ,ExecutionStatusDetail: {State: COMPLETED,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013 ,StartDateTime: Tue Aug 06 16:36:12 CEST 2013 ,EndDateTime: Tue Aug 06 16:36:40 CEST 2013 ,} } , {StepConfig: {Name: /map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar ,ActionOnFailure: TERMINATE_JOB_FLOW ,HadoopJarStep: {Properties: [] ,Jar: s3n://map-reduce-intro/job/MapReduce-1.0-SNAPSHOT.jar ,Args: [s3n://map-reduce-intro/input/input.txt , s3n://map-reduce-intro/result/dictionary-8288df47-8aef-4ad3-badf-ee352a4a7c43]} } ,ExecutionStatusDetail: {State: COMPLETED ,CreationDateTime: Tue Aug 06 16:31:58 CEST 2013 ,StartDateTime: Tue Aug 06 16:36:40 CEST 2013 ,EndDateTime: Tue Aug 06 16:38:10 CEST 2013 ,} }] ,BootstrapActions: [] ,SupportedProducts: [] ,VisibleToAllUsers: false ,} Process finished with exit code 0
И, конечно, у нас есть результат в папке результатов, которую мы настроили в нашей корзине S3: я передаю результат на свою локальную машину и смотрю на него:
На этом мы завершаем этот простой, но, на мой взгляд, довольно завершенный пример создания задания Hadoop и его запуска в кластере после тестирования модуля (как мы это делаем со всем нашим программным обеспечением).
С этой настройкой в качестве основы довольно легко придумать более сложные бизнес-кейсы и протестировать их и настроить для работы на AWS EMR .