Статьи

Ringbuffer: новая структура данных в Hazelcast

Hazelcast Ringbuffer — это новая структура данных, добавленная в Hazelcast 3.5, которая в некоторых случаях может быть более практичной альтернативой очередям. Думайте о Ringbuffer как о круглом массиве с фиксированной емкостью. Как и в случае с массивом, каждый элемент в кольцевом буфере уникально идентифицируется с помощью идентификатора последовательности (long).

Ringbuffer является структурой данных только для добавления; поэтому невозможно удалить элемент. Хвост — это место, куда добавляются предметы, а голова — место, где находятся самые старые предметы в кольцевом буфере. Создать Ringbuffer и добавить элементы очень просто:

Ringbuffer<String>rb = hazelcastInstance.getRingbuffer();
long sequence = rb.add("someitem");

Круто то, что возвращаемую последовательность можно использовать и для считывания элемента:

String item = rb.readOne(sequence);

Поскольку каждый элемент уникально идентифицируется по его идентификатору последовательности, возвращаемый идентификатор последовательности является уникальным и может использоваться как дешевый генератор идентификаторов, если вы используете Ringbuffer.

Кольцевой буфер по сравнению с очередью

Хорошая вещь в Ringbuffer, по сравнению с очередью, состоит в том, что с очередью получение является разрушительной операцией; таким образом, только один поток может взять определенный элемент из очереди. Как только это взято, это ушло. Это может быть проблематично по двум причинам:

  1. Что происходит, когда происходит сбой системы после того, как предмет был взят, но до того, как он был полностью обработан?
  2. Что произойдет, если вы хотите, чтобы несколько читателей прочитали один и тот же элемент? Один из подходов заключается в создании очереди для каждого читателя и внесении в каждую очередь. Проблема в том, что он делает путы очень дорогими, потому что с N читателями нужно делать N путов.

Поскольку чтение в кольцевом буфере не является разрушительной операцией, и читатель контролирует, какие элементы он хочет прочитать, читателю легко реализовать гарантии доставки, сохраняя идентификатор последовательности.

  • По крайней мере, один раз : сохранить идентификатор последовательности после того, как элемент был полностью обработан. Если происходит сбой системы до того, как элемент был полностью обработан, тот же элемент будет прочитан снова, поскольку сохраненный идентификатор последовательности все еще содержит старое значение.
  • Самое большее один раз : сохранить идентификатор последовательности до начала обработки элемента. Если происходит сбой системы до полной обработки элемента, загружается идентификатор последовательности элемента, который мы потенциально не смогли обработать, и система может продолжить со следующего элемента.

Другое большое преимущество операции чтения, которая не является разрушительной, заключается в том, что она очень быстрая, поскольку ее не нужно тиражировать — в отличие от очереди.

Вместимость

Каждый Ringbuffer создается с определенной емкостью — по умолчанию 10 тыс. Элементов. Ringbuffer не может вырасти выше этой емкости, поэтому самые старые предметы в конечном итоге перезаписываются (подробнее об этом ниже). Кольцевой буфер может быть настроен с использованием XML или нашего программного API. Если мы хотим установить емкость:

RingbufferConfig rbConfig = new RingbufferConfig("rb")
    .setCapacity(50 * 1000);
Config config = new Config();
config.addRingbufferConfig(rbConfig);
HazelcastInstance hz = Hazelcast.newHazelcastInstance(config);
Ringbuffer<String&gr; rb = hz.getRingbuffer("rb");

Время жить

По умолчанию элементы в кольцевом буфере остаются в кольцевом буфере до тех пор, пока они не будут перезаписаны. Обратите внимание, что они никогда не истекают. Это точно такое же поведение, как если бы вы использовали обычный массив; если элемент записан в массив, он никогда не будет автоматически удален.

На практике вы часто хотите контролировать, как долго элементы остаются доступными (например, 30 секунд). С помощью Ringbuffer это можно сделать, установив время жизни на RingbufferConfig:

