Статьи

Простая и легкая реализация пула

Пулы объектов — это контейнеры, которые содержат определенное количество объектов. Когда объект извлекается из пула, он недоступен в пуле, пока не будет возвращен. У объектов в пуле есть жизненный цикл: создание, проверка, уничтожение и т. Д. Пул помогает лучше управлять доступными ресурсами. Есть много примеров использования. Особенно на серверах приложений существуют пулы источников данных, пулы потоков и т. Д. Пулы следует использовать в таких случаях, как:

  • Высокочастотное использование одних и тех же объектов
  • Объекты очень большие и занимают много памяти
  • Объектам нужно много времени для инициализации
  • Объекты используют массивные операции ввода-вывода (потоки, сокеты, БД и т. Д.)
  • Объекты не являются потокобезопасными

Когда я искал реализацию пула для одного из моих Java-проектов, я обнаружил, что многие люди ссылаются на пул Apache Commons . Apache Commons Pool предоставляет API для пула объектов. Есть интерфейсы ObjectPool, ObjectPoolFactory, PoolableObjectFactory и множество реализаций. Пул предоставляет методы addObject, loanObject, invalidateObject, returnObject для добавления, получения, удаления и возврата объектов назад. PoolableObjectFactory определяет поведение объектов в пуле и предоставляет различные обратные вызовы для операций пула.

Изучив детали реализации, я обнаружил, что Apache Commons Pool не является легковесной реализацией, которая является чрезмерной нагрузкой для моих целей. Кроме того, он использует ключевое слово старой Java, синхронизированное для множества методов, которые не рекомендуется использовать. Java 5 представила среду Executor для параллелизма Java (многопоточность). Фреймворк Executor здесь предпочтительнее. Я решил реализовать простой и легкий пул, который я хотел бы представить здесь. Это только один класс Java. Я думаю, что этого достаточно, если вам не нужны обратные вызовы и другие сложные вещи. Я создал проект easy-pool на GitHub .

Реализация пула основана на ConcurrentLinkedQueue из пакета java.util.concurrent. ConcurrentLinkedQueue — это потокобезопасная очередь, основанная на связанных узлах. Эта очередь упорядочивает элементы по принципу FIFO (первым пришел-первым вышел). Моя реализация универсального пула выглядит следующим образом

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
 
public abstract class ObjectPool<T>
{
    private ConcurrentLinkedQueue<T> pool;
 
    private ScheduledExecutorService executorService;
 
    /**
     * Creates the pool.
     *
     * @param minIdle minimum number of objects residing in the pool
     */
    public ObjectPool(final int minIdle) {
        // initialize pool
        initialize(minIdle);
    }
 
    /**
     * Creates the pool.
     *
     * @param minIdle            minimum number of objects residing in the pool
     * @param maxIdle            maximum number of objects residing in the pool
     * @param validationInterval time in seconds for periodical checking of minIdle / maxIdle conditions in a separate thread.
     *                           When the number of objects is less than minIdle, missing instances will be created.
     *                           When the number of objects is greater than maxIdle, too many instances will be removed.
     */
    public ObjectPool(final int minIdle, final int maxIdle, final long validationInterval) {
        // initialize pool
        initialize(minIdle);
 
        // check pool conditions in a separate thread
        executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleWithFixedDelay(new Runnable()
        {
            @Override
            public void run() {
                int size = pool.size();
                if (size < minIdle) {
                    int sizeToBeAdded = minIdle - size;
                    for (int i = 0; i < sizeToBeAdded; i++) {
                        pool.add(createObject());
                    }
                } else if (size > maxIdle) {
                    int sizeToBeRemoved = size - maxIdle;
                    for (int i = 0; i < sizeToBeRemoved; i++) {
                        pool.poll();
                    }
                }
            }
        }, validationInterval, validationInterval, TimeUnit.SECONDS);
    }
 
    /**
     * Gets the next free object from the pool. If the pool doesn't contain any objects,
     * a new object will be created and given to the caller of this method back.
     *
     * @return T borrowed object
     */
    public T borrowObject() {
        T object;
        if ((object = pool.poll()) == null) {
            object = createObject();
        }
 
        return object;
    }
 
    /**
     * Returns object back to the pool.
     *
     * @param object object to be returned
     */
    public void returnObject(T object) {
        if (object == null) {
            return;
        }
 
        this.pool.offer(object);
    }
 
    /**
     * Shutdown this pool.
     */
    public void shutdown() {
        if (executorService != null) {
            executorService.shutdown();
        }
    }
 
    /**
     * Creates a new object.
     *
     * @return T new object
     */
    protected abstract T createObject();
 
    private void initialize(final int minIdle) {
        pool = new ConcurrentLinkedQueue<T>();
 
        for (int i = 0; i < minIdle; i++) {
            pool.add(createObject());
        }
    }
}

Абстрактный класс ObjectPool предоставляет два основных метода: loanObject для получения следующего свободного объекта из пула и returnObject для возврата заимствованного объекта обратно в пул. Если пул не содержит каких-либо объектов, будет создан новый объект и возвращен вызывающей стороне метода loanObject. Создание объекта происходит в методе createObject. Любой класс, который расширяет абстрактный класс ObjectPool, должен только реализовать этот метод, и пул готов к использованию. Как вы можете видеть, я также использую ScheduledExecutorService из пакета java.util.concurrent. Для чего это хорошо? Вы можете указать минимальное и максимальное количество объектов, находящихся в пуле. ScheduledExecutorService запускает специальную задачу в отдельном потоке и отслеживает периодику в указанное время (параметр validationInterval) минимального и максимального количества объектов в пуле. Когда количество объектов меньше минимального, будут созданы отсутствующие экземпляры. Когда количество объектов превышает максимальное, будет удалено слишком много экземпляров. Это иногда полезно для баланса потребляющих память объектов в пуле и т. Д.

