Статьи

Как распространять Spring Bean с использованием EntryProcessor и PortableObject

В этой статье показано, как распространять компоненты Spring с помощью функций EntryProcessor и Portable Object Format (POF) в Oracle Coherence.

Coherence поддерживает модель программирования без блокировки через API EntryProcessor. Эта функция повышает производительность системы за счет сокращения доступа к сети и выполнения неявной низкоуровневой блокировки записей. Эта неявная функциональность низкоуровневой блокировки отличается от явной блокировки (ключа), предоставляемой API ConcurrentMap.

Явная блокировка, Transaction Framework API и Coherence Resource Adapter — это другие параметры транзакции Coherence в качестве процессоров ввода. Для получения подробной информации о параметрах Coherence Transaction, пожалуйста, просмотрите раздел ссылок. Кроме того, для реализации Coherence Explicit Lock можно предложить распределенное управление данными в статье Oracle Coherence .

Portable Object Format (POF) — это независимый от платформы формат сериализации. Это позволяет кодировать эквивалентные объекты Java, .NET и C ++ в одинаковую последовательность байтов. POF рекомендуется для производительности системы, так как характеристики POF для сериализации и десериализации лучше, чем стандартная сериализация Java (согласно справочному документу Coherence, в простом тестовом классе со строкой String, long и Three Ints, (de) сериализации было семь в разы быстрее, чем стандартная сериализация Java).

Coherence предлагает много видов типов кэша, таких как распределенный (или разделенный), реплицированный, оптимистический, ближний, локальный и удаленный кэш. Распределенный кеш определяется как набор данных, который распределен (или разделен) по любому количеству узлов кластера, так что ровно один узел в кластере отвечает за каждый фрагмент данных в кеше, а ответственность распределяется (или, балансировка нагрузки) среди узлов кластера. Обратите внимание, что в этой статье использовался тип распределенного кэша. Поскольку другие типы кэша не входят в сферу применения этой статьи, ознакомьтесь с разделом «Ссылки» или документом Coherence Reference. Их конфигурации очень похожи на конфигурацию распределенного кэша.

Как распространять Spring Beans с помощью статьи Coherence, в которой рассматривается явная блокировка — стандартная сериализация Java , предлагается сравнить две разные реализации ( EntryProcessor — переносимый объектный формат (POF) и явная блокировка — стандартная сериализация Java ).

В этой статье был создан новый кластер с именем OTV и распределенный bean-компонент с использованием объекта кэша с именем user-cache . Он был распределен между двумя членами кластера.

Давайте посмотрим на реализации AbsctractProcessor реализующего EntryProcessor интерфейс и PortableObject интерфейс для распределения Весны — бобов между виртуальными машинами в кластере.

Используемые технологии:

JDK 1.6.0_31
Spring 3.1.1
Coherence 3.7.0
SolarisOS 5.10
Maven 3.0.2

ШАГ 1: СОЗДАТЬ MAVEN ПРОЕКТ

Maven проект создается как показано ниже. (Его можно создать с помощью Maven или IDE Plug-in).

ШАГ 2: КОГЕРЕНТНЫЙ ПАКЕТ

Coherence загружается через пакет Coherence

ШАГ 3: БИБЛИОТЕКИ

Во-первых, Spring-зависимости добавляются в pom.xml Maven. Обратите внимание, что библиотека Coherence установлена ​​в локальный репозиторий Maven, а ее описание добавляется в pom.xml следующим образом. Также, если maven не используется, файл coherence.jar можно добавить в classpath.

<properties>
    <spring.version>3.1.1.RELEASE</spring.version>
</properties>
 
<dependencies>
 
    <!-- Spring 3 dependencies -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
 
    <!-- Coherence library(from local repository) -->
    <dependency>
        <groupId>com.tangosol</groupId>
        <artifactId>coherence</artifactId>
        <version>3.7.0</version>
    </dependency>
 
    <!-- Log4j library -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.16</version>
    </dependency>
 
</dependencies>

Следующий maven-плагин можно использовать для создания runnable-jar.

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.3.1</version>
 
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>com.otv.exe.Application</mainClass>
                    </transformer>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.handlers</resource>
                    </transformer>
                    <transformer
                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                        <resource>META-INF/spring.schemas</resource>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

