Статьи

Распределенное выполнение Hazelcast с Spring

Функция ExecutorService поставляется с Java 5 и находится в пакете java.util.concurrent . Он расширяет интерфейс Executor и предоставляет функциональность пула потоков для выполнения асинхронных коротких задач. Java Executor Service Types предлагается рассмотреть базовую реализацию ExecutorService.

Также ThreadPoolExecutor является очень полезной реализацией интерфейса ExecutorService. Он расширяет AbstractExecutorService, предоставляя реализации по умолчанию методов выполнения ExecutorService. Он обеспечивает повышенную производительность при выполнении большого количества асинхронных задач и поддерживает базовую статистику, например количество выполненных задач.

Также предлагается изучить, как разрабатывать и контролировать сервисы Thread Pool с использованием Spring .

До сих пор мы только что говорили о реализации службы нераспределенного исполнителя. Позвольте нам также исследовать Распределенную Службу Исполнителя.

Функция Hazelcast Distributed Executor Service — это распределенная реализация java.util.concurrent.ExecutorService . Это позволяет выполнять бизнес-логику в кластере. Есть четыре альтернативных способа реализовать это:

  • Логика может быть выполнена на конкретном элементе кластера, который выбран.
  • Логика может быть выполнена на члене, владеющем выбранным ключом.
  • Логика может быть выполнена на члене, который выберет Hazelcast.
  • Логика может быть выполнена для всех или подмножества элементов кластера.

В этой статье показано, как разработать распределенную службу Executor Service с помощью Hazelcast и Spring.

Используемые технологии:

  • JDK 1.7.0_09
  • Весна 3.1.3
  • Hazelcast 2.4
  • Maven 3.0.4

ШАГ 1: СОЗДАТЬ MAVEN ПРОЕКТ

Maven проект создается как показано ниже. (Его можно создать с помощью Maven или IDE Plug-in).

ШАГ 2: БИБЛИОТЕКИ

Во-первых, Spring-зависимости добавляются в pom.xml Maven.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<properties>
    <spring.version>3.1.3.RELEASE</spring.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
 
<dependencies>
    <!-- Spring 3 dependencies -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${spring.version}</version>
    </dependency>
 
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
 
    <!-- Hazelcast library -->
    <dependency>
        <groupId>com.hazelcast</groupId>
        <artifactId>hazelcast-all</artifactId>
        <version>2.4</version>
    </dependency>
 
    <!-- Log4j library -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.16</version>
    </dependency>
</dependencies>

maven-compiler-plugin (Maven Plugin) используется для компиляции проекта с JDK 1.7

1
2
3
4
5
6
7
8
9
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.0</version>
    <configuration>
      <source>1.7</source>
      <target>1.7</target>
    </configuration>
</plugin>

maven-shade-plugin ( плагин Maven) может быть использован для создания runnable-jar

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.0</version>
 
        <executions>
            <execution>
                <phase>package</phase>
                <goals>
                    <goal>shade</goal>
                </goals>
                <configuration>
                    <transformers>
                        <transformer
                            implementation='org.apache.maven.plugins.shade.resource.
ManifestResourceTransformer'>
                            <mainClass>com.onlinetechvision.exe.Application</mainClass>
                        </transformer>
                        <transformer
                            implementation='org.apache.maven.plugins.shade.resource.
AppendingTransformer'>
                            <resource>META-INF/spring.handlers</resource>
                        </transformer>
                        <transformer
                            implementation='org.apache.maven.plugins.shade.resource.
AppendingTransformer'>
                            <resource>META-INF/spring.schemas</resource>
                        </transformer>
                    </transformers>
                </configuration>
            </execution>
        </executions>
    </plugin>

ШАГ 3: СОЗДАЙТЕ БИАН клиента

Новый компонент Customer создан. Этот компонент будет распределен между двумя узлами в кластере OTV. В следующем примере все определенные типы свойств (id, name и фамилия) имеют тип String, и для сериализации был реализован стандартный интерфейс java.io.Serializable . Если используются пользовательские или сторонние типы объектов, интерфейс com.hazelcast.nio.DataSerializable может быть реализован для повышения производительности сериализации.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.onlinetechvision.customer;
 
import java.io.Serializable;
 
