Статьи

Распространение Spring Beans в Oracle Coherence

В этой статье показано, как распространять компоненты Spring с помощью функций EntryProcessor и Portable Object Format (POF) в Oracle Coherence.

Coherence поддерживает модель программирования без блокировки через API EntryProcessor. Эта функция повышает производительность системы за счет сокращения доступа к сети и выполнения неявной низкоуровневой блокировки записей. Эта неявная функциональность низкоуровневой блокировки отличается от явной блокировки (ключа), предоставляемой API ConcurrentMap.

Явная блокировка, Transaction Framework API и Coherence Resource Adapter — это другие параметры транзакции Coherence в качестве процессоров ввода. Для получения подробной информации о параметрах Coherence Transaction, пожалуйста, просмотрите раздел ссылок. Кроме того, для реализации Coherence Explicit Lock можно предложить распределенное управление данными в статье Oracle Coherence .

Portable Object Format (POF) — это независимый от платформы формат сериализации. Это позволяет кодировать эквивалентные объекты Java, .NET и C ++ в одинаковую последовательность байтов. POF рекомендуется для производительности системы, так как характеристики POF для сериализации и десериализации лучше, чем стандартная сериализация Java (согласно справочному документу Coherence, в простом тестовом классе со строкой String, длинной и тремя целочисленными значениями (de) сериализации было семь в разы быстрее, чем стандартная сериализация Java).

Coherence предлагает много видов типов кэша, таких как распределенный (или разделенный), реплицированный, оптимистический, ближний, локальный и удаленный кэш. Распределенный кеш определяется как набор данных, которые распределены (или разделены) по любому количеству узлов кластера, так что ровно один узел в кластере отвечает за каждый фрагмент данных в кеше, и ответственность распределяется (или, балансировка нагрузки) среди узлов кластера. Обратите внимание, что в этой статье использовался тип распределенного кэша. Поскольку другие типы кэша не входят в сферу применения этой статьи, ознакомьтесь с разделом «Ссылки» или документом Coherence Reference. Их конфигурации очень похожи на конфигурацию распределенного кэша.

Как распространять Spring Beans с помощью статьи Coherence, в которой рассматривается явная блокировка — стандартная сериализация Java , предлагается сравнить две разные реализации ( EntryProcessor — переносимый объектный формат (POF) и явная блокировка — стандартная сериализация Java ).

В этой статье был создан новый кластер с именем OTV и распределенный bean-компонент с использованием объекта кэша с именем user-cache . Он был распределен между двумя членами кластера.

Давайте рассмотрим реализацию AbsctractProcessor, реализующую интерфейс EntryProcessor и интерфейс PortableObject для распределения Spring Beans между JVM в кластере.

Используемые технологии:

JDK 1.6.0_31
Весна 3.1.1
Согласованность 3.7.0
SolarisOS 5.10
Maven 3.0.2

ШАГ 1: СОЗДАТЬ MAVEN ПРОЕКТ

Maven проект создается как показано ниже. (Его можно создать с помощью Maven или IDE Plug-in).

ШАГ 2: КОГЕРЕНТНЫЙ ПАКЕТ

Coherence загружается через пакет Coherence

ШАГ 3: БИБЛИОТЕКИ

Во-первых, Spring-зависимости добавляются в pom.xml Maven. Обратите внимание, что библиотека Coherence установлена ​​в локальный репозиторий Maven, а ее описание добавляется в pom.xml следующим образом. Также, если maven не используется, файл coherence.jar можно добавить в classpath.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<properties>
 <spring.version>3.1.1.RELEASE</spring.version>
</properties>
 
<dependencies>
 
 <!-- Spring 3 dependencies -->
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>${spring.version}</version>
 </dependency>
 <dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>${spring.version}</version>
 </dependency>
 
 <!-- Coherence library(from local repository) -->
 <dependency>
  <groupId>com.tangosol</groupId>
  <artifactId>coherence</artifactId>
  <version>3.7.0</version>
 </dependency>
 
 <!-- Log4j library -->
 <dependency>
  <groupId>log4j</groupId>
  <artifactId>log4j</artifactId>
  <version>1.2.16</version>
 </dependency>
 
