Статьи

Обработка событий когерентности с использованием функции запуска по карте

В этой статье показано, как обрабатывать события Coherence с помощью триггеров карты. В основном, распределенное управление данными в Oracle Coherence предлагается рассмотреть базовую конфигурацию и реализацию Oracle Coherence API
Триггеры карт — одна из важнейших функций Oracle Coherence, обеспечивающая настраиваемую систему управления кэшем. MapTrigger представляет собой функциональный агент, который позволяет проверять, отклонять или изменять операции мутации с базовой картой. Кроме того, они могут предотвращать недопустимые транзакции, обеспечивать безопасность, обеспечивать ведение журнала событий и аудит, а также собирать статистику по изменениям данных.

Например, у нас есть код, который работает с NamedCache, и мы хотим изменить поведение или содержимое записи, прежде чем запись будет вставлена ​​в карту. Это изменение может быть сделано без изменения всего существующего кода путем включения триггера карты.

Есть два способа добавить функцию Map Trigger в приложение:

1) MapTriggerListener может использоваться для регистрации MapTrigger с Именованным Кешем
2) Механизм фабрики классов можно использовать в файле конфигурации coherence-cache-config.xml

В следующем примере приложения функциональность MapTrigger реализована следующим образом. Создается новый кластер, называемый OTV, и пользовательский компонент распределяется по объекту NamedCache карты пользователя, который используется среди двух членов кластера.

Используемые технологии:

JDK 1.6.0_35
Весна 3.1.2
Согласованность 3.7.1
Maven 3.0.2

ШАГ 1: СОЗДАТЬ MAVEN ПРОЕКТ

Maven проект создается как показано ниже. (Его можно создать с помощью Maven или IDE Plug-in).

ШАГ 2: КОГЕРЕНТНЫЙ ПАКЕТ

Coherence загружается через пакет Coherence

ШАГ 3: БИБЛИОТЕКИ

Во-первых, Spring-зависимости добавляются в pom.xml Maven.