/**
 * Customer Bean.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Customer implements Serializable {
 
    private static final long serialVersionUID = 1856862670651243395L;
 
    private String id;
    private String name;
    private String surname;
 
    public String getId() {
        return id;
    }
 
    public void setId(String id) {
        this.id = id;
    }
 
    public String getName() {
        return name;
    }
 
    public void setName(String name) {
        this.name = name;
    }
 
    public String getSurname() {
        return surname;
    }
 
    public void setSurname(String surname) {
        this.surname = surname;
    }
 
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((id == null) ? 0 : id.hashCode());
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        result = prime * result + ((surname == null) ? 0 : surname.hashCode());
        return result;
    }
 
    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        Customer other = (Customer) obj;
        if (id == null) {
            if (other.id != null)
                return false;
        } else if (!id.equals(other.id))
            return false;
        if (name == null) {
            if (other.name != null)
                return false;
        } else if (!name.equals(other.name))
            return false;
        if (surname == null) {
            if (other.surname != null)
                return false;
        } else if (!surname.equals(other.surname))
            return false;
        return true;
    }
 
    @Override
    public String toString() {
        return 'Customer [id=' + id + ', name=' + name + ', surname=' + surname + ']';
    }
 
}

ШАГ 4: СОЗДАЙТЕ ИНТЕРФЕЙС ICacheService

Новый интерфейс ICacheService создан для уровня обслуживания, чтобы показать функциональность кэша.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.onlinetechvision.cache.srv;
 
import com.hazelcast.core.IMap;
import com.onlinetechvision.customer.Customer;
 
/**
 * A new ICacheService Interface is created for service layer to expose cache functionality.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public interface ICacheService {
 
    /**
     * Adds Customer entries to cache
     *
     * @param String key
     * @param Customer customer
     *
     */
    void addToCache(String key, Customer customer);
 
    /**
     * Deletes Customer entries from cache
     *
     * @param String key
     *
     */
    void deleteFromCache(String key);
 
    /**
     * Gets Customer cache
     *
     * @return IMap Coherence named cache
     */
    IMap<String, Customer> getCache();
}

ШАГ 5: СОЗДАЙТЕ РЕАЛИЗАЦИЮ CacheService

CacheService является реализацией интерфейса ICacheService .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.onlinetechvision.cache.srv;
 
import com.hazelcast.core.IMap;
import com.onlinetechvision.customer.Customer;
import com.onlinetechvision.test.listener.CustomerEntryListener;
 
/**
 * CacheService Class is implementation of ICacheService Interface.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class CacheService implements ICacheService {
 
    private IMap<String, Customer> customerMap;
 
    /**
     * Constructor of CacheService
     *
     * @param IMap customerMap
     *
     */
    @SuppressWarnings('unchecked')
    public CacheService(IMap<String, Customer> customerMap) {
        setCustomerMap(customerMap);
        getCustomerMap().addEntryListener(new CustomerEntryListener(), true);
    }
 
    /**
     * Adds Customer entries to cache
     *
     * @param String key
     * @param Customer customer
     *
     */
    @Override
    public void addToCache(String key, Customer customer) {
        getCustomerMap().put(key, customer);
    }
 
    /**
     * Deletes Customer entries from cache
     *
     * @param String key
     *
     */
    @Override
    public void deleteFromCache(String key) {
        getCustomerMap().remove(key);
    }
 
    /**
     * Gets Customer cache
     *
     * @return IMap Coherence named cache
     */
    @Override
    public IMap<String, Customer> getCache() {
        return getCustomerMap();
    }
 
    public IMap<String, Customer> getCustomerMap() {
        return customerMap;
    }
 
    public void setCustomerMap(IMap<String, Customer> customerMap) {
        this.customerMap = customerMap;
    }
 
}

ШАГ 6: СОЗДАТЬ IDistributedExecutorService ИНТЕРФЕЙС

Новый интерфейс IDistributedExecutorService создан для уровня обслуживания, чтобы предоставить функциональность распределенного выполнения.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.onlinetechvision.executor.srv;
 
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
 
import com.hazelcast.core.Member;
 
