Статьи

Использование java.util.concurrent.BlockingQueue как rx.Observable

Классический шаблон производитель-потребитель относительно прост в Java, поскольку у нас есть java.util.concurrent.BlockingQueue. Чтобы избежать занятого ожидания и подверженной ошибкам ручной блокировки, мы просто используем put () и take (). Они оба блокируют, если очередь заполнена или пуста соответственно. Все, что нам нужно, это куча потоков, которые ссылаются на одну и ту же очередь: одни производят, а другие потребляют. И, конечно, очередь должна иметь ограниченную емкость, иначе у нас скоро не хватит памяти, если производители превзойдут потребителей. Грег Янг не мог подчеркнуть это правило во время Devoxx Poland:


Никогда, никогда не создавайте неограниченную очередь

Производитель-потребитель, использующийBlockingQueue

Вот самый простой пример. Сначала нам нужен производитель, который помещает объекты в общую очередь:

import lombok.Value;
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
@Value
class Producer implements Runnable {
 
  private final BlockingQueue<User> queue;
 
  @Override
  public void run() {
    try {
      while (!Thread.currentThread().isInterrupted()) {
        final User user = new User("User " + System.currentTimeMillis());
        log.info("Producing {}", user);
        queue.put(user);
        TimeUnit.SECONDS.sleep(1);
      }
    } catch (Exception e) {
      log.error("Interrupted", e);
    }
  }
}

Producer просто публикует экземпляр  User класса (каким бы он ни был) в данную очередь каждую секунду. Очевидно, что в реальной жизни помещение  User в очередь будет результатом каких-то действий в системе, таких как вход пользователя в систему. Аналогичным образом потребитель берет новые элементы из очереди и обрабатывает их:

@Slf4j
@Value
class Consumer implements Runnable {
 
  private final BlockingQueue<User> queue;
 
  @Override
  public void run() {
    try {
      while (!Thread.currentThread().isInterrupted()) {
        final User user = queue.take();
        log.info("Consuming: {}", user);
      }
    } catch (Exception e) {
      log.error("Interrupted", e);
    }
  }
}

Опять же, в реальной жизни обработка будет означать хранение в базе данных или запуск некоторого обнаружения мошенничества для пользователя. Мы используем очередь, чтобы отделить поток обработки от потребляющего потока, например, чтобы уменьшить задержку. Чтобы запустить простой тест, давайте раскрутим несколько потоков производителей и потребителей:

BlockingQueue<User> queue = new ArrayBlockingQueue<>(1_000);
final List<Runnable> runnables = Arrays.asList(
    new Producer(queue),
    new Producer(queue),
    new Consumer(queue),
    new Consumer(queue),
    new Consumer(queue)
);
 
final List<Thread> threads = runnables
    .stream()
    .map(runnable -> new Thread(runnable, threadName(runnable)))
    .peek(Thread::start)
    .collect(toList());
 
TimeUnit.SECONDS.sleep(5);
threads.forEach(Thread::interrupt);
 
//...
 
private static String threadName(Runnable runnable) {
  return runnable.getClass().getSimpleName() + "-" + System.identityHashCode(runnable);
}

У нас есть 2 производителя и 3 потребителя, кажется, все работает. В реальной жизни вы, вероятно, имели бы некоторые неявные потоки производителей, такие как потоки обработки HTTP-запросов. На стороне потребителя вы, скорее всего, использовали бы пул потоков. Этот шаблон работает хорошо, но особенно потребляющая сторона довольно низкого уровня.

Вводя ObservableQueue<T>

Цель этой статьи — представить абстракцию, которая ведет себя как очередь со стороны производителя, но как Observable от RxJava на стороне потребителя. Другими словами, мы можем рассматривать объекты, добавленные в очередь, как поток, который мы можем отображать, фильтровать, создавать и т. Д. На стороне клиента. Интересно, что это уже не очередь под капотом. ObservableQueue <T> просто перенаправляет все новые объекты прямо подписчикам и не буферизует события в случае, если никто не слушает (« горячая » наблюдаемая).

ObservableQueue <T> — это не сама по себе очередь, а просто мост между одним API и другим. Это похоже на java.util.concurrent.SynchronousQueue, но если никто не заинтересован в потреблении, объект просто отбрасывается.

