Классический шаблон производитель-потребитель относительно прост в Java, поскольку у нас есть java.util.concurrent.BlockingQueue. Чтобы избежать занятого ожидания и подверженной ошибкам ручной блокировки, мы просто используем put () и take (). Они оба блокируют, если очередь заполнена или пуста соответственно. Все, что нам нужно, это куча потоков, которые ссылаются на одну и ту же очередь: одни производят, а другие потребляют. И, конечно, очередь должна иметь ограниченную емкость, иначе у нас скоро не хватит памяти, если производители превзойдут потребителей. Грег Янг не мог подчеркнуть это правило во время Devoxx Poland:
Никогда, никогда не создавайте неограниченную очередь
Производитель-потребитель, использующийBlockingQueue
Вот самый простой пример. Сначала нам нужен производитель, который помещает объекты в общую очередь:
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Value
class Producer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = new User("User " + System.currentTimeMillis());
log.info("Producing {}", user);
queue.put(user);
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
Producer просто публикует экземпляр User
класса (каким бы он ни был) в данную очередь каждую секунду. Очевидно, что в реальной жизни помещение User
в очередь будет результатом каких-то действий в системе, таких как вход пользователя в систему. Аналогичным образом потребитель берет новые элементы из очереди и обрабатывает их:
@Slf4j
@Value
class Consumer implements Runnable {
private final BlockingQueue<User> queue;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
final User user = queue.take();
log.info("Consuming: {}", user);
}
} catch (Exception e) {
log.error("Interrupted", e);
}
}
}
Опять же, в реальной жизни обработка будет означать хранение в базе данных или запуск некоторого обнаружения мошенничества для пользователя. Мы используем очередь, чтобы отделить поток обработки от потребляющего потока, например, чтобы уменьшить задержку. Чтобы запустить простой тест, давайте раскрутим несколько потоков производителей и потребителей:
BlockingQueue<User> queue = new ArrayBlockingQueue<>(1_000);
final List<Runnable> runnables = Arrays.asList(
new Producer(queue),
new Producer(queue),
new Consumer(queue),
new Consumer(queue),
new Consumer(queue)
);
final List<Thread> threads = runnables
.stream()
.map(runnable -> new Thread(runnable, threadName(runnable)))
.peek(Thread::start)
.collect(toList());
TimeUnit.SECONDS.sleep(5);
threads.forEach(Thread::interrupt);
//...
private static String threadName(Runnable runnable) {
return runnable.getClass().getSimpleName() + "-" + System.identityHashCode(runnable);
}
У нас есть 2 производителя и 3 потребителя, кажется, все работает. В реальной жизни вы, вероятно, имели бы некоторые неявные потоки производителей, такие как потоки обработки HTTP-запросов. На стороне потребителя вы, скорее всего, использовали бы пул потоков. Этот шаблон работает хорошо, но особенно потребляющая сторона довольно низкого уровня.
Вводя ObservableQueue<T>
Цель этой статьи — представить абстракцию, которая ведет себя как очередь со стороны производителя, но как Observable от RxJava на стороне потребителя. Другими словами, мы можем рассматривать объекты, добавленные в очередь, как поток, который мы можем отображать, фильтровать, создавать и т. Д. На стороне клиента. Интересно, что это уже не очередь под капотом. ObservableQueue <T> просто перенаправляет все новые объекты прямо подписчикам и не буферизует события в случае, если никто не слушает (« горячая » наблюдаемая).
ObservableQueue <T> — это не сама по себе очередь, а просто мост между одним API и другим. Это похоже на java.util.concurrent.SynchronousQueue, но если никто не заинтересован в потреблении, объект просто отбрасывается.
Вот первая экспериментальная реализация. Это просто игрушечный код, не считайте его готовым к производству. Также мы значительно упростим это позже:
public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {
private final Set<Subscriber<? super T>> subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Observable<T> observable = Observable.create(subscriber -> {
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
subscribers.remove(subscriber);
}
@Override
public boolean isUnsubscribed() {
return false;
}
});
subscribers.add(subscriber);
});
public Observable<T> observe() {
return observable;
}
@Override
public boolean add(T t) {
return offer(t);
}
@Override
public boolean offer(T t) {
subscribers.forEach(subscriber -> subscriber.onNext(t));
return true;
}
@Override
public T remove() {
return noSuchElement();
}
@Override
public T poll() {
return null;
}
@Override
public T element() {
return noSuchElement();
}
private T noSuchElement() {
throw new NoSuchElementException();
}
@Override
public T peek() {
return null;
}
@Override
public void put(T t) throws InterruptedException {
offer(t);
}
@Override
public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
return offer(t);
}
@Override
public T take() throws InterruptedException {
throw new UnsupportedOperationException("Use observe() instead");
}
@Override
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public int remainingCapacity() {
return 0;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean containsAll(Collection<?> c) {
return false;
}
@Override
public boolean addAll(Collection<? extends T> c) {
c.forEach(this::offer);
return true;
}
@Override
public boolean removeAll(Collection<?> c) {
return false;
}
@Override
public boolean retainAll(Collection<?> c) {
return false;
}
@Override
public void clear() {
}
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
return true;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public Iterator<T> iterator() {
return Collections.emptyIterator();
}
@Override
public Object[] toArray() {
return new Object[0];
}
@Override
public <T> T[] toArray(T[] a) {
return a;
}
@Override
public int drainTo(Collection<? super T> c) {
return 0;
}
@Override
public int drainTo(Collection<? super T> c, int maxElements) {
return 0;
}
@Override
public void close() throws IOException {
subscribers.forEach(rx.Observer::onCompleted);
}
}
Есть несколько интересных фактов об этом:
- Мы должны отслеживать всех подписчиков, то есть потребителей, которые хотят получать новые товары. Если один из подписчиков больше не заинтересован, мы должны удалить такого подписчика, иначе произойдет утечка памяти (продолжайте читать!)
- Эта очередь ведет себя так, как будто она всегда была пустой. Он никогда не содержит никаких элементов — когда вы помещаете что-то в эту очередь, он автоматически передается подписчикам и забывается
- Технически эта очередь не ограничена (!), То есть вы можете поместить столько элементов, сколько захотите. Однако, поскольку элементы передаются всем подписчикам (если они есть) и немедленно отбрасываются, эта очередь фактически всегда пуста (см. Выше).
- Тем не менее, возможно, что производитель генерирует слишком много событий, и потребители не могут идти в ногу с этим — RxJava теперь имеет поддержку обратного давления, не описанную в этой статье.
Производитель может использовать ObservableQueue <T>, как и любой другой BlockingQueue <T>, при условии, что я правильно реализовал контракт очереди. Однако потребитель выглядит намного легче и умнее:
final ObservableQueue<User> users = new ObservableQueue<>();
final Observable<User> observable = users.observe();
users.offer(new User("A"));
observable.subscribe(user -> log.info("User logged in: {}", user));
users.offer(new User("B"));
users.offer(new User("C"));
Код выше печатает "B"
и "C"
только. "A"
теряется в дизайне, так как ObservableQueue
отбрасывает предметы, если никто не слушает. Очевидно, что Producer
класс теперь использует users
очередь. Все отлично работает, вы можете позвонить users.observe()
в любой момент и применить один из десятков Observable
операторов. Однако есть одно предостережение: по умолчанию RxJava не навязывает поток, поэтому потребление происходит в том же потоке, что и при создании! Мы потеряли самую важную особенность модели производитель-потребитель — разделение потоков. К счастью, в RxJava все декларативно, а также планирование потоков:
users
.observe()
.observeOn(Schedulers.computation())
.forEach(user ->
log.info("User logged in: {}", user)
);
Теперь давайте посмотрим на реальную силу RxJava. Представьте, что вы хотите посчитать, сколько пользователей регистрируется в секунду, где каждый вход помещается как событие в очередь:
users
.observe()
.map(User::getName)
.filter(name -> !name.isEmpty())
.window(1, TimeUnit.SECONDS)
.flatMap(Observable::count)
.doOnCompleted(() -> log.info("System shuts down"))
.forEach(c -> log.info("Logins in last second: {}", c));
Производительность также приемлема, такая очередь может принимать около 3 миллионов объектов в секунду на моем ноутбуке с одним подписчиком. Рассматривайте этот класс как переходник от устаревших систем, использующих очереди к современному реактивному миру. Но ждать! Использовать ObservableQueue<T>
легко, но реализация с subscribers
синхронизированным набором кажется слишком низкоуровневой. К счастью, есть Subject<T, T>
. Subject
это «другая сторона» Observable
— вы можете выдвигать события, Subject
но они все еще реализуются Observable
, поэтому вы можете легко создавать произвольные Observable
. Посмотрите, как красиво ObservableQueue
выглядит одна из Subject
реализаций:
public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {
private final Subject<T, T> subject = PublishSubject.create();
public Observable<T> observe() {
return subject;
}
@Override
public boolean add(T t) {
return offer(t);
}
@Override
public boolean offer(T t) {
subject.onNext(t);
return true;
}
@Override
public void close() throws IOException {
subject.onCompleted();
}
@Override
public T remove() {
return noSuchElement();
}
@Override
public T poll() {
return null;
}
@Override
public T element() {
return noSuchElement();
}
private T noSuchElement() {
throw new NoSuchElementException();
}
@Override
public T peek() {
return null;
}
@Override
public void put(T t) throws InterruptedException {
offer(t);
}
@Override
public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
return offer(t);
}
@Override
public T take() throws InterruptedException {
throw new UnsupportedOperationException("Use observe() instead");
}
@Override
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public int remainingCapacity() {
return 0;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean containsAll(Collection<?> c) {
return false;
}
@Override
public boolean addAll(Collection<? extends T> c) {
c.forEach(this::offer);
return true;
}
@Override
public boolean removeAll(Collection<?> c) {
return false;
}
@Override
public boolean retainAll(Collection<?> c) {
return false;
}
@Override
public void clear() {
}
@Override
public int size() {
return 0;
}
@Override
public boolean isEmpty() {
return true;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public Iterator<T> iterator() {
return Collections.emptyIterator();
}
@Override
public Object[] toArray() {
return new Object[0];
}
@Override
public <T> T[] toArray(T[] a) {
return a;
}
@Override
public int drainTo(Collection<? super T> c) {
return 0;
}
@Override
public int drainTo(Collection<? super T> c, int maxElements) {
return 0;
}
}
Реализация выше намного чище, и нам вообще не нужно беспокоиться о синхронизации потоков.