</dependencies>

Следующий maven-плагин может быть использован для создания runnable-jar.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-shade-plugin</artifactId>
 <version>1.3.1</version>
 
 <executions>
  <execution>
   <phase>package</phase>
   <goals>
    <goal>shade</goal>
   </goals>
   <configuration>
    <transformers>
     <transformer
      implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
      <mainClass>com.otv.exe.Application</mainClass>
     </transformer>
     <transformer
      implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
      <resource>META-INF/spring.handlers</resource>
     </transformer>
     <transformer
      implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
      <resource>META-INF/spring.schemas</resource>
     </transformer>
    </transformers>
   </configuration>
  </execution>
 </executions>
</plugin>

ШАГ 4: СОЗДАТЬ otv-pof-config.xml
otv-pof-config.xml охватывает классы, использующие функцию Portable Object Format (POF) для сериализации. В этом примере; Классы user , UpdateUserProcessor и DeleteUserProcessor реализуют интерфейс com.tangosol.io.pof.PortableObject .

Аргумент -Dtangosol.pof.config может использоваться для определения пути otv-pof -config.xml в сценарии запуска.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?xml version="1.0"?>
<!DOCTYPE pof-config SYSTEM "pof-config.dtd">
<pof-config>
 <user-type-list>
  <!-- coherence POF user types -->
  <include>coherence-pof-config.xml</include>
  <!-- The definition of classes which use Portable Object Format -->
  <user-type>
   <type-id>1001</type-id>
   <class-name>com.otv.user.User</class-name>
  </user-type>
  <user-type>
   <type-id>1002</type-id>
   <class-name>com.otv.user.processor.UpdateUserProcessor</class-name>
  </user-type>
  <user-type>
   <type-id>1003</type-id>
   <class-name>com.otv.user.processor.DeleteUserProcessor</class-name>
  </user-type>
 </user-type-list>
 <allow-interfaces>true</allow-interfaces>
 <allow-subclasses>true</allow-subclasses>
</pof-config>

ШАГ 5: СОЗДАТЬ otv-coherence-cache-config.xml

otv-coherence-cache-config.xml содержит конфигурацию кэширования (распределенную или реплицированную) и конфигурацию отображения схемы кэширования. Созданная конфигурация кэша должна быть добавлена ​​в coherence-cache-config.xml .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?xml version="1.0"?>
 
           xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
                     coherence-cache-config.xsd">
 
 <caching-scheme-mapping>
  <cache-mapping>
   <cache-name>user-cache</cache-name>
   <scheme-name>UserDistributedCacheWithPof</scheme-name>
  </cache-mapping>
 </caching-scheme-mapping>
 
    <caching-schemes>
 
  <distributed-scheme>
   <scheme-name>UserDistributedCacheWithPof</scheme-name>
   <service-name>UserDistributedCacheWithPof</service-name>
 
   <serializer>
    <instance>
     <class-name>com.tangosol.io.pof.SafeConfigurablePofContext
     </class-name>
     <init-params>
      <init-param>
       <param-type>String</param-type>
       <param-value>
           <!-- pof-config.xml path should be set-->
        otv-pof-config.xml
       </param-value>
      </init-param>
     </init-params>
    </instance>
   </serializer>
   <backing-map-scheme>
    <local-scheme />
   </backing-map-scheme>
   <autostart>true</autostart>
  </distributed-scheme>
 </caching-schemes>
 
</cache-config>

ШАГ 6: СОЗДАЙТЕ tangosol-coherence-override.xml

tangosol-coherence-override.xml охватывает конфигурацию кластера, идентификатора участника и настраиваемую фабрику кэша. Также в следующем XML-файле конфигурации показан первый член кластера. Аргумент -Dtangosol.coherence.override может использоваться для определения пути tangosol-coherence-override.xml в сценарии запуска.