01
02
03
04
05
06
07
08
09
10
11
<!-- Spring 3.1.2 dependencies -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>${spring.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>${spring.version}</version>
</dependency>

Библиотека Coherence устанавливается в Local Maven Repository вручную, а ее описание добавляется в pom.xml, как показано ниже. Также, если Maven не используется для управления проектом, файл coherence.jar может быть добавлен в classpath.

1
2
3
4
5
6
<!-- Coherence library(from local repository) -->
<dependency>
    <groupId>com.tangosol</groupId>
    <artifactId>coherence</artifactId>
    <version>3.7.1</version>
</dependency>

Для создания runnable-jar можно использовать следующий плагин Maven.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.3.1</version>
 
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                        implementation='org.apache.maven.plugins.shade.resource.ManifestResourceTransformer'>
                        <mainClass>com.otv.exe.Application</mainClass>
                    </transformer>
                    <transformer
                        implementation='org.apache.maven.plugins.shade.resource.AppendingTransformer'>
                        <resource>META-INF/spring.handlers</resource>
                    </transformer>
                    <transformer
                        implementation='org.apache.maven.plugins.shade.resource.AppendingTransformer'>
                        <resource>META-INF/spring.schemas</resource>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

ШАГ 4: СОЗДАТЬ otv-coherence-cache-config.xml

Первый файл конфигурации Coherence — это otv-coherence-cache-config.xml . Он содержит схемы кэширования (распределенные или реплицированные) и конфигурацию отображения схемы кэширования. Созданная конфигурация кэша должна быть добавлена ​​в coherence-cache-config.xml .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?xml version='1.0'?>
 
   coherence-cache-config.xsd'>
 
    <caching-scheme-mapping>
        <cache-mapping>
            <cache-name>user-map</cache-name>
            <scheme-name>MapDistCache</scheme-name>
        </cache-mapping>
    </caching-scheme-mapping>
 
    <caching-schemes>
        <distributed-scheme>
            <scheme-name>MapDistCache</scheme-name>
            <service-name>MapDistCache</service-name>
            <backing-map-scheme>
                <local-scheme>
                    <unit-calculator>BINARY</unit-calculator>
                </local-scheme>
            </backing-map-scheme>
            <autostart>true</autostart>
        </distributed-scheme>
    </caching-schemes>
 
</cache-config>

ШАГ 5: СОЗДАЙТЕ tangosol-coherence-override.xml

Второй файл конфигурации Coherence — tangosol-coherence-override.xml . Он содержит конфигурацию кластера, идентификатора члена и настраиваемой фабрики кэша.

tangosol-coherence-override.xml для первого члена кластера:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<?xml version='1.0'?>
 
   xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd'>
 
   <cluster-config>
 
      <member-identity>
         <cluster-name>OTV</cluster-name>
         <role-name>OTV1</role-name>
      </member-identity>
 
      <unicast-listener>
          <well-known-addresses>
            <socket-address id='1'>
              <address>x.x.x.x</address>
              <port>8089</port>
            </socket-address>
            <socket-address id='2'>
              <address>x.x.x.x</address>
              <port>8090</port>
            </socket-address>
          </well-known-addresses>
 
          <machine-id>1001</machine-id>
          <address>x.x.x.x</address>
          <port>8089</port>
          <port-auto-adjust>true</port-auto-adjust>
      </unicast-listener>
 
   </cluster-config>
 
   <configurable-cache-factory-config>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value system-property='tangosol.coherence.cacheconfig'>
              otv-coherence-cache-config.xml</param-value>
         </init-param>
      </init-params>
   </configurable-cache-factory-config>
 
</coherence>

tangosol-coherence-override.xml для второго члена кластера:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?xml version='1.0'?>
 
   xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd'>
 
   <cluster-config>
 
      <member-identity>
         <cluster-name>OTV</cluster-name>
         <role-name>OTV2</role-name>
      </member-identity>
 
      <unicast-listener>     
 
          <well-known-addresses>
            <socket-address id='1'>
              <address>x.x.x.x</address>
              <port>8090</port>
            </socket-address>
            <socket-address id='2'>
              <address>x.x.x.x</address>
              <port>8089</port>
            </socket-address>
          </well-known-addresses>
 
          <machine-id>1002</machine-id>
          <address>x.x.x.x</address>
          <port>8090</port>
          <port-auto-adjust>true</port-auto-adjust>
 
      </unicast-listener>
 
   </cluster-config>
 
   <configurable-cache-factory-config>
      <init-params>
         <init-param>
            <param-type>java.lang.String</param-type>
            <param-value system-property='tangosol.coherence.cacheconfig'>
              otv-coherence-cache-config.xml</param-value>
         </init-param>
      </init-params>
   </configurable-cache-factory-config>
 
</coherence>

ШАГ 6: СОЗДАНИЕ applicationContext.xml

Файл конфигурации Spring, applicationContext.xml , создан.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
 
 
    <!-- Beans Declaration -->
    <bean id='userCacheService' class='com.otv.srv.UserCacheService'></bean>
 
    <bean id='userCacheUpdater' class='com.otv.exe.UserCacheUpdater'>
        <property name='userCacheService' ref='userCacheService' />
    </bean>
 
</beans>

ШАГ 7: СОЗДАТЬ КЛАСС Пользователя

Новый бин User Spring создан. Этот компонент будет распределен между двумя узлами в кластере OTV. Для сериализации был реализован интерфейс java.io.Serializable , но PortableObject может быть реализован для повышения производительности.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.otv.user;
 
import java.io.Serializable;
 
/**
 * User Bean
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class User implements Serializable {
 
    private static final long serialVersionUID = -1963764656789800896L;
 
    private String id;
    private String name;
    private String surname;
 
    public String getId() {
        return id;
    }
 
    public void setId(String id) {
        this.id = id;
    }
 
    public String getName() {
        return name;
    }
 
    public void setName(String name) {
        this.name = name;
    }
 
    public String getSurname() {
        return surname;
    }
 
    public void setSurname(String surname) {
        this.surname = surname;
    }
 
    @Override
    public String toString() {
        StringBuilder strBuff = new StringBuilder();
        strBuff.append('id : ').append(id);
        strBuff.append(', name : ').append(name);
        strBuff.append(', surname : ').append(surname);
        return strBuff.toString();
    }
}

ШАГ 8: СОЗДАТЬ IUserCacheService ИНТЕРФЕЙС

Новый интерфейс IUserCacheService создан для уровня обслуживания, чтобы показать функциональность кэша.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.otv.srv;
 
import com.tangosol.net.NamedCache;
 
/**
 * IUserCacheService Interface exposes User Cache operations
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public interface IUserCacheService {
 
    /**
     * Adds user entries to cache
     *
     * @param Object key
     * @param Object value
     *
     */
    void addToUserCache(Object key, Object value);
 
    /**
     * Deletes user entries from cache
     *
     * @param Object key
     *
     */
    void deleteFromUserCache(Object key);
 
    /**
     * Gets user cache
     *
     * @retun NamedCache Coherence named cache
     */
    NamedCache getUserCache();
 
}

ШАГ 9: СОЗДАНИЕ UserCacheService IMPL CLASS

UserCacheService создается путем реализации IUserCacheService .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.otv.srv;
 
import com.otv.listener.UserMapListener;
import com.otv.trigger.UserMapTrigger;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.MapTriggerListener;
 
