Статьи

Опубликовать и подписаться с Hazelcast

Несколько недель назад я написал блог о начале работы с Hazelcast, в котором рассказывал, как нелепо просто создавать распределенные карты, списки и очереди. В то время я упоминал, что Hazelcast, помимо всего прочего, делает еще много вещей. В этом блоге кратко рассматриваются другие функции Hazelcast: система вещательных сообщений, основанная на шаблоне публикации / подписки . Это принимает обычный формат, в котором приложение отправителя сообщений публикует сообщения по определенной теме. Сообщения не адресованы какому-либо конкретному клиенту, но могут быть прочитаны любым клиентом, который заинтересован в данной теме.

Очевидный сценарий публикации и подписки исходит из мира высоких финансов и маркет-мейкеров . Маркет-мейкер покупает и продает финансовые инструменты, такие как акции, и конкурирует за бизнес, рекламируя как цены покупки, так и продажи на, как правило, электронном рынке. Чтобы реализовать очень простой сценарий маркет-мейкера с использованием Hazelcast, нам нужны три класса: компонент StockPrice , MarketMaker и Client .

Следующий код был добавлен в мой существующий проект Hazelcast, который доступен на Github. Не нужно беспокоиться о дополнительных зависимостях POM.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class StockPrice implements Serializable {
 
  private static final long serialVersionUID = 1L;
 
  private final BigDecimal bid;
 
  private final BigDecimal ask;
 
  private final String code;
 
  private final String description;
 
  private final long timestamp;
 
  /**
   * Create a StockPrice for the given stock at a given moment
   */
  public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description,
      long timestamp) {
    super();
    this.bid = bid;
    this.ask = ask;
    this.code = code;
    this.description = description;
    this.timestamp = timestamp;
  }
 
  public BigDecimal getBid() {
    return bid;
  }
 
  public BigDecimal getAsk() {
    return ask;
  }
 
  public String getCode() {
    return code;
  }
 
  public String getDescription() {
    return description;
  }
 
  public long getTimestamp() {
    return timestamp;
  }
 
  @Override
  public String toString() {
 
    StringBuilder sb = new StringBuilder("Stock - ");
    sb.append(code);
    sb.append(" - ");
    sb.append(description);
    sb.append(" - ");
    sb.append(description);
    sb.append(" - Bid: ");
    sb.append(bid);
    sb.append(" - Ask: ");
    sb.append(ask);
    sb.append(" - ");
    SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS");
    sb.append(df.format(new Date(timestamp)));
    return sb.toString();
  }
}

StockPrice , со всеми обычными StockPrice получения и установки, моделирует цену спроса и предложения акций (продавать и покупать на обычном языке) в любой момент времени, и класс MarketMaker публикует эти компоненты с помощью Hazelcast.

Обычно маркет-мейкер публикует цены по нескольким финансовым инструментам; однако для простоты в этой демонстрации MarketMaker публикует только одну цену.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public class MarketMaker implements Runnable {
 
  private static Random random = new Random();
 
  private final String stockCode;
 
  private final String description;
 
  private final ITopic<StockPrice> topic;
 
  private volatile boolean running;
 
  public MarketMaker(String topicName, String stockCode, String description) {
    this.stockCode = stockCode;
    this.description = description;
    this.topic = createTopic(topicName);
    running = true;
  }
 
  @VisibleForTesting
  ITopic<StockPrice> createTopic(String topicName) {
    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
    return hzInstance.getTopic(topicName);
  }
 
  public void publishPrices() {
 
    Thread thread = new Thread(this);
    thread.start();
  }
 
  @Override
  public void run() {
 
    do {
      publish();
      sleep();
    } while (running);
  }
 
  private void publish() {
 
    StockPrice price = createStockPrice();
    System.out.println(price.toString());
    topic.publish(price);
  }
 
  @VisibleForTesting
  StockPrice createStockPrice() {
 
    double price = createPrice();
    DecimalFormat df = new DecimalFormat("#.##");
 
    BigDecimal bid = new BigDecimal(df.format(price - variance(price)));
    BigDecimal ask = new BigDecimal(df.format(price + variance(price)));
 
    StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description,
        System.currentTimeMillis());
    return stockPrice;
  }
 
  private double createPrice() {
 
    int val = random.nextInt(2010 - 1520) + 1520;
    double retVal = (double) val / 100;
    return retVal;
  }
 
  private double variance(double price) {
    return (price * 0.01);
  }
 
  private void sleep() {
    try {
      TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
 
  public void stop() {
    running = false;
  }
 
  public static void main(String[] args) throws InterruptedException {
 
    MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom");
    MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys");
    MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium");
 
    bt.publishPrices();
    cbry.publishPrices();
    bp.publishPrices();
 
  }
 
}

Как обычно, настройка Hazelcast довольно проста, и большая часть кода в классе MarketMaker выше не имеет ничего общего с Hazelcast. Класс делится на две части: строительные и издательские цены. Конструктор принимает три аргумента, которые он сохраняет для дальнейшего использования. Он также создает экземпляр Hazelcast и регистрирует простой createTopic() "STOCKS" помощью закрытого createTopic() . Как и следовало ожидать, создание экземпляра Hazelcast и регистрация темы занимает две строки кода, как показано ниже:

1
2
3
4
  ITopic<StockPrice> createTopic(String topicName) {
    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
    return hzInstance.getTopic(topicName);
  }

Остальная часть класса запускает механизм публикации цен, используя поток для вызова MarketMaker run() MarketMaker . Этот метод генерирует случайную ставку, запрашивает цену за связанный код акции и публикует ее с помощью Hazelcast. Публикация осуществляется с использованием следующей строки кода:

1
    topic.publish(price);

Последняя часть класса MarketMaker — это метод main() и все, что он делает, — это создает несколько экземпляров MarketMaker и запускает их.

Теперь, когда Hazelcast знает о наших постоянно меняющихся ценах на акции, следующее, что нужно сделать, это разобраться в коде клиента.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Client implements MessageListener<StockPrice> {
 
  public Client(String topicName) {
    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
    ITopic<StockPrice> topic = hzInstance.getTopic(topicName);
    topic.addMessageListener(this);
  }
 
  /**
   * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message)
   */
  @Override
  public void onMessage(Message<StockPrice> arg0) {
    System.out.println("Received: " + arg0.getMessageObject().toString());
  }
 
  public static void main(String[] args) {
 
    new Client("STOCKS");
  }
 
}

Как и в любой системе обмена сообщениями, код отправителя сообщения должен знать, кому и как звонить. «Что звонить» достигается тем, что клиент создает экземпляр Hazelcast и регистрирует интерес к теме "STOCKS" же, как издатель, как показано ниже:

1
2
3
    HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance();
    ITopic<StockPrice> topic = hzInstance.getTopic(topicName);
    topic.addMessageListener(this);

«Что вызывать» достигается клиентом, реализующим интерфейс MessageListener MessageListener и его единственный метод onMessage()

1
2
3
4
  @Override
  public void onMessage(Message<StockPrice> arg0) {
    System.out.println("Received: " + arg0.getMessageObject().toString());
  }

Последняя часть клиентского кода — это метод main() который создает экземпляр клиента.

Последнее, что нужно сделать, это запустить код. Для этого я просто поместил все необходимые файлы JAR в один каталог, и есть только два, которые нужно учитывать: hazel cast-3.1.jar и guava-13.0.1.jar.

Снимок экрана 2013-12-07 в 10.51.43

Как только это было сделано, я перешел в каталог классов проекта:

1
cd /Users/Roger/git/captaindebug/hazelcast/target/classes

… и уволил издателя

1
java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker

… а потом клиент.

1
java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client

Конечно, если вы используете это на своей машине, используя эту грубую и готовую технику, то не забудьте заменить
/Users/Roger/tmp/mm с указанием пути к месту, куда вы положили свои копии этих файлов JAR.

Если вы запустите издателя MarketMaker в одном терминале, а пару клиентов — в двух других терминалах, то вы получите что-то вроде этого, где вы сможете увидеть публикуемые цены и клиентов, получающих обновления.

Снимок экрана 2013-12-22 в 17.40.07

В Hazelcast следует обратить внимание на то, что « кластер » относится к кластеру экземпляров Hazelcast, а не к кластеру JVM. Это не очевидно, пока вы не запросите более одного экземпляра Hazelcast для одного приложения. Когда к кластеру присоединятся дополнительные клиенты, вы увидите что-то вроде этого:

1
2
3
4
5
6
7
Members [5] {
Member [192.168.0.7]:5701
Member [192.168.0.7]:5702
Member [192.168.0.7]:5703
Member [192.168.0.7]:5704 this
Member [192.168.0.7]:5705
}

В приведенном выше журнале есть две записи прослушивателя, по одной для каждого клиентского процесса, и три записи издателя, по одной для каждого экземпляра MarketMaker запущенного в MarketMaker main() MarketMaker .

Снимок экрана 2013-12-07 в 11.16.21
Здесь необходимо рассмотреть вопрос о том, является ли хорошей практикой создание экземпляра Hazelcast для каждого экземпляра объекта (как я это делал в примере кода), или лучше иметь в своем коде один static экземпляр Hazelcast. Я не уверен в ответе на этот вопрос, поэтому, если кто-нибудь из гуру Hazelcast читает это, пожалуйста, дайте мне знать.

Вот и все: Hazelcast успешно работает в режиме публикации и подписки, но я не охватил все возможности Hazelcast; возможно, об этом позже …