ШАГ 4: СОЗДАНИЕ otv-pof-config.xml
otv-pof-config.xml охватывает классы, использующиефункцию Portable Object Format (POF) для сериализации. В этом примере; Классы user , UpdateUserProcessor и DeleteUserProcessor реализуютинтерфейс com.tangosol.io.pof.PortableObject .

Аргумент -Dtangosol.pof.config может использоваться для определения пути otv-pof -config.xml в сценарии запуска.

<?xml version="1.0"?>
<!DOCTYPE pof-config SYSTEM "pof-config.dtd">
<pof-config>
    <user-type-list>
        <!-- coherence POF user types -->
        <include>coherence-pof-config.xml</include>
        <!-- The definition of classes which use Portable Object Format -->
        <user-type>
            <type-id>1001</type-id>
            <class-name>com.otv.user.User</class-name>
        </user-type>
        <user-type>
            <type-id>1002</type-id>
            <class-name>com.otv.user.processor.UpdateUserProcessor</class-name>
        </user-type>
        <user-type>
            <type-id>1003</type-id>
            <class-name>com.otv.user.processor.DeleteUserProcessor</class-name>
        </user-type>
    </user-type-list>
    <allow-interfaces>true</allow-interfaces>
    <allow-subclasses>true</allow-subclasses>
</pof-config>

ШАГ 5: СОЗДАТЬ otv-coherence-cache-config.xml

otv-coherence-cache-config.xml содержит конфигурацию кэширования (распределенную или реплицированную) и конфигурацию отображения схемы кэширования. Созданная конфигурация кэша должна быть добавлена ​​в coherence-cache-config.xml .

<?xml version="1.0"?>
 
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
                     coherence-cache-config.xsd">
 
    <caching-scheme-mapping>
        <cache-mapping>
            <cache-name>user-cache</cache-name>
            <scheme-name>UserDistributedCacheWithPof</scheme-name>
        </cache-mapping>
    </caching-scheme-mapping>
 
    <caching-schemes>
 
        <distributed-scheme>
            <scheme-name>UserDistributedCacheWithPof</scheme-name>
            <service-name>UserDistributedCacheWithPof</service-name>
 
            <serializer>
                <instance>
                    <class-name>com.tangosol.io.pof.SafeConfigurablePofContext
                    </class-name>
                    <init-params>
                        <init-param>
                            <param-type>String</param-type>
                            <param-value>
                                <!-- pof-config.xml path should be set-->
                                otv-pof-config.xml
                            </param-value>
                        </init-param>
                    </init-params>
                </instance>
            </serializer>
            <backing-map-scheme>
                <local-scheme />
            </backing-map-scheme>
            <autostart>true</autostart>
        </distributed-scheme>
    </caching-schemes>
 
</cache-config>

ШАГ 6: СОЗДАЙТЕ tangosol-coherence-override.xml

tangosol-coherence-override.xml охватывает конфигурацию кластера, идентификатора участника и настраиваемую фабрику кэша. Также в следующем XML-файле конфигурации показан первый член кластера. Аргумент -Dtangosol.coherence.override может использоваться для определения пути tangosol-coherence-override.xml в сценарии запуска.

tangosol-coherence-override.xml для первого члена кластера:

<?xml version='1.0'?>
 
<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
   xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
 
   <cluster-config>
 
      <member-identity>
         <cluster-name>OTV</cluster-name>
         <!-- Name of the first member of the cluster -->
         <role-name>OTV1</role-name>
      </member-identity>
 
      <unicast-listener>
          <well-known-addresses>
            <socket-address id="1">
              <!-- IP Address of the first member of the cluster -->
              <address>x.x.x.x</address>
              <port>8089</port>
            </socket-address>
            <socket-address id="2">
              <!-- IP Address of the second member of the cluster -->
              <address>y.y.y.y</address>
              <port>8089</port>
            </socket-address>
          </well-known-addresses>
 
          <!-- Name of the first member of the cluster -->
          <machine-id>OTV1</machine-id>
          <!-- IP Address of the first member of the cluster -->
          <address>x.x.x.x</address>
          <port>8089</port>
          <port-auto-adjust>true</port-auto-adjust>
      </unicast-listener>
 
   </cluster-config>
 
   <configurable-cache-factory-config>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value system-property="tangosol.coherence.cacheconfig">
              <!-- coherence-cache-config.xml path should be set-->
              otv-coherence-cache-config.xml
            </param-value>
         </init-param>
      </init-params>
   </configurable-cache-factory-config>
 
