Статьи

Индексирование данных в Solr из разнородных источников с использованием Camel

Apache Solr — это «популярная, молниеносная платформа для корпоративного поиска с открытым исходным кодом», созданная на основе Lucene Для того, чтобы выполнить поиск (и найти результаты), существует первоначальное требование приема данных, как правило, из разнородных источников, таких как системы управления контентом, реляционные базы данных, унаследованные системы, как вы их называете … Затем существует также проблема поддержания индекса на уровне до дата добавления новых данных, обновления существующих записей, удаления устаревших данных. Новые источники данных могут быть такими же, как и исходные, но также могут быть источниками, такими как twitter, AWS или остальные конечные точки.

Solr может понимать различные форматы файлов и предоставляет достаточное количество опций для данных
индексация :

  1. Прямой HTTP и удаленная потоковая передача — позволяет взаимодействовать с Solr через HTTP, публикуя файл для прямой индексации или путь к файлу для удаленной потоковой передачи.
  2. DataImportHandler — это модуль, который обеспечивает как полный, так и инкрементный импорт дельты из реляционных баз данных или файловой системы.
  3. SolrJ — Java-клиент для доступа к Solr с помощью HTTP-клиента Apache Commons.

Но в реальной жизни индексация данных из разных источников с помощью миллионов документов, десятков преобразований, фильтрации, обогащения контента, репликации, параллельной обработки требует гораздо большего. Один из способов справиться с такой проблемой — это заново изобрести колесо: написать несколько пользовательских приложений, объединить их с некоторыми сценариями или запустить cronjobs. Другой подход заключается в использовании гибкого инструмента, разработанного для настройки и подключения, который поможет вам с легкостью масштабировать и распределять нагрузку. Таким инструментом является Apache Camel, который теперь также имеет разъем Solr.

Все началось несколько месяцев назад, в дни базового лагеря в Sourcesense , где я и мой коллега Алекс экспериментировали с различными проектами по реализации конвейера для индексации данных в Solr. Как и ожидалось, мы обнаружили Camel, и после нескольких дней сопряжения мы были готовы к первоначальной версии компонента Solr, которая была передана Camel и расширена Беном Одеем . На данный момент это полнофункциональный коннектор Solr, который использует SolrJ за сценой и позволяет: настраивать все параметры SolrServer и StreamingUpdateSolrServer; поддерживает операции: вставка, add_bean, delete_by_id, delete_by_query, фиксация, откат, оптимизация; индексные файлы, экземпляры SolrInputDocument, компоненты с аннотациями или отдельные заголовки сообщений.

Создать маршрут Camel для индексации всех данных из таблицы реляционной базы данных и локальной файловой системы очень просто:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void configure() {
from("timer://clear?repeatCount=1")
        .to("direct:clearIndex");
 
from("file:src/data?noop=true")
        .to("direct:insert");
 
from("timer://database?repeatCount=1")
        .to("sql:select * from products?dataSourceRef=productDataSource")
        .split(body())
        .process(new SqlToSolrMapper())
        .to("direct:insert");
 
from("direct:insert")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_INSERT))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);
 
from("direct:clearIndex")
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_DELETE_BY_QUERY))
        .setBody(constant("*:*"))
        .to(SOLR_URL)
        .setHeader(SolrConstants.OPERATION, constant(SolrConstants.OPERATION_COMMIT))
        .to(SOLR_URL);
}

Приведенный выше маршрут сначала очистит индекс, удалив все документы с последующей фиксацией. Затем он начнет опрос файлов из папки src / data, прочитает каждый файл и отправит его в конечную точку Solr. Предполагая, что файлы в формате, понятном Solr, они будут проиндексированы и зафиксированы. Третий маршрут извлечет все продукты из базы данных (в памяти), разделит их на отдельные записи, сопоставит каждую запись с полями Solr и дайджест. К счастью, в 2012 году жизнь разработчика программного обеспечения не так скучна. Вместо этого в настоящее время более реалистичное требование к индексации будет состоять из чего-то вроде этого:

1. Получить файлы резервных копий с Amazon S3 и индекс. Если документ утвержден, зафиксируйте его как можно скорее, иначе зафиксируйте каждые 10 минут.

Как верблюд может помочь вам с этим требованием? Camel поддерживает большинство популярных API Amazon, включая S3. Используя компонент aws-s3, можно читать файлы из корзины S3, затем применять фильтр для утвержденных документов, чтобы отправлять их по отдельному маршруту для мгновенной фиксации.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
<route>
  <from uri="aws-s3://MyBucket?delay=5000&maxMessagesPerPoll=5"/>
  <choice>
    <when>
      <xpath>/add/doc[@status='approved']</xpath>
      <to uri="direct:indexAndCommit"/>
    </when>
    <otherwise>
      <to uri="direct:index"/>
    </otherwise>
  </choice>
