Статьи

Использование отложенных очередей на практике

Часто бывают случаи использования, когда у вас есть какая-то работа или очередь на работу, и есть необходимость не обрабатывать каждый рабочий элемент или работу сразу, а с некоторой задержкой. Например, если пользователь нажимает кнопку, которая запускает какую-то работу, и через секунду пользователь понимает, что ошибся, и работа вообще не должна начинаться. Или, например, может быть случай использования, когда некоторые рабочие элементы в очереди должны быть удалены после некоторой задержки (истечение срока).

Существует множество реализаций, но я хотел бы описать использование классов параллельной среды чистого JDK: DelayedQueue и Delayed interface.

Позвольте мне начать с простого (и пустого) интерфейса, который определяет рабочий элемент. Я пропускаю детали реализации, такие как свойства и методы, так как они не важны.

1
2
3
4
5
package com.example.delayed;
 
public interface WorkItem {
   // Some properties and methods here
}

Следующий класс в нашей модели будет представлять отложенный рабочий элемент и реализовывать интерфейс Delayed . Есть только несколько основных концепций, которые нужно принять во внимание: сама задержка и фактическое время, когда был представлен соответствующий рабочий элемент. Так рассчитывается срок годности. Итак, давайте сделаем это, введя класс PostponedWorkItem .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.example.delayed;
 
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
 
public class PostponedWorkItem implements Delayed {
    private final long origin;
    private final long delay;
    private final WorkItem workItem;
 
    public PostponedWorkItem( final WorkItem workItem, final long delay ) {
        this.origin = System.currentTimeMillis();
        this.workItem = workItem;
        this.delay = delay;
    }
 
    @Override
    public long getDelay( TimeUnit unit ) {
        return unit.convert( delay - ( System.currentTimeMillis() - origin ),
                TimeUnit.MILLISECONDS );
    }
 
    @Override
    public int compareTo( Delayed delayed ) {
        if( delayed == this ) {
            return 0;
        }
 
        if( delayed instanceof PostponedWorkItem ) {
            long diff = delay - ( ( PostponedWorkItem )delayed ).delay;
            return ( ( diff == 0 ) ? 0 : ( ( diff < 0 ) ? -1 : 1 ) );
        }
 
        long d = ( getDelay( TimeUnit.MILLISECONDS ) - delayed.getDelay( TimeUnit.MILLISECONDS ) );
        return ( ( d == 0 ) ? 0 : ( ( d < 0 ) ? -1 : 1 ) );
    }
}

Как видите, мы создаем новый экземпляр класса и сохраняем текущее системное время в свойстве внутреннего источника . Метод getDelayed вычисляет фактическое время, оставшееся до истечения срока действия рабочего элемента. Задержка — это внешняя настройка, которая является параметром конструктора. Обязательная реализация Comparable <Delayed> требуется, поскольку Delayed расширяет этот интерфейс.

Теперь мы в основном сделали! Чтобы завершить пример, давайте удостоверимся, что один и тот же рабочий элемент не будет представлен дважды в рабочую очередь путем реализации equals и hashCode (реализация довольно тривиальна и не должна требовать каких-либо комментариев).

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class PostponedWorkItem implements Delayed {
    ...
 
    @Override
    public int hashCode() {
        final int prime = 31;
 
        int result = 1;
        result = prime * result + ( ( workItem == null ) ? 0 : workItem.hashCode() );
 
        return result;
    }
 
    @Override
    public boolean equals( Object obj ) {
        if( this == obj ) {
            return true;
        }
 
        if( obj == null ) {
            return false;
        }
 
        if( !( obj instanceof PostponedWorkItem ) ) {
            return false;
        }
 
        final PostponedWorkItem other = ( PostponedWorkItem )obj;
        if( workItem == null ) {
            if( other.workItem != null ) {
                return false;
            }
        } else if( !workItem.equals( other.workItem ) ) {
            return false;
        }
 
        return true;
    }
}

Последний шаг — это введение какого-то менеджера, который будет планировать рабочие элементы и периодически опрашивать просроченные: встречайте класс WorkItemScheduler .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.example.delayed;
 
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
 
public class WorkItemScheduler {
    private final long delay = 2000; // 2 seconds
 
    private final BlockingQueue< PostponedWorkItem > delayed =
            new DelayQueue< PostponedWorkItem >();
 
    public void addWorkItem( final WorkItem workItem ) {
        final PostponedWorkItem postponed = new PostponedWorkItem( workItem, delay );
        if( !delayed.contains( postponed )) {
            delayed.offer( postponed );
        }
    }
 
    public void process() {
        final Collection< PostponedWorkItem > expired = new ArrayList< PostponedWorkItem >();
        delayed.drainTo( expired );
 
        for( final PostponedWorkItem postponed: expired ) {
            // Do some real work here with postponed.getWorkItem()
        }
    }
}

Использование BlockingQueue гарантирует безопасность потоков и высокий уровень параллелизма. Метод процесса должен запускаться периодически, чтобы опустошить очередь рабочих элементов. Это может быть аннотировано @ Scheduled аннотацией из Spring Framework или аннотацией @Schedule EJB из JEE 6 .

Наслаждайтесь!

Справка: Использование отложенных очередей на практике от нашего партнера по JCG Андрея Редько в блоге Андрея Редько .