/**
 * A new IDistributedExecutorService Interface is created for service layer to expose distributed execution functionality.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public interface IDistributedExecutorService {
 
    /**
     * Executes the callable object on stated member
     *
     * @param Callable callable
     * @param Member member
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    String executeOnStatedMember(Callable<String> callable, Member member) throws InterruptedException, ExecutionException;
 
    /**
     * Executes the callable object on member owning the key
     *
     * @param Callable callable
     * @param Object key
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    String executeOnTheMemberOwningTheKey(Callable<String> callable, Object key) throws InterruptedException, ExecutionException;
 
    /**
     * Executes the callable object on any member
     *
     * @param Callable callable
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    String executeOnAnyMember(Callable<String> callable) throws InterruptedException, ExecutionException;
 
    /**
     * Executes the callable object on all members
     *
     * @param Callable callable
     * @param Set all members
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    Collection<String> executeOnMembers(Callable<String> callable, Set<Member> members) throws InterruptedException, ExecutionException;
}

ШАГ 7: СОЗДАЙТЕ РЕАЛИЗАЦИЮ DistributedExecutorService

DistributedExecutorService является реализацией интерфейса IDistributedExecutorService .

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.onlinetechvision.executor.srv;
 
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
 
import org.apache.log4j.Logger;
 
import com.hazelcast.core.DistributedTask;
import com.hazelcast.core.Member;
import com.hazelcast.core.MultiTask;
 
/**
 * DistributedExecutorService Class is implementation of IDistributedExecutorService Interface.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class DistributedExecutorService implements IDistributedExecutorService {
 
    private static final Logger logger = Logger.getLogger(DistributedExecutorService.class);
 
    private ExecutorService hazelcastDistributedExecutorService;
 
    /**
     * Executes the callable object on stated member
     *
     * @param Callable callable
     * @param Member member
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    @SuppressWarnings('unchecked')
    public String executeOnStatedMember(Callable<String> callable, Member member) throws InterruptedException, ExecutionException {
        logger.debug('Method executeOnStatedMember is called...');
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        FutureTask<String> task = (FutureTask<String>) executorService.submit( new DistributedTask<String>(callable, member));
        String result = task.get();
        logger.debug('Result of method executeOnStatedMember is : ' + result);
        return result;
    }
 
    /**
     * Executes the callable object on member owning the key
     *
     * @param Callable callable
     * @param Object key
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    @SuppressWarnings('unchecked')
    public String executeOnTheMemberOwningTheKey(Callable<String> callable, Object key) throws InterruptedException, ExecutionException {
        logger.debug('Method executeOnTheMemberOwningTheKey is called...');
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        FutureTask<String> task = (FutureTask<String>) executorService.submit(new DistributedTask<String>(callable, key));
        String result = task.get();
        logger.debug('Result of method executeOnTheMemberOwningTheKey is : ' + result);
        return result;
    }
 
    /**
     * Executes the callable object on any member
     *
     * @param Callable callable
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    public String executeOnAnyMember(Callable<String> callable) throws InterruptedException, ExecutionException {
        logger.debug('Method executeOnAnyMember is called...');
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        Future<String> task = executorService.submit(callable);
        String result = task.get();
        logger.debug('Result of method executeOnAnyMember is : ' + result);
        return result;
    }
 
    /**
     * Executes the callable object on all members
     *
     * @param Callable callable
     * @param Set all members
     * @throws InterruptedException
     * @throws ExecutionException
     *
     */
    public Collection<String> executeOnMembers(Callable<String> callable, Set<Member> members) throws ExecutionException, InterruptedException {
        logger.debug('Method executeOnMembers is called...');
        MultiTask<String> task = new MultiTask<String>(callable, members);
        ExecutorService executorService = getHazelcastDistributedExecutorService();
        executorService.execute(task);
        Collection<String> results = task.get();
        logger.debug('Result of method executeOnMembers is : ' + results.toString());
        return results;
    }
 
    public ExecutorService getHazelcastDistributedExecutorService() {
        return hazelcastDistributedExecutorService;
    }
 
    public void setHazelcastDistributedExecutorService(ExecutorService hazelcastDistributedExecutorService) {
        this.hazelcastDistributedExecutorService = hazelcastDistributedExecutorService;
    }
 
}

ШАГ 8: СОЗДАЙТЕ TestCallable CLASS

Класс TestCallable показывает бизнес-логику для выполнения.

Задача TestCallable для первого члена кластера:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.onlinetechvision.task;
 
import java.io.Serializable;
import java.util.concurrent.Callable;
 
