Статьи

Мелкозернистая репликация в Infinispan

Иногда у нас есть большой объект, возможно, с множеством атрибутов или с некоторыми двоичными данными, и мы хотели бы сказать Infinispan, что он должен реплицировать только определенную часть объекта в кластере. Как правило, мы хотим воспроизвести только ту часть, которую мы только что обновили. Именно здесь вступают в игру интерфейсы DeltaAware и Delta. Предоставляя реализации этих интерфейсов, мы можем определить точную репликацию. Когда мы прилагаем некоторые усилия для такого улучшения, мы также хотели бы ускорить маршалинг объектов и демаршаллинг. Поэтому мы собираемся определить наши собственные экстернализаторы — чтобы избежать медленной сериализации Java по умолчанию.

Следующие фрагменты кода собраны в полный пример по адресу
https://github.com/mgencur/infinispan-examples/tree/master/partial-state-transfer.Этот проект содержит файл readme с инструкциями о том, как создать и запустить пример. Он основан на быстром запуске кластерного кэша в Infinispan.

Реализация интерфейса DeltaAware

Итак, давайте посмотрим на наш основной объект. Для этого упражнения я определил класс Bicycle, который состоит из множества компонентов, таких как frame, fork, tailShock и т. Д. Этот объект хранится в кэше как значение под определенным (не важным) ключом. В нашем сценарии может случиться так, что мы обновляем только определенные компоненты велосипеда, и в этом случае мы хотим повторить только эти изменения компонентов.

Здесь важны следующие методы (описание взято из javadocs):

commit () — указывает, что все собранные на сегодняшний день дельты были извлечены (посредством

                 вызова delta ()) и могут быть отброшены. Часто используется в качестве оптимизации, если

                 дельта не нужна, но
                 желательна очистка и сброс       
внутреннего состояния.

delta () — извлекает изменения, внесенные в реализации, в эффективном формате, который

             можно легко и дешево сериализовать и десериализовать. Этот метод будет

             вызываться только один раз для каждого набора изменений, поскольку предполагается, что
             внутренний журнал изменений любой
реализации стирается и сбрасывается после генерации

             и отправки дельты вызывающей стороне.

         

Мы также должны определить сеттеры и геттеры для наших членов. Методы установки, помимо прочего, отвечают за регистрацию изменений в журнале изменений, который впоследствии будет использоваться для восстановления состояния объекта. Externalizer для этого класса необходим только при использовании хранилищ кэша. Ради простоты я не упоминаю об этом здесь.

public class Bicycle implements DeltaAware, Cloneable {

    //bicycle attributes
    String frame, fork, rearShock, crank, bottomBracket, shifters, cogSet, chain,
           frontDerailleur, rearDerailleur, rims, hubs, tires, pedals, brake, handlebar, stem;

    private BicycleDelta delta;

    @Override
    public void commit() {
        delta = null;
    }

    @Override
    public Delta delta() {
        Delta toReturn = getDelta();
        delta = null; // reset
        return toReturn;
    }
    
    ...
    
    public void setFrame(String frame) {
        getDelta().registerComponentChange("frame", frame);
        this.frame = frame;
    }
    
    public String getFrame() {
        return frame;
    }
    
    ...
}

Реализация интерфейса Delta

Фактический объект, который будет реплицирован по всему кластеру, является реализацией интерфейса Delta. Давайте посмотрим на класс. Во-первых, нам нужно поле, которое будет содержать изменения — changeLog. Во-вторых, нам нужно определить метод merge (). Этот метод должен быть реализован так, чтобы Infinispan знал, как объединить существующий объект с входящими изменениями. Параметр этого метода представляет объект, который уже хранится в кэше, входящие изменения будут применены к этому объекту. Мы используем отражение здесь, чтобы применить изменения к реальному объекту, но это не обязательно. Мы могли бы легко вызвать методы установки. Преимущество использования отражения в том, что мы можем установить эти поля в цикле.

Другой частью является метод registerComponentChange (). Это вызывается объектом класса Bicycle — для записи изменений этого объекта. Название этого метода не важно.

Определение нашего собственного экстерьера


Хорошо, так что остаётся определение экстерьера для реализации Delta. Мы реализуем интерфейс AdvancedExternalizer и говорим, что только объект changeLog должен быть маршалирован и демаршаллирован при передаче данных по проводам. Полная (почти) реализация интерфейса Delta заключается в следующем.