/**
 * CacheService Class implements the ICacheService
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class UserCacheService implements IUserCacheService {
 
    private NamedCache userCache = null;
    private static final String USER_MAP = 'user-map';
    private static final long   LOCK_TIMEOUT = -1;
 
    public UserCacheService() {
        setUserCache(CacheFactory.getCache(USER_MAP));
        getUserCache().addMapListener(new UserMapListener());
        getUserCache().addMapListener(new MapTriggerListener(new UserMapTrigger()));
    }  
 
    /**
     * Adds user entries to cache
     *
     * @param Object key
     * @param Object value
     *
     */
    public void addToUserCache(Object key, Object value) {
        // key is locked
        getUserCache().lock(key, LOCK_TIMEOUT);
        try {
            // application logic
            getUserCache().put(key, value);
        } finally {
            // key is unlocked
            getUserCache().unlock(key);
        }
    }
 
    /**
     * Deletes user entries from cache
     *
     * @param Object key
     *
     */
    public void deleteFromUserCache(Object key) {
        // key is locked
        getUserCache().lock(key, LOCK_TIMEOUT);
        try {
            // application logic
            getUserCache().remove(key);
        } finally {
            // key is unlocked
            getUserCache().unlock(key);
        }
    }
 
    /**
     * Gets user cache
     *
     * @retun NamedCache Coherence named cache
     */
    public NamedCache getUserCache() {
        return userCache;
    }
 
    public void setUserCache(NamedCache userCache) {
        this.userCache = userCache;
    }
 
}

ШАГ 10: СОЗДАНИЕ КЛАССА UserMapTrigger

Новый класс UserMapTrigger создается путем реализации интерфейса com.tangosol.util.MapTrigger . Этот триггер обрабатывает логику до того, как запись будет вставлена ​​в пользовательскую карту .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.otv.trigger;
 
import org.apache.log4j.Logger;
 
import com.otv.listener.UserMapListener;
import com.otv.user.User;
import com.tangosol.util.MapTrigger;
 
/**
 * UserMapTrigger executes required logic before the operation is committed
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class UserMapTrigger implements MapTrigger {
 
    private static final long serialVersionUID = 5411263646665358790L;
    private static Logger logger = Logger.getLogger(UserMapListener.class);
 
    /**
     * Processes user cache entries
     *
     * @param MapTrigger.Entry entry
     *
     */
    public void process(MapTrigger.Entry entry) {
        User user = (User) entry.getValue();
        String id = user.getId();
        String name = user.getName();
        String updatedName = name.toUpperCase();
 
        String surname = user.getSurname();
        String updatedSurname = surname.toUpperCase();
 
        if (!updatedName.equals(name)) {
            user.setName(updatedName);
        }
 
        if (!updatedSurname.equals(surname)) {
            user.setSurname(updatedSurname);
        }
 
        user.setId(user.getName() + '_' + user.getSurname());
 
        entry.setValue(user);
 
        logger.debug('UserMapTrigger processes the entry before committing. '
                            + 'oldId : ' + id
                            + ', newId : ' + ((User)entry.getValue()).getId()
                            + ', oldName : ' + name
                            + ', newName : ' + ((User)entry.getValue()).getName()
                            + ', oldSurname : ' + surname
                            + ', newSurname : ' + ((User)entry.getValue()).getSurname()
                            );
    }
 
    public boolean equals(Object o) {
        return o != null && o.getClass() == this.getClass();
    }
 
    public int hashCode() {
        return getClass().getName().hashCode();
    }
}

ШАГ 11: СОЗДАТЬ USERMAPLISTENER IMPL CLASS

Новый класс UserMapListener создан. Этот слушатель получает распределенные события пользовательской карты .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.otv.listener;
 
import org.apache.log4j.Logger;
 
import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;
 
