Статьи

Руководство для начинающих по Hazelcast, часть 4

Это четвертая часть моей серии Hazelcast. Если один не видел других 3, я предлагаю перейти к части 1 , части 2 и части 3 .

логирование

Ведение журнала является важной особенностью любого приложения, и мои примеры ничем не отличаются. System.out.println может быть полезным инструментом для информирования пользователя о том, что происходит в консольных приложениях. Но давайте посмотрим правде в глаза, если кто-то читает, как использовать инструмент для распределенных приложений, этот человек на самом деле не новичок. Просмотр серии сообщений не должен никого пугать. На самом деле, для примеров в этом посте, они должны знать, кем и что происходит. В конце концов, мы будем говорить о многопоточном программировании.

Хорошие люди из Hazelcast, похоже, согласились с тем, что ведение журнала важно, и поэтому у них есть много разных способов определить, какая библиотека ведёт журнал. Инфраструктура ведения журнала зависит только от ведения журнала JDK и имеет ряд адаптеров, которые даже позволяют создавать пользовательские структуры ведения журнала. Выбранный адаптер ведения журнала устанавливается свойством hazelcast.logging.type со следующими параметрами:

  • Ведение журнала JDK, это по умолчанию.
  • log4j
  • SLF4J
  • никто

Я использовал Log4j2, поэтому я выбрал slf4j и вставил четыре jar-файла, необходимые для его работы.

Спиннинг распределенных потоков

Как и многие классы в Hazelcast, IExecutorService реализует интерфейс из библиотек Java, ExecutorService. Этот интерфейс определяет, что такое пул потоков. Интерфейс является частью пакета java.util.concurrent и существует с Java 1.5. Пакет также имеет реализации, к которым можно получить доступ из java.util.concurrent.Executors. Я хотел бы иметь что-то подобное в Java 1.4 или 1.3 или 1.2 или 1.1. Делать пулы потоков было весело, пока не возникли тупики. Теперь я могу использовать пулы библиотеки Java, достаточно хорошо для меня.

У ExecutorServices есть интересная «особенность». Нужно закрыть их, иначе служба не исчезнет. При первом использовании я вызвал утечку памяти и выключил JVM. Я обнаружил ошибку во время собственного тестирования, поэтому клиенту никогда не приходилось видеть мой опыт обучения. IExecutorService имеет другую складку. Служба не исчезнет, ​​пока не завершатся все потоки. Это вызвало много нечистых отключений. Вы были предупреждены!

IExecutorServices может делиться потоками несколькими различными способами. Вот они подробно:

Любой экземпляр

Это когда кто-то вызывает только submit(Callable call). Это больше, чем просто случайная установка потока в кластер. Он выполняет некоторую балансировку нагрузки с этим потоком, чтобы экземпляр не был забит потоками.

Для конкретного члена

Это делается с помощью submit(Callable call, Member member) . Это отправляет поток конкретному члену кластера. Нет балансировки нагрузки здесь; просто отправка члену. Будьте осторожны, можно легко перегружать элемент и действительно тормозить любую выполняемую обработку. Я мог бы видеть это как способ создания собственного балансировщика нагрузки.

В коллекцию членов

Да, можно отправить тему нескольким участникам. Когда я делал свой пример кодирования, все участники ведут себя так, как будто они получили свою собственную ветку и не разделяют ее. Если кто-то реализует Callable <T> в качестве своей реализации потока, метод возвращает Map of Futures, используя члены в качестве ключа. Если кто-то использует Runnable, он ничего не возвращает.

Члену с правильным ключом

Записи в IMap могут находиться в любом месте кластера. Если необходимо выполнить обработку этой записи, локальный поток должен будет подтянуть запись по сети. Это может быть проблемой, если запись очень большая. Лучшим способом было бы перенести, как мы надеемся, меньшую ветку на запись. Для этого кластер должен знать, куда его отправлять. Следовательно, вызов submit(Callable call, Object key) .

Для всех участников

Это работает так же, как отправка в коллекцию участников, но это все из них, как в каждом элементе в кластере. Это может стать «забавным», если в кластере много участников. Я думаю, что я слышал до 1000 членов в одном кластере. Убедитесь, что это то, что вы хотите, прежде чем это называется.

Использование ExecutionCallback

Это в основном способ отправить некоторые потоки и позволить результатам возвращаться асинхронно. Каждый использует ExecutionCallback, если один поток представлен. Один использует MultiExecutionCallback, если задействовано более одного участника.

Пример кода