</route>
<route>
  <from uri="timer://commit?fixedRate=true&period=600s"/>
  <from uri="direct:commit"/>
</route>

2. Извлекайте данные клиентов из базы данных каждые 5 секунд, считывая 10 записей за раз. Также ищите дельты. Пополните адресные данные с помощью latitute / longitute, вызвав внешнюю службу XXX для облегчения пространственного поиска в Solr.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
<route id="fromDB">
  <from uri="jpa://com.ofbizian.pipeline.Customer?consumer.namedQuery= newCustomers&maximumResults=10&delay=5000"/>
  <enrich uri="direct:coordinateEnricher" strategyRef="latLongAggregationStrategy"/>
  <to uri="direct:index"/>
</route>
 
<route>
  <from uri="direct:coordinateEnricher"/>
  <setHeader headerName="CamelHttpQuery">
    <simple>address='${body.address}'&sensor=false</simple>
  </setHeader>
  <setHeader headerName="lat">
    <xpath resultType="java.lang.Double">//result[1]/geometry/location/lat/text()</xpath>
  </setHeader>
  <setHeader headerName="lng">
    <xpath resultType="java.lang.Double">//result[1]/geometry/location/lng/text()</xpath>
  </setHeader>
</route>

Вышеупомянутый маршрут считывает записи из таблицы 10 клиентов за раз, и для каждой из них будет вызываться API карт Google, чтобы получить широту и долготу, используя поле адреса клиента. Координаты извлекаются из ответа с использованием XPath и объединяются обратно в объект Customer. Просто, не правда ли?

3. Индексируйте контент по этому / этому / пути в нашей системе управления контентом, а также следите за обновлениями.

1
2
3
4
<route>
  <from uri="jcr://user:pass@repository/import/inbox/signal?eventTypes=3&deep=true&synchronous=false"/>
  <to uri="direct:index"/>
</route>

Верблюд имеет разъем jcr , который позволяет создавать контент в любом хранилище контента java. В CAMEL-5155 также представлено улучшение, которое позволит в скором времени читать контент из репозиториев, поддерживающих JCR v.2. Если вам повезло, и ваша CMS поддерживает CMIS, вы можете использовать мой разъем camel-cmis от github для той же цели.

4. Слушайте твиты о нашем продукте / компании, проводите анализ настроений и индексируйте только положительные твиты.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
<route id="fromTwitter">
  <from uri="twitter://streaming/filter?type=event&keywords=productName&consumerKey={{consumer.key}}&consumerSecret={{consumer.secret}}"/>
  <setHeader headerName="CamelHttpQuery">
    <language language="beanshell">
      "q=" + java.net.URLEncoder.encode(request.getBody().getText(), "UTF-8")
    </language>
  </setHeader>
  <throttle timePeriodMillis="1500">
    <constant>1</constant>
    <setHeader headerName="sentiment">
      <xpath resultType="java.lang.Double">/sentiment/value/text()</xpath>
    </setHeader>
    <filter>
      <simple>${in.header.sentiment} > 0</simple>
      <to uri="direct:index"/>
    </filter>
  </throttle>
</route>

Этот маршрут будет прослушивать твиты с использованием API Twitter в реальном времени, URL кодировать твит и вызывать API твитов для анализа настроений. Кроме того, он будет применять дросселирование, поэтому каждые 1500 миллисекунд выполняется не более одного запроса, поскольку существует ограничение на количество вызовов в секунду. Затем маршрут применяет фильтр, чтобы игнорировать все отрицательные твиты перед индексацией.

Как вы можете видеть, Camel может легко взаимодействовать со многими разрозненными системами (включая Solr), и даже если у вас очень нестандартное приложение, написание соединителя для него не составит труда. Но это только одна сторона истории. С другой стороны, есть полный список корпоративных интеграционных шаблонов, реализованных компанией Camel, которые необходимы для любого серьезного конвейера приема данных: маршрутизаторы, переводчик, фильтр, разветвитель, агрегатор, обогащение контента, балансировщик нагрузки … Последнее, но не менее важное: обработка исключений , Ведение журнала, мониторинг, DSL … В двух словах: Camel Rocks!

PS : Полный исходный код примеров можно найти на моем аккаунте на github .

Ссылка: индексирование данных в Solr из разнородных источников с использованием Camel от нашего партнера по JCG Билгина Ибряма в блоге OFBIZian .