tangosol-coherence-override.xml для первого члена кластера:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<?xml version='1.0'?>
 
   xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
 
   <cluster-config>
 
      <member-identity>
         <cluster-name>OTV</cluster-name>
   <!-- Name of the first member of the cluster -->
         <role-name>OTV1</role-name>
      </member-identity>
 
      <unicast-listener>
       <well-known-addresses>
         <socket-address id="1">
     <!-- IP Address of the first member of the cluster -->
           <address>x.x.x.x</address>
           <port>8089</port>
         </socket-address>
         <socket-address id="2">
     <!-- IP Address of the second member of the cluster -->
           <address>y.y.y.y</address>
           <port>8089</port>
         </socket-address>
       </well-known-addresses>
 
    <!-- Name of the first member of the cluster -->
       <machine-id>OTV1</machine-id>
    <!-- IP Address of the first member of the cluster -->
        <address>x.x.x.x</address>
        <port>8089</port>
        <port-auto-adjust>true</port-auto-adjust>
      </unicast-listener>
 
   </cluster-config>
 
   <configurable-cache-factory-config>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value system-property="tangosol.coherence.cacheconfig">
     <!-- coherence-cache-config.xml path should be set-->
              otv-coherence-cache-config.xml
   </param-value>
         </init-param>
      </init-params>
   </configurable-cache-factory-config>
 
</coherence>

tangosol-coherence-override.xml для второго члена кластера:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?xml version='1.0'?>
 
   xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
 
   <cluster-config>
 
      <member-identity>
         <cluster-name>OTV</cluster-name>
   <!-- Name of the second member of the cluster -->
         <role-name>OTV2</role-name>
      </member-identity>
 
      <unicast-listener>     
 
       <well-known-addresses>
         <socket-address id="1">
     <!-- IP Address of the first member of the cluster -->
           <address>x.x.x.x</address>
           <port>8089</port>
         </socket-address>
         <socket-address id="2">
     <!-- IP Address of the second member of the cluster -->
           <address>y.y.y.y</address>
           <port>8089</port>
         </socket-address>
       </well-known-addresses>
 
    <!-- Name of the second member of the cluster -->
       <machine-id>OTV2</machine-id>
    <!-- IP Address of the second member of the cluster -->
        <address>y.y.y.y</address>
        <port>8089</port>
        <port-auto-adjust>true</port-auto-adjust>
 
      </unicast-listener>
 
   </cluster-config>
 
   <configurable-cache-factory-config>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value system-property="tangosol.coherence.cacheconfig">
     <!-- coherence-cache-config.xml path should be set-->
              otv-coherence-cache-config.xml</param-value>
         </init-param>
      </init-params>
   </configurable-cache-factory-config>
 
</coherence>

ШАГ 7: СОЗДАТЬ applicationContext.xml

applicationContext.xml создан.

01
02
03
04
05
06
07
08
09
10
11
12
13
 
 
 <!-- Beans Declaration -->
 <bean id="User" class="com.otv.user.User" scope="prototype" />
    <bean id="UserCacheService" class="com.otv.user.cache.srv.UserCacheService" />
    <bean id="CacheUpdaterTask" class="com.otv.cache.updater.task.CacheUpdaterTask">
     <property name="userCacheService" ref="UserCacheService" />
    </bean>
</beans>

ШАГ 8: СОЗДАЙТЕ SystemConstants CLASS

Класс SystemConstants создан. Этот класс охватывает все системные константы.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.otv.common;
 
