Несколько недель назад я написал блог о начале работы с Hazelcast, в котором рассказывал, как нелепо просто создавать распределенные карты, списки и очереди. В то время я упоминал, что Hazelcast, помимо всего прочего, делает еще много вещей. В этом блоге кратко рассматриваются другие функции Hazelcast: система вещательных сообщений, основанная на шаблоне публикации / подписки . Это принимает обычный формат, в котором приложение отправителя сообщений публикует сообщения по определенной теме. Сообщения не адресованы какому-либо конкретному клиенту, но могут быть прочитаны любым клиентом, который заинтересован в данной теме.
Очевидный сценарий публикации и подписки исходит из мира высоких финансов и маркет-мейкеров . Маркет-мейкер покупает и продает финансовые инструменты, такие как акции, и конкурирует за бизнес, рекламируя как цены покупки, так и продажи на, как правило, электронном рынке. Чтобы реализовать очень простой сценарий маркет-мейкера с использованием Hazelcast, нам нужны три класса: компонент StockPrice , MarketMaker и Client .
Следующий код был добавлен в мой существующий проект Hazelcast, который доступен на Github. Не нужно беспокоиться о дополнительных зависимостях POM.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
public class StockPrice implements Serializable { private static final long serialVersionUID = 1L; private final BigDecimal bid; private final BigDecimal ask; private final String code; private final String description; private final long timestamp; /** * Create a StockPrice for the given stock at a given moment */ public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description, long timestamp) { super(); this.bid = bid; this.ask = ask; this.code = code; this.description = description; this.timestamp = timestamp; } public BigDecimal getBid() { return bid; } public BigDecimal getAsk() { return ask; } public String getCode() { return code; } public String getDescription() { return description; } public long getTimestamp() { return timestamp; } @Override public String toString() { StringBuilder sb = new StringBuilder("Stock - "); sb.append(code); sb.append(" - "); sb.append(description); sb.append(" - "); sb.append(description); sb.append(" - Bid: "); sb.append(bid); sb.append(" - Ask: "); sb.append(ask); sb.append(" - "); SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS"); sb.append(df.format(new Date(timestamp))); return sb.toString(); } } |
StockPrice , со всеми обычными StockPrice получения и установки, моделирует цену спроса и предложения акций (продавать и покупать на обычном языке) в любой момент времени, и класс MarketMaker публикует эти компоненты с помощью Hazelcast.
Обычно маркет-мейкер публикует цены по нескольким финансовым инструментам; однако для простоты в этой демонстрации MarketMaker публикует только одну цену.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
public class MarketMaker implements Runnable { private static Random random = new Random(); private final String stockCode; private final String description; private final ITopic<StockPrice> topic; private volatile boolean running; public MarketMaker(String topicName, String stockCode, String description) { this.stockCode = stockCode; this.description = description; this.topic = createTopic(topicName); running = true; } @VisibleForTesting ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); } public void publishPrices() { Thread thread = new Thread(this); thread.start(); } @Override public void run() { do { publish(); sleep(); } while (running); } private void publish() { StockPrice price = createStockPrice(); System.out.println(price.toString()); topic.publish(price); } @VisibleForTesting StockPrice createStockPrice() { double price = createPrice(); DecimalFormat df = new DecimalFormat("#.##"); BigDecimal bid = new BigDecimal(df.format(price - variance(price))); BigDecimal ask = new BigDecimal(df.format(price + variance(price))); StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description, System.currentTimeMillis()); return stockPrice; } private double createPrice() { int val = random.nextInt(2010 - 1520) + 1520; double retVal = (double) val / 100; return retVal; } private double variance(double price) { return (price * 0.01); } private void sleep() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } public void stop() { running = false; } public static void main(String[] args) throws InterruptedException { MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom"); MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys"); MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium"); bt.publishPrices(); cbry.publishPrices(); bp.publishPrices(); } } |
Как обычно, настройка Hazelcast довольно проста, и большая часть кода в классе MarketMaker выше не имеет ничего общего с Hazelcast. Класс делится на две части: строительные и издательские цены. Конструктор принимает три аргумента, которые он сохраняет для дальнейшего использования. Он также создает экземпляр Hazelcast и регистрирует простой createTopic() "STOCKS" помощью закрытого createTopic() . Как и следовало ожидать, создание экземпляра Hazelcast и регистрация темы занимает две строки кода, как показано ниже:
|
1
2
3
4
|
ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); } |
Остальная часть класса запускает механизм публикации цен, используя поток для вызова MarketMaker run() MarketMaker . Этот метод генерирует случайную ставку, запрашивает цену за связанный код акции и публикует ее с помощью Hazelcast. Публикация осуществляется с использованием следующей строки кода:
|
1
|
topic.publish(price); |
Последняя часть класса MarketMaker — это метод main() и все, что он делает, — это создает несколько экземпляров MarketMaker и запускает их.
Теперь, когда Hazelcast знает о наших постоянно меняющихся ценах на акции, следующее, что нужно сделать, это разобраться в коде клиента.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public class Client implements MessageListener<StockPrice> { public Client(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this); } /** * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message) */ @Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); } public static void main(String[] args) { new Client("STOCKS"); } } |
Как и в любой системе обмена сообщениями, код отправителя сообщения должен знать, кому и как звонить. «Что звонить» достигается тем, что клиент создает экземпляр Hazelcast и регистрирует интерес к теме "STOCKS" же, как издатель, как показано ниже:
|
1
2
3
|
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this); |
«Что вызывать» достигается клиентом, реализующим интерфейс MessageListener MessageListener и его единственный метод onMessage()
|
1
2
3
4
|
@Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); } |
Последняя часть клиентского кода — это метод main() который создает экземпляр клиента.
Последнее, что нужно сделать, это запустить код. Для этого я просто поместил все необходимые файлы JAR в один каталог, и есть только два, которые нужно учитывать: hazel cast-3.1.jar и guava-13.0.1.jar.
Как только это было сделано, я перешел в каталог классов проекта:
|
1
|
cd /Users/Roger/git/captaindebug/hazelcast/target/classes |
… и уволил издателя
|
1
|
java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker |
… а потом клиент.
|
1
|
java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client |
Конечно, если вы используете это на своей машине, используя эту грубую и готовую технику, то не забудьте заменить
/Users/Roger/tmp/mm с указанием пути к месту, куда вы положили свои копии этих файлов JAR.
Если вы запустите издателя MarketMaker в одном терминале, а пару клиентов — в двух других терминалах, то вы получите что-то вроде этого, где вы сможете увидеть публикуемые цены и клиентов, получающих обновления.
В Hazelcast следует обратить внимание на то, что « кластер » относится к кластеру экземпляров Hazelcast, а не к кластеру JVM. Это не очевидно, пока вы не запросите более одного экземпляра Hazelcast для одного приложения. Когда к кластеру присоединятся дополнительные клиенты, вы увидите что-то вроде этого:
|
1
2
3
4
5
6
7
|
Members [5] {Member [192.168.0.7]:5701Member [192.168.0.7]:5702Member [192.168.0.7]:5703Member [192.168.0.7]:5704 thisMember [192.168.0.7]:5705} |
В приведенном выше журнале есть две записи прослушивателя, по одной для каждого клиентского процесса, и три записи издателя, по одной для каждого экземпляра MarketMaker запущенного в MarketMaker main() MarketMaker .

Здесь необходимо рассмотреть вопрос о том, является ли хорошей практикой создание экземпляра Hazelcast для каждого экземпляра объекта (как я это делал в примере кода), или лучше иметь в своем коде один static экземпляр Hazelcast. Я не уверен в ответе на этот вопрос, поэтому, если кто-нибудь из гуру Hazelcast читает это, пожалуйста, дайте мне знать.
Вот и все: Hazelcast успешно работает в режиме публикации и подписки, но я не охватил все возможности Hazelcast; возможно, об этом позже …
- Этот исходный код доступен на Github: https://github.com/roghughe/captaindebug/tree/master/hazelcast