</coherence>

tangosol-coherence-override.xml для второго члена кластера:

<?xml version='1.0'?>
 
<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
   xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
 
   <cluster-config>
 
      <member-identity>
         <cluster-name>OTV</cluster-name>
         <!-- Name of the second member of the cluster -->
         <role-name>OTV2</role-name>
      </member-identity>
 
      <unicast-listener>     
 
          <well-known-addresses>
            <socket-address id="1">
              <!-- IP Address of the first member of the cluster -->
              <address>x.x.x.x</address>
              <port>8089</port>
            </socket-address>
            <socket-address id="2">
              <!-- IP Address of the second member of the cluster -->
              <address>y.y.y.y</address>
              <port>8089</port>
            </socket-address>
          </well-known-addresses>
 
          <!-- Name of the second member of the cluster -->
          <machine-id>OTV2</machine-id>
          <!-- IP Address of the second member of the cluster -->
          <address>y.y.y.y</address>
          <port>8089</port>
          <port-auto-adjust>true</port-auto-adjust>
 
      </unicast-listener>
 
   </cluster-config>
 
   <configurable-cache-factory-config>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value system-property="tangosol.coherence.cacheconfig">
              <!-- coherence-cache-config.xml path should be set-->
              otv-coherence-cache-config.xml</param-value>
         </init-param>
      </init-params>
   </configurable-cache-factory-config>
 
</coherence>

ШАГ 7: СОЗДАТЬ applicationContext.xml

applicationContext.xml создан.

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
 
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
 
    <!-- Beans Declaration -->
    <bean id="User" class="com.otv.user.User" scope="prototype" />
    <bean id="UserCacheService" class="com.otv.user.cache.srv.UserCacheService" />
    <bean id="CacheUpdaterTask" class="com.otv.cache.updater.task.CacheUpdaterTask">
        <property name="userCacheService" ref="UserCacheService" />
    </bean>
</beans>

ШАГ 8: СОЗДАЙТЕ SystemConstants CLASS

Класс SystemConstants создан. Этот класс охватывает все системные константы.

package com.otv.common;
 
/**
 * System Constants
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class SystemConstants {
 
    public static final String APPLICATION_CONTEXT_FILE_NAME = "applicationContext.xml";
 
    //Named Cache Definition...
    public static final String USER_CACHE = "user-cache";
 
    //Bean Names...
    public static final String BEAN_NAME_CACHE_UPDATER_TASK = "CacheUpdaterTask";
    public static final String BEAN_NAME_USER = "User";
 
}

ШАГ 9: СОЗДАНИЕ БИНА ПОЛЬЗОВАТЕЛЯ

Новый бин User Spring создан. Этот компонент будет распределен между двумя узлами в кластере OTV . PortableObject может быть реализован для сериализации. Интерфейс PortableObject имеет два не реализованных метода, как readExternal и writeExternal . Свойства, которые только сериализуются, должны быть определены. В этом примере все свойства (идентификатор, имя и фамилия пользователя) сериализуются.

package com.otv.user;
 
import java.io.IOException;
 
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
 
/**
 * User Bean
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class User implements PortableObject {
 
    private String id;
    private String name;
    private String surname;
 
    /**
     * Gets User Id
     *
     * @return String id
     */
    public String getId() {
        return id;
    }
 
    /**
     * Sets User Id
     *
     * @param String id
     */
    public void setId(String id) {
        this.id = id;
    }
 
    /**
     * Gets User Name
     *
     * @return String name
     */
    public String getName() {
        return name;
    }
 
    /**
     * Sets User Name
     *
     * @param String name
     */
    public void setName(String name) {
        this.name = name;
    }
 
    /**
     * Gets User Surname
     *
     * @return String surname
     */
    public String getSurname() {
        return surname;
    }
 
    /**
     * Sets User Surname
     *
     * @param String surname
     */
    public void setSurname(String surname) {
        this.surname = surname;
    }
 
    @Override
    public String toString() {
        StringBuilder strBuilder = new StringBuilder();
        strBuilder.append("Id : ").append(id);
        strBuilder.append(", Name : ").append(name);
        strBuilder.append(", Surname : ").append(surname);
        return strBuilder.toString();
    }
 
    /**
     * Restore the contents of a user type instance by reading its state
     * using the specified PofReader object.
     *
     * @param PofReader in
     */
    public void readExternal(PofReader in) throws IOException {
        this.id = in.readString(0);
        this.name = in.readString(1);
        this.surname = in.readString(2);
    }
 
    /**
     * Save the contents of a POF user type instance by writing its state
     * using the specified PofWriter object.
     *
     * @param PofWriter out
     */
    public void writeExternal(PofWriter out) throws IOException {
        out.writeString(0, id);
        out.writeString(1, name);
        out.writeString(2, surname);
    }
}

