Существует множество реализаций, но я хотел бы описать использование классов параллельной среды чистого JDK: DelayedQueue и Delayed interface.
Позвольте мне начать с простого (и пустого) интерфейса, который определяет рабочий элемент. Я пропускаю детали реализации, такие как свойства и методы, так как они не важны.
|
1
2
3
4
5
|
package com.example.delayed;public interface WorkItem { // Some properties and methods here} |
Следующий класс в нашей модели будет представлять отложенный рабочий элемент и реализовывать интерфейс Delayed . Есть только несколько основных концепций, которые нужно принять во внимание: сама задержка и фактическое время, когда был представлен соответствующий рабочий элемент. Так рассчитывается срок годности. Итак, давайте сделаем это, введя класс PostponedWorkItem .
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package com.example.delayed;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;public class PostponedWorkItem implements Delayed { private final long origin; private final long delay; private final WorkItem workItem; public PostponedWorkItem( final WorkItem workItem, final long delay ) { this.origin = System.currentTimeMillis(); this.workItem = workItem; this.delay = delay; } @Override public long getDelay( TimeUnit unit ) { return unit.convert( delay - ( System.currentTimeMillis() - origin ), TimeUnit.MILLISECONDS ); } @Override public int compareTo( Delayed delayed ) { if( delayed == this ) { return 0; } if( delayed instanceof PostponedWorkItem ) { long diff = delay - ( ( PostponedWorkItem )delayed ).delay; return ( ( diff == 0 ) ? 0 : ( ( diff < 0 ) ? -1 : 1 ) ); } long d = ( getDelay( TimeUnit.MILLISECONDS ) - delayed.getDelay( TimeUnit.MILLISECONDS ) ); return ( ( d == 0 ) ? 0 : ( ( d < 0 ) ? -1 : 1 ) ); }} |
Как видите, мы создаем новый экземпляр класса и сохраняем текущее системное время в свойстве внутреннего источника . Метод getDelayed вычисляет фактическое время, оставшееся до истечения срока действия рабочего элемента. Задержка — это внешняя настройка, которая является параметром конструктора. Обязательная реализация Comparable <Delayed> требуется, поскольку Delayed расширяет этот интерфейс.
Теперь мы в основном сделали! Чтобы завершить пример, давайте удостоверимся, что один и тот же рабочий элемент не будет представлен дважды в рабочую очередь путем реализации equals и hashCode (реализация довольно тривиальна и не должна требовать каких-либо комментариев).
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public class PostponedWorkItem implements Delayed { ... @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ( ( workItem == null ) ? 0 : workItem.hashCode() ); return result; } @Override public boolean equals( Object obj ) { if( this == obj ) { return true; } if( obj == null ) { return false; } if( !( obj instanceof PostponedWorkItem ) ) { return false; } final PostponedWorkItem other = ( PostponedWorkItem )obj; if( workItem == null ) { if( other.workItem != null ) { return false; } } else if( !workItem.equals( other.workItem ) ) { return false; } return true; }} |
Последний шаг — это введение какого-то менеджера, который будет планировать рабочие элементы и периодически опрашивать просроченные: встречайте класс WorkItemScheduler .
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package com.example.delayed;import java.util.ArrayList;import java.util.Collection;import java.util.concurrent.BlockingQueue;import java.util.concurrent.DelayQueue;public class WorkItemScheduler { private final long delay = 2000; // 2 seconds private final BlockingQueue< PostponedWorkItem > delayed = new DelayQueue< PostponedWorkItem >(); public void addWorkItem( final WorkItem workItem ) { final PostponedWorkItem postponed = new PostponedWorkItem( workItem, delay ); if( !delayed.contains( postponed )) { delayed.offer( postponed ); } } public void process() { final Collection< PostponedWorkItem > expired = new ArrayList< PostponedWorkItem >(); delayed.drainTo( expired ); for( final PostponedWorkItem postponed: expired ) { // Do some real work here with postponed.getWorkItem() } }} |
Использование BlockingQueue гарантирует безопасность потоков и высокий уровень параллелизма. Метод процесса должен запускаться периодически, чтобы опустошить очередь рабочих элементов. Это может быть аннотировано @ Scheduled аннотацией из Spring Framework или аннотацией @Schedule EJB из JEE 6 .
Наслаждайтесь!
Справка: Использование отложенных очередей на практике от нашего партнера по JCG Андрея Редько в блоге Андрея Редько .