public class BicycleDelta implements Delta {

    private HashMap<String, String> changeLog = new HashMap<String, String>();
    
    @Override
    public DeltaAware merge(DeltaAware d) {
        Bicycle other;
        if (d != null && (d instanceof Bicycle)) {
            other = (Bicycle) d;
        }
        else {
            other = new Bicycle();
        }
        Class<?> hugeClass = other.getClass();
        try {
            if (changeLog != null) {
                for (Entry<String, String> o : changeLog.entrySet()) {
                    Field x = hugeClass.getDeclaredField(o.getKey());
                    if (!x.isAccessible()) x.setAccessible(true);
                    x.set(other, o.getValue());
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return other;
    }
    
    public void registerComponentChange(String compName, String compDescription) {
        changeLog.put(compName, compDescription);
    }
    
    ...
    
    public static class Externalizer implement AdvancedExternalizer<BicycleDelta> {

        @Override
        public Set<Class<? extends BicycleDelta>> getTypeClasses() {
            return Util.<Class<? extends BicycleDelta>>asSet(BicycleDelta.class);
        }

        @Override
        public void writeObject(ObjectOutput output, BicycleDelta object) throws IOException {
            output.writeObject(object.changeLog);
        }

        @Override
        public BicycleDelta readObject(ObjectInput input) throws IOException, ClassNotFoundException {
            BicycleDelta delta = new BicycleDelta();
            delta.changeLog = (HashMap<String, String>) input.readObject();
            return delta;
        }
        
        @Override
        public Integer getId() {
           return 33; //put some random value here to identify the externalizer (must not be in reserved value ranges)
        }
    }
}

Расскажите Infinispan о дополнительном экстерьере

Нам также необходимо настроить Infinispan для использования нашего специального экстерьера для маршалинга / демаршаллинга наших объектов. Мы можем сделать это, например, программно, вызвав .addAdvancedExternalizer () в построителе конфигурации сериализации.

new DefaultCacheManager(
GlobalConfigurationBuilder.defaultClusteredBuilder()
.serialization()
.addAdvancedExternalizer(new BicycleDelta.Externalizer())
.build(),
new ConfigurationBuilder()
.clustering()
.cacheMode(CacheMode.REPL_SYNC)
.transaction()
.transactionMode(TransactionMode.TRANSACTIONAL)
.autoCommit(true)
.lockingMode(LockingMode.OPTIMISTIC)
.transactionManagerLookup(new JBossStandaloneJTAManagerLookup())
.locking()
.isolationLevel(IsolationLevel.READ_COMMITTED)
.build()
);


Вы можете видеть, что мы также настраиваем транзакции здесь.
Это не обязательно, однако. Мы просто стремимся предоставить более богатый пример, исключить транзакционное поведение очень легко.

И тут начинается часть «использования». Заключите вызовы кеша транзакцией, извлеките объект велосипеда из кеша, внесите некоторые изменения и зафиксируйте их. 

try {
tm.begin();

//retrieve the bicycle from the cache, we suppose the bicycle already exists and
//only trying to apply some changes to it, only these changes will be replicated
Bicycle toChange = cache.get(BIKE1);

System.out.println("Updating components: frame, fork");
toChange.setFrame("New Frame");
toChange.setFork("New Fork");

//store the bicycle back to the cache
cache.put(BIKE1, toChange);

tm.commit();
} catch (Exception e) {
try {
if (tm != null) {
tm.rollback();
}
} catch (Exception ex) {
}
}

Вот и все. То, что в итоге передается по проводам, это просто объект changeLog. Фактический объект велосипеда реконструирован из поступающих обновлений.

Если все это кажется вам слишком сложным, у меня есть хорошие новости. Infinispan предоставляет одну реализацию интерфейса DeltaAware, которая называется AtomicHashMap (пакет org.infinispan.atomic). Если эта карта используется в качестве значения в парах ключ / значение, хранящихся в кэше, то только операции добавления / получения / удаления этой карты во время транзакции копируются на другие узлы. Такие классы, как Bicycle и BicycleDelta, тогда не нужны. Даже регистрация внешнего устройства для AtomicHashMap не требуется, это делается автоматически при регистрации внутренних внешних устройств. Однако может потребоваться класс, имитирующий объект реального мира, а не просто карту. Это тот случай, когда ваши собственные реализации интерфейсов DeltaAware и Delta являются единственным способом.