/**
 * TestCallable Class shows business logic to be executed.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class TestCallable implements Callable<String>, Serializable{
 
    private static final long serialVersionUID = -1839169907337151877L;
 
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return String computed result
     * @throws Exception if unable to compute a result
     */
    public String call() throws Exception {
        return 'First Member' s TestCallable Task is called...';
    }
 
}

Задача TestCallable для второго члена кластера:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.onlinetechvision.task;
 
import java.io.Serializable;
import java.util.concurrent.Callable;
 
/**
 * TestCallable Class shows business logic to be executed.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class TestCallable implements Callable<String>, Serializable{
 
    private static final long serialVersionUID = -1839169907337151877L;
 
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return String computed result
     * @throws Exception if unable to compute a result
     */
    public String call() throws Exception {
        return 'Second Member' s TestCallable Task is called...';
    }
 
}

ШАГ 9: СОЗДАЙТЕ КЛАСС другогоAvailableMemberNotFoundException

AnotherAvailableMemberNotFoundException генерируется, когда другой доступный элемент не найден. Чтобы избежать этого исключения, первый узел должен быть запущен перед вторым узлом.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.onlinetechvision.exception;
 
/**
 * AnotherAvailableMemberNotFoundException is thrown when another available member is not found.
 * To avoid this exception, first node should be started before the second node.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class AnotherAvailableMemberNotFoundException extends Exception {
 
    private static final long serialVersionUID = -3954360266393077645L;
 
    /**
     * Constructor of AnotherAvailableMemberNotFoundException
     *
     * @param  String Exception message
     *
     */
    public AnotherAvailableMemberNotFoundException(String message) {
        super(message);
    }
 
}

ШАГ 10: СОЗДАНИЕ КЛАССА CustomerEntryListener

Класс CustomerEntryListener прослушивает изменения записи в именованном объекте кэша.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.onlinetechvision.test.listener;
 
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
 
/**
 * CustomerEntryListener Class listens entry changes on named cache object.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
@SuppressWarnings('rawtypes')
public class CustomerEntryListener implements EntryListener {
 
    /**
     * Invoked when an entry is added.
     *
     * @param EntryEvent
     *
     */
    public void entryAdded(EntryEvent ee) {
        System.out.println('EntryAdded... Member : ' + ee.getMember() + ', Key : '+ee.getKey()+', OldValue : '+ee.getOldValue()+', NewValue : '+ee.getValue());
    }
 
    /**
     * Invoked when an entry is removed.
     *
     * @param EntryEvent
     *
     */
    public void entryRemoved(EntryEvent ee) {
        System.out.println('EntryRemoved... Member : ' + ee.getMember() + ', Key : '+ee.getKey()+', OldValue : '+ee.getOldValue()+', NewValue : '+ee.getValue());
    }
 
    /**
     * Invoked when an entry is evicted.
     *
     * @param EntryEvent
     *
     */
    public void entryEvicted(EntryEvent ee) {
 
    }  
 
    /**
     * Invoked when an entry is updated.
     *
     * @param EntryEvent
     *
     */
    public void entryUpdated(EntryEvent ee) {
 
    }
 
}

ШАГ 11: СОЗДАЙТЕ СТАРТ КЛАСС

Начальный класс загружает клиентов в кэш и выполняет распределенные задачи.

Стартовый класс первого члена кластера:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.onlinetechvision.exe;
 
import com.onlinetechvision.cache.srv.ICacheService;
import com.onlinetechvision.customer.Customer;
 
/**
 * Starter Class loads Customers to cache and executes distributed tasks.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Starter {
 
    private ICacheService cacheService;
 
    /**
     * Loads cache and executes the tasks
     *
     */
    public void start() {
        loadCacheForFirstMember();
    }
 
    /**
     * Loads Customers to cache
     *
     */
    public void loadCacheForFirstMember() {
        Customer firstCustomer = new Customer();
        firstCustomer.setId('1');
        firstCustomer.setName('Jodie');
        firstCustomer.setSurname('Foster');
 
        Customer secondCustomer = new Customer();
        secondCustomer.setId('2');
        secondCustomer.setName('Kate');
        secondCustomer.setSurname('Winslet');
 
        getCacheService().addToCache(firstCustomer.getId(), firstCustomer);
        getCacheService().addToCache(secondCustomer.getId(), secondCustomer);
    }
 
    public ICacheService getCacheService() {
        return cacheService;
    }
 
    public void setCacheService(ICacheService cacheService) {
        this.cacheService = cacheService;
    }
 
}

