Однако есть некоторые объекты, которые, безусловно, дорого обходятся при создании. Такие объекты, как потоки, объекты подключения к базе данных и т. Д., Не являются легковесными объектами и их создание немного дороже. В любом приложении мы требуем использования нескольких объектов вышеупомянутого вида. Поэтому было бы замечательно, если бы был очень простой способ создать и поддерживать пул объектов такого типа, чтобы объекты можно было динамически использовать и использовать повторно, не заботясь о клиентском коде о жизненном цикле объектов.
Прежде чем писать код для пула объектов, давайте сначала определим основные требования, которым должен отвечать любой пул объектов.
- Пул должен позволять клиентам использовать объект, если таковой имеется.
- Он должен повторно использовать объекты, как только они возвращаются в пул клиентом.
- При необходимости он должен иметь возможность создавать больше объектов для удовлетворения растущих потребностей клиента.
- Он должен обеспечивать надлежащий механизм выключения, чтобы при выключении не происходило утечек памяти.
Излишне говорить, что вышеперечисленные пункты послужат основой интерфейса, который мы представим нашим клиентам.
Таким образом, наше объявление интерфейса будет следующим:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
package com.test.pool; /** * Represents a cached pool of objects. * * @author Swaranga * * @param < T > the type of object to pool. */ public interface Pool< T > { /** * Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation. * * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available. * In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown * when the thread waits for an object to become available. * * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not. * If any object is available the call returns it * else the call returns < code >null< /code >. * * The validity of the objects are determined using the * {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >. * * @return T one of the pooled objects. */ T get(); /** * Releases the object and puts it back to the pool. * * The mechanism of putting the object back to the pool is * generally asynchronous, * however future implementations might differ. * * @param t the object to return to the pool */ void release(T t); /** * Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources. * Releasing resources are done * via the < code >invalidate()< /code > * method of the {@link Validator} interface. */ void shutdown(); } |
Приведенный выше интерфейс специально сделан очень простым и универсальным для поддержки любых типов объектов. Он предоставляет методы для получения / возврата объекта из / в пул. Он также предоставляет механизм отключения для удаления объектов.
Сейчас мы пытаемся создать реализацию вышеупомянутого интерфейса. Но перед этим важно отметить, что идеальный метод release () сначала попытается проверить, можно ли повторно использовать объект, возвращаемый клиентом. Если да, то он вернет его в пул, иначе объект должен быть отброшен. Мы хотим, чтобы каждая реализация интерфейса пула следовала этому правилу. Поэтому перед созданием конкретной реализации мы создаем абстрактную реализацию, которая накладывает это ограничение на последующие реализации. Наша абстрактная реализация будет называться удивительно AbstractPool и ее определение будет следующим:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package com.test.pool; /** * Represents an abstract pool, that defines the procedure * of returning an object to the pool. * * @author Swaranga * * @param < T > the type of pooled objects. */ abstract class AbstractPool < T > implements Pool < T > { /** * Returns the object to the pool. * The method first validates the object if it is * re-usable and then puts returns it to the pool. * * If the object validation fails, * some implementations * will try to create a new one * and put it into the pool; however * this behaviour is subject to change * from implementation to implementation * */ @Override public final void release(T t) { if (isValid(t)) { returnToPool(t); } else { handleInvalidReturn(t); } } protected abstract void handleInvalidReturn(T t); protected abstract void returnToPool(T t); protected abstract boolean isValid(T t); } |
В приведенном выше классе мы сделали обязательным, чтобы пулы объектов проверяли объект, прежде чем возвращать его в пул. Чтобы настроить поведение своих пулов, реализации могут свободно выбирать способ реализации трех абстрактных методов. Они решат, используя свою собственную логику, как проверить, является ли объект действительным для повторного использования [метод validate ()], что делать, если объект, возвращаемый клиентом, недопустим [метод handleInvalidReturn ()], и фактическую логику вернуть действительный объект в пул [метод returnToPool ()].
Теперь, имея вышеупомянутый набор классов, мы почти готовы к конкретной реализации. Но подвох состоит в том, что, поскольку вышеупомянутые классы предназначены для поддержки общих пулов объектов, следовательно, общая реализация вышеупомянутых классов не будет знать, как проверять объект [так как объекты будут общими :-)]. Следовательно, нам нужно что-то еще, что поможет нам в этом.
Что нам действительно нужно, так это обычный способ проверки объекта, чтобы конкретным реализациям пула не пришлось беспокоиться о типе проверяемых объектов. Поэтому мы представляем новый интерфейс Validator, который определяет методы для проверки объекта. Наше определение интерфейса Validator будет следующим:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package com.test.pool; /** * Represents the functionality to * validate an object of the pool * and to subsequently perform cleanup activities. * * @author Swaranga * * @param < T > the type of objects to validate and cleanup. */ public static interface Validator < T > { /** * Checks whether the object is valid. * * @param t the object to check. * * @return <code>true</code> * if the object is valid else <code>false</code>. */ public boolean isValid(T t); /** * Performs any cleanup activities * before discarding the object. * For example before discarding * database connection objects, * the pool will want to close the connections. * This is done via the * <code>invalidate()</code> method. * * @param t the object to cleanup */ public void invalidate(T t); } |
Приведенный выше интерфейс определяет методы для проверки допустимости объекта, а также метод для аннулирования и объекта. Метод invalidate следует использовать, когда мы хотим отбросить объект и очистить всю память, используемую этим экземпляром. Обратите внимание, что этот интерфейс сам по себе не имеет большого значения и имеет смысл только при использовании в контексте пула объектов. Таким образом, мы определяем этот интерфейс внутри интерфейса пула верхнего уровня. Это аналогично интерфейсам Map и Map.Entry в библиотеке коллекций Java. Следовательно, наш интерфейс пула становится следующим:
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
|
package com.test.pool; /** * Represents a cached pool of objects. * * @author Swaranga * * @param < T > the type of object to pool. */ public interface Pool< T > { /** * Returns an instance from the pool. * The call may be a blocking one or a non-blocking one * and that is determined by the internal implementation. * * If the call is a blocking call, * the call returns immediately with a valid object * if available, else the thread is made to wait * until an object becomes available. * In case of a blocking call, * it is advised that clients react * to {@link InterruptedException} which might be thrown * when the thread waits for an object to become available. * * If the call is a non-blocking one, * the call returns immediately irrespective of * whether an object is available or not. * If any object is available the call returns it * else the call returns < code >null< /code >. * * The validity of the objects are determined using the * {@link Validator} interface, such that * an object < code >o< /code > is valid if * < code > Validator.isValid(o) == true < /code >. * * @return T one of the pooled objects. */ T get(); /** * Releases the object and puts it back to the pool. * * The mechanism of putting the object back to the pool is * generally asynchronous, * however future implementations might differ. * * @param t the object to return to the pool */ void release(T t); /** * Shuts down the pool. In essence this call will not * accept any more requests * and will release all resources. * Releasing resources are done * via the < code >invalidate()< /code > * method of the {@link Validator} interface. */ void shutdown(); /** * Represents the functionality to * validate an object of the pool * and to subsequently perform cleanup activities. * * @author Swaranga * * @param < T > the type of objects to validate and cleanup. */ public static interface Validator < T > { /** * Checks whether the object is valid. * * @param t the object to check. * * @return <code>true</code> * if the object is valid else <code>false</code>. */ public boolean isValid(T t); /** * Performs any cleanup activities * before discarding the object. * For example before discarding * database connection objects, * the pool will want to close the connections. * This is done via the * <code>invalidate()</code> method. * * @param t the object to cleanup */ public void invalidate(T t); } } |
Мы почти готовы к конкретной реализации. Но перед этим нам нужно одно заключительное оружие, которое на самом деле является самым важным оружием в пуле объектов. Это называется «возможность создавать новые объекты». Поскольку наши пулы объектов будут общими, они должны знать, как создавать новые объекты для заполнения его пула. Эта функциональность также не должна зависеть от типа пула объектов и должна быть обычным способом создания новых объектов. Способом сделать это будет интерфейс, называемый ObjectFactory, который определяет только один метод — «как создать новый объект». Наш интерфейс ObjectFactory выглядит следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
package com.test.pool; /** * Represents the mechanism to create * new objects to be used in an object pool. * * @author Swaranga * * @param < T > the type of object to create. */ public interface ObjectFactory < T > { /** * Returns a new instance of an object of type T. * * @return T an new instance of the object of type T */ public abstract T createNew(); } |
Мы наконец-то закончили с нашими вспомогательными классами и теперь создадим конкретную реализацию интерфейса Pool. Поскольку нам нужен пул, который можно использовать в параллельных приложениях, мы создадим пул блокировок, который блокирует клиента, если в пуле нет доступных объектов. Механизм блокировки будет блокироваться бесконечно, пока объекты не станут доступными. Реализация такого рода приводит к появлению другого метода, который будет блокировать только в течение заданного периода времени ожидания, если какой-либо объект станет доступным до истечения времени ожидания, что объект будет возвращен, в противном случае по истечении времени ожидания вместо ожидания возвращается нулевой объект , Эта реализация аналогична реализации LinkedBlockingQueue Java Concurrency API, и, таким образом, перед реализацией реального класса мы представляем другую реализацию, BlockingPool, которая аналогична интерфейсу BlockingQueue API параллелизма Java.
Следовательно, объявление интерфейса Blockingpool выглядит следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
package com.test.pool; import java.util.concurrent.TimeUnit; /** * Represents a pool of objects that makes the * requesting threads wait if no object is available. * * @author Swaranga * * @param < T > the type of objects to pool. */ public interface BlockingPool < T > extends Pool < T > { /** * Returns an instance of type T from the pool. * * The call is a blocking call, * and client threads are made to wait * indefinitely until an object is available. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented. * * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available, * the current implementations * sets the interrupted state of the thread * to <code>true</code> and returns null. * However this is subject to change * from implementation to implementation. * * @return T an instance of the Object * of type T from the pool. */ T get(); /** * Returns an instance of type T from the pool, * waiting up to the * specified wait time if necessary * for an object to become available.. * * The call is a blocking call, * and client threads are made to wait * for time until an object is available * or until the timeout occurs. * The call implements a fairness algorithm * that ensures that a FCFS service is implemented. * * Clients are advised to react to InterruptedException. * If the thread is interrupted while waiting * for an object to become available, * the current implementations * set the interrupted state of the thread * to <code>true</code> and returns null. * However this is subject to change * from implementation to implementation. * * * @param time amount of time to wait before giving up, * in units of <tt>unit</tt> * @param unit a <tt>TimeUnit</tt> determining * how to interpret the * <tt>timeout</tt> parameter * * @return T an instance of the Object * of type T from the pool. * * @throws InterruptedException * if interrupted while waiting */ T get( long time, TimeUnit unit) throws InterruptedException; } |
И наша реализация BoundedBlockingPool будет выглядеть следующим образом:
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
package com.test.pool; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public final class BoundedBlockingPool < T > extends AbstractPool < T > implements BlockingPool < T > { private int size; private BlockingQueue < T > objects; private Validator < T > validator; private ObjectFactory < T > objectFactory; private ExecutorService executor = Executors.newCachedThreadPool(); private volatile boolean shutdownCalled; public BoundedBlockingPool( int size, Validator < T > validator, ObjectFactory < T > objectFactory) { super (); this .objectFactory = objectFactory; this .size = size; this .validator = validator; objects = new LinkedBlockingQueue < T >(size); initializeObjects(); shutdownCalled = false ; } public T get( long timeOut, TimeUnit unit) { if (!shutdownCalled) { T t = null ; try { t = objects.poll(timeOut, unit); return t; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return t; } throw new IllegalStateException( 'Object pool is already shutdown' ); } public T get() { if (!shutdownCalled) { T t = null ; try { t = objects.take(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } return t; } throw new IllegalStateException( 'Object pool is already shutdown' ); } public void shutdown() { shutdownCalled = true ; executor.shutdownNow(); clearResources(); } private void clearResources() { for (T t : objects) { validator.invalidate(t); } } @Override protected void returnToPool(T t) { if (validator.isValid(t)) { executor.submit( new ObjectReturner(objects, t)); } } @Override protected void handleInvalidReturn(T t) { } @Override protected boolean isValid(T t) { return validator.isValid(t); } private void initializeObjects() { for ( int i = 0 ; i < size; i++) { objects.add(objectFactory.createNew()); } } private class ObjectReturner < E > implements Callable < Void > { private BlockingQueue < E > queue; private E e; public ObjectReturner(BlockingQueue < E > queue, E e) { this .queue = queue; this .e = e; } public Void call() { while ( true ) { try { queue.put(e); break ; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } return null ; } } } |
Выше приведен базовый пул объектов, поддерживаемый внутренне LinkedBlockingQueue. Единственный интересный метод — метод returnToPool (). Поскольку внутреннее хранилище является пулом блокировок, если мы попытаемся поместить возвращенный элемент непосредственно в LinkedBlockingPool, он может заблокировать клиента, если очередь заполнена. Но мы не хотим, чтобы клиент пула объектов блокировался только для выполнения повседневных задач, таких как возврат объекта в пул. Таким образом, мы выполнили реальную задачу вставки объекта в LinkedBlockingQueue в качестве асинхронной задачи и передачи его экземпляру Executor, чтобы поток клиента мог немедленно вернуться.
Теперь мы будем использовать вышеуказанный пул объектов в нашем коде. Мы будем использовать пул объектов для объединения некоторых объектов подключения к базе данных. Следовательно, нам понадобится Validator для проверки наших объектов подключения к базе данных.
Наш JDBCConnectionValidator будет выглядеть так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.test; import java.sql.Connection; import java.sql.SQLException; import com.test.pool.Pool.Validator; public final class JDBCConnectionValidator implements Validator < Connection > { public boolean isValid(Connection con) { if (con == null ) { return false ; } try { return !con.isClosed(); } catch (SQLException se) { return false ; } } public void invalidate(Connection con) { try { con.close(); } catch (SQLException se) { } } } |
И наш JDBCObjectFactory, который позволит объектному пулу создавать новые объекты, будет выглядеть следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
package com.test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import com.test.pool.ObjectFactory; public class JDBCConnectionFactory implements ObjectFactory < Connection > { private String connectionURL; private String userName; private String password; public JDBCConnectionFactory( String driver, String connectionURL, String userName, String password) { super (); try { Class.forName(driver); } catch (ClassNotFoundException ce) { throw new IllegalArgumentException( 'Unable to find driver in classpath' , ce); } this .connectionURL = connectionURL; this .userName = userName; this .password = password; } public Connection createNew() { try { return DriverManager.getConnection( connectionURL, userName, password); } catch (SQLException se) { throw new IllegalArgumentException( 'Unable to create new connection' , se); } } } |
Теперь мы создаем пул объектов JDBC, используя вышеуказанные Validator и ObjectFactory:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
package com.test; import java.sql.Connection; import com.test.pool.Pool; import com.test.pool.PoolFactory; public class Main { public static void main(String[] args) { Pool < Connection > pool = new BoundedBlockingPool < Connection > ( 10 , new JDBCConnectionValidator(), new JDBCConnectionFactory( '' , '' , '' , '' ) ); //do whatever you like } } |
В качестве бонуса за чтение всего поста. Я предоставлю еще одну реализацию интерфейса «Пул», которая по сути является неблокирующим пулом объектов. Единственное отличие этой реализации от предыдущей состоит в том, что эта реализация не блокирует клиента, если элемент недоступен, а возвращает нуль. Здесь это идет:
001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
|
package com.test.pool; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.Semaphore; public class BoundedPool < T > extends AbstractPool < T > { private int size; private Queue < T > objects; private Validator < T > validator; private ObjectFactory < T > objectFactory; private Semaphore permits; private volatile boolean shutdownCalled; public BoundedPool( int size, Validator < T > validator, ObjectFactory < T > objectFactory) { super (); this .objectFactory = objectFactory; this .size = size; this .validator = validator; objects = new LinkedList < T >(); initializeObjects(); shutdownCalled = false ; } @Override public T get() { T t = null ; if (!shutdownCalled) { if (permits.tryAcquire()) { t = objects.poll(); } } else { throw new IllegalStateException( 'Object pool already shutdown' ); } return t; } @Override public void shutdown() { shutdownCalled = true ; clearResources(); } private void clearResources() { for (T t : objects) { validator.invalidate(t); } } @Override protected void returnToPool(T t) { boolean added = objects.add(t); if (added) { permits.release(); } } @Override protected void handleInvalidReturn(T t) { } @Override protected boolean isValid(T t) { return validator.isValid(t); } private void initializeObjects() { for ( int i = 0 ; i < size; i++) { objects.add(objectFactory.createNew()); } } } |
Учитывая, что у нас сейчас две сильные реализации, лучше позволить пользователям создавать наши пулы через фабрику со значимыми именами. Вот фабрика:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
package com.test.pool; import com.test.pool.Pool.Validator; /** * Factory and utility methods for * {@link Pool} and {@link BlockingPool} classes * defined in this package. * This class supports the following kinds of methods: * * <ul> * <li> Method that creates and returns a default non-blocking * implementation of the {@link Pool} interface. * </li> * * <li> Method that creates and returns a * default implementation of * the {@link BlockingPool} interface. * </li> * </ul> * * @author Swaranga */ public final class PoolFactory { private PoolFactory() { } /** * Creates a and returns a new object pool, * that is an implementation of the {@link BlockingPool}, * whose size is limited by * the <tt> size </tt> parameter. * * @param size the number of objects in the pool. * @param factory the factory to create new objects. * @param validator the validator to * validate the re-usability of returned objects. * * @return a blocking object pool * bounded by <tt> size </tt> */ public static < T > Pool < T > newBoundedBlockingPool( int size, ObjectFactory < T > factory, Validator < T > validator) { return new BoundedBlockingPool < T > ( size, validator, factory); } /** * Creates a and returns a new object pool, * that is an implementation of the {@link Pool} * whose size is limited * by the <tt> size </tt> parameter. * * @param size the number of objects in the pool. * @param factory the factory to create new objects. * @param validator the validator to validate * the re-usability of returned objects. * * @return an object pool bounded by <tt> size </tt> */ public static < T > Pool < T > newBoundedNonBlockingPool( int size, ObjectFactory < T > factory, Validator < T > validator) { return new BoundedPool < T >(size, validator, factory); } } |
Таким образом, наши клиенты теперь могут создавать пулы объектов в более удобочитаемой форме:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
package com.test; import java.sql.Connection; import com.test.pool.Pool; import com.test.pool.PoolFactory; public class Main { public static void main(String[] args) { Pool < Connection > pool = PoolFactory.newBoundedBlockingPool( 10 , new JDBCConnectionFactory( '' , '' , '' , '' ), new JDBCConnectionValidator()); //do whatever you like } } |
И так заканчивается наш длинный пост. Этот был давно пора. Не стесняйтесь использовать его, измените его, добавьте больше реализаций.
Приятного кодирования и не забудьте поделиться!
Ссылка: общий и параллельный объектный пул от нашего партнера JCG Сармы Сваранги в блоге Java HotSpot .