/**
 * System Constants
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class SystemConstants {
 
 public static final String APPLICATION_CONTEXT_FILE_NAME = "applicationContext.xml";
 
 //Named Cache Definition...
 public static final String USER_CACHE = "user-cache";
 
 //Bean Names...
 public static final String BEAN_NAME_CACHE_UPDATER_TASK = "CacheUpdaterTask";
 public static final String BEAN_NAME_USER = "User";
 
}

ШАГ 9: СОЗДАНИЕ БИНА ПОЛЬЗОВАТЕЛЯ

Новый бин User Spring создан. Этот компонент будет распределен между двумя узлами в кластере OTV . PortableObject может быть реализован для сериализации. Интерфейс PortableObject имеет два не реализованных метода, как readExternal и writeExternal . Свойства, которые только сериализуются, должны быть определены. В этом примере все свойства (идентификатор, имя и фамилия пользователя) сериализуются.

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
package com.otv.user;
 
import java.io.IOException;
 
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
 
/**
 * User Bean
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class User implements PortableObject {
 
 private String id;
 private String name;
 private String surname;
 
 /**
     * Gets User Id
     *
     * @return String id
     */
 public String getId() {
  return id;
 }
 
 /**
     * Sets User Id
     *
     * @param String id
     */
 public void setId(String id) {
  this.id = id;
 }
 
 /**
     * Gets User Name
     *
     * @return String name
     */
 public String getName() {
  return name;
 }
 
 /**
     * Sets User Name
     *
     * @param String name
     */
 public void setName(String name) {
  this.name = name;
 }
 
 /**
     * Gets User Surname
     *
     * @return String surname
     */
 public String getSurname() {
  return surname;
 }
 
 /**
     * Sets User Surname
     *
     * @param String surname
     */
 public void setSurname(String surname) {
  this.surname = surname;
 }
 
 @Override
 public String toString() {
  StringBuilder strBuilder = new StringBuilder();
  strBuilder.append("Id : ").append(id);
  strBuilder.append(", Name : ").append(name);
  strBuilder.append(", Surname : ").append(surname);
  return strBuilder.toString();
 }
 
 /**
  * Restore the contents of a user type instance by reading its state
  * using the specified PofReader object.
  *
  * @param PofReader in
  */
 public void readExternal(PofReader in) throws IOException {
  this.id = in.readString(0);
  this.name = in.readString(1);
  this.surname = in.readString(2);
 }
 
 /**
  * Save the contents of a POF user type instance by writing its state
  * using the specified PofWriter object.
  *
  * @param PofWriter out
  */
 public void writeExternal(PofWriter out) throws IOException {
  out.writeString(0, id);
  out.writeString(1, name);
  out.writeString(2, surname);
 }
}

ШАГ 10: СОЗДАЙТЕ IUserCacheService ИНТЕРФЕЙС

Новый интерфейс IUserCacheService создан для выполнения операций кэширования.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.otv.user.cache.srv;
 
import com.otv.user.User;
import com.otv.user.processor.DeleteUserProcessor;
import com.otv.user.processor.UpdateUserProcessor;
import com.tangosol.net.NamedCache;
 
/**
 * User Cache Service Interface
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public interface IUserCacheService {
 
 /**
     * Gets Distributed User Cache
     *
     * @return NamedCache User Cache
     */
 NamedCache getUserCache();
 
 /**
     * Adds user to cache
     *
     * @param User user
     */
 void addUser(User user);
 
 /**
     * Updates user on the cache
     *
     * @param String userId
     * @param UpdateUserProcessor processor
     *
     */
 void updateUser(String userId, UpdateUserProcessor processor);
 
 /**
     * Deletes user from the cache
     *
     * @param String userId
     * @param DeleteUserProcessor processor
     *
     */
 void deleteUser(String userId, DeleteUserProcessor processor);
 
}

ШАГ 11: СОЗДАНИЕ КЛАССА UserCacheService

Класс UserCacheService создается путем реализации интерфейса IUserCacheService .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.otv.user.cache.srv;
 
import com.otv.cache.listener.UserMapListener;
import com.otv.common.SystemConstants;
import com.otv.user.User;
import com.otv.user.processor.DeleteUserProcessor;
import com.otv.user.processor.UpdateUserProcessor;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
 
