Статьи

Обработка ошибок в топологиях Storm Trident

Эта статья обобщает мой текущий подход к обработке ошибок при разработке топологий Storm Trident. Здесь я сосредоточен на разработке кода, а не на передовых практиках развертывания, таких как надзор и избыточность.

Из-за потоковой природы Storm в реальном времени при возникновении большинства видов ошибок нам в конечном итоге придется перейти к следующему фрагменту данных. Обработка ошибок в этом контексте сводится к сообщению об этой ошибке (или нет) и повторной попытке обработать ошибочные входные данные позже (или нет). Часть 1 этого поста об этом аспекте.

Это подразумевает, что при обработке кортежа, как правило, трудно быть уверенным, встречаемся ли мы впервые с ним или его содержимое уже частично применено к персистентности. Поэтому нам нужно сделать наши операции обновления состояния идемпотентными, что является предметом второй части этого поста.

Не удивляйтесь размеру этого поста, Storm фактически делает большую часть работы за нас. Все, что требуется на самом деле, — это понять, как все это подходит, чтобы объединить вещи таким образом, чтобы это имело смысл.

Этот пост основан на Storm 0.9, Cassandra 2.0.4 и Kafka 0.7. Я поместил игрушечный проект на github, чтобы проиллюстрировать несколько моментов, обсуждаемых ниже. Этот проект фактически адаптирован из примера «присутствия в комнате», который я представил в предыдущем посте .

Часть 1: обработка ошибочной ситуации

Решая, когда попросить повторить

Первая простая стратегия обработки ошибок — это просто принять ухудшение качества вычислений, вызванное ошибками во время выполнения. Это может иметь место, например, если топология вычисляет некоторую оценку тренда в реальном времени на скользящем окне за очень недавнее прошлое, или если мы уже работаем с выборочными данными, такими как общедоступный поток Twitter . Если мы решили игнорировать такие ошибки, реализация будет тривиально простой, просто оберните логику топологии большой жирной попыткой / отловом, сообщите как-нибудь об ошибках и не позволяйте ничего всплывать в Storm.

Однако в большинстве случаев мы заботимся о согласованности и должны принять осторожное решение о попытке повторить неудачные данные или нет.

Типичный пример ошибки времени выполнения — проблемы с форматом входящих данных. В этом случае повторная попытка, конечно, не имеет смысла, так как она не улучшится во второй раз. Вместо этого мы должны регистрировать ошибочные данные и, возможно, попросить некоторых людей провести расследование. Вот упрощенный пример из функции BytesToString Storm из моего игрушечного проекта:

01
02
03
04
05
06
07
08
09
10
public class BytesToString extends BaseFunction {
@Override
     public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
     try {
         String asString = new String((byte[]) tuple.getValueByField("bytes"), "UTF-8");
         tridentCollector.emit(new Values(asString));
     } catch (UnsupportedEncodingException e) {
         logger.err("ERROR: lost data: unable to parse inbound message from Kafka (expecting UTF-8 string)", e);
     }
 }

С другой стороны, если ошибка связана с некоторыми недоступными внешними источниками данных, вызванными, например, сетевым разделом, мы должны инициировать повторную попытку, как описано в следующем разделе.

Есть много других видов ошибок, отличных от двух, упомянутых выше, но остается один момент: важно отличать повторяющиеся ошибки от не повторяющихся ошибок и реагировать соответствующим образом.

В заключение, будьте очень осторожны, когда решите не сообщать об ошибке, которая произошла в мультигете IBackingMap , потому что эта функция должна возвращать список того же размера, что и список ввода ключей. Таким образом, в случае отсутствия повторных ошибок, мы должны вернуть какой-то результат тем или иным способом. В большинстве случаев, если мы решим не повторять ошибки в этом случае, это потому, что что-то уже сломано в постоянстве из-за какой-то прошлой ошибки, и уже слишком поздно ее исправлять. В приведенном ниже примере ошибка возникает из-за неудачного анализа некоторых данных, считанных из БД, и вместо этого код просто возвращает нулевые значения, что эквивалентно рассмотрению того, что в постоянстве нет ничего (по крайней мере, ничего полезного). Смотрите также часть 3 ниже для возможного решения для этого случая.