Вот первая экспериментальная реализация. Это просто игрушечный код, не считайте его готовым к производству. Также мы значительно упростим это позже:

public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {
 
  private final Set<Subscriber<? super T>> subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());
  private final Observable<T> observable = Observable.create(subscriber -> {
    subscriber.add(new Subscription() {
      @Override
      public void unsubscribe() {
        subscribers.remove(subscriber);
      }
 
      @Override
      public boolean isUnsubscribed() {
        return false;
      }
    });
    subscribers.add(subscriber);
  });
 
  public Observable<T> observe() {
    return observable;
  }
 
  @Override
  public boolean add(T t) {
    return offer(t);
  }
 
  @Override
  public boolean offer(T t) {
    subscribers.forEach(subscriber -> subscriber.onNext(t));
    return true;
  }
 
  @Override
  public T remove() {
    return noSuchElement();
  }
 
  @Override
  public T poll() {
    return null;
  }
 
  @Override
  public T element() {
    return noSuchElement();
  }
 
  private T noSuchElement() {
    throw new NoSuchElementException();
  }
 
  @Override
  public T peek() {
    return null;
  }
 
  @Override
  public void put(T t) throws InterruptedException {
    offer(t);
  }
 
  @Override
  public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
    return offer(t);
  }
 
  @Override
  public T take() throws InterruptedException {
    throw new UnsupportedOperationException("Use observe() instead");
  }
 
  @Override
  public T poll(long timeout, TimeUnit unit) throws InterruptedException {
    return null;
  }
 
  @Override
  public int remainingCapacity() {
    return 0;
  }
 
  @Override
  public boolean remove(Object o) {
    return false;
  }
 
  @Override
  public boolean containsAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean addAll(Collection<? extends T> c) {
    c.forEach(this::offer);
    return true;
  }
 
  @Override
  public boolean removeAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean retainAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public void clear() {
  }
 
  @Override
  public int size() {
    return 0;
  }
 
  @Override
  public boolean isEmpty() {
    return true;
  }
 
  @Override
  public boolean contains(Object o) {
    return false;
  }
 
  @Override
  public Iterator<T> iterator() {
    return Collections.emptyIterator();
  }
 
  @Override
  public Object[] toArray() {
    return new Object[0];
  }
 
  @Override
  public <T> T[] toArray(T[] a) {
    return a;
  }
 
  @Override
  public int drainTo(Collection<? super T> c) {
    return 0;
  }
 
  @Override
  public int drainTo(Collection<? super T> c, int maxElements) {
    return 0;
  }
 
  @Override
  public void close() throws IOException {
    subscribers.forEach(rx.Observer::onCompleted);
  }
}

Есть несколько интересных фактов об этом:

  1. Мы должны отслеживать всех подписчиков, то есть потребителей, которые хотят получать новые товары. Если один из подписчиков больше не заинтересован, мы должны удалить такого подписчика, иначе произойдет утечка памяти (продолжайте читать!)
  2. Эта очередь ведет себя так, как будто она всегда была пустой. Он никогда не содержит никаких элементов — когда вы помещаете что-то в эту очередь, он автоматически передается подписчикам и забывается
  3. Технически эта очередь не ограничена (!), То есть вы можете поместить столько элементов, сколько захотите. Однако, поскольку элементы передаются всем подписчикам (если они есть) и немедленно отбрасываются, эта очередь фактически всегда пуста (см. Выше).
  4. Тем не менее, возможно, что производитель генерирует слишком много событий, и потребители не могут идти в ногу с этим — RxJava теперь имеет поддержку обратного давления, не описанную в этой статье.

Производитель может использовать ObservableQueue <T>, как и любой другой BlockingQueue <T>, при условии, что я правильно реализовал контракт очереди. Однако потребитель выглядит намного легче и умнее:

final ObservableQueue<User> users = new ObservableQueue<>();
final Observable<User> observable = users.observe();
 
users.offer(new User("A"));
observable.subscribe(user -> log.info("User logged in: {}", user));
users.offer(new User("B"));
users.offer(new User("C"));