Стартовый класс второго члена кластера:

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.onlinetechvision.exe;
 
import java.util.Set;
import java.util.concurrent.ExecutionException;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Member;
import com.onlinetechvision.cache.srv.ICacheService;
import com.onlinetechvision.customer.Customer;
import com.onlinetechvision.exception.AnotherAvailableMemberNotFoundException;
import com.onlinetechvision.executor.srv.IDistributedExecutorService;
import com.onlinetechvision.task.TestCallable;
 
/**
 * Starter Class loads Customers to cache and executes distributed tasks.
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Starter {
 
    private String hazelcastInstanceName;
    private Hazelcast hazelcast;
    private IDistributedExecutorService distributedExecutorService;
    private ICacheService cacheService;
 
    /**
     * Loads cache and executes the tasks
     *
     */
    public void start() {
        loadCache();
        executeTasks();
    }
 
    /**
     * Loads Customers to cache
     *
     */
    public void loadCache() {
        Customer firstCustomer = new Customer();
        firstCustomer.setId('3');
        firstCustomer.setName('Bruce');
        firstCustomer.setSurname('Willis');
 
        Customer secondCustomer = new Customer();
        secondCustomer.setId('4');
        secondCustomer.setName('Colin');
        secondCustomer.setSurname('Farrell');
 
        getCacheService().addToCache(firstCustomer.getId(), firstCustomer);
        getCacheService().addToCache(secondCustomer.getId(), secondCustomer);
    }
 
    /**
     * Executes Tasks
     *
     */
    public void executeTasks() {
        try {
            getDistributedExecutorService().executeOnStatedMember(new TestCallable(), getAnotherMember());
            getDistributedExecutorService().executeOnTheMemberOwningTheKey(new TestCallable(), '3');
            getDistributedExecutorService().executeOnAnyMember(new TestCallable());
            getDistributedExecutorService().executeOnMembers(new TestCallable(), getAllMembers());
        } catch (InterruptedException | ExecutionException | AnotherAvailableMemberNotFoundException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * Gets cluster members
     *
     * @return Set<Member> Set of Cluster Members
     *
     */
    private Set<Member> getAllMembers() {
        Set<Member> members = getHazelcastLocalInstance().getCluster().getMembers();
 
        return members;
    }
 
    /**
     * Gets an another member of cluster
     *
     * @return Member Another Member of Cluster
     * @throws AnotherAvailableMemberNotFoundException An Another Available Member can not found exception
     */
    private Member getAnotherMember() throws AnotherAvailableMemberNotFoundException {
        Set<Member> members = getAllMembers();
        for(Member member : members) {
            if(!member.localMember()) {
                return member;
            }
        }
 
        throw new AnotherAvailableMemberNotFoundException('No Other Available Member on the cluster. Please be aware that all members are active on the cluster');
    }
 
    /**
     * Gets Hazelcast local instance
     *
     * @return HazelcastInstance Hazelcast local instance
     */
    @SuppressWarnings('static-access')
    private HazelcastInstance getHazelcastLocalInstance() {
        HazelcastInstance instance = getHazelcast().getHazelcastInstanceByName(getHazelcastInstanceName());
        return instance;
    }
 
    public String getHazelcastInstanceName() {
        return hazelcastInstanceName;
    }
 
    public void setHazelcastInstanceName(String hazelcastInstanceName) {
        this.hazelcastInstanceName = hazelcastInstanceName;
    }
 
    public Hazelcast getHazelcast() {
        return hazelcast;
    }
 
    public void setHazelcast(Hazelcast hazelcast) {
        this.hazelcast = hazelcast;
    }
 
    public IDistributedExecutorService getDistributedExecutorService() {
        return distributedExecutorService;
    }
 
    public void setDistributedExecutorService(IDistributedExecutorService distributedExecutorService) {
        this.distributedExecutorService = distributedExecutorService;
    }
 
    public ICacheService getCacheService() {
        return cacheService;
    }
 
    public void setCacheService(ICacheService cacheService) {
        this.cacheService = cacheService;
    }
 
}

ШАГ 12: СОЗДАТЬ ФАЙЛ hazelcast-config.properties

Файл hazelcast-config.properties показывает свойства элементов кластера.

Свойства первого члена:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
hz.instance.name = OTVInstance1
 
hz.group.name = dev
hz.group.password = dev
 
hz.management.center.enabled = true
hz.management.center.url = http://localhost:8080/mancenter
 
hz.network.port = 5701
hz.network.port.auto.increment = false
 
hz.tcp.ip.enabled = true
 
hz.members = 192.168.1.32
 
hz.executor.service.core.pool.size = 2
hz.executor.service.max.pool.size = 30
hz.executor.service.keep.alive.seconds = 30
 
hz.map.backup.count=2
hz.map.max.size=0
hz.map.eviction.percentage=30
hz.map.read.backup.data=true
hz.map.cache.value=true
hz.map.eviction.policy=NONE
hz.map.merge.policy=hz.ADD_NEW_ENTRY

Свойства второго члена:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
hz.instance.name = OTVInstance2
 
hz.group.name = dev
hz.group.password = dev
 
hz.management.center.enabled = true
hz.management.center.url = http://localhost:8080/mancenter
 
hz.network.port = 5702
hz.network.port.auto.increment = false
 
hz.tcp.ip.enabled = true
 
hz.members = 192.168.1.32
 
hz.executor.service.core.pool.size = 2
hz.executor.service.max.pool.size = 30
hz.executor.service.keep.alive.seconds = 30
 
hz.map.backup.count=2
hz.map.max.size=0
hz.map.eviction.percentage=30
hz.map.read.backup.data=true
hz.map.cache.value=true
hz.map.eviction.policy=NONE
hz.map.merge.policy=hz.ADD_NEW_ENTRY

ШАГ 13: СОЗДАЙТЕ applicationContext-hazelcast.xml

Файл конфигурации Spring Hazelcast, applicationContext-hazelcast.xml , создан, и настроены служба распределенного выполнения Hazelcast и экземпляр Hazelcast.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
 
 
 
 
    <hz:map id='customerMap' name='customerMap' instance-ref='instance'/>
 
    <!-- Hazelcast Distributed Executor Service definition -->
    <hz:executorService id='hazelcastDistributedExecutorService' instance-ref='instance' name='hazelcastDistributedExecutorService' />
 
    <!-- Hazelcast Instance configuration -->
    <hz:hazelcast id='instance'>
        <hz:config>
 
            <!-- Hazelcast Instance Name -->
            <hz:instance-name>${hz.instance.name}</hz:instance-name>
 
        <!-- Hazelcast Group Name and Password -->
        <hz:group name='${hz.group.name}' password='${hz.group.password}'/>
 
                <!-- Hazelcast Management Center URL -->
            <hz:management-center  enabled='${hz.management.center.enabled}' url='${hz.management.center.url}'/>
 
            <!-- Hazelcast Tcp based network configuration -->
            <hz:network port='${hz.network.port}' port-auto-increment='${hz.network.port.auto.increment}'>
                <hz:join>
                    <hz:tcp-ip enabled='${hz.tcp.ip.enabled}'>
                        <hz:members>${hz.members}</hz:members>
                    </hz:tcp-ip>
                </hz:join>
            </hz:network>
 
            <!-- Hazelcast Distributed Executor Service configuration -->
            <hz:executor-service name='executorService'
                                 core-pool-size='${hz.executor.service.core.pool.size}'
                                 max-pool-size='${hz.executor.service.max.pool.size}'
                                 keep-alive-seconds='${hz.executor.service.keep.alive.seconds}'/>
 
            <!-- Hazelcast Distributed Map configuration -->
            <hz:map name='map'
                backup-count='${hz.map.backup.count}'
                max-size='${hz.map.max.size}'
                eviction-percentage='${hz.map.eviction.percentage}'
                read-backup-data='${hz.map.read.backup.data}'
                cache-value='${hz.map.cache.value}'
                eviction-policy='${hz.map.eviction.policy}'
                merge-policy='${hz.map.merge.policy}'  />
 
        </hz:config>
 
    </hz:hazelcast>  
 
</beans>

ШАГ 14: СОЗДАТЬ applicationContext.xml

Файл конфигурации Spring, applicationContext.xml , создан.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
 
 
    <import resource='classpath:applicationContext-hazelcast.xml' />
 
    <!-- Beans Declaration -->
    <bean id='propertyConfigurer' class='org.springframework.beans.factory.config.PropertyPlaceholderConfigurer'>
        <property name='locations'>
            <list>
                <value>classpath:/hazelcast-config.properties</value>
            </list>
        </property>
    </bean>
 
    <bean id='cacheService' class='com.onlinetechvision.cache.srv.CacheService'>
        <constructor-arg ref='customerMap'/>
    </bean>
 
    <bean id='distributedExecutorService' class='com.onlinetechvision.executor.srv.DistributedExecutorService'>
        <property name='hazelcastDistributedExecutorService' ref='hazelcastDistributedExecutorService' />
    </bean>
 
    <bean id='hazelcast' class='com.hazelcast.core.Hazelcast'/>
 
    <bean id='starter' class='com.onlinetechvision.exe.Starter'>
        <property name='hazelcastInstanceName' value='${hz.instance.name}' />
        <property name='hazelcast' ref='hazelcast' />
        <property name='distributedExecutorService' ref='distributedExecutorService' />
        <property name='cacheService' ref='cacheService' />
    </bean>
</beans>

ШАГ 15: СОЗДАЙТЕ КЛАСС ПРИЛОЖЕНИЙ

Класс приложения создан для запуска приложения.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.onlinetechvision.exe;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
/**
 * Application class starts the application
 *
 * @author onlinetechvision.com
 * @since 27 Nov 2012
 * @version 1.0.0
 *
 */
public class Application {
 
    /**
     * Starts the application
     *
     * @param  String[] args
     *
     */
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext('applicationContext.xml');
        Starter starter = (Starter) context.getBean('starter');
        starter.start();
    }
 
}