/**
 * User Cache Service
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class UserCacheService implements IUserCacheService {
 
 private NamedCache userCache = null;
 
 public UserCacheService() {
  setUserCache(CacheFactory.getCache(SystemConstants.USER_CACHE));
  //UserMap Listener is registered to listen user-cache operations
  getUserCache().addMapListener(new UserMapListener());
 }
 
 /**
     * Adds user to cache
     *
     * @param User user
     */
 public void addUser(User user) {
  getUserCache().put(user.getId(), user);
 }
 
 /**
     * Deletes user from the cache
     *
     * @param String userId
     * @param DeleteUserProcessor processor
     *
     */
 public void deleteUser(String userId, DeleteUserProcessor processor) {
  getUserCache().invoke(userId, processor);
 }
 
 /**
     * Updates user on the cache
     *
     * @param String userId
     * @param UpdateUserProcessor processor
     *
     */
 public void updateUser(String userId, UpdateUserProcessor processor) {
  getUserCache().invoke(userId, processor);
 }
 
 /**
     * Gets Distributed User Cache
     *
     * @return NamedCache User Cache
     */
 public NamedCache getUserCache() {
  return userCache;
 }
 
 /**
     * Sets User Cache
     *
     * @param NamedCache userCache
     */
 public void setUserCache(NamedCache userCache) {
  this.userCache = userCache;
 }
}

ШАГ 12: СОЗДАТЬ КЛАСС USERMAPLISTENER

Новый класс UserMapListener создан. Этот слушатель получает распределенные события пользовательского кэша .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.otv.cache.listener;
 
import org.apache.log4j.Logger;
 
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
 
/**
 * User Map Listener
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class UserMapListener implements MapListener {
 
 private static Logger logger = Logger.getLogger(UserMapListener.class);
 
 /**
     * This method is invoked when an entry is deleted from the cache...
     *
     * @param MapEvent me
     */
 public void entryDeleted(MapEvent me) {
   logger.debug("Deleted Key = " + me.getKey() + ", Value = " + me.getOldValue());
 }
 
 /**
     * This method is invoked when an entry is inserted to the cache...
     *
     * @param MapEvent me
     */
 public void entryInserted(MapEvent me) {
  logger.debug("Inserted Key = " + me.getKey() + ", Value = " + me.getNewValue());
 }
 
 /**
     * This method is invoked when an entry is updated on the cache...
     *
     * @param MapEvent me
     */
 public void entryUpdated(MapEvent me) {
  logger.debug("Updated Key = " + me.getKey() + ", New_Value = " + me.getNewValue() + ", Old Value = " + me.getOldValue());
 }
}

ШАГ 13: СОЗДАЙТЕ UpdateUserProcessor CLASS

AbstractProcessor — это абстрактный класс в пакете com.tangosol.util.processor . Он реализует интерфейс EntryProcessor .

Класс UpdateUserProcessor создан для обработки операции обновления пользователя в кеше. Когда UpdateUserProcessor вызывается для ключа, сначала в кластере обнаруживается элемент, содержащий ключ. После этого UpdateUserProcessor вызывается из члена, который содержит связанный ключ, и его значение (объект пользователя) обновляется. Поэтому сетевой трафик уменьшается.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.otv.user.processor;
 
import java.io.IOException;
 
import org.apache.log4j.Logger;
 
import com.otv.user.User;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;
 
