Статьи

Упрощение торговой системы с Akka

Мои коллеги разрабатывают торговую систему, которая обрабатывает довольно большой поток входящих транзакций. Каждая транзакция охватывает один Instrument (например, облигации или акции) и имеет некоторые (сейчас) неважные свойства. Они застряли с Java (<8), поэтому давайте придерживаться этого:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
class Instrument implements Serializable, Comparable<Instrument> {
    private final String name;
 
    public Instrument(String name) {
        this.name = name;
    }
 
    //...Java boilerplate
 
}
 
public class Transaction {
    private final Instrument instrument;
 
    public Transaction(Instrument instrument) {
        this.instrument = instrument;
    }
 
    //...Java boilerplate
 
}

Instrument впоследствии будет использоваться в качестве ключа в HashMap , поэтому в будущем мы будем активно внедрять Comparable<Instrument> . Это наш домен, теперь требования:

  1. Транзакции поступают в систему и должны быть обработаны (что бы это ни значило) как можно скорее
  2. Мы свободны обрабатывать их в любом порядке
  3. … Однако транзакции для одного и того же инструмента должны обрабатываться последовательно в том же порядке, в котором они были введены.

Первоначальная реализация была простой — поместите все входящие транзакции в очередь (например, ArrayBlockingQueue ) с одним потребителем. Это удовлетворяет последнему требованию, поскольку очередь сохраняет строгий порядок FIFO для всех транзакций. Но такая архитектура предотвращает параллельную обработку несвязанных транзакций для различных инструментов, что приводит к бесполезному увеличению пропускной способности. Неудивительно, что эта реализация, несмотря на несомненную простоту, стала узким местом.

Первой идеей было как-то разделить входящие транзакции по инструментам и обрабатывать инструменты по отдельности. Мы придумали следующую структуру данных:

01
02
03
04
05
06
07
08
09
10
11
priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues =
    new ConcurrentHashMap<Instrument, Queue<Transaction>>();
 
public void accept(Transaction tx) {
    final Instrument instrument = tx.getInstrument();
    if (queues.get(instrument) == null) {
        queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());
    }
    final Queue<Transaction> queue = queues.get(instrument);
    queue.add(tx);
}

Тьфу! Но худшее еще впереди. Как убедиться, что не более одного потока обрабатывает каждую очередь за раз? В конце концов, в противном случае два потока могут выбрать элементы из одной очереди (один инструмент) и обработать их в обратном порядке, что недопустимо. Самый простой случай — иметь Thread в очереди — он не будет масштабироваться, поскольку мы ожидаем десятки тысяч различных инструментов. Таким образом, мы можем сказать N потоков и позволить каждому из них обрабатывать подмножество очередей, например, instrument.hashCode() % N сообщает нам, какой поток заботится о данной очереди. Но это все еще не идеально по трем причинам:

  1. Один поток должен «наблюдать» много очередей, вероятнее всего занятых ожиданием, перебирая их все время. Альтернативно очередь может как-то разбудить родительский поток
  2. В худшем случае все инструменты будут иметь конфликтующие хеш-коды, ориентированные только на один поток — что фактически совпадает с нашим первоначальным решением
  3. Это просто чертовски сложно! Красивый код не сложен!

Реализация этого чудовища возможна, но сложна и подвержена ошибкам. Более того, есть еще одно нефункциональное требование: инструменты приходят и уходят, а со временем их становится сотни тысяч. Через некоторое время мы должны удалить записи на нашей карте, представляющие инструменты, которые не были видны в последнее время. В противном случае мы получим утечку памяти.

Если вы можете найти более простое решение, дайте мне знать. А пока позвольте мне рассказать вам, что я предложил моим коллегам. Как вы можете догадаться, это был Акка — и это оказалось невероятно простым. Нам нужны два типа актеров: Dispatcher и Processor . Dispatcher имеет один экземпляр и получает все входящие транзакции. Его обязанность заключается в том, чтобы найти или породить актера рабочего Processor для каждого Instrument и отправить транзакцию к нему:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Dispatcher extends UntypedActor {
 
    private final Map<Instrument, ActorRef> instrumentProcessors =
        new HashMap<Instrument, ActorRef>();
 
    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Transaction) {
            dispatch(((Transaction) message));
        } else {
            unhandled(message);
        }
    }
 
    private void dispatch(Transaction tx) {
        final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());
        processor.tell(tx, self());
    }
 
    private ActorRef findOrCreateProcessorFor(Instrument instrument) {
        final ActorRef maybeActor = instrumentProcessors.get(instrument);
        if (maybeActor != null) {
            return maybeActor;
        } else {
            final ActorRef actorRef = context().actorOf(
                Props.create(Processor.class), instrument.getName());
            instrumentProcessors.put(instrument, actorRef);
            return actorRef;
        }
    }
}