/**
 * UserMapListener Class listens user cache events
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class UserMapListener implements MapListener {
 
    private static Logger logger = Logger.getLogger(UserMapListener.class);
 
    public void entryDeleted(MapEvent me) {
        logger.debug('Deleted Key = ' + me.getKey() + ', Value = ' + me.getOldValue());
    }
 
    public void entryInserted(MapEvent me) {
        logger.debug('Inserted Key = ' + me.getKey() + ', Value = ' + me.getNewValue());
    }
 
    public void entryUpdated(MapEvent me) {
//      logger.debug('Updated Key = ' + me.getKey() + ', New_Value = ' + me.getNewValue() + ', Old Value = ' + me.getOldValue());
    }
}

ШАГ 12: СОЗДАЙТЕ CacheUpdater CLASS

Класс CacheUpdater создан для добавления новой записи в кэш и отслеживания содержимого кеша.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.otv.exe;
 
import java.util.Collection;
 
import org.apache.log4j.Logger;
 
import com.otv.srv.IUserCacheService;
import com.otv.user.User;
 
/**
 * CacheUpdater Class updates and prints user cache entries
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class UserCacheUpdater implements Runnable {
 
    private static Logger logger = Logger.getLogger(UserCacheUpdater.class);
 
    private IUserCacheService userCacheService;
 
    /**
     * Runs the UserCacheUpdater Thread
     *
     */
    public void run() {    
 
        //New User are created...
        User user = new User();
 
        //Only Name and Surname properties are set and Id property will be set at trigger level.
        user.setName('James');
        user.setSurname('Joyce');
 
        //Entries are added to cache...
        getUserCacheService().addToUserCache('user1', user);
 
//      The following code block shows the entry which will be inserted via second member of the cluster
//      so it should be opened and above code block should be commented-out before the project is built.
 
//      user.setName('Thomas');
//      user.setSurname('Moore');
//      getUserCacheService().addToUserCache('user2', user);
 
        //Cache Entries are being printed...
        printCacheEntries();
    }
 
    /**
     * Prints User Cache Entries
     *
     */
    @SuppressWarnings('unchecked')
    private void printCacheEntries() {
        Collection<User> userCollection = null;
        try {
            while(true) {
                userCollection = (Collection<User>)getUserCacheService().getUserCache().values();
 
                for(User user : userCollection) {
                    logger.debug('Cache Content : '+user);
                }
 
                Thread.sleep(60000);
            }
        } catch (InterruptedException e) {
            logger.error('CacheUpdater is interrupted!', e);
        }
    }
 
    public IUserCacheService getUserCacheService() {
        return userCacheService;
    }
 
    public void setUserCacheService(IUserCacheService userCacheService) {
        this.userCacheService = userCacheService;
    }
}

ШАГ 13: СОЗДАЙТЕ КЛАСС ПРИЛОЖЕНИЙ

Класс приложения создан для запуска приложения.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.otv.exe;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
/**
 * Application class starts the application
 *
 * @author onlinetechvision.com
 * @since 29 Oct 2012
 * @version 1.0.0
 *
 */
public class Application {
 
    /**
     * Starts the application
     *
     * @param  String[] args
     *
     */
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext('applicationContext.xml');
 
        UserCacheUpdater cacheUpdater = (UserCacheUpdater) context.getBean('userCacheUpdater');
        new Thread(cacheUpdater).start();
    }
}

NBSP;
ШАГ 14: СТРОИМ ПРОЕКТ

После сборки проекта OTV_Spring_Coherence_MapTrigger будет создан файл OTV_Spring_Coherence_MapTrigger-0.0.1-SNAPSHOT.jar .
Важное примечание: Члены кластера имеют различную конфигурацию для Coherence, поэтому проект должен быть построен отдельно для каждого участника.

ШАГ 15: ЗАПУСТИТЕ ПРОЕКТ, НАЧИНАЯ С ЧЛЕНА КЛАСТЕРА

После того, как созданный файл OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar запущен на элементах кластера, на консоли первого участника будут показаны следующие выходные журналы:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
--A new cluster is created and First Member joins the cluster and adds a new entry to the cache.
29.10.2012 18:26:44 DEBUG (UserMapTrigger.java:49) - UserMapTrigger processes the entry before committing. oldId : null, newId : JAMES_JOYCE
, oldName : James, newName : JAMES, oldSurname : Joyce, newSurname : JOYCE
29.10.2012 18:26:44 DEBUG (UserMapListener.java:25) - Inserted Key = user1, Value = id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:26:44 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
 
.......
 
--Second Member joins the cluster and adds a new entry to the cache.
29.10.2012 18:27:33 DEBUG (UserMapTrigger.java:49) - UserMapTrigger processes the entry before committing. oldId : null, newId : THOMAS_MOORE,
oldName : Thomas, newName : THOMAS, oldSurname : Moore, newSurname : MOORE
29.10.2012 18:27:34 DEBUG (UserMapListener.java:25) - Inserted Key = user2, Value = id : THOMAS_MOORE, name : THOMAS, surname : MOORE
 
.......
 
--After second member adds a new entry, cache content is shown as below :
29.10.2012 18:27:44 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:27:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:28:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:28:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE

Консоль второго члена:

1
2
3
4
5
6
--After Second Member joins the cluster and adds a new entry to the cache, cache content is shown as below and the members has got same entries :.
29.10.2012 18:27:34 DEBUG (UserMapListener.java:25) - Inserted Key = user2, Value = id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:27:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:27:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:28:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:28:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE

ШАГ 16: СКАЧАТЬ

https://github.com/erenavsarogullari/OTV_Spring_Coherence_MapTrigger

Ссылка: согласованная обработка событий с использованием функции запуска по карте от нашего партнера JCG Эрен Авсарогуллари в блоге Online Technology Vision .