Статьи

Руководство для начинающих по Hazelcast, часть 3

Это продолжение серии сообщений о том, как использовать Hazelcast с точки зрения новичка. Если вы не читали последние два, я рекомендую прочитать их:

Примитивы идут

Во время моего последнего поста я упоминал об использовании ILock с IList и ISet, потому что они не являются поточно-ориентированными. Меня поразило, что я не рассмотрел основную часть Hazelcast, распределенных примитивов. Они решают проблему синхронизации использования ресурсов распределенным способом. Те, кто занимается многопоточным программированием, сразу узнают их. Для тех из вас, кто плохо знаком с программированием в потоках, я объясню, что делает каждый примитив, и приведу пример.

IAtomicLong

Это распределенная атомная длинна. Это означает, что каждая операция происходит одновременно. Например, можно добавить число и получить полученное значение за одну операцию. Можно получить значение, а затем добавить значение. Это верно для каждой операции, выполняемой с этим примитивом. Как можно себе представить, это потокобезопасно, но нельзя сделать это, и это потокобезопасно.

1
atomicLong.addAndGet(2 * atomicLong.get());

Строка выше создает состояние гонки, потому что есть три операции: чтение содержимого атомарного long, умножение на два и добавление этого к экземпляру. Поток безопасно существует только в том случае, если операция гарантированно произойдет за один шаг. Для этого в IAtomicLong есть метод alterAndGet. AlterAndGet принимает объект IFunction. Это делает многошаговые операции одним шагом. Всегда есть одна синхронная резервная копия IAtomicLong, и она не настраивается.

IdGenerator

IAtomicLongs отлично подходит для отслеживания того, сколько у вас есть. Проблема заключается в том, что, поскольку вызов, скорее всего, является удаленным, IAtomicLongs для некоторых ситуаций не является идеальным решением. Одна из таких ситуаций — генерация уникальных идентификаторов. IdGenerator был сделан именно для этой цели. Это работает так, что каждый участник запрашивает миллион идентификаторов. После того, как все эти заявленные цифры взяты, сегмент требует еще один миллион. Таким образом, поскольку у каждого участника есть миллион спрятанных идентификаторов, вероятность того, что вызов IdGenerator является удаленным, составляет один на миллион. Это позволяет очень быстро создавать уникальные идентификаторы. Если возникнут какие-либо дубликаты, это может быть потому, что участники не присоединились. Если член выходит из строя до того, как его сегмент будет использован, в идентификаторах будут пробелы. Для генерации уникального идентификатора пропущенные номера не являются проблемой. Я чувствую, что участники не подключаются к кластеру, это проблема, но если это происходит, есть более важные вещи, о которых нужно беспокоиться. Если кластер перезапускается, идентификаторы снова начинаются с нуля. Это потому, что идентификатор не сохранился. Это база данных в памяти, каждый рискует. Чтобы противостоять этому, IdGenerators можно настроить на запуск с определенного номера, если он не заявлен кем-то еще и идентификаторы еще не созданы. Альтернативой является создание собственного генератора идентификаторов или использование класса java.util.UUID. Это может занять больше места, но каждый проект имеет свои собственные требования для удовлетворения. IdGenerators всегда имеют одну синхронную резервную копию и не могут быть настроены.

ILock

Вот классический метод синхронизации с поворотом. Это эксклюзивный замок, который распространяется. Один просто вызывает метод блокировки, и поток либо ждет, либо получает блокировку. Как только замок установлен, критическая секция может быть предварительно сформирована. Когда работа завершена, используется метод разблокировки. Ветераны этой техники поместят критическую секцию в блок try finally, установив блокировку сразу за блоком try и разблокировку в секции finally. Это неоценимо для выполнения действий над структурами, которые не являются потокобезопасными. Процесс, который получает блокировку, владеет блокировкой и должен вызывать разблокировку, чтобы другие процессы могли устанавливать блокировки. Это может быть проблематично, если у вас есть потоки в нескольких местах в сети. Hazelcast подумал об этой проблеме и снял блокировку, когда член отключается. Другая особенность заключается в том, что метод блокировки имеет время ожидания 300 секунд. Это предотвращает голодные темы. ILocks имеет одну синхронную резервную копию и не настраивается.