1
2
3
4
5
6
7
8
9
@Override
 public List<OpaqueValue> multiGet(List<List<Object>> keys) {
    try {
        return Utils.opaqueStringToOpaqueValues(opaqueStrings, HourlyTimeline.class);
    } catch (IOException e) {
        logger.err("error while trying to deserialize data from json => giving up (data is lost!)", e);
        return Utils.listOfNulls(keys.size()); // this assumes previous state does not exist => destroys data!
    }
}

(ну, этот код из TimelineBackingMap фактически заменяет все данные нулями, что еще хуже, но это игрушечный проект …)

Вызывая кортеж Trident для воспроизведения …

Как только мы решили, что имеет смысл запустить воспроизведение кортежа, мы просто должны попросить об этом, и Storm сделает все остальное (просто подключите правильный носик, см. Следующий раздел). Технически это очень просто: запуск повторной попытки из примитива Trident, такого как Function или Filter , так же прост, как и выброс FailedException, как в TimeLineBackingMap из моего игрушечного проекта, который включает в себя пример повторных и неповторяющихся ошибок (обратите внимание, что код ниже из TimelineBackingMap предполагается, что любая ошибка БД является повторяемой, что является чрезмерным упрощением):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
@Override
 public void multiPut(List<List<Object>> keys, List<OpaqueValue> timelines) {;
  List<OpaqueValue> jsonOpaqueTimelines;
  try {
      jsonOpaqueTimelines = Utils.opaqueValuesToOpaqueJson(timelines);
  } catch (IOException e) {
      System.err.println("error while trying to serialize data to json => giving up (data is lost!)");
      return;
  }
  if (jsonOpaqueTimelines != null) {
      try {
          DB.put("room_timelines", toSingleKeys(keys), jsonOpaqueTimelines);
      } catch (Exception e) {
          logger.err("error while storing timelines to cassandra, triggering a retry...", e);
          throw new FailedException("could not store data into Cassandra, triggering a retry...", e);
      }
   }
};

Затем Storm распространит ошибку обратно в носик, чтобы принудительно воспроизвести кортеж. Если мы хотим, чтобы ошибка сообщалась в Storm UI, мы можем вместо этого выдать ReportedFailedException.

Другой способ, который я настоятельно не рекомендую, это позволить любому другому виду RuntimeException всплыть в Storm. По сути, это дает тот же результат при гораздо более высоких затратах производительности: он вызывает сбой рабочего узла с автоматическим перезапуском Nimbus, и все носики возобновляют чтение из последнего известного успешного индекса (реализация носика, такая как носик Кафки, сохраняет свое последнее успешно обработанное смещение в zookeeper. для этой цели). Эта стратегия быстрого отказа является частью проекта Storm (см. Документацию по надзору за работниками и отказоустойчивости ). По сути, это обеспечивает те же гарантии согласованности, что и при разрешении spout воспроизводить некоторые кортежи, но влияние на производительность, конечно, больше, поскольку мы имеем полный перезапуск JVM со сбросом всех запущенных в данный момент экземпляров топологии. Так что никогда не делайте это нарочно. Тем не менее, отрадно осознавать, что в случае сбоя наших узлов данные не будут повреждены, и поток, естественно, продолжится.

Третья ситуация, когда Storm решает воспроизвести кортежи, — это если они не достигают конца топологии до заданного времени ожидания. Точнее, этот механизм на самом деле запускается носиком, который испускает кортеж, если ACK не получен вовремя, поэтому эти повторы также могут быть запущены в случае успешной обработки кортежей, но ACK не может достичь носика из-за некоторого сетевого раздела. , Параметры Storm для управления этим являются topology.enable.message.timeouts и topology.message.timeout.secs , а их значения по умолчанию в соответствии с defaults.yaml равны «true» и 30 секунд. Это еще одна причина, почему идемпотентность в наших топологиях так важна.

… и фактически воспроизводит кортежи

Как только уведомление о сбое достигнет носика (или будет сгенерировано им в случае тайм-аута), мы должны убедиться, что неудачные кортежи будут воспроизведены. Если вы не разрабатываете свой собственный носик, это просто сводится к выбору правильного вкуса носика . Этот выбор влияет на способ воспроизведения кортежей (или нет), поэтому он должен быть согласован со стратегией на месте для обработки переигранных кортежей в топологии, которая является предметом следующего раздела. Существует 3 вида носиков:

  • не транзакционный: нет гарантии, но в некоторых случаях он все еще может быть полезен, если выбранная вами реализация предлагает гарантию «хотя бы один раз»
  • транзакционный: не рекомендуется, потому что они могут блокировать топологию в некоторых случаях разбиения
  • непрозрачный: предлагает слабую гарантию на воспроизведение в том смысле, что они достигают кортежа, который будет воспроизведен хотя бы один раз, но в случае воспроизведения выданные партии могут быть не идентичны. На практике все, что имеет значение при использовании тех, которые я рекомендую, состоит в том, чтобы убедиться, что топология устойчива для такого рода гибких повторов, что обсуждается в следующем разделе.

