Статьи

Начало работы с «Blur» — поиск в верхней части Hadoop и Lucene.

Blur — это новый проект лицензионного программного обеспечения Apache 2.0, который предоставляет возможности поиска, построенные на основе Hadoop и Lucene. Elastic Search и Solr уже существуют, так зачем создавать что-то новое? Хотя эти проекты работают хорошо, у них не было прочной интеграции с экосистемой Hadoop. Blur был создан специально для больших данных, принимая во внимание масштабируемость, избыточность и производительность с самого начала, используя при этом все преимущества, которые уже существуют в стеке Hadoop.

Полтора года назад мой проект начал использовать Hadoop для обработки данных. Очень рано у нас возникли проблемы с сетью, из-за которых подключение к кластеру HDFS в лучшем случае становилось ненадежным. В частности, в течение одного уик-энда мы постоянно теряли сетевое соединение с 47 из 90 узлов данных в кластере. Когда мы пришли в понедельник утром, я заметил, что система MapReduce была немного вялой, но все еще работала. Когда я проверил HDFS, я увидел, что наша емкость упала примерно на 50%. После запуска fsck в кластере я был поражен, обнаружив, что то, что казалось катастрофическим сбоем в выходные, привело к еще здоровой файловой системе. Этот опыт оставил неизгладимое впечатление на меня.Именно тогда у меня появилась идея каким-то образом использовать избыточность и отказоустойчивость HDFS для следующей версии поисковой системы, которую я только начинал (пере) писать.

Я уже написал специальный сервер Lucene, который находился в производственной системе пару лет. Lucene работал очень хорошо и делал все, что нам нужно для поиска. Проблема, с которой мы столкнулись, заключалась в том, что он работал на большом железе, которое не было избыточным и не могло быть легко расширено. Увидев гибкие характеристики Hadoop из первых рук, я решил рассмотреть возможность сочетания уже зрелого и впечатляющего набора функций Lucene со встроенной избыточностью и масштабируемостью платформы Hadoop. Из этого эксперимента была создана Blur.

Самые большие технические проблемы / особенности, которые решает Blur:

  • Быстрая индексация массы целых наборов данных
  • Автоматический отказоустойчивый сервер
  • Совместимость обновлений в режиме реального времени через Lucene NRT
  • Сжатие файлов Lucene FDT при сохранении производительности произвольного доступа
  • Lucene WAL (Write Ahead Log) для обеспечения надежности данных
  • Lucene R / W непосредственно в HDFS (проблема поиска при записи)
  • Производительность произвольного доступа с блочным кэшированием каталога Lucene

Модель данных

Данные в Blur хранятся в таблицах, которые содержат строки. Строки должны иметь уникальный идентификатор строки и содержать одну или несколько записей. Записи имеют уникальный идентификатор записи (уникальный внутри строки) и семейство столбцов для группировки столбцов, которые логически составляют одну запись. Столбцы содержат имя и значение, а запись может содержать несколько столбцов с одинаковым именем.

{
  rowid:"user_a@server.com",
  records:[
    {
      recordid:"324182347",
      family:"messages",
      columns:[
        {name:"to",value:"user_b@server.com"},
        {name:"to",value:"user_c@server.com"},
        {name:"subject",value:"important!"},
        {name:"body",value:"This is a very important email...."}
      ]
    }, {
      recordid:"234387219",
      family:"messages",
      columns:[
        {name:"to",value:"user_b@server.com"},
        {name:"subject",value:"This is cool!"},
        {name:"body",value:"Check this out....."}
      ]
    }, {
      recordid:"234123412",
      family:"contacts",
      columns:[
        {name:"name",value:"Jon Doe"},
        {name:"email",value:"user_d@server.com"}
      ]
    }
  ]
}

Архитектура

Blur использует инфраструктуру Hadoop MapReduce для индексации данных и файловую систему HDFS Hadoop для хранения индексов. Thrift используется для всех межпроцессных взаимодействий, а Zookeeper используется для определения состояния системы и хранения метаданных. Архитектура Blur состоит из двух типов серверных процессов:

  • Blur Controller Server
  • Blur Shard Server

Сервер шардов, обслуживает 0 или более шардов со всех онлайн-таблиц. Расчет количества сегментов в сети на каждом сервере сегментов выполняется с помощью информации о состоянии в Zookeeper. Если сервер сегментов отключается, посредством взаимодействия с Zookeeper остальные серверы сегментов обнаруживают сбой и определяют, какой из отсутствующих сегментов им нужно обслуживать из HDFS.

Сервер контроллера предоставляет единую точку входа (логически) в кластер для распыления запросов, сбора ответов и предоставления одного ответа. И серверы контроллера, и серверы шардов предоставляют один и тот же Thrift API, который помогает упростить отладку. Это также позволяет разработчикам запускать один сервер сегмента и взаимодействовать с ним так же, как с большим кластером. Многие серверы контроллеров могут (и должны быть) работать на избыточность. Контроллеры действуют как шлюзы ко всем данным, которые обслуживаются серверами сегмента.

Обновление / Загрузка данных