Прежде чем начать, позвольте мне сказать, что у меня нет примера для каждого метода в IExecutorService. У меня есть пример для каждого обсуждаемого типа, однако. Еще одна вещь о коде примера. В учебных целях в предыдущих статьях я сделал несколько кодировок копирования и вставки, чтобы каждый пример мог стоять сам по себе, и каждый мог получить контекст того, что и куда. Я сделал это совсем немного в третьей части . Если кто-то этого не заметил, посмотри еще раз.

На этот раз я не сделал этого, потому что было бы скопировано много кода, и результаты были бы довольно уродливыми. Я использовал Enum и думаю, что результаты были очень хорошими. Я думал, что Enum был хорошим выбором из-за ограниченного числа примеров и позволил мне иметь возможность показывать код кусками, которые понятны, если сначала была показана структура.

С этим объяснением, давайте двигаться дальше!

Фреймворк

Это основные биты. Он состоит из основного класса и класса потока. Обратите внимание, как основной класс показывает, как поток может быть вызван.

Основной

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package hazelcastservice;
 
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
/**
 *
 * @author Daryl
 */
public class Main {
    private static final Logger logger = LoggerFactory.getLogger(Main.class);
    public static final String SERVICE_NAME = "spinnerella";
    public static final int NUM_INSTANCES = 5;
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        System.setProperty("hazelcast.logging.type", "slf4j");
        List<HazelcastInstance> instances = new ArrayList<>(NUM_INSTANCES);
        for(int i = 0; i < NUM_INSTANCES; i++) {
            instances.add(Hazelcast.newHazelcastInstance());
            logger.info("instance {} up", i);
        }
 
        IExecutorService spinner = instances.get(0).getExecutorService(SERVICE_NAME);
        try {
            HazelcastIExecutorServiceExamples.TO_SOME_MEMBER.example(instances, spinner);
            HazelcastIExecutorServiceExamples.TO_PARTICULAR_MEMBER.example(instances, spinner);
            HazelcastIExecutorServiceExamples.ON_THE_KEY_OWNER.example(instances, spinner);
            HazelcastIExecutorServiceExamples.ON_A_SET_OF_MEMBERS.example(instances, spinner);
            HazelcastIExecutorServiceExamples.ON_ALL_MEMBERS.example(instances, spinner);
            HazelcastIExecutorServiceExamples.CALLBACK.example(instances, spinner);
            HazelcastIExecutorServiceExamples.MULTIPLE_MEMBERS_WITH_CALLBACK.example(instances, spinner);
             
            //Lets setup a loop to make sure they are all done (Especially the callback ones)
            for(HazelcastIExecutorServiceExamples example: HazelcastIExecutorServiceExamples.values()) {
                while(!example.isDone()) {
                    Thread.sleep(1000);
                }
            }
        } catch(ExecutionException ee) {
            logger.warn("Can't finish the job", ee);
        } catch(InterruptedException ie) {
            logger.warn("Everybody out of the pool", ie);
        } finally {
            // time to clean up my toys
            boolean allClear = false;
             
            while(!allClear) {
                try {
                    Thread.sleep(1000);
                    Hazelcast.shutdownAll();
                    allClear = true;
                } catch(InterruptedException ie) {
                    //got interrupted. try again
                } catch(RejectedExecutionException ree) {
                    logger.debug("caught a RejectedExecutionException");
                    allClear = false;
                }
            }
             
            logger.info("All done");
        }
    }
}

Нить

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package hazelcastservice;
 
import java.io.Serializable;
import java.util.Random;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
/**
 * This class was inspired by the song "I Like to Move it" from the movie
 * Madagascar by Dreamworks.  I offer NO apologies for using it. 
 *
 * To those software developers who like consistent results, I used java.util.Random to
 * make it loop inconsistently each time call is called. 
 *
 * Sometimes you need to make your own entertainment.
 * @author Daryl
 */
public class MoveItMoveIt implements Callable<Integer>, Serializable {
    private static final Logger logger = LoggerFactory.getLogger(MoveItMoveIt.class);
    private static final int UPPER_BOUND = 15;
         
    @Override
    public Integer call() throws Exception {
        Random random = new Random();
        int howMany = random.nextInt(UPPER_BOUND);
//        int howMany = 2;
        for(int i = 0; i < howMany; i++) {
            logger.info("I like to Move it Move it!");
        }
        logger.info("Move it!");
        return howMany;
    }
}

Особенности

Здесь я показываю различные типы вызовов, которые обсуждались. Помните, что это куски класса Enum. Выполнение — это защищенная переменная, и необходимо реализовать public void example(List<HazelcastInstance> instances, IExecutorService spinner) .

Любой экземпляр

01
02
03
04
05
06
07
08
09
10
TO_SOME_MEMBER() {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
                throws ExecutionException, InterruptedException {
            logger.info("Submit to some member.");
            Future<Integer> howMany = spinner.submit(new MoveItMoveIt());
            logger.info("It moved it {} times", howMany.get());
            done = true;
        }
    }