ШАГ 16: СТРОИМ ПРОЕКТ

После создания проекта OTV_Spring_Hazelcast_DistributedExecution будет создан файл OTV_Spring_Hazelcast_DistributedExecution-0.0.1-SNAPSHOT.jar .

Важное примечание: Члены кластера имеют различную конфигурацию для Coherence, поэтому проект должен быть построен отдельно для каждого участника.

ШАГ 17: ИНТЕГРАЦИЯ С ЦЕНТРОМ УПРАВЛЕНИЯ HAZELCAST

Центр управления Hazelcast позволяет осуществлять мониторинг и управление узлами в кластере.

Количество объектов и резервных копий, принадлежащих customerMap , можно увидеть через таблицу данных карты памяти. Мы распределили 4 записи через customerMap, как показано ниже:

Примеры ключей и значений можно увидеть в браузере карт:

Добавлена ​​первая запись:

Добавлена ​​третья запись:

Детали hazelcastDistributedExecutorService можно увидеть на вкладке Executors. Мы выполнили 3 задачи для первого члена и 2 задачи для второго, как показано ниже:

ШАГ 18: ЗАПУСТИТЕ ПРОЕКТ, НАЧИНАЯ ЧЛЕНА КЛАСТЕРА

После запуска созданного файла OTV_Spring_Hazelcast_DistributedExecution-0.0.1-SNAPSHOT.jar на элементах кластера будут показаны следующие журналы вывода консоли:

Первый вывод консоли участника:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker
INFO: Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [x.y.z.t]
Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker
INFO: Prefer IPv4 stack is true.
Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker
INFO: Picked Address[x.y.z.t]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
Kas 25, 2012 4:07:21 PM com.hazelcast.system
INFO: [x.y.z.t]:5701 [dev] Hazelcast Community Edition 2.4 (20121017) starting at Address[x.y.z.t]:5701
Kas 25, 2012 4:07:21 PM com.hazelcast.system
INFO: [x.y.z.t]:5701 [dev] Copyright (C) 2008-2012 Hazelcast.com
Kas 25, 2012 4:07:21 PM com.hazelcast.impl.LifecycleServiceImpl
INFO: [x.y.z.t]:5701 [dev] Address[x.y.z.t]:5701 is STARTING
Kas 25, 2012 4:07:24 PM com.hazelcast.impl.TcpIpJoiner
INFO: [x.y.z.t]:5701 [dev]
--A new cluster is created and First Member joins the cluster.
Members [1] {
    Member [x.y.z.t]:5701 this
}
 
Kas 25, 2012 4:07:24 PM com.hazelcast.impl.MulticastJoiner
INFO: [x.y.z.t]:5701 [dev]
 
Members [1] {
    Member [x.y.z.t]:5701 this
}
 