В настоящее время существует два способа загрузки и обновления данных. Первый — через массовую загрузку в MapReduce, а второй — через вызовы мутаций в Thrift.

Массовая загрузка MapReduce Пример

public class BlurMapReduce {
  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    Configuration configuration = new Configuration();
    String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: blurindexer <in> <out>");
      System.exit(2);
    }

    AnalyzerDefinition ad = new AnalyzerDefinition();

    TableDescriptor td = new TableDescriptor();
    td.setShardCount(16);
    // Location in HDFS
    td.setTableUri("hdfs://<namenode>:<port>/blur/tables/test-table");
    td.setAnalyzerDefinition(ad);

    BlurTask blurTask = new BlurTask();
    blurTask.setTableDescriptor(td);
    blurTask.setSpinLockPath("/copy-locks");
    blurTask.setZookeeperConnectionStr("localhost");
    blurTask.setMaxNumberOfConcurrentCopies(10);

    // The copy locks are used to throttle how many concurrent
    // copies from the reducers are occuring at the same time.
    // This is normally needed because the indexing cluster is
    // typically larger in size than the blur cluster.

    Job job = blurTask.configureJob(new Configuration());
    job.setJarByClass(BlurExampleMapper.class);
    job.setMapperClass(BlurExampleMapper.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

  public static class BlurExampleMapper extends BlurMapper<LongWritable, Text> {
    @Override
    protected void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException {
      // Reset record
      _record.clearColumns();

      // Set row id
      _record.setRowId("rowid");

      // Set record id
      _record.setRecordId("recordid");

      // Set column family
      _record.setColumnFamily("cf1");
      
      _record.addColumn("name", "value");

      // Set the key which is usual the rowid
      byte[] bs = _record.getRowId().getBytes();
      _key.set(bs, 0, bs.length);
      context.write(_key, _record);
      _recordCounter.increment(1);
      context.progress();
    }
  }
}

Пример Thutft Data Mutation

import static com.nearinfinity.blur.utils.BlurUtil.*;

public class ThriftMutationExample {
  public static void main(String[] args) throws BlurException, TException, IOException {
    final RowMutation mutation = newRowMutation("test-table", "rowid-1234",
        newRecordMutation("column-family", "recordid-5678",
            newColumn("columnname", "value")));
    
    BlurClientManager.execute("controller1:40010", new BlurCommand<Void>() {
      @Override
      public Void call(Client client) throws BlurException, TException {
        client.mutate(mutation);
        return null;
      }
    });
  }
}

Поиск данных

Любой элемент в модели данных Blur доступен для поиска с помощью обычной семантики Lucene: анализаторы. Анализаторы определяются по таблице Blur.

Стандартный синтаксис запроса Lucene является стандартным способом поиска Blur. Если требуется что-то вне стандартного синтаксиса, вы можете создать запрос Lucene непосредственно с объектами Java и отправить их через API экспертного запроса.

Группировка семейств столбцов в Rows позволяет обнаруживать результаты по семействам столбцов, аналогичные тем, которые вы получили бы при внутреннем объединении двух таблиц с одинаковым ключом (или в данном случае rowid). Для сложных моделей данных, которые имеют несколько семейств столбцов, это обеспечивает очень мощные возможности поиска.

В следующем примере поиск значения выполняется как полнотекстовый поиск. Если бы я хотел найти «значение» в одном поле, например в столбце «colA» в семействе столбцов «famB», запрос был бы похож на «famB.colA: value».

public class ThriftSearchExample {

  public static void main(String[] args) throws BlurException, TException, IOException {
    BlurResults blurResults = BlurClientManager.execute("controller1:40010", new BlurCommand<BlurResults>() {
      @Override
      public BlurResults call(Client client) throws BlurException, TException {
        BlurQuery blurQuery = new BlurQuery();
        SimpleQuery simpleQuery = new SimpleQuery();
        simpleQuery.setQueryStr("value");
        blurQuery.setSimpleQuery(simpleQuery);
        blurQuery.setSelector(new Selector());
        return client.query("test-table", blurQuery);
      }
    });
    for (BlurResult result : blurResults.getResults()) {
       // do something with the result
    }
  }
}

Получение данных

Fetches can be done by row or by record. This is done by creating a selector object in which you specify the rowid or recordid, and the specific column families or columns that you would like returned. When not specified, the entire Row or Record is returned.

public class ThriftFetchExample {
  public static void main(String[] args) throws BlurException, TException, IOException {
    Row row = BlurClientManager.execute("controller1:40010", new BlurCommand<Row>() {
      @Override
      public Row call(Client client) throws BlurException, TException {
        Selector selector = new Selector();
        selector.setRowId("rowid-1234");
        FetchResult fetchRow = client.fetchRow("test-table", selector);
        FetchRowResult rowResult = fetchRow.getRowResult();
        return rowResult.getRow();
      }
    });
  }
}

Current State

Blur is nearing it’s first release 0.1 and is relatively stable. The first release candidate should be available for download within the next few weeks. In the meantime you can check it out on github:

https://github.com/nearinfinity/blur

http://blur.io