Небольшой совет от кого-то, у кого есть опыт, держите критические секции как можно меньше; это помогает производительности и предотвращает взаимные блокировки. Взаимные блокировки затрудняют отладку, и их сложнее тестировать из-за неизвестного порядка выполнения потоков. Один раз ошибка проявляется, тогда это не так. Это может продолжаться в течение недели или более из-за неуместной блокировки. Тогда нужно быть уверенным, что это больше не повторится. Это трудно доказать из-за неизвестного выполнения потоков. К тому времени, когда все это сделано, босс разочарован из-за времени, которое потребовалось, и никто не знает, исправлена ​​ли ошибка или нет.

ICondition

Вы когда-нибудь хотели дождаться события, но не хотели, чтобы другие люди тоже его ждали? Это именно то, что условия для многопоточного программирования. До Java 1.5 это было достигнуто с помощью техники synchronized-wait-notify. Это может быть выполнено методом блокировки. Отправляйся в путешествие со мной, и я покажу, как это работает. Представьте себе ситуацию, когда существует не безопасный поток список, в котором производитель и потребитель пишут и читают из него. Очевидно, что есть критические разделы, которые необходимо защитить. Это падает на колени замка. После того, как блокировка установлена, критическая работа может начаться. Единственная проблема заключается в том, что ресурс в состоянии, которое бесполезно для потока. Например, потребитель не может извлекать записи из пустого списка. Производитель не может помещать записи в полный список. Вот где наступает условие. Производитель или потребитель войдет в цикл while, который проверяет условие, которое является благоприятным, и вызывает условие.await (). Когда вызывается await, поток снимает свою блокировку и позволяет другим потокам обращаться к своим критическим разделам. Ожидающий поток вернет блокировку для проверки ее состояния и может подождать еще несколько, либо условие будет выполнено и начнет выполнять работу. Когда критическая секция завершена, поток может вызвать signal () или signalAll (), чтобы другие потоки проснулись и проверили их состояние. Условия создаются блокировкой вместо экземпляра Hazelcast. Другое дело, что если кто-то хочет, чтобы условие распространялось, он должен использовать метод lock.newCondition (String name). IConditions имеют одну синхронную резервную копию и не могут быть настроены.

Я не могу сказать, сколько тупиков может возникнуть при использовании этой техники. Иногда сигнал приходит, когда поток ждет, и все хорошо. Другая сторона заключается в том, что сигнал отправляется, когда поток не ожидает, переходит в состояние ожидания и ожидает бесконечно. По этой причине я рекомендую использовать тайм-аут во время ожидания, чтобы поток мог время от времени проверять, выполнено ли условие. Таким образом, если сигнал пропадает, самое худшее, что может случиться, — это небольшое время ожидания, а не вечное ожидание. Я использовал технику тайм-аута в моем примере. Скопируйте и вставьте код столько, сколько хотите. Я бы предпочел проверить используемые методы, а не непроверенный код, вторгающийся в Интернет.

ICountDownLatch

ICountDownLatch — это инструмент синхронизации, который срабатывает, когда его счетчик обнуляется. Это не обычный способ сделать координаты, но он есть там, где это необходимо. Раздел примера, я думаю, дает гораздо лучшее объяснение того, как это работает. Защелка может быть сброшена после того, как она обнуляется, чтобы ее можно было использовать снова. Если принадлежащий элемент удаляется, все потоки, ожидающие защелки, чтобы достичь нуля, сигнализируются так, как если бы он достиг нуля. Резервное копирование ICountDownLatch выполняется синхронно в другом месте и не может быть настроено.

ISemaphore

Да, есть распределенная версия классического семафора. Это меня волнует, потому что в прошлый раз, когда я перешел в класс «Операционная система», семафорам требовалась небольшая аппаратная поддержка. Может быть, я только что встречался, ну, все равно, это круто (снова встречаюсь с собой). Семафоры работают, ограничивая количество потоков, которые могут получить доступ к ресурсу. В отличие от блокировок, у семафоров нет чувства владения, поэтому разные потоки могут освободить заявку на ресурс. В отличие от остальных примитивов, ISemaphore может быть настроен. Я настраиваю один в моем примере. Он находится в файле hazelcast.xml в пакете по умолчанию для моего проекта.

Примеры