Последнее замечание о кортеже и повторении партии

Я обсуждаю выше на уровне кортежей, потому что это упрощает проектные решения. В действительности, запрос Storm на воспроизведение одного кортежа вызовет воспроизведение многих других кортежей, содержащихся в том же пакете, некоторые из которых потенциально безошибочны.

Часть 2: идемпотентная обработка переигранных кортежей

Другая сторона истории заключается в том, что теперь, когда мы знаем, что кортежи могут обрабатываться несколько раз, необходимо убедиться, что топология идемпотентна, то есть отправка нескольких раз одних и тех же кортежей не сделает состояние несовместимым. Части топологии, которые не имеют побочных эффектов, конечно же, не подвержены влиянию повторов кортежей.

Документация Storm Trident о согласованности состояний довольно ясна, поэтому я просто добавляю немного соли здесь.

Если наши операции обновления состояния уже идемпотентны

Если операция обновления состояния уже идемпотентна по своей природе, то она уже устойчива к повторам кортежей и никакой из специальных механизмов Storm не требуется.

Это случай любой операции «сохранение по идентификатору», если значение идентификатора полностью основано на содержимом входящего кортежа. Например, в моем игрушечном проекте я храню сеансы занятости, первичный ключ которых получен из идентификатора корреляции, найденного во входящих событиях, поэтому в этом случае операция записи уже готова к воспроизведению, поскольку любое воспроизведение просто перезапишет существующие данные с такими же информация без какого-либо уничтожения данных (при условии, что у нас есть гарантия заказа, что в данном случае верно).

1
2
3
public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) {
    DB.upsertPeriods(newOrUpdatedPeriods);
}

и в CassandraDB.java:

1
2
3
4
5
6
7
try {
    PreparedStatement statement = getSession().prepare("INSERT INTO presence (id, payload) values (?,?)");
    execute(new BoundStatement(statement).bind(rpp.getId(), periodJson));
 } catch (Exception e) {
    logger.error("error while contacting Cassandra, triggering a retry...", e);
    new FailedException("error while trying to record room presence in Cassandra ", e);
 }

Делая операцию чтения-обновления-записи также идемпотентной

В предыдущем сообщении в блоге я описал, как Storm позволяет нам реализовывать операции, которые выполняют следующее, не требуя блокировок DB и все же избегая условий гонки:

  • читать предыдущее состояние из БД,
  • обновить состояние в памяти в соответствии с новыми данными кортежей,
  • сохранить новое состояние в БД

Одна из прелестей шторма заключается в том, что для обработки воспроизведенных кортежей без разрушения состояния нам нужно только адаптировать шаги 1 и 3. Это очень важно: теперь мы можем реализовать всю нашу логику обработки на шаге 2, как если бы каждый кортеж воспроизводился только один раз и наплевать на повторы (если мы «чисты», см. замечание ниже…). Это то, что они имеют в виду под «штормом точно семантическим».

Более того, если у нас есть внутренняя реализация 1 и 3, то для их готовности к воспроизведению достаточно просто обернуть их существующими классами Storm. Самый надежный способ сделать это — использовать непрозрачную логику, за счет сохранения дважды каждого состояния, как объяснено в документации Trident о выпуске транзакции .

Более того, уже существует множество реализаций Opaque BackingMap для многих бэкэндов, таких как Cassandra или Mysql, в storm-contrib , поэтому в большинстве случаев действительно нечего делать, кроме выбора правильного.

Наиболее важный момент, который необходимо убрать, заключается в том, что для использования непрозрачной фоновой карты, которая обрабатывает повторяющиеся кортежи, мы должны использовать носик, который учитывает непрозрачные предварительные условия, как показано в этой матрице .