Давайте реализуем тестовые классы, чтобы показать использование конкретного пула. Во-первых, нам нужен класс, представляющий объекты в пуле, который имитирует длительный процесс. Этот класс, называемый ExportingProcess, нуждается в некотором времени для создания экземпляра.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ExportingProcess {
 
    private String location;
 
    private long processNo = 0;
 
    public ExportingProcess(String location, long processNo) {
        this.location = location;
        this.processNo = processNo;
 
        // doing some time expensive calls / tasks
        // ...
 
        // for-loop is just for simulation
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
        }
 
        System.out.println("Object with process no. " + processNo + " was created");
    }
 
    public String getLocation() {
        return location;
    }
 
    public long getProcessNo() {
        return processNo;
    }
}

Второй класс реализует интерфейс Runnable и моделирует некоторую задачу, выполняемую потоком. В методе run мы заимствуем экземпляр ExportingProcess и позже вернем его обратно в пул.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ExportingTask implements Runnable {
 
    private ObjectPool<ExportingProcess> pool;
 
    private int threadNo;
 
    public ExportingTask(ObjectPool<ExportingProcess> pool, int threadNo) {
        this.pool = pool;
        this.threadNo = threadNo;
    }
 
    public void run() {
        // get an object from the pool
        ExportingProcess exportingProcess = pool.borrowObject();
 
        System.out.println("Thread " + threadNo +
                ": Object with process no. " + exportingProcess.getProcessNo() + " was borrowed");
 
        // do something
        // ...
 
        // for-loop is just for simulation
        for (int i = 0; i < 100000; i++) {
        }
 
        // return ExportingProcess instance back to the pool
        pool.returnObject(exportingProcess);
 
        System.out.println("Thread " + threadNo +
                ": Object with process no. " + exportingProcess.getProcessNo() + " was returned");
    }
}

Теперь в классе JUnit TestObjectPool мы создаем пул объектов типа ExportingProcess. Это происходит с помощью нового ObjectPool <ExportingProcess> (4, 10, 5). Параметры описаны в комментариях ниже.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
 
public class TestObjectPool
{
    private ObjectPool<ExportingProcess> pool;
 
    private AtomicLong processNo = new AtomicLong(0);
 
    @Before
    public void setUp() {
        // Create a pool of objects of type ExportingProcess. Parameters:
        // 1) Minimum number of special ExportingProcess instances residing in the pool = 4
        // 2) Maximum number of special ExportingProcess instances residing in the pool = 10
        // 3) Time in seconds for periodical checking of minIdle / maxIdle conditions in a separate thread = 5.
        //    When the number of ExportingProcess instances is less than minIdle, missing instances will be created.
        //    When the number of ExportingProcess instances is greater than maxIdle, too many instances will be removed.
        //    If the validation interval is negative, no periodical checking of minIdle / maxIdle conditions
        //    in a separate thread take place. These boundaries are ignored then.
        pool = new ObjectPool<ExportingProcess>(4, 10, 5)
        {
            protected ExportingProcess createObject() {
                // create a test object which takes some time for creation
                return new ExportingProcess("/home/temp/", processNo.incrementAndGet());
            }
        };
    }
 
    @After
    public void tearDown() {
        pool.shutdown();
    }
 
    @Test
    public void testObjectPool() {
        ExecutorService executor = Executors.newFixedThreadPool(8);
 
        // execute 8 tasks in separate threads
        executor.execute(new ExportingTask(pool, 1));
        executor.execute(new ExportingTask(pool, 2));
        executor.execute(new ExportingTask(pool, 3));
        executor.execute(new ExportingTask(pool, 4));
        executor.execute(new ExportingTask(pool, 5));
        executor.execute(new ExportingTask(pool, 6));
        executor.execute(new ExportingTask(pool, 7));
        executor.execute(new ExportingTask(pool, 8));
 
        executor.shutdown();
        try {
            executor.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Тестовый вывод выглядит как

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
Object with process no. 1 was created
Object with process no. 2 was created
Object with process no. 3 was created
Object with process no. 4 was created
Thread 2: Object with process no. 2 was borrowed
Thread 1: Object with process no. 1 was borrowed
Thread 2: Object with process no. 2 was returned
Thread 3: Object with process no. 3 was borrowed
Thread 4: Object with process no. 4 was borrowed
Thread 1: Object with process no. 1 was returned
Thread 4: Object with process no. 4 was returned
Thread 8: Object with process no. 4 was borrowed
Thread 5: Object with process no. 1 was borrowed
Thread 7: Object with process no. 3 was borrowed
Thread 3: Object with process no. 3 was returned
Thread 6: Object with process no. 2 was borrowed
Thread 7: Object with process no. 3 was returned
Thread 5: Object with process no. 1 was returned
Thread 8: Object with process no. 4 was returned
Thread 6: Object with process no. 2 was returned

Как видно, первый поток, обращающийся к пулу, создает минимум объектов, находящихся в пуле. Запустив этот тестовый класс несколько раз, мы можем обнаружить, что иногда 4 объекта заимствуются один за другим, и в пуле создается новый 5. объект. Все тестовые классы доступны в GitHub .