ШАГ 10: СОЗДАЙТЕ IUserCacheService ИНТЕРФЕЙС

Новый интерфейс IUserCacheService создан для выполнения операций кэширования.

package com.otv.user.cache.srv;
 
import com.otv.user.User;
import com.otv.user.processor.DeleteUserProcessor;
import com.otv.user.processor.UpdateUserProcessor;
import com.tangosol.net.NamedCache;
 
/**
 * User Cache Service Interface
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public interface IUserCacheService {
 
    /**
     * Gets Distributed User Cache
     *
     * @return NamedCache User Cache
     */
    NamedCache getUserCache();
 
    /**
     * Adds user to cache
     *
     * @param User user
     */
    void addUser(User user);
 
    /**
     * Updates user on the cache
     *
     * @param String userId
     * @param UpdateUserProcessor processor
     *
     */
    void updateUser(String userId, UpdateUserProcessor processor);
 
    /**
     * Deletes user from the cache
     *
     * @param String userId
     * @param DeleteUserProcessor processor
     *
     */
    void deleteUser(String userId, DeleteUserProcessor processor);
 
}

ШАГ 11: СОЗДАНИЕ КЛАССА UserCacheService

Класс UserCacheService создается путем реализации интерфейса IUserCacheService .

package com.otv.user.cache.srv;
 
import com.otv.cache.listener.UserMapListener;
import com.otv.common.SystemConstants;
import com.otv.user.User;
import com.otv.user.processor.DeleteUserProcessor;
import com.otv.user.processor.UpdateUserProcessor;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
 
/**
 * User Cache Service
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class UserCacheService implements IUserCacheService {
 
    private NamedCache userCache = null;   
 
    public UserCacheService() {
        setUserCache(CacheFactory.getCache(SystemConstants.USER_CACHE));
        //UserMap Listener is registered to listen user-cache operations
        getUserCache().addMapListener(new UserMapListener());
    }  
 
    /**
     * Adds user to cache
     *
     * @param User user
     */
    public void addUser(User user) {
        getUserCache().put(user.getId(), user);
    }
 
    /**
     * Deletes user from the cache
     *
     * @param String userId
     * @param DeleteUserProcessor processor
     *
     */
    public void deleteUser(String userId, DeleteUserProcessor processor) {
        getUserCache().invoke(userId, processor);
    }
 
    /**
     * Updates user on the cache
     *
     * @param String userId
     * @param UpdateUserProcessor processor
     *
     */
    public void updateUser(String userId, UpdateUserProcessor processor) {
        getUserCache().invoke(userId, processor);
    }
 
    /**
     * Gets Distributed User Cache
     *
     * @return NamedCache User Cache
     */
    public NamedCache getUserCache() {
        return userCache;
    }
 
    /**
     * Sets User Cache
     *
     * @param NamedCache userCache
     */
    public void setUserCache(NamedCache userCache) {
        this.userCache = userCache;
    }
}

ШАГ 12: СОЗДАТЬ КЛАСС USERMAPLISTENER

Новый класс UserMapListener создан. Этот слушатель получает распределенные события пользовательского кэша .

package com.otv.cache.listener;
 
import org.apache.log4j.Logger;
 
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
 