В случае, если нам необходимо по какой-то причине реализовать наш собственный BackingMap, единственное, что нам нужно сделать, это заставить его хранить текущую и предыдущую версию данных и идентификатор транзакции. Вот упрощенный пример из моего игрушечного проекта (но на самом деле, рассмотрите шторм-вклад, прежде чем кодировать что-то подобное):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
public void put(String table, List<String> keys, List<OpaqueValue> opaqueStrings) {;
    // this should be optimized with C* batches...
    for (Pair<String, OpaqueValue> keyValue : Utils.zip(keys, opaqueStrings)) {
         PreparedStatement statement = getSession().prepare(format("INSERT INTO %s (id, txid, prev, curr) values (?, ?, ?, ?)", table));
         OpaqueValue opaqueVal = keyValue.getValue();
         execute(new BoundStatement(statement).bind(keyValue.getKey(), opaqueVal.getCurrTxid(), opaqueVal.getCurr(), opaqueVal.getPrev()));
     }
}
public List<OpaqueValue> get(String table, List<String> keys) {;
    List<OpaqueValue> vals = new ArrayList<>(keys.size());
    ResultSet rs = execute(format("select id, txid, prev, curr from %s where id in ( %s ) ", table, toCsv(keys) ));
    Map<String, OpaqueValue> data = toMapOfOpaque(rs);
    for (String key: keys){
        vals.add(data.get(key));
    }
    return vals;
 }

Тогда единственное, что нужно сделать, чтобы фактически получить семантическую форму Trident, — это обернуть ее в OpaqueMap, например так:

1
2
3
4
5
public static StateFactory FACTORY = new StateFactory() {
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         return OpaqueMap.build(new TimelineBackingMap(new CassandraDB(conf)));
    }
 }

За кулисами происходит то, что OpaqueMap будет выбирать, какое ранее сохраненное состояние («curr» или «prev») отображать в нашей логике обновления, в зависимости от идентификатора транзакции, связанного с текущим пакетом кортежей, и идентификатора, найденного в постоянстве. Этот идентификатор транзакции предоставляется изливом, поэтому это является причиной того, что поддержание выровненного излива и выбора состояний так важно: состояние делает предположение о значении каждого идентификатора транзакции.

Не ломай предыдущий экземпляр!

Давайте вернемся на шаг 2 к последовательности чтения-обновления-записи, упомянутой выше. Теперь, когда мы знаем, что непрозрачная логика должна хранить как новую, так и старую версию любого состояния, посмотрите на следующий код Редуктора и попытайтесь определить, почему он неисправен:

1
2
3
4
5
6
7
8
9
public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) {
     LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;
     if (ENTER == event.getEventType()) {
         curr.setStartTime(event.getTime());            // buggy code
     } else {
         curr.setEndTme(event.getTime());              // buggy code
     }
     return curr;
 }

Адепты функционального программирования называют это «нечистым» методом, потому что он изменяет свои входные параметры. Причина, по которой он нарушает непрозрачную логику Storm, заключается в том, что теперь и «текущая», и «предыдущая» ссылки Java фактически ссылаются на один и тот же экземпляр в памяти. Таким образом, когда непрозрачная логика сохраняет как предыдущую, так и текущую версию некоторого фрагмента состояния, она действительно дважды сохраняет новую версию, и поэтому предыдущая версия теряется.

Лучшая реализация может быть что-то вроде этого

01
02
03
04
05
06
07
08
09
10
public RoomPresencePeriod reduce(RoomPresencePeriod curr, TridentTuple tuple) {
     LocationChangedEvent event = (LocationChangedEvent) tuple.getValueByField("occupancyEvent");;
     RoomPresencePeriod updated = new RoomPresencePeriod(curr);  // copy constructor
     if (ENTER == event.getEventType()) {
        updated.setStartTime(event.getTime());
     } else {
         updated.setEndTme(event.getTime());
     }
     return updated;
 }

Часть 3: человеческие ошибки: переиграть все

В заключение следует со смирением осознать, что независимо от того, сколько усилий и мер предосторожности, подобных тем, что мы описали выше, мы приложим, мы все равно будем внедрять ошибки в производство (и я клянусь об этом, клянусь!). В случае платформы обработки данных, ошибки потенциально означают ошибки, разрушающие данные, что плохо, когда данные являются нашим бизнесом. В некоторых случаях мы обнаружим, что данные повреждены только после факта, как описано выше в примечании к multiget.

В своей книге «Большие данные» Натан Марц описывает простую идею воспроизведения всего, основанную на лямбда-архитектуре, чтобы обойти эту идею. Краткое резюме этой книги также доступно здесь .

Ссылка: Обработка ошибок в топологиях Storm Trident от нашего партнера JCG Свенда Вандервекена в блоге Svend .