Статьи

Индексирование данных в Solr из разных источников с использованием Camel


Apache Solr — это «популярная, молниеносная корпоративная поисковая платформа с открытым исходным кодом», созданная на основе Lucene. Для того, чтобы выполнить поиск (и найти результаты), существует первоначальное требование приема данных, как правило, из разнородных источников, таких как системы управления контентом, реляционные базы данных, устаревшие системы, вы называете это … Тогда есть также проблема сохранения индекса обновлять, добавляя новые данные, обновляя существующие записи, удаляя устаревшие данные. Новые источники данных могут быть такими же, как и исходные, но также могут быть источниками, такими как twitter, AWS или остальные конечные точки.

Solr может понимать различные форматы файлов и предоставляет достаточное количество возможностей для индексации данных
:

  1. Прямой HTTP и удаленная потоковая передача — позволяет взаимодействовать с Solr через HTTP, публикуя файл для прямой индексации или путь к файлу для удаленной потоковой передачи.
  2. DataImportHandler — это модуль, который обеспечивает как полный, так и инкрементный импорт дельты из реляционных баз данных или файловой системы.
  3. SolrJ — Java-клиент для доступа к Solr с помощью HTTP-клиента Apache Commons.

Но в реальной жизни индексация данных из разных источников с помощью миллионов документов, десятков преобразований, фильтрации, обогащения контента, репликации, параллельной обработки требует гораздо большего. Один из способов справиться с такой проблемой — это заново изобрести колесо: написать несколько пользовательских приложений, объединить их с некоторыми сценариями или запустить cronjobs. Другой подход заключается в использовании гибкого инструмента, разработанного для настройки и подключения, который поможет вам с легкостью масштабировать и распределять нагрузку. Таким инструментом является Apache Camel, который теперь также имеет разъем Solr
.

Все началось несколько месяцев назад, в дни базового лагеря в
Sourcesense.где я и мой коллега Алекс экспериментировали с различными проектами, чтобы реализовать конвейер для индексации данных в Solr. Как и ожидалось, мы обнаружили Camel, и после нескольких дней сопряжения мы были готовы к первоначальной версии компонента Solr, которая была передана Camel и расширена
Беном Одеем . На данный момент это полнофункциональный коннектор Solr, который использует SolrJ за сценой и позволяет: настраивать все параметры SolrServer и StreamingUpdateSolrServer; поддерживает операции: вставка, add_bean, delete_by_id, delete_by_query, фиксация, откат, оптимизация; индексные файлы, экземпляры SolrInputDocument, компоненты с аннотациями или отдельные заголовки сообщений.

Создать маршрут Camel для индексации всех данных из таблицы реляционной базы данных и локальной файловой системы очень просто:

public void configure() {
from("timer://clear?repeatCount=1")
        .to("direct:clearIndex");

from("file:src/data?noop=true")
        .to("direct:insert");

from("timer://database?repeatCount=1")
        .to("sql:select * from products?dataSourceRef=productDataSource")
        .split(body())
        .process(new SqlToSolrMapper())
        .to("direct:insert");

from("direct:insert")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_INSERT))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);

from("direct:clearIndex")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_DELETE_BY_QUERY))
        .setBody(constant("*:*"))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);
}

Приведенный выше маршрут сначала очистит индекс, удалив все документы с последующей фиксацией. Затем он начнет опрос файлов из папки src / data, прочитает каждый файл и отправит его в конечную точку Solr. Предполагая, что файлы в формате, понятном Solr, они будут проиндексированы и зафиксированы. Третий маршрут будет получать все продукты из базы данных (в памяти), разделить их на отдельные записи, отображение каждой записи на поля Solr, и переваривать ?

К счастью, в 2012 году, жизнь разработчика программного обеспечения не так

просто
скучный. Вместо этого в настоящее время более реалистичное требование к индексации будет состоять из чего-то вроде этого:

1. Получить файлы резервной копии из amazon S3 и индексировать. Если документ утвержден, зафиксируйте его как можно скорее, иначе зафиксируйте каждые 10 минут.

Как верблюд может помочь вам с этим требованием? Camel поддерживает большинство популярных API Amazon, включая S3. Используя компонент aws-s3, можно читать файлы из корзины S3, затем применять фильтр для утвержденных документов, чтобы отправлять их по отдельному маршруту для мгновенной фиксации.

<route>
  <from uri="aws-s3://MyBucket?delay=5000&maxMessagesPerPoll=5"/>
  <choice>
    <when>
      <xpath>/add/doc[@status='approved']</xpath>
      <to uri="direct:indexAndCommit"/>
    </when>
    <otherwise>
      <to uri="direct:index"/>
    </otherwise>
  </choice>
