Учебники

HCatalog — Reader Writer

HCatalog содержит API передачи данных для параллельного ввода и вывода без использования MapReduce. Этот API использует базовую абстракцию хранения таблиц и строк для чтения данных из кластера Hadoop и записи данных в него.

API передачи данных содержит в основном три класса; это —

  • HCatReader — читает данные из кластера Hadoop.

  • HCatWriter — записывает данные в кластер Hadoop.

  • DataTransferFactory — генерирует экземпляры чтения и записи.

HCatReader — читает данные из кластера Hadoop.

HCatWriter — записывает данные в кластер Hadoop.

DataTransferFactory — генерирует экземпляры чтения и записи.

Этот API подходит для настройки главного-подчиненного узла. Давайте обсудим больше о HCatReader и HCatWriter .

HCatReader

HCatReader — это абстрактный класс, внутренний для HCatalog, который абстрагируется от сложностей базовой системы, из которой должны быть получены записи.

Sr.No. Название и описание метода
1

Открытый абстрактный ReaderContext prepareRead () создает исключение HCatException

Это должно быть вызвано на главном узле для получения ReaderContext, который затем должен быть сериализован и отправлен подчиненным узлам.

2

Открытый абстрактный итератор <HCatRecorder> read () создает исключение HCaException

Это должно быть вызвано на подчиненных узлах для чтения HCatRecords.

3

Публичная конфигурация getConf ()

Он вернет объект класса конфигурации.

Открытый абстрактный ReaderContext prepareRead () создает исключение HCatException

Это должно быть вызвано на главном узле для получения ReaderContext, который затем должен быть сериализован и отправлен подчиненным узлам.

Открытый абстрактный итератор <HCatRecorder> read () создает исключение HCaException

Это должно быть вызвано на подчиненных узлах для чтения HCatRecords.

Публичная конфигурация getConf ()

Он вернет объект класса конфигурации.

Класс HCatReader используется для чтения данных из HDFS. Чтение представляет собой двухэтапный процесс, в котором первый шаг выполняется на главном узле внешней системы. Второй шаг выполняется параллельно на нескольких подчиненных узлах.

Чтения сделаны на ReadEntity . Прежде чем вы начнете читать, вам нужно определить ReadEntity для чтения. Это можно сделать через ReadEntity.Builder . Вы можете указать имя базы данных, имя таблицы, раздел и строку фильтра. Например —

ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10. 

Приведенный выше фрагмент кода определяет объект ReadEntity («entity»), содержащий таблицу с именем mytbl в базе данных с именем mydb , которую можно использовать для чтения всех строк этой таблицы. Обратите внимание, что эта таблица должна существовать в HCatalog до начала этой операции.

После определения ReadEntity вы получаете экземпляр HCatReader, используя ReadEntity и конфигурацию кластера —

HCatReader reader = DataTransferFactory.getHCatReader(entity, config);

Следующим шагом является получение ReaderContext от читателя следующим образом:

ReaderContext cntxt = reader.prepareRead();

HCatWriter

Эта абстракция является внутренней для HCatalog. Это облегчает запись в HCatalog из внешних систем. Не пытайтесь создавать это напрямую. Вместо этого используйте DataTransferFactory.

Sr.No. Название и описание метода
1

Открытый абстрактный WriterContext prepareRead () генерирует исключение HCatException

Внешняя система должна вызывать этот метод ровно один раз из главного узла. Возвращает WriterContext . Это должно быть сериализовано и отправлено на подчиненные узлы для создания там HCatWriter .

2

Открытый абстрактный void write (Iterator <HCatRecord> recordItr) выдает исключение HCaException

Этот метод должен использоваться на подчиненных узлах для выполнения записи. RecordItr — это объект-итератор, содержащий коллекцию записей для записи в HCatalog.

3

Открытый абстрактный void abort (WriterContext cntxt) создает исключение HCatException

Этот метод должен вызываться на главном узле. Основная цель этого метода заключается в выполнении очистки в случае сбоев.

4

открытый абстрактный void commit (WriterContext cntxt) выдает исключение HCatException

Этот метод должен вызываться на главном узле. Целью этого метода является фиксация метаданных.

Открытый абстрактный WriterContext prepareRead () генерирует исключение HCatException

Внешняя система должна вызывать этот метод ровно один раз из главного узла. Возвращает WriterContext . Это должно быть сериализовано и отправлено на подчиненные узлы для создания там HCatWriter .

Открытый абстрактный void write (Iterator <HCatRecord> recordItr) выдает исключение HCaException

Этот метод должен использоваться на подчиненных узлах для выполнения записи. RecordItr — это объект-итератор, содержащий коллекцию записей для записи в HCatalog.

Открытый абстрактный void abort (WriterContext cntxt) создает исключение HCatException

Этот метод должен вызываться на главном узле. Основная цель этого метода заключается в выполнении очистки в случае сбоев.