Вот примеры. У меня был комментарий о моем последнем посте с просьбой сделать отступ для моего кода, чтобы он был более читабельным. Я сделаю это наверняка на этот раз из-за объема кода, который я публикую. Можно увидеть пару вещей, которые я раньше не обсуждал. Одним из них является IExecutorService. Это распределенная версия ExecutorService. Можно фактически отослать рабочие места, чтобы быть законченным различными участниками. Другое дело, что все определенные классы Runnable / Callable реализуют Serializable. Это необходимо в распределенной среде, поскольку объект может быть отправлен различным членам. Последнее — это интерфейс HazelcastInstanceAware. Это позволяет классу получить доступ к локальному экземпляру Hazelcast. Затем класс может получить экземпляры ресурсов, которые ему нужны (например, ILists). Без дальнейших церемоний, здесь мы идем.

IAtomicLong

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package hazelcastprimitives.iatomiclong;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IFunction;
import java.io.Serializable;
 
/**
 *
 * @author Daryl
 */
public class IAtomicLongExample {
    public static class MultiplyByTwoAndSubtractOne
        implements IFunction, Serializable {
 
        @Override
        public Long apply(Long t) {
            return (long)(2 * t - 1);
        }
         
    }
     
    public static final void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        final String NAME = "atomic";
        IAtomicLong aLong = instance.getAtomicLong(NAME);
        IAtomicLong bLong = instance.getAtomicLong(NAME);
        aLong.getAndSet(1L);
        System.out.println("bLong is now: " + bLong.getAndAdd(2));
        System.out.println("aLong is now: " + aLong.getAndAdd(0L));
         
        MultiplyByTwoAndSubtractOne alter = new MultiplyByTwoAndSubtractOne();
        aLong.alter(alter);
        System.out.println("bLong is now: " + bLong.getAndAdd(0L));
        bLong.alter(alter);
        System.out.println("aLong is now: " + aLong.getAndAdd(0L));
         
        System.exit(0);
    }
}

Обратите внимание, что даже класс MutilpyAndSubtractOne реализует Serializable.

IdGenerator

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package hazelcastprimitives.idgenerator;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IdGenerator;
 
/**
 *
 * @author Daryl
 */
public class IdGeneratorExample {
  
    public static void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
 
        IdGenerator generator = instance.getIdGenerator("generator");
         
        for(int i = 0; i < 10; i++) {
            System.out.println("The generated value is " + generator.newId());
        }
         
        instance.shutdown();
        System.exit(0);
    }
}

ILock

Этот пример ILock также можно считать примером условия ICon. Мне пришлось использовать условие, потому что ListConsumer всегда работал до ListProducer, поэтому я заставил ListConsumer подождать, пока IList будет что-то потреблять.

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
package hazelcastprimitives.ilock;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ICondition;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IList;
import com.hazelcast.core.ILock;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
 
/**
 *
 * @author Daryl
 */
public class ILockExample {
 
    static final String LIST_NAME = "to be locked";
    static final String LOCK_NAME = "to lock with";
    static final String CONDITION_NAME = "to signal with";
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        IExecutorService service = instance.getExecutorService("service");
        ListConsumer consumer = new ListConsumer();
        ListProducer producer = new ListProducer();
         
        try {
            service.submit(producer);
            service.submit(consumer);
            Thread.sleep(10000);
        } catch(InterruptedException ie){
            System.out.println("Got interrupted");
        } finally {
            instance.shutdown();
        }
    }
     
    public static class ListConsumer implements Runnable, Serializable, HazelcastInstanceAware {
 
        private transient HazelcastInstance instance;
         
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICondition condition = lock.newCondition(CONDITION_NAME);
            IList list = instance.getList(LIST_NAME);
            lock.lock();
            try {
                while(list.isEmpty()) {
                    condition.await(2, TimeUnit.SECONDS);
                }
                while(!list.isEmpty()) {
                    System.out.println("value is " + list.get(0));
                    list.remove(0);
                }
            } catch(InterruptedException ie) {
                System.out.println("Consumer got interrupted");
            } finally {
                lock.unlock();
            }
            System.out.println("Consumer leaving");
        }
 
        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
         
    }
     
    public static class ListProducer implements Runnable, Serializable, HazelcastInstanceAware {
        private transient HazelcastInstance instance;
 
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICondition condition = lock.newCondition(CONDITION_NAME);
            IList list = instance.getList(LIST_NAME);
            lock.lock();
            try {
                for(int i = 1; i <= 10; i++){
                    list.add(i);
                }
                condition.signalAll();
            } finally {
                lock.unlock();
            }
            System.out.println("Producer leaving");
        }
 
        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
         
    }
}