RingbufferConfig rbConfig = new RingbufferConfig("rb")
    .setCapacity(50 * 1000)
    .setTimeToLiveSeconds(30);

Со временем жизни 30 секунд потребитель имеет 30-секундное временное окно для обработки элемента. Если элемент записан и прошло 31 секунду, чтение завершено, и элемент больше не будет доступен.

Время жизни может помочь предотвратить чрезмерное использование памяти и предотвратить устаревшие данные; но его реальное значение в том случае, если оно объединено с OverflowPolicy. OverflowPolicy определяет, что делать, когда Ringbuffer заполнен и нет элементов, срок действия которых истекает. На данный момент есть два варианта:

  • ПЕРЕЗАПИСЬ: самый старый элемент в кольцевом буфере перезаписывается, даже если он недостаточно стар, чтобы истечь. В этом случае вы будете отдавать предпочтение производителю, а не потребителю, поскольку потребитель может столкнуться с тем StaleSequenceException, что данных, которые он хочет прочитать, больше не существует.
  • FAIL: ничего не перезаписывается, и вызывающий абонент получает сигнал о том, что запись не удалась. Затем звонящий решает, что делать.

Следующий код показывает, как настроить экспоненциальный откат в сочетании с OverflowPolicy.FAIL:

long sleepMs = 100;
for (; ; ) {
    long result = ringbuffer.addAsync(item, OverflowPolicy.FAIL).get();
    if (result != -1) {
        break;
    }

    TimeUnit.MILLISECONDS.sleep(sleepMs);
    sleepMs = min(5000, sleepMs * 2);
}

дозирующий

Приведенные примеры кода вставлены и читают по одному элементу за раз. Проблема с этим подходом состоит в том, что существует огромное количество служебных данных из-за планирования операций, сетевого взаимодействия и т. Д. Гораздо более эффективно пакетное чтение и запись для амортизации служебных данных.

Добавить партию предметов очень просто:

List<String> items = Arrays.asList("1","2","3");
ICompletableFuture<Long> f = rb.addAllAsync(items, OverflowPolicy.OVERWRITE);
f.get()

Помимо предоставления пакетной функциональности, вы также можете решить, хотите ли вы сделать синхронизирующий вызов, вызвав get или сделать это асинхронным вызовом, используя andThenметод и предоставив обратный вызов.

Чтение партии предметов немного сложнее:

long sequence = rb.headSequence();
for(;;) {

    ICompletableFuture<ReadResultSet<String>> f = rb.readManyAsync(sequence, 1, 100, null);
    ReadResultSet<String> rs = f.get();
    for (String s : rs) {
        System.out.println(s);
    }
    sequence+=rs.readCount();
}

В этом примере мы хотим прочитать не менее 1 элемента и не более 100 элементов. Это может быть очень эффективно, если доступно 1000 элементов, поскольку нужно выполнить только 10 операций.

Вы можете бродить по поводу nullспора в конце. Это где фильтр может быть предоставлен. Представьте, что существует единственный Ringbuffer с объектами-сотрудниками, и вы хотите получить только инженеров; Вы можете предоставить фильтр, который выбирает инженеров.

public class EngineerFilter<Employee, Boolean> implements Filter {
    Boolean apply(Employee e){
    return e instanceof Engineer;
    }
}

Хорошая особенность фильтра в том, что он выполняется в источнике, поэтому не относящиеся к делу элементы не отправляются вызывающей стороне.

Одна из вещей, которую можно сделать с фильтрами, — распараллелить рабочую нагрузку (например, один читатель имеет дело со всеми инженерами, используя инженерный фильтр, и один читатель имеет дело со всеми продавцами с помощью торгового фильтра).

Ознакомьтесь с документацией Ringbuffer »
Готовы попробовать сами? Скачайте Hazelcast и начните сегодня!