Это очень просто. Поскольку наш актер Dispatcher фактически является однопоточным, синхронизация не требуется. Мы едва получаем Transaction , ищем или создаем Processor и передаем Transaction дальше. Вот как может выглядеть реализация Processor :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
public class Processor extends UntypedActor {
 
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Transaction) {
            process(((Transaction) message));
        } else {
            unhandled(message);
        }
    }
 
    private void process(Transaction tx) {
        log.info("Processing {}", tx);
    }
}

Это оно! Интересно, что наша реализация Akka практически идентична нашей первой идее с картой очередей. В конце концов, актер — это просто очередь и (логический) поток, обрабатывающий элементы в этой очереди. Разница в том, что Akka управляет ограниченным пулом потоков и делит его между сотнями тысяч участников. А поскольку каждый инструмент имеет своего отдельного (и «однопоточного») субъекта, последовательная обработка транзакций для каждого инструмента гарантирована.

Еще кое-что. Как указывалось ранее, существует огромное количество инструментов, и мы не хотим оставлять актеров для инструментов, которых давно не видели. Предположим, что если Processor не получил ни одной транзакции в течение часа, его следует остановить и собрать мусор. Если позже мы получим новую транзакцию для такого инструмента, мы всегда сможем ее воссоздать. Это довольно сложно — мы должны убедиться, что если транзакция прибудет, когда процессор решил удалить себя, мы не сможем потерять эту транзакцию. Вместо того, чтобы останавливаться, Processor сообщает своему родителю, что он простаивал слишком долго. Dispatcher отправит PoisonPill на него. Поскольку сообщения ProcessorIdle и Transaction обрабатываются последовательно, риск отправки транзакции на уже не существующий субъект отсутствует.

Каждый актер управляет своим жизненным циклом независимо, планируя время ожидания с помощью setReceiveTimeout :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Processor extends UntypedActor {
 
    @Override
    public void preStart() throws Exception {
        context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));
    }
 
    @Override
    public void onReceive(Object message) throws Exception {
        //...
        if (message instanceof ReceiveTimeout) {
            log.debug("Idle for two long, shutting down");
            context().parent().tell(ProcessorIdle.INSTANCE, self());
        } else {
            unhandled(message);
        }
    }
 
}
 
enum ProcessorIdle {
    INSTANCE
}

Понятно, что когда Processor не получал никаких сообщений в течение одного часа, он осторожно сообщает об этом своему родителю ( Dispatcher ). Но актер все еще жив и может обрабатывать транзакции, если они происходят точно через час. Что делает Dispatcher , так это убивает данный Processor и удаляет его с карты:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Dispatcher extends UntypedActor {
 
    private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();
 
    public void onReceive(Object message) throws Exception {
        //...
        if (message == ProcessorIdle.INSTANCE) {
            removeIdleProcessor(sender());
            sender().tell(PoisonPill.getInstance(), self());
        } else {
            unhandled(message);
        }
    }
 
    private void removeIdleProcessor(ActorRef idleProcessor) {
        instrumentProcessors.inverse().remove(idleProcessor);
    }
 
    private void dispatch(Transaction tx) {
        final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());
        processor.tell(tx, self());
    }
 
    //...
 
}

Было небольшое неудобство. Map<Instrument, ActorRef> раньше были Map<Instrument, ActorRef> . Этого оказалось недостаточно, поскольку нам внезапно пришлось удалить запись в этой карте по значению. Другими словами, нам нужно найти ключ ( Instrument ), который отображается на данный ActorRef ( Processor ). Есть разные способы справиться с этим (например, простаивающий Processor может отправить Instrumnt он обрабатывает), но вместо этого я использовал BiMap<K, V> из Гуавы. Это работает, потому что ActorRef Instrument и ActorRef уникальны (актер на инструмент). Имея BiMap я мог просто inverse() карту (от BiMap<Instrument, ActorRef> до BiMap<ActorRef, Instrument> и рассматривать ActorRef как ключ.

Этот пример Akka не более чем « привет, мир ». Но по сравнению с запутанным решением нам пришлось бы писать с использованием параллельных очередей, блокировок и пулов потоков, это прекрасно. Мои товарищи по команде были так взволнованы, что к концу дня решили переписать все свое заявление на Akka.

Ссылка: Упрощение торговой системы с Akka от нашего партнера JCG Томаша Нуркевича в блоге Java и соседстве .