ICondition

Вот реальный пример ICondition. Обратите внимание, как SpunProducer и SpunConsumer совместно используют одно и то же условие ICON и сигнализируют друг другу. Обратите внимание, что я использую таймауты для предотвращения взаимных блокировок.

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package hazelcastprimitives.icondition;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ICondition;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IList;
import com.hazelcast.core.ILock;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
 
/**
 *
 * @author Daryl
 */
public class IConditionExample {
     
    static final String LOCK_NAME = "lock";
    static final String CONDITION_NAME = "condition";
    static final String SERVICE_NAME = "spinderella";
    static final String LIST_NAME = "list";
     
    public static final void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
         
        IExecutorService service  = instance.getExecutorService(SERVICE_NAME);
        service.execute(new SpunConsumer());
        service.execute(new SpunProducer());
         
         
         
        try {
            Thread.sleep(10000);
 
        } catch(InterruptedException ie) {
            System.out.println("Hey we got out sooner than I expected");
        } finally {
            instance.shutdown();
            System.exit(0);
        }
    }
     
    public static class SpunProducer implements Serializable, Runnable, HazelcastInstanceAware {
 
        private transient HazelcastInstance instance;
        private long counter = 0;
         
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICondition condition = lock.newCondition(CONDITION_NAME);
            IList list = instance.getList(LIST_NAME);
             
            lock.lock();           
            try {
                if(list.isEmpty()) {
                    populate(list);
                    System.out.println("telling the consumers");
                    condition.signalAll();
                }
                for(int i = 0; i < 2; i++) {
                    while(!list.isEmpty()) {
                        System.out.println("Waiting for the list to be empty");
                        System.out.println("list size: " + list.size() );
                        condition.await(2, TimeUnit.SECONDS);
                    
                    populate(list);
                    System.out.println("Telling the consumers");
                    condition.signalAll();
                }
            } catch(InterruptedException ie) {
                System.out.println("We have a found an interuption");
            } finally {
                condition.signalAll();
                System.out.println("Producer exiting stage left");
                lock.unlock();
            }
        }
 
        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
         
        private void populate(IList list) {
            System.out.println("Populating list");
            long currentCounter = counter;
            for(; counter < currentCounter + 10; counter++) {
                list.add(counter);
            }
        }
    }
     
    public static class SpunConsumer implements Serializable, Runnable, HazelcastInstanceAware {
 
        private transient HazelcastInstance instance;
         
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICondition condition = lock.newCondition(CONDITION_NAME);
            IList list = instance.getList(LIST_NAME);
             
            lock.lock();           
            try {
                for(int i = 0; i < 3; i++) {
                    while(list.isEmpty()) {
                        System.out.println("Waiting for the list to be filled");
                        condition.await(1, TimeUnit.SECONDS);
                    }
                    System.out.println("removing values");
                    while(!list.isEmpty()){
                        System.out.println("value is " + list.get(0));
                        list.remove(0);
                    }
                    System.out.println("Signaling the producer");
                    condition.signalAll();
                }
            } catch(InterruptedException ie) {
                System.out.println("We had an interrupt");
            } finally {
                System.out.println("Consumer exiting stage right");
                condition.signalAll();
                lock.unlock();
            }
        }
 
        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
    }
 
}

ICountDownLatch

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
package hazelcastprimitives.icountdownlatch;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ICountDownLatch;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IList;
import com.hazelcast.core.ILock;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
 
/**
 *
 * @author Daryl
 */
public class ICountDownLatchExample {
    static final String LOCK_NAME = "lock";
    static final String LATCH_NAME = "condition";
    static final String SERVICE_NAME = "spinderella";
    static final String LIST_NAME = "list";
     
    public static final void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
         
        IExecutorService service  = instance.getExecutorService(SERVICE_NAME);
        service.execute(new SpunMaster());
        service.execute(new SpunSlave());
         
         
         