/**
 * Update User Processor
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class UpdateUserProcessor extends AbstractProcessor implements PortableObject {
 
 private static Logger logger = Logger.getLogger(UpdateUserProcessor.class);
 private User newUser;
 
 /**
     * This empty constructor is added for Portable Object Format(POF).
     *
     */
 public UpdateUserProcessor() {
 
 }
 
 public UpdateUserProcessor(User newUser) {
  this.newUser = newUser;
 }
 
 /**
     * Processes a Map.Entry object.
     *
     * @param Entry entry
     * @return Object newUser
     */
 public Object process(Entry entry) {
  Object newValue = null;
  try {
   newValue = getNewUser();
   entry.setValue(newValue);
      } catch (Exception e) {
       logger.error("Error occured when entry was being processed!", e);
  }
 
  return newValue;
 }
 
 /**
     * Gets new user
     *
     * @return User newUser
     */
 public User getNewUser() {
  return newUser;
 }
 
 /**
     * Sets new user
     *
     * @param User newUser
     */
 public void setNewUser(User newUser) {
  this.newUser = newUser;
 }
 
 /**
  * Restore the contents of a user type instance by reading its state
  * using the specified PofReader object.
  *
  * @param PofReader in
  */
 public void readExternal(PofReader in) throws IOException {
  setNewUser((User) in.readObject(0));
 }
 
 /**
  * Save the contents of a POF user type instance by writing its state
  * using the specified PofWriter object.
  *
  * @param PofWriter out
  */
 public void writeExternal(PofWriter out) throws IOException {
  out.writeObject(0, getNewUser());
 }
}

ШАГ 14: СОЗДАЙТЕ DeleteUserProcessor CLASS

Класс DeleteUserProcessor создан для обработки операции удаления пользователя в кеше. Когда DeleteUserProcessor вызывается для ключа, сначала в кластере обнаруживается элемент, содержащий ключ. После этого DeleteUserProcessor вызывается из члена, который содержит связанный ключ. Поэтому сетевой трафик уменьшается.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.otv.user.processor;
 
import java.io.IOException;
 
import org.apache.log4j.Logger;
 
import com.otv.user.User;
import com.tangosol.io.pof.PofReader;
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.util.InvocableMap.Entry;
import com.tangosol.util.processor.AbstractProcessor;
 
/**
 * Delete User Processor
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class DeleteUserProcessor extends AbstractProcessor implements PortableObject {
 
 private static Logger logger = Logger.getLogger(DeleteUserProcessor.class);
 
 /**
     * Processes a Map.Entry object.
     *
     * @param Entry entry
     * @return Object user
     */
 public Object process(Entry entry) {
  User user = null;
  try {
   user = (User) entry.getValue();
   entry.remove(true);
      } catch (Exception e) {
       logger.error("Error occured when entry was being processed!", e);
  }
 
  return user;
 }
 
 /**
  * Restore the contents of a user type instance by reading its state
  * using the specified PofReader object.
  *
  * @param PofReader in
  */
 public void readExternal(PofReader in) throws IOException {
 
 }
 
 /**
  * Save the contents of a POF user type instance by writing its state
  * using the specified PofWriter object.
  *
  * @param PofWriter out
  */
 public void writeExternal(PofWriter out) throws IOException {
 
 }
}

ШАГ 15: СОЗДАЙТЕ CacheUpdaterTask CLASS

CacheUpdaterTask Class создан для выполнения операций кэширования (добавления, обновления и удаления) и мониторинга содержимого кэша.

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package com.otv.cache.updater.task;
 
import java.util.Collection;
 
import org.apache.log4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
 
import com.otv.common.SystemConstants;
import com.otv.user.User;
import com.otv.user.cache.srv.IUserCacheService;
import com.otv.user.processor.DeleteUserProcessor;
import com.otv.user.processor.UpdateUserProcessor;
 
/**
 * Cache Updater Task
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class CacheUpdaterTask implements BeanFactoryAware, Runnable {
 
 private static Logger log = Logger.getLogger(CacheUpdaterTask.class);
 private IUserCacheService userCacheService;
 private BeanFactory beanFactory;
 
 public void run() {
  try {
   while(true) {
    /**
     * Before the project is built for the first member,
     * this code block should be used instead of
     * method processRequestsOnSecondMemberOfCluster.
     */
    processRequestsOnFirstMemberOfCluster();
 
    /**
     * Before the project is built for the second member,
     * this code block should be used instead of
     * method processRequestsOnFirstMemberOfCluster.
     */