</route>
<route>
  <from uri="timer://commit?fixedRate=true&period=600s"/>
  <from uri="direct:commit"/>
</route>

2. Извлекайте данные клиентов из базы данных каждые 5 секунд, считывая 10 записей за раз. Также ищите дельты. Пополните адресные данные с помощью latitute / longitute, вызвав внешнюю службу XXX для облегчения пространственного поиска в Solr.

     <route id="fromDB">
      <from uri="jpa://com.ofbizian.pipeline.Customer?consumer.namedQuery= newCustomers&maximumResults=10&delay=5000"/>
      <enrich uri="direct:coordinateEnricher" strategyRef="latLongAggregationStrategy"/>
      <to uri="direct:index"/>
    </route>

    <route>
      <from uri="direct:coordinateEnricher"/>
      <setHeader headerName="CamelHttpQuery">
        <simple>address='${body.address}'&sensor=false</simple>
      </setHeader>
      <to uri="http://maps.google.com/maps/api/geocode/xml"/>
      <setHeader headerName="lat">
        <xpath resultType="java.lang.Double">//result[1]/geometry/location/lat/text()</xpath>
      </setHeader>
      <setHeader headerName="lng">
        <xpath resultType="java.lang.Double">//result[1]/geometry/location/lng/text()</xpath>
      </setHeader>
    </route>

Вышеупомянутый маршрут считывает записи из таблицы 10 клиентов за раз, и для каждой из них будет вызываться API карт Google, чтобы получить широту и долготу, используя поле адреса клиента. Координаты извлекаются из ответа с использованием XPath и объединяются с объектом Customer. Просто, не правда ли?

3. Индексируйте контент по этому / этому / пути в нашей системе управления контентом, а также следите за обновлениями.

<route>
  <from uri="jcr://user:pass@repository/import/inbox/signal?eventTypes=3&deep=true&synchronous=false"/>
  <to uri="direct:index"/>
</route> 

Верблюд имеет
разъем
jcr , который позволяет создавать контент в любом хранилище контента java. В CAMEL-5155 также представлено улучшение, 
которое позволит в скором времени читать контент из репозиториев, поддерживающих JCR v.2.

Если вам повезло, и ваша CMS поддерживает CMIS, вы можете использовать мой
разъем
camel-cmis от github для той же цели.

4. Слушайте твиты о нашем продукте / компании, проводите анализ настроений и индексируйте только положительные твиты.

<route id="fromTwitter">
  <from uri="twitter://streaming/filter?type=event&keywords=productName&consumerKey={{consumer.key}}&consumerSecret={{consumer.secret}}"/>
  <setHeader headerName="CamelHttpQuery">
    <language language="beanshell">
      "q=" + java.net.URLEncoder.encode(request.getBody().getText(), "UTF-8")
    </language>
  </setHeader>
  <throttle timePeriodMillis="1500">
    <constant>1</constant>
    <to uri="http://data.tweetsentiments.com:8080/api/analyze.xml"/>
    <setHeader headerName="sentiment">
      <xpath resultType="java.lang.Double">/sentiment/value/text()</xpath>
    </setHeader>
    <filter>
      <simple>${in.header.sentiment} > 0</simple>
      <to uri="direct:index"/>
    </filter>
  </throttle>
</route>

Этот маршрут будет прослушивать твиты с использованием API Twitter в реальном времени, URL кодировать твит и вызывать API твитов для анализа настроений. Кроме того, он будет применять дросселирование, поэтому каждые 1500 миллисекунд выполняется не более одного запроса, поскольку существует ограничение на количество вызовов в секунду. Затем маршрут применяет фильтр, чтобы игнорировать все отрицательные твиты перед индексацией.

Как вы можете видеть, Camel может легко взаимодействовать со многими разрозненными системами (включая Solr), и даже если у вас очень нестандартное приложение, написание соединителя для него не составит труда. Но это только одна сторона истории. С другой стороны, есть полный список интеграции предприятия
Шаблоны, реализованные Camel, которые необходимы для любого серьезного конвейера приема данных: маршрутизаторы, переводчик, фильтр, разветвитель, агрегатор, обогащение содержимого, балансировщик нагрузки … Последнее, но не менее важное: обработка исключений, ведение журнала, мониторинг, DSL … В двух слова:
верблюжьи скалы!

PS
: Полный исходный код примеров можно найти на моем аккаунте на
github .