        try {
            Thread.sleep(10000);
 
        } catch(InterruptedException ie) {
            System.out.println("Hey we got out sooner than I expected");
        } finally {
            instance.shutdown();
            System.exit(0);
        }
    }
     
    public static class SpunMaster implements Serializable, Runnable, HazelcastInstanceAware {
 
        private transient HazelcastInstance instance;
        private long counter = 0;
         
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICountDownLatch latch = instance.getCountDownLatch(LATCH_NAME);
            IList list = instance.getList(LIST_NAME);
             
            lock.lock();           
            try {
                latch.trySetCount(10);
                populate(list, latch);
            } finally {
                System.out.println("Master exiting stage left");
                lock.unlock();
            }
        }
 
        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
         
        private void populate(IList list, ICountDownLatch latch) {
            System.out.println("Populating list");
            long currentCounter = counter;
            for(; counter < currentCounter + 10; counter++) {
                list.add(counter);
                latch.countDown();
            }
        }
    }
     
    public static class SpunSlave implements Serializable, Runnable, HazelcastInstanceAware {
 
        private transient HazelcastInstance instance;
         
        @Override
        public void run() {
            ILock lock = instance.getLock(LOCK_NAME);
            ICountDownLatch latch = instance.getCountDownLatch(LATCH_NAME);
            IList list = instance.getList(LIST_NAME);
             
            lock.lock();           
            try {
                if(latch.await(2, TimeUnit.SECONDS)) {
                    while(!list.isEmpty()){
                        System.out.println("value is " + list.get(0));
                        list.remove(0);
                    }
 
                }
            } catch(InterruptedException ie) {
                System.out.println("We had an interrupt");
            } finally {
                System.out.println("Slave exiting stage right");
                lock.unlock();
            }
        }
 
        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
    }
 
}

ISemaphore

конфигурация

Вот конфигурация ISemaphore:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
<?xml version="1.0" encoding="UTF-8"?>
<hazelcast
    <network>
        <join><multicast enabled="true"/></join>
    </network>
     
    <semaphore name="to reduce access">
        <initial-permits>3</initial-permits>
    </semaphore>
</hazelcast>

Пример кода

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package hazelcastprimitives.isemaphore;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.ISemaphore;
import com.hazelcast.core.IdGenerator;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
 
/**
 *
 * @author Daryl
 */
public class ISemaphoreExample {
    static final String SEMAPHORE_NAME = "to reduce access";
    static final String GENERATOR_NAME = "to use";
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        IExecutorService service = instance.getExecutorService("service");
        List<Future> futures = new ArrayList(10);
        try {
            for(int i = 0; i < 10; i++) {
                futures.add(service.submit(new GeneratorUser(i)));
            }
            // so I wait til the last man.  No this may not be scalable.
            for(Future future: futures) {
                future.get();
            }
        } catch(InterruptedException ie){
            System.out.printf("Got interrupted.");
        } catch(ExecutionException ee) {
            System.out.printf("Cannot execute on Future. reason: %s\n", ee.toString());
        } finally {
            service.shutdown();
            instance.shutdown();
        }
 
    }
     
    static class GeneratorUser implements Callable, Serializable, HazelcastInstanceAware {
        private transient HazelcastInstance instance;
        private final int number;
         
        public GeneratorUser(int number) {
            this.number = number;
        }
         
        @Override
        public Long call() {
            ISemaphore semaphore = instance.getSemaphore(SEMAPHORE_NAME);
            IdGenerator gen = instance.getIdGenerator(GENERATOR_NAME);
            long lastId = -1;
            try {
                semaphore.acquire();
                try {
                    for(int i = 0; i < 10; i++){
                        lastId = gen.newId();
                        System.out.printf("current value of generator on %d is %d\n", number, lastId);
                        Thread.sleep(1000);
                    }
                } catch(InterruptedException ie) {
                    System.out.printf("User %d was Interrupted\n", number);
                } finally {
                    semaphore.release();
                }
            } catch(InterruptedException ie) {
                System.out.printf("User %d Got interrupted\n", number);
            }
            System.out.printf("User %d is leaving\n", number);
            return lastId;
        }
 
        @Override
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            instance = hazelcastInstance;
        }
         
    }
 
}

Вывод

Примитивы Hazelcast обсуждались в этом посте. Большинство, если не все, вращались вокруг координации потоков. Объяснения примитивного и личного опыта были поделены. В примерах были показаны различные типы координации. Примеры можно скачать с помощью subversion по адресу http://darylmathisonblog.googlecode.com/svn/trunk/HazelcastPrimitives .

использованная литература

  • Книга Hazelcast: найдена на www.hazelcast.com
  • Документация Hazelcast: найдена в загрузке Hazelcast, найденной на www.hazelcast.org