/**
 * User Map Listener
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class UserMapListener implements MapListener {
 
    private static Logger logger = Logger.getLogger(UserMapListener.class);
 
    /**
     * This method is invoked when an entry is deleted from the cache...
     *
     * @param MapEvent me
     */
    public void entryDeleted(MapEvent me) {
         logger.debug("Deleted Key = " + me.getKey() + ", Value = " + me.getOldValue());
    }
 
    /**
     * This method is invoked when an entry is inserted to the cache...
     *
     * @param MapEvent me
     */
    public void entryInserted(MapEvent me) {
        logger.debug("Inserted Key = " + me.getKey() + ", Value = " + me.getNewValue());
    }
 
    /**
     * This method is invoked when an entry is updated on the cache...
     *
     * @param MapEvent me
     */
    public void entryUpdated(MapEvent me) {
        logger.debug("Updated Key = " + me.getKey() + ", New_Value = " + me.getNewValue() + ", Old Value = " + me.getOldValue());
    }
}

ШАГ 13: СОЗДАТЬ UpdateUserProcessor CLASS

AbstractProcessor — это абстрактный класс в пакете com.tangosol.util.processor . Он реализует интерфейс EntryProcessor .

Класс UpdateUserProcessor создан для обработки операции обновления пользователя в кеше. Когда UpdateUserProcessor вызывается для ключа, сначала в кластере обнаруживается элемент, содержащий ключ. После этого UpdateUserProcessor вызывается из члена, который содержит связанный ключ, и его значение (объект пользователя) обновляется. Поэтому сетевой трафик уменьшается.

package com.otv.user.processor;
 
import java.io.IOException;
 
import org.apache.log4j.Logger;
 
import com.otv.user.User;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;
 
/**
 * Update User Processor
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class UpdateUserProcessor extends AbstractProcessor implements PortableObject {
 
    private static Logger logger = Logger.getLogger(UpdateUserProcessor.class);
    private User newUser;
 
    /**
     * This empty constructor is added for Portable Object Format(POF).
     *
     */
    public UpdateUserProcessor() {
 
    }
 
    public UpdateUserProcessor(User newUser) {
        this.newUser = newUser;
    }
 
    /**
     * Processes a Map.Entry object.
     *
     * @param Entry entry
     * @return Object newUser
     */
    public Object process(Entry entry) {
        Object newValue = null;
        try {
            newValue = getNewUser();
            entry.setValue(newValue);
        } catch (Exception e) {
            logger.error("Error occured when entry was being processed!", e);
        }
 
        return newValue;
    }
 
    /**
     * Gets new user
     *
     * @return User newUser
     */
    public User getNewUser() {
        return newUser;
    }
 
    /**
     * Sets new user
     *
     * @param User newUser
     */
    public void setNewUser(User newUser) {
        this.newUser = newUser;
    }
 
    /**
     * Restore the contents of a user type instance by reading its state
     * using the specified PofReader object.
     *
     * @param PofReader in
     */
    public void readExternal(PofReader in) throws IOException {
        setNewUser((User) in.readObject(0));
    }
 
    /**
     * Save the contents of a POF user type instance by writing its state
     * using the specified PofWriter object.
     *
     * @param PofWriter out
     */
    public void writeExternal(PofWriter out) throws IOException {
        out.writeObject(0, getNewUser());
    }
}

ШАГ 14: СОЗДАЙТЕ DeleteUserProcessor CLASS

Класс DeleteUserProcessor создан для обработки операции удаления пользователя в кеше. Когда DeleteUserProcessor вызывается для ключа, сначала в кластере обнаруживается элемент, содержащий ключ. После этого DeleteUserProcessor вызывается из члена, который содержит связанный ключ. Поэтому сетевой трафик уменьшается.

package com.otv.user.processor;
 
import java.io.IOException;
 
import org.apache.log4j.Logger;
 
import com.otv.user.User;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;
 
/**
 * Delete User Processor
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class DeleteUserProcessor extends AbstractProcessor implements PortableObject {
 
    private static Logger logger = Logger.getLogger(DeleteUserProcessor.class);
 
    /**
     * Processes a Map.Entry object.
     *
     * @param Entry entry
     * @return Object user
     */
    public Object process(Entry entry) {
        User user = null;
        try {
            user = (User) entry.getValue();
            entry.remove(true);
        } catch (Exception e) {
            logger.error("Error occured when entry was being processed!", e);
        }
 
        return user;
    }
 
    /**
     * Restore the contents of a user type instance by reading its state
     * using the specified PofReader object.
     *
     * @param PofReader in
     */
    public void readExternal(PofReader in) throws IOException {
 
    }
 
    /**
     * Save the contents of a POF user type instance by writing its state
     * using the specified PofWriter object.
     *
     * @param PofWriter out
     */
    public void writeExternal(PofWriter out) throws IOException {
 
    }
}