//    processRequestsOnSecondMemberOfCluster();
   }
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
 
 /**
   * Processes the cache requests on the first member of cluster...
   *
   * @throws InterruptedException
   */
 private void processRequestsOnFirstMemberOfCluster() throws InterruptedException {
  //Entry is added to cache...
  getUserCacheService().addUser(getUser("1", "Bruce", "Willis"));
 
  //Cache Entries are being printed...
  printCacheEntries();
 
  Thread.sleep(10000);
 
  User newUser = getUser("1", "Client", "Eastwood");
  //Existent Entry is updated on the cache...
  getUserCacheService().updateUser(newUser.getId(), new UpdateUserProcessor(newUser));
 
  //Cache Entries are being printed...
  printCacheEntries();
 
  Thread.sleep(10000);
 
  //Entry is deleted from cache...
  getUserCacheService().deleteUser(newUser.getId(), new DeleteUserProcessor());
 
  //Cache Entries are being printed...
  printCacheEntries();
 
  Thread.sleep(10000);
 }
 
 /**
   * Processes the cache requests on the second member of cluster...
   *
   * @throws InterruptedException
   */
 private void processRequestsOnSecondMemberOfCluster() throws InterruptedException {
  //Entry is added to cache...
  getUserCacheService().addUser(getUser("2", "Nathalie", "Portman"));
 
  Thread.sleep(15000);
 
  User newUser = getUser("2", "Sharon", "Stone");
  //Existent Entry is updated on the cache...
  getUserCacheService().updateUser(newUser.getId(), new UpdateUserProcessor(newUser));
 
  User newUser2 = getUser("1", "Maria", "Sharapova");
  //Existent Entry is updated on the cache...
  getUserCacheService().updateUser(newUser2.getId(), new UpdateUserProcessor(newUser2));
 
  Thread.sleep(15000);
 
  //Entry is deleted from cache...
  getUserCacheService().deleteUser(newUser.getId(), new DeleteUserProcessor());
 
  Thread.sleep(15000);
 }
 
 /**
     * Prints cache entries
     *
     */
 private void printCacheEntries() {
  Collection<User> userCollection = (Collection<User>)getUserCacheService().getUserCache().values();
  for(User user : userCollection) {
   log.debug("Cache Content : "+user);
  }
 }
 
 /**
     * Gets new user instance
     *
     * @param String user id
     * @param String user name
     * @param String user surname
     * @return User user
     */
 private User getUser(String id, String name, String surname) {
  User user = getNewUserInstance();
  user.setId(id);
  user.setName(name);
  user.setSurname(surname);
 
  return user;
 }
 
 /**
     * Gets user cache service...
     *
     * @return IUserCacheService userCacheService
     */
 public IUserCacheService getUserCacheService() {
  return userCacheService;
 }
 
 /**
     * Sets user cache service...
     *
     * @param IUserCacheService userCacheService
     */
 public void setUserCacheService(IUserCacheService userCacheService) {
  this.userCacheService = userCacheService;
 }
 
 /**
     * Gets a new instance of User Bean
     *
     * @return User
     */
    public User getNewUserInstance() {
        return  (User) getBeanFactory().getBean(SystemConstants.BEAN_NAME_USER);
    }
 
    /**
     * Gets bean factory
     *
     * @return BeanFactory
     */
    public BeanFactory getBeanFactory() {
  return beanFactory;
 }
 
    /**
     * Sets bean factory
     *
     * @param BeanFactory beanFactory
     * @throws BeansException
     */
 public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
  this.beanFactory = beanFactory;
 }
}

ШАГ 16: СОЗДАЙТЕ КЛАСС ПРИЛОЖЕНИЙ

Класс приложения создан для запуска приложения.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.otv.exe;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
import com.otv.cache.updater.task.CacheUpdaterTask;
import com.otv.common.SystemConstants;
 