открытый абстрактный void commit (WriterContext cntxt) выдает исключение HCatException

Этот метод должен вызываться на главном узле. Целью этого метода является фиксация метаданных.

Подобно чтению, запись также представляет собой двухэтапный процесс, в котором первый шаг выполняется на главном узле. Впоследствии второй шаг происходит параллельно на подчиненных узлах.

Запись выполняется в WriteEntity, который может быть сконструирован аналогично reads —

WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();

Приведенный выше код создает объектный объект WriteEntity, который можно использовать для записи в таблицу с именем mytbl в базе данных mydb .

После создания WriteEntity следующим шагом является получение WriterContext —

HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();

Все вышеперечисленные шаги выполняются на главном узле. Затем главный узел сериализует объект WriterContext и делает его доступным для всех ведомых устройств.

На подчиненных узлах вам необходимо получить HCatWriter с помощью WriterContext следующим образом:

HCatWriter writer = DataTransferFactory.getHCatWriter(context);

Затем писатель принимает итератор в качестве аргумента для метода write

writer.write(hCatRecordItr);

Затем автор вызывает getNext () для этого итератора в цикле и записывает все записи, прикрепленные к итератору.

Файл TestReaderWriter.java используется для проверки классов HCatreader и HCatWriter. Следующая программа демонстрирует, как использовать HCatReader и HCatWriter API для чтения данных из исходного файла и последующей записи их в файл назначения.

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;

import org.junit.Assert;
import org.junit.Test;

public class TestReaderWriter extends HCatBaseTest {
   @Test
   public void test() throws MetaException, CommandNeedRetryException,
      IOException, ClassNotFoundException {
		
      driver.run("drop table mytbl");
      driver.run("create table mytbl (a string, b int)");
		
      Iterator<Entry<String, String>> itr = hiveConf.iterator();
      Map<String, String> map = new HashMap<String, String>();
		
      while (itr.hasNext()) {
         Entry<String, String> kv = itr.next();
         map.put(kv.getKey(), kv.getValue());
      }
		
      WriterContext cntxt = runsInMaster(map);
      File writeCntxtFile = File.createTempFile("hcat-write", "temp");
      writeCntxtFile.deleteOnExit();
		
      // Serialize context.
      ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
      oos.writeObject(cntxt);
      oos.flush();
      oos.close();
		
      // Now, deserialize it.
      ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
      cntxt = (WriterContext) ois.readObject();
      ois.close();
      runsInSlave(cntxt);
      commit(map, true, cntxt);
		
      ReaderContext readCntxt = runsInMaster(map, false);
      File readCntxtFile = File.createTempFile("hcat-read", "temp");
      readCntxtFile.deleteOnExit();
      oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
      oos.writeObject(readCntxt);
      oos.flush();
      oos.close();
		
      ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
      readCntxt = (ReaderContext) ois.readObject();
      ois.close();
		
      for (int i = 0; i < readCntxt.numSplits(); i++) {
         runsInSlave(readCntxt, i);
      }
   }
	
   private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
		
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
      WriterContext info = writer.prepareWrite();
      return info;
   }
	
   private ReaderContext runsInMaster(Map<String, String> config, 
      boolean bogus) throws HCatException {
      ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
      HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
      ReaderContext cntxt = reader.prepareRead();
      return cntxt;
   }
	
   private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
      HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
      Iterator<HCatRecord> itr = reader.read();
      int i = 1;
		
      while (itr.hasNext()) {
         HCatRecord read = itr.next();
         HCatRecord written = getRecord(i++);
			
         // Argh, HCatRecord doesnt implement equals()
         Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
         written.get(0).equals(read.get(0)));
			
         Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
         written.get(1).equals(read.get(1)));
			
         Assert.assertEquals(2, read.size());
      }
		
      //Assert.assertFalse(itr.hasNext());
   }
	
   private void runsInSlave(WriterContext context) throws HCatException {
      HCatWriter writer = DataTransferFactory.getHCatWriter(context);
      writer.write(new HCatRecordItr());
   }
	
   private void commit(Map<String, String> config, boolean status,
      WriterContext context) throws IOException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
		
      if (status) {
         writer.commit(context);
      } else {
         writer.abort(context);
      }
   }
	
   private static HCatRecord getRecord(int i) {
      List<Object> list = new ArrayList<Object>(2);
      list.add("Row #: " + i);
      list.add(i);
      return new DefaultHCatRecord(list);
   }
	
   private static class HCatRecordItr implements Iterator<HCatRecord> {
      int i = 0;
		
      @Override
      public boolean hasNext() {
         return i++ < 100 ? true : false;
      }
		
      @Override
      public HCatRecord next() {
         return getRecord(i);
      }
		
      @Override
      public void remove() {
         throw new RuntimeException();
      }
   }
}

Вышеупомянутая программа считывает данные из HDFS в виде записей и записывает данные записи в mytable.