ШАГ 15: СОЗДАЙТЕ CacheUpdaterTask CLASS

CacheUpdaterTask Class создан для выполнения операций кэширования (добавления, обновления и удаления) и мониторинга содержимого кэша.

package com.otv.cache.updater.task;
 
import java.util.Collection;
 
import org.apache.log4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
 
import com.otv.common.SystemConstants;
import com.otv.user.User;
import com.otv.user.cache.srv.IUserCacheService;
import com.otv.user.processor.DeleteUserProcessor;
import com.otv.user.processor.UpdateUserProcessor;
 
/**
 * Cache Updater Task
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class CacheUpdaterTask implements BeanFactoryAware, Runnable {
 
    private static Logger log = Logger.getLogger(CacheUpdaterTask.class);
    private IUserCacheService userCacheService;
    private BeanFactory beanFactory;
 
    public void run() {
        try {
            while(true) {
                /**
                 * Before the project is built for the first member,
                 * this code block should be used instead of
                 * method processRequestsOnSecondMemberOfCluster.
                 */
                processRequestsOnFirstMemberOfCluster();
 
                /**
                 * Before the project is built for the second member,
                 * this code block should be used instead of
                 * method processRequestsOnFirstMemberOfCluster.
                 */
//              processRequestsOnSecondMemberOfCluster();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    /**
      * Processes the cache requests on the first member of cluster...
      *
      * @throws InterruptedException
      */
    private void processRequestsOnFirstMemberOfCluster() throws InterruptedException {
        //Entry is added to cache...
        getUserCacheService().addUser(getUser("1", "Bruce", "Willis"));
 
        //Cache Entries are being printed...
        printCacheEntries();
 
        Thread.sleep(10000);
 
        User newUser = getUser("1", "Client", "Eastwood");
        //Existent Entry is updated on the cache...
        getUserCacheService().updateUser(newUser.getId(), new UpdateUserProcessor(newUser));
 
        //Cache Entries are being printed...
        printCacheEntries();
 
        Thread.sleep(10000);
 
        //Entry is deleted from cache...
        getUserCacheService().deleteUser(newUser.getId(), new DeleteUserProcessor());
 
        //Cache Entries are being printed...
        printCacheEntries();
 
        Thread.sleep(10000);
    }
 
    /**
      * Processes the cache requests on the second member of cluster...
      *
      * @throws InterruptedException
      */
    private void processRequestsOnSecondMemberOfCluster() throws InterruptedException {
        //Entry is added to cache...
        getUserCacheService().addUser(getUser("2", "Nathalie", "Portman"));
 
        Thread.sleep(15000);
 
        User newUser = getUser("2", "Sharon", "Stone");
        //Existent Entry is updated on the cache...
        getUserCacheService().updateUser(newUser.getId(), new UpdateUserProcessor(newUser));
 
        User newUser2 = getUser("1", "Maria", "Sharapova");
        //Existent Entry is updated on the cache...
        getUserCacheService().updateUser(newUser2.getId(), new UpdateUserProcessor(newUser2));
 
        Thread.sleep(15000);
 
        //Entry is deleted from cache...
        getUserCacheService().deleteUser(newUser.getId(), new DeleteUserProcessor());
 
        Thread.sleep(15000);
    }
 
    /**
     * Prints cache entries
     *
     */
    private void printCacheEntries() {
        Collection<User> userCollection = (Collection<User>)getUserCacheService().getUserCache().values();
        for(User user : userCollection) {
            log.debug("Cache Content : "+user);
        }
    }
 
    /**
     * Gets new user instance
     *
     * @param String user id
     * @param String user name
     * @param String user surname
     * @return User user
     */
    private User getUser(String id, String name, String surname) {
        User user = getNewUserInstance();
        user.setId(id);
        user.setName(name);
        user.setSurname(surname);
 
        return user;
    }
 
    /**
     * Gets user cache service...
     *
     * @return IUserCacheService userCacheService
     */
    public IUserCacheService getUserCacheService() {
        return userCacheService;
    }
 
    /**
     * Sets user cache service...
     *
     * @param IUserCacheService userCacheService
     */
    public void setUserCacheService(IUserCacheService userCacheService) {
        this.userCacheService = userCacheService;
    }
 
    /**
     * Gets a new instance of User Bean
     *
     * @return User
     */
    public User getNewUserInstance() {
        return  (User) getBeanFactory().getBean(SystemConstants.BEAN_NAME_USER);
    }
 
    /**
     * Gets bean factory
     *
     * @return BeanFactory
     */
    public BeanFactory getBeanFactory() {
        return beanFactory;
    }
 
    /**
     * Sets bean factory
     *
     * @param BeanFactory beanFactory
     * @throws BeansException
     */
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }
}