...
-- First member adds two new entries to the cache...
EntryAdded... Member : Member [x.y.z.t]:5701 this, Key : 1, OldValue : null, NewValue : Customer [id=1, name=Jodie, surname=Foster]
EntryAdded... Member : Member [x.y.z.t]:5701 this, Key : 2, OldValue : null, NewValue : Customer [id=2, name=Kate, surname=Winslet]
 
...
--Second Member joins the cluster.
Members [2] {
    Member [x.y.z.t]:5701 this
    Member [x.y.z.t]:5702
}
 
...
-- Second member adds two new entries to the cache...
EntryAdded... Member : Member [x.y.z.t]:5702, Key : 4, OldValue : null, NewValue : Customer [id=4, name=Colin, surname=Farrell]
EntryAdded... Member : Member [x.y.z.t]:5702, Key : 3, OldValue : null, NewValue : Customer [id=3, name=Bruce, surname=Willis]

Второй вывод консоли участника:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker
INFO: Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [x.y.z.t]
Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker
INFO: Prefer IPv4 stack is true.
Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker
INFO: Picked Address[x.y.z.t]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true
Kas 25, 2012 4:07:49 PM com.hazelcast.system
INFO: [x.y.z.t]:5702 [dev] Hazelcast Community Edition 2.4 (20121017) starting at Address[x.y.z.t]:5702
Kas 25, 2012 4:07:49 PM com.hazelcast.system
INFO: [x.y.z.t]:5702 [dev] Copyright (C) 2008-2012 Hazelcast.com
Kas 25, 2012 4:07:49 PM com.hazelcast.impl.LifecycleServiceImpl
INFO: [x.y.z.t]:5702 [dev] Address[x.y.z.t]:5702 is STARTING
Kas 25, 2012 4:07:49 PM com.hazelcast.impl.Node
INFO: [x.y.z.t]:5702 [dev] ** setting master address to Address[x.y.z.t]:5701
Kas 25, 2012 4:07:49 PM com.hazelcast.impl.MulticastJoiner
INFO: [x.y.z.t]:5702 [dev] Connecting to master node: Address[x.y.z.t]:5701
Kas 25, 2012 4:07:49 PM com.hazelcast.nio.ConnectionManager
INFO: [x.y.z.t]:5702 [dev] 55715 accepted socket connection from /x.y.z.t:5701
Kas 25, 2012 4:07:55 PM com.hazelcast.cluster.ClusterManager
INFO: [x.y.z.t]:5702 [dev]
--Second Member joins the cluster.
Members [2] {
    Member [x.y.z.t]:5701
    Member [x.y.z.t]:5702 this
}
 
Kas 25, 2012 4:07:56 PM com.hazelcast.impl.LifecycleServiceImpl
INFO: [x.y.z.t]:5702 [dev] Address[x.y.z.t]:5702 is STARTED
-- Second member adds two new entries to the cache...
EntryAdded... Member : Member [x.y.z.t]:5702 this, Key : 3, OldValue : null, NewValue : Customer [id=3, name=Bruce, surname=Willis]
EntryAdded... Member : Member [x.y.z.t]:5702 this, Key : 4, OldValue : null, NewValue : Customer [id=4, name=Colin, surname=Farrell]
 
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:42) - Method executeOnStatedMember is called...
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:46) - Result of method executeOnStatedMember is : First Member' s TestCallable Task is called...
 
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:61) - Method executeOnTheMemberOwningTheKey is called...
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:65) - Result of method executeOnTheMemberOwningTheKey is : First Member' s TestCallable Task is called...
 
25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:78) - Method executeOnAnyMember is called...
25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:82) - Result of method executeOnAnyMember is : Second Member' s TestCallable Task is called...
 
25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:96) - Method executeOnMembers is called...
25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:101) - Result of method executeOnMembers is : [First Member' s TestCallable Task is called..., Second Member' s TestCallable Task is called...]

ШАГ 19: СКАЧАТЬ

https://github.com/erenavsarogullari/OTV_Spring_Hazelcast_DistributedExecution

Ссылки по теме :

Java ExecutorService Interface
Hazelcast Распределенная Служба Исполнителя

Ссылка: Распределенное выполнение Hazelcast с Spring от нашего партнера JCG Эрен Авсарогуллари в блоге Online Technology Vision .