/**
 * Application Class
 *
 * @author  onlinetechvision.com
 * @since   2 Jun 2012
 * @version 1.0.0
 *
 */
public class Application {
 
 /**
  * Starts the application
  *
  * @param  String[] args
  *
  */
 public static void main(String[] args) {
  ApplicationContext context = new ClassPathXmlApplicationContext(SystemConstants.APPLICATION_CONTEXT_FILE_NAME);
 
  CacheUpdaterTask cacheUpdaterTask = (CacheUpdaterTask) context.getBean(SystemConstants.BEAN_NAME_CACHE_UPDATER_TASK);
  Thread cacheUpdater = new Thread(cacheUpdaterTask);
  cacheUpdater.start();
 }
}

ШАГ 17: СТРОИМ ПРОЕКТ

После сборки проекта OTV_Spring_Coherence_With_Processor_and_POF будет создан файл OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar .
Обратите внимание, что члены кластера имеют различную конфигурацию для Coherence, поэтому проект должен быть построен отдельно для каждого участника.

ШАГ 18: ЗАПУСК ПРОЕКТА ПЕРВОГО ЧЛЕНА КЛАСТЕРА

После запуска созданного файла OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar на элементах кластера на консоли первого участника будут показаны следующие выходные журналы:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
--After A new cluster is created and First Member joins the cluster, a new entry is added to the cache.
02.06.2012 14:21:45 DEBUG (UserMapListener.java:33) - Inserted Key = 1, Value = Id : 1, Name : Bruce, Surname : Willis
02.06.2012 14:21:45 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Bruce, Surname : Willis
.......
--After Second Member joins the cluster, a new entry is added to the cache.
02.06.2012 14:21:45 DEBUG (UserMapListener.java:33) - Inserted Key = 2, Value = Id : 2, Name : Nathalie, Surname : Portman
.......
--Cache operations go on both first and second members of the cluster:
02.06.2012 14:21:55 DEBUG (UserMapListener.java:42) - Updated Key = 1, New_Value = Id : 1, Name : Client, Surname : Eastwood,
                                                                       Old Value = Id : 1, Name : Bruce, Surname : Willis
 
02.06.2012 14:21:55 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 2, Name : Nathalie, Surname : Portman
02.06.2012 14:21:55 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Client, Surname : Eastwood
 
02.06.2012 14:22:00 DEBUG (UserMapListener.java:42) - Updated Key = 2, New_Value = Id : 2, Name : Sharon, Surname : Stone,
                    Old Value = Id : 2, Name : Nathalie, Surname : Portman
 
02.06.2012 14:22:00 DEBUG (UserMapListener.java:42) - Updated Key = 1, New_Value = Id : 1, Name : Maria, Surname : Sharapova,
                                                                       Old Value = Id : 1, Name : Client, Surname : Eastwood
 
02.06.2012 14:22:05 DEBUG (UserMapListener.java:24) - Deleted Key = 1, Value = Id : 1, Name : Maria, Surname : Sharapova
02.06.2012 14:22:05 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 2, Name : Sharon, Surname : Stone
02.06.2012 14:22:15 DEBUG (UserMapListener.java:24) - Deleted Key = 2, Value = Id : 2, Name : Sharon, Surname : Stone
02.06.2012 14:22:15 DEBUG (UserMapListener.java:33) - Inserted Key = 1, Value = Id : 1, Name : Bruce, Surname : Willis
02.06.2012 14:22:15 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Bruce, Surname : Willis

ШАГ 19: СКАЧАТЬ

OTV_Spring_Coherence_With_Processor_and_POF

Дальнейшее чтение :

Выполнение транзакций в согласованности
Использование формата переносимого объекта в согласованности
Spring Framework Reference 3.x

Ссылка: Как распространять Spring Beans с помощью функций EntryProcessor и PortableObject в Oracle Coherence от нашего партнера по JCG Эрена Авсарогуллари из блога Online Technology Vision .