ШАГ 16: СОЗДАЙТЕ КЛАСС ПРИЛОЖЕНИЙ

Класс приложения создан для запуска приложения.

package com.otv.exe;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
import com.otv.cache.updater.task.CacheUpdaterTask;
import com.otv.common.SystemConstants;
 
/**
 * Application Class
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class Application {
 
    /**
     * Starts the application
     *
     * @param  String[] args
     *
     */
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext(SystemConstants.APPLICATION_CONTEXT_FILE_NAME);
 
        CacheUpdaterTask cacheUpdaterTask = (CacheUpdaterTask) context.getBean(SystemConstants.BEAN_NAME_CACHE_UPDATER_TASK);
        Thread cacheUpdater = new Thread(cacheUpdaterTask);
        cacheUpdater.start();
    }
}

ШАГ 17: СТРОИМ ПРОЕКТ

После сборки проекта OTV_Spring_Coherence_With_Processor_and_POF будет создан файл OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar .
Обратите внимание, что члены кластера имеют различную конфигурацию для Coherence, поэтому проект должен быть построен отдельно для каждого участника.

ШАГ 18: ЗАПУСК ПРОЕКТА ПЕРВОГО ЧЛЕНА КЛАСТЕРА

После запуска созданного файла OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar на элементах кластера на консоли первого участника будут показаны следующие выходные журналы:

--After A new cluster is created and First Member joins the cluster, a new entry is added to the cache.
02.06.2012 14:21:45 DEBUG (UserMapListener.java:33) - Inserted Key = 1, Value = Id : 1, Name : Bruce, Surname : Willis
02.06.2012 14:21:45 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Bruce, Surname : Willis
.......
--After Second Member joins the cluster, a new entry is added to the cache.
02.06.2012 14:21:45 DEBUG (UserMapListener.java:33) - Inserted Key = 2, Value = Id : 2, Name : Nathalie, Surname : Portman
.......
--Cache operations go on both first and second members of the cluster:
02.06.2012 14:21:55 DEBUG (UserMapListener.java:42) - Updated Key = 1, New_Value = Id : 1, Name : Client, Surname : Eastwood,
                                                                       Old Value = Id : 1, Name : Bruce, Surname : Willis
 
02.06.2012 14:21:55 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 2, Name : Nathalie, Surname : Portman
02.06.2012 14:21:55 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Client, Surname : Eastwood
 
02.06.2012 14:22:00 DEBUG (UserMapListener.java:42) - Updated Key = 2, New_Value = Id : 2, Name : Sharon, Surname : Stone,
                                                                       Old Value = Id : 2, Name : Nathalie, Surname : Portman
 
02.06.2012 14:22:00 DEBUG (UserMapListener.java:42) - Updated Key = 1, New_Value = Id : 1, Name : Maria, Surname : Sharapova,
                                                                       Old Value = Id : 1, Name : Client, Surname : Eastwood
 
02.06.2012 14:22:05 DEBUG (UserMapListener.java:24) - Deleted Key = 1, Value = Id : 1, Name : Maria, Surname : Sharapova
02.06.2012 14:22:05 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 2, Name : Sharon, Surname : Stone
02.06.2012 14:22:15 DEBUG (UserMapListener.java:24) - Deleted Key = 2, Value = Id : 2, Name : Sharon, Surname : Stone
02.06.2012 14:22:15 DEBUG (UserMapListener.java:33) - Inserted Key = 1, Value = Id : 1, Name : Bruce, Surname : Willis
02.06.2012 14:22:15 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Bruce, Surname : Willis

ШАГ 19: СКАЧАТЬ

OTV_Spring_Coherence_With_Processor_and_POF

РЕКОМЕНДАЦИИ :

Выполнение транзакций в когерентности с
использованием формата переносимых объектов в Coherence
Spring Framework Reference 3.x