Для конкретного члена

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
TO_PARTICULAR_MEMBER {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
                throws ExecutionException, InterruptedException {
            logger.info("Submit to a particular member.");
            Member member = getRandomMember(instances);
            logger.debug("member is {}", member);
            Future<Integer> howMany = spinner.submitToMember(new MoveItMoveIt(), member);
            logger.info("It moved it {} times.", howMany.get());
            done = true;
        }
         
        private Member getRandomMember(List<HazelcastInstance> instances) {
            Set<Member> members = instances.get(0).getCluster().getMembers();
            int i = 0;
            int max = new Random().nextInt(instances.size());
            Iterator<Member> iterator = members.iterator();
            Member member = iterator.next();
            while(iterator.hasNext() && (i < max)) {
                member = iterator.next();
                i++;
            }
            return member;
        }
    }

В коллекцию членов

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ON_A_SET_OF_MEMBERS {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
            throws ExecutionException, InterruptedException {
            logger.info("Send to some of the members");
            Set<Member> randomMembers = getRandomMembers(instances);
            Map<Member, Future<Integer>> results =
                    spinner.submitToMembers(new MoveItMoveIt(), randomMembers);
            for(Future<Integer> howMany: results.values()) {
                logger.info("It moved {} times", howMany.get());
            }
            done = true;
        }
         
        private Set<Member> getRandomMembers(List<HazelcastInstance> instances) {
            int max = new Random().nextInt(instances.size());
            Set<Member> newSet = new HashSet<>(instances.size());
            int k = 0;
            Iterator<Member> i = instances.get(0).getCluster().getMembers().iterator();
            while(i.hasNext() && k < max) {
                newSet.add(i.next());
                k++;
            }
            return newSet;
        }
    }

Члену с правильным ключом

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
ON_THE_KEY_OWNER {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
            throws ExecutionException, InterruptedException {
            logger.info("Send to the one owning the key");
            HazelcastInstance randomInstance = getRandomInstance(instances);
            IMap<Long, Boolean> map = randomInstance.getMap("default");
            Long one = 1L;
            map.put(one, Boolean.TRUE);
             
            Future<Integer> howMany = spinner.submitToKeyOwner(new MoveItMoveIt(), one);
            logger.info("It moved it {} times.", howMany.get());
            done = true;
        }
         
        private HazelcastInstance getRandomInstance(List<HazelcastInstance> instances) {
            return instances.get(new Random().nextInt(instances.size()));
        }
 
    }

Для всех участников

01
02
03
04
05
06
07
08
09
10
11
12
13
ON_ALL_MEMBERS {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
            throws ExecutionException, InterruptedException {
            logger.info("Send to all members");
            Map<Member, Future<Integer>> results =
                    spinner.submitToAllMembers(new MoveItMoveIt());
            for(Future<Integer> howMany: results.values()) {
                logger.info("It moved {} times", howMany.get());
            }
            done = true;
        }
    }

Использование ExecutionCallback

Этот пример кода содержит два фрагмента кода для отображения одного обратного вызова и нескольких обратных вызовов.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
CALLBACK {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
            throws ExecutionException, InterruptedException {
            logger.info("example with a callback");
            spinner.submit(new MoveItMoveIt(), new ExecutionCallback<Integer>() {
                @Override
                public void onResponse(Integer response) {
                    logger.info("It moved {} times", response);
                    done = true;
                }
 
                @Override
                public void onFailure(Throwable thrwbl) {
                    logger.error("trouble in the callback", thrwbl);
                    done = true;
                }
            });
        }       
    },
    MULTIPLE_MEMBERS_WITH_CALLBACK {
        @Override
        public void example(List<HazelcastInstance> instances, IExecutorService spinner)
            throws ExecutionException, InterruptedException {
            logger.info("running on multiple members with callback");
            spinner.submitToAllMembers(new MoveItMoveIt(), new MultiExecutionCallback() {
 
                @Override
                public void onResponse(Member member, Object o) {
                    logger.info("member finished with {} moves", o);
                }
 
                @Override
                public void onComplete(Map<Member, Object> map) {
                    logger.info("All members completed");
                    for(Object value: map.values()) {
                        logger.info("It moved {} times", value);
                    }
                    done = true;
                }
            });
        }

Вывод

Было хорошо опубликовать мой собственный код / ​​идеи снова в моем блоге. Я быстро взглянул на возможности IExecutorService от Hazelcast. Мой пример кода следовал принципу СУХОЙ. Код в полном объеме можно найти здесь .

использованная литература

Как всегда с моими руководствами Hazelcast, моя информация взята из документации Hazelcast, которую можно найти здесь .