Код выше печатает  "B" и  "C" только. "A" теряется  в дизайне,  так как  ObservableQueueотбрасывает предметы, если никто не слушает. Очевидно, что  Producer класс теперь использует  usersочередь. Все отлично работает, вы можете позвонить  users.observe() в любой момент и применить один из десятков  Observable операторов. Однако есть одно предостережение: по умолчанию RxJava не навязывает поток, поэтому потребление происходит в том же потоке, что и при создании! Мы потеряли самую важную особенность модели производитель-потребитель — разделение потоков. К счастью, в RxJava все декларативно, а также планирование потоков:

users
    .observe()
    .observeOn(Schedulers.computation())
    .forEach(user ->
            log.info("User logged in: {}", user)
    );

Теперь давайте посмотрим на реальную силу RxJava. Представьте, что вы хотите посчитать, сколько пользователей регистрируется в секунду, где каждый вход помещается как событие в очередь:

users
  .observe()
  .map(User::getName)
  .filter(name -> !name.isEmpty())
  .window(1, TimeUnit.SECONDS)
  .flatMap(Observable::count)
  .doOnCompleted(() -> log.info("System shuts down"))
  .forEach(c -> log.info("Logins in last second: {}", c));

Производительность также приемлема, такая очередь может принимать около 3 миллионов объектов в секунду на моем ноутбуке с одним подписчиком. Рассматривайте этот класс как переходник от устаревших систем, использующих очереди к современному реактивному миру. Но ждать! Использовать  ObservableQueue<T>легко, но реализация с  subscribers синхронизированным набором кажется слишком низкоуровневой. К счастью, есть  Subject<T, T>Subject это «другая сторона»  Observable — вы можете выдвигать события,  Subject но они все еще реализуются  Observable, поэтому вы можете легко создавать произвольные  Observable. Посмотрите, как красиво  ObservableQueue выглядит одна из Subject реализаций:

public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {
 
  private final Subject<T, T> subject = PublishSubject.create();
 
  public Observable<T> observe() {
    return subject;
  }
 
  @Override
  public boolean add(T t) {
    return offer(t);
  }
 
  @Override
  public boolean offer(T t) {
    subject.onNext(t);
    return true;
  }
 
  @Override
  public void close() throws IOException {
    subject.onCompleted();
  }
  @Override
  public T remove() {
    return noSuchElement();
  }
 
  @Override
  public T poll() {
    return null;
  }
 
  @Override
  public T element() {
    return noSuchElement();
  }
 
  private T noSuchElement() {
    throw new NoSuchElementException();
  }
 
  @Override
  public T peek() {
    return null;
  }
 
  @Override
  public void put(T t) throws InterruptedException {
    offer(t);
  }
 
  @Override
  public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
    return offer(t);
  }
 
  @Override
  public T take() throws InterruptedException {
    throw new UnsupportedOperationException("Use observe() instead");
  }
 
  @Override
  public T poll(long timeout, TimeUnit unit) throws InterruptedException {
    return null;
  }
 
  @Override
  public int remainingCapacity() {
    return 0;
  }
 
  @Override
  public boolean remove(Object o) {
    return false;
  }
 
  @Override
  public boolean containsAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean addAll(Collection<? extends T> c) {
    c.forEach(this::offer);
    return true;
  }
 
  @Override
  public boolean removeAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public boolean retainAll(Collection<?> c) {
    return false;
  }
 
  @Override
  public void clear() {
  }
 
  @Override
  public int size() {
    return 0;
  }
 
  @Override
  public boolean isEmpty() {
    return true;
  }
 
  @Override
  public boolean contains(Object o) {
    return false;
  }
 
  @Override
  public Iterator<T> iterator() {
    return Collections.emptyIterator();
  }
 
  @Override
  public Object[] toArray() {
    return new Object[0];
  }
 
  @Override
  public <T> T[] toArray(T[] a) {
    return a;
  }
 
  @Override
  public int drainTo(Collection<? super T> c) {
    return 0;
  }
 
  @Override
  public int drainTo(Collection<? super T> c, int maxElements) {
    return 0;
  }
 
}

Реализация выше намного чище, и нам вообще не нужно беспокоиться о синхронизации потоков.