Статьи

Обнаружение члена Hazelcast с использованием кураторов и ZooKeeper

В одном проекте я настраивал кластер Hazelcast в частном облаке. Внутри кластера все узлы должны видеть друг друга, поэтому во время начальной загрузки Hazelcast попытается найти других членов кластера. Нет сервера и все узлы сделаны равными. Есть несколько методов обнаружения участников, реализованных в Hazelcast; к сожалению, это был не AWS, поэтому мы не могли использовать автообнаружение EC2, и многоадресная рассылка была заблокирована, поэтому встроенная поддержка многоадресной рассылки оказалась бесполезной. Последним средством был кластер TCP / IP, где адреса всех узлов должны быть жестко заданы в конфигурации XML:

1
2
3
4
5
6
7
<tcp-ip enabled="true">
    <member>machine1</member>
    <member>machine2</member>
    <member>machine3:5799</member>
    <member>192.168.1.0-7</member>
    <member>192.168.1.21</member>
</tcp-ip>

Это не очень хорошо масштабируется, также узлы в нашем облаке назначались динамически, поэтому было невозможно определить адреса до выполнения. Здесь я представляю доказательство концепции, основанное на Curator Service Discovery и ZooKeeper . Прежде всего давайте пропустим конфигурацию hazelcast.xml и загрузочный кластер в простом старом коде Java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Configuration
public class HazelcastConfiguration {
  
    @Bean(destroyMethod = "shutdown")
    HazelcastInstance hazelcast(Config config) {
        return Hazelcast.newHazelcastInstance(config);
    }
  
    @Bean
    Config config(ApplicationContext applicationContext, NetworkConfig networkConfig) {
        final Config config = new Config();
        config.setNetworkConfig(networkConfig);
        config.getGroupConfig().setName(applicationContext.getId());
        return config;
    }
  
    @Bean
    NetworkConfig networkConfig(@Value("${hazelcast.port:5701}") int port, JoinConfig joinConfig) {
        final NetworkConfig networkConfig = new NetworkConfig();
        networkConfig.setJoin(joinConfig);
        networkConfig.setPort(port);
        return networkConfig;
    }
  
    @Bean
    JoinConfig joinConfig(TcpIpConfig tcpIpConfig) {
        final JoinConfig joinConfig = disabledMulticast();
        joinConfig.setTcpIpConfig(tcpIpConfig);
        return joinConfig;
    }
  
    private JoinConfig disabledMulticast() {
        JoinConfig join = new JoinConfig();
        final MulticastConfig multicastConfig = new MulticastConfig();
        multicastConfig.setEnabled(false);
        join.setMulticastConfig(multicastConfig);
        return join;
    }
  
    @Bean
    TcpIpConfig tcpIpConfig(ApplicationContext applicationContext, ServiceDiscovery<Void> serviceDiscovery) throws Exception {
        final TcpIpConfig tcpIpConfig = new TcpIpConfig();
        final List<String> instances = queryOtherInstancesInZk(applicationContext.getId(), serviceDiscovery);
        tcpIpConfig.setMembers(instances);
        tcpIpConfig.setEnabled(true);
        return tcpIpConfig;
    }
  
    private List<String> queryOtherInstancesInZk(String name, ServiceDiscovery<Void> serviceDiscovery) throws Exception {
        return serviceDiscovery
                .queryForInstances(name)
                .stream()
                .map(ServiceInstance::buildUriSpec)
                .collect(toList());
    }
  
}

Я использую applicationContext.getId() чтобы избежать жесткого кодирования имени приложения. В Spring Boot его можно заменить на --spring.application.name=... Также полезно назначить имя для кластера config.getGroupConfig().setName(...) — это позволит нам запускать несколько кластеров. в той же сети, даже с включенной многоадресной рассылкой. Последний метод queryOtherInstancesInZk() наиболее интересен. При создании TcpIpConfig мы вручную предоставляем список адресов TCP / IP, где находятся другие члены кластера. Вместо того, чтобы жестко кодировать этот список (как в примере с XML выше), мы запрашиваем ServiceDiscovery от куратора. Мы запрашиваем все экземпляры нашего приложения и передаем его в TcpIpConfig . Прежде чем перейти к настройке куратора, несколько слов о том, как Hazelcast использует конфигурацию TCP / IP. Очевидно, что все узлы не запускаются одновременно. Когда запускается первый узел, куратор едва вернет один экземпляр (себя), поэтому в кластере будет только один член. Когда запускается второй узел, он увидит уже запущенный узел и попытается сформировать из него кластер. Очевидно, что первый узел обнаружит второй, просто подключающийся к нему. Индукция продолжается — когда запускается больше узлов, они получают существующие узлы из службы обнаружения кураторов и присоединяются к ним. Hazelcast позаботится о ложных сбоях участников, удалив их из кластера и перебалансировав данные. Куратор, с другой стороны, удалит их из ZooKeeper.

Хорошо, теперь откуда ServiceDiscovery<Void> ? Вот полная конфигурация:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
public class CuratorConfiguration {
  
    @BeanWithLifecycle
    ServiceDiscovery<Void> serviceDiscovery(CuratorFramework curatorFramework, ServiceInstance<Void> serviceInstance) throws Exception {
        return ServiceDiscoveryBuilder
                .builder(Void.class)
                .basePath("hazelcast")
                .client(curatorFramework)
                .thisInstance(serviceInstance)
                .build();
    }
  
    @BeanWithLifecycle
    CuratorFramework curatorFramework(@Value("${zooKeeper.url:localhost:2181}") String zooKeeperUrl) {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        return CuratorFrameworkFactory.newClient(zooKeeperUrl, retryPolicy);
    }
  
    @Bean
    ServiceInstance<Void> serviceInstance(@Value("${hazelcast.port:5701}") int port, ApplicationContext applicationContext) throws Exception {
        final String hostName = InetAddress.getLocalHost().getHostName();
        return ServiceInstance
                .<Void>builder()
                .name(applicationContext.getId())
                .uriSpec(new UriSpec("{address}:{port}"))
                .address(hostName)
                .port(port)
                .build();
    }
  
}

Hazelcast по умолчанию прослушивает 5701, но если указанный порт занят, он будет пытаться использовать последующие. При запуске мы регистрируемся в кураторе, предоставляя имя нашего хоста и порт Hazelcast. При запуске других экземпляров нашего приложения они увидят ранее зарегистрированные экземпляры. Когда приложение закрывается, куратор отменяет регистрацию, используя механизм эфемерного узла в ZooKeeper. Кстати, @BeanWithLifecycle не из Spring или Spring Boot, я создал его сам, чтобы избежать повторений:

1
2
3
4
@Target({METHOD, ANNOTATION_TYPE})
@Retention(RUNTIME)
@Bean(initMethod = "start", destroyMethod = "close")
@interface BeanWithLifecycle { }

Запустив ZooKeeper (по умолчанию на localhost:2181 ), мы можем запустить произвольное количество узлов, и они быстро найдут друг друга. Единственная общая информация — это URL ZooKeeper.