Статьи

Лучшие абстракции с core.async

core.async  — это 
 библиотека
Clojure, реализующая 
последовательные процессы связи , подход, позволяющий структурировать код как производители и потребители сообщений, передаваемых по каналам. CSP — это подход к параллельной работе в программе, который существует в качестве сильной альтернативы программированию, ориентированному на
обратный вызов, которое  присутствует в 
NodeJS  (например).

Я был большим поклонником библиотеки core.async Clojure с тех пор, как впервые услышал об этом, и с нетерпением  использовал ее  несколькими способами. Я вижу core.async как выполнение двух функций:  Enabler  и  рационализатор .

Активаторы позволяют достичь чего-то такого, чего не могло бы быть, по крайней мере без большого объема работы и неопределенности. Большинство технологий, которые мы используем, — это средства от самого языка до библиотек и сред. Например, после использования Clojure какое-то время вы обнаружите, что алгоритмы выражения с использованием отложенной оценки понятны, просты и желательны. Clojure обеспечивает ленивую оценку благодаря встроенной поддержке в основной язык и библиотеку. Вы не хотели бы строить все это самостоятельно.

Аналогично, core.async предоставляет вашему приложению множество желаемых функций; Разбиение ваших процессов на последовательные строительные блоки, соединенные каналами, является отличным способом построения серверов с высокой пропускной способностью и быстротой реагирования. Это основная часть того, что представляет собой core.async, и он рисует очень желательную картину работы вашего сервера, всегда поддерживая небольшой пул потоков горячим и занятым.

Но core.async также может выступать в роли упрощителя. Речь идет не только о производительности и масштабируемости, но и о том, как выразить себя более четко.

Я работал над небольшим количеством кода для распределения кодов оплаты (для проекта в  Aviso ). Коды связаны со счетом, который должен быть оплачен; код платежа действует как временный псевдоним, который можно отправить на телефон пользователя.

Коды короткие: шесть буквенных символов, таких как UWESHL. По сути, коды представляют собой число в базе 26 в диапазоне от 0 (AAAAAA) до 308 915 775 (ZZZZZZ).

Чтобы легко распределить их в кластере серверов с минимальной координацией, диапазон возможных чисел разбит на блоки по 64 КБ по 4 КБ каждый (что составляет до 28 бит, что покрывает большую часть диапазона того, что может быть выражено). в шести базовых 26 цифрах). База данных используется, чтобы позволить серверам распределять блоки, и тогда любой отдельный сервер может выделить поток из 4096 кодов в блоке, не касаясь базы данных снова.

Хорошо, вот в чем проблема; Мой код доходит до того, что ему нужен один из этих кодов оплаты, поэтому вызывается функция для предоставления следующего кода оплаты. Мой первый проход выглядел примерно так:

(defn next-payment-code
  [db state]
  (loop []
    (let [initial @state
          [[code new-state] (generate-payment-code db initial)]
      (if (compare-and-set! state initial new-state)
        code
        (recur)))))

 

https://gist.github.com/hlship/8271a2744dc4705a9a46

В его основе лежит атом, который используется для отслеживания состояния; идентификатор блока кода платежа и последний присвоенный индекс. Я не показываю generate-payment-code, но он должен сделать несколько вещей: выделить новый блок при необходимости, затем перейти к следующему индексу кода платежа, затем сгенерировать строку из шести символов… и вернуть новое значение и новое состояние (необходимо для генерации  следующего  кода платежа). И, конечно, код должен включать в себя спин-цикл на случай, если несколько потоков выделят коды в один и тот же момент.

Это некрасиво во многих отношениях; немаловажным из которых является то, что, как функция, потребитель  кодов платежей должен предоставить конкретные аргументы (база данных, атом состояния), необходимые для  генератора  кодов платежей. Это чувствует себя  законченным и хрупким.

Итак, когда я выяснял, как будет выглядеть код внутри generate-payment-code, у меня появилась эта восхитительная искра понимания: если бы я мог взглянуть на эту проблему по-другому, не как вызываемую функцию, а как канал для получения значений от, я мог бы закончить с лучшим дизайном, проще в реализации.

Канал core.async — это идеальный способ изолировать генерацию кодов оплаты от их потребления. Но это оставляет вопрос о том, сколько кодов оплаты должно быть сгенерировано. Как оказалось, ответ … все они! Давайте взглянем:

(def payment-codes-per-block 4096)

(defn create-payment-code-feed
  [db]
  (let [payment-code-feed (chan 10)]
    (go-loop []
             (let [block-id (allocate-block db)
                   block-start (* block-id payment-codes-per-block)]
               (-> (onto-chan payment-code-feed
                              (->>
                                (range block-start (+ block-start payment-codes-per-block))
                                (map sa/int->six-alpha))
                              ;; Don't close!
                              false)
                   ;; park here until all of that block has been consumed.
                   <!))
             ;; Loop forever.
             (recur))
    payment-code-feed))
https://gist.github.com/hlship/bd3938545395be7e0545

Возвращенный канал подачи имеет размер буфера 10… это означает, что блок go всегда будет пытаться иметь 10 кодов оплаты «в очереди» впереди любых потребителей.

Цикл бесконечен; он выделяет блок из базы данных, а затем генерирует все ключи для этого блока; функция on-chan читает из seq и помещает каждое последующее значение в канал. На первый взгляд кажется, что он немедленно сгенерирует все возможные коды платежей и поместит их все в канал, заполняя всю доступную память. К счастью, это не так.

Визуализируйте, что делает этот код: при запуске он выделяет идентификатор первого блока и генерирует  ленивую  последовательность всех кодов оплаты для этого блока. Сразу же канал подачи заполняется первыми 10 кодами из ленивой последовательности. Так как канал подачи теперь заполнен, цикл go внутри парков on-chan.

В конце концов, какой-то другой поток получает значение из канала; как только это происходит, процесс, созданный on-chan, не обрабатывает парки, берет еще одно значение из ленивой последовательности и помещает его в канал подачи.

Намного позже, из  канала подачи было удалено достаточное количество кодов оплаты  , чтобы 4096-й код оплаты из блока можно было вставить  в  канал подачи. В этот момент процесс create-payment-code-feed, припаркованный в строке 16, отменяет парковку… и процесс начинается снова, сразу же выделяется новый блок.

Возможно, самая запутанная часть состоит в том, что происходит, когда приложение закрывается … что происходит с двумя процессами (один непосредственно внутри create-payment-code-feed, а вложенный — в into-chan)?

Ответ таков: все просто самоочищается. Под крышками каналов core.async находятся те же самые обратные вызовы, которые так высмеиваются, когда они появляются в JavaScript. Когда процесс припаркован, на самом деле происходит добавление обратного вызова в канал… например, в строке 16 обратный вызов добавляется в канал, возвращаемый on-chan. Внутри on-chan есть обратный вызов на канале канала. Как только система выключается, внешние ссылки на канал подачи будут освобождены; в конце концов, обратные вызовы, связанные с каналом подачи и каналом on-chan, сформируют маленький островок самоссылки, и ничто не будет направлено вовне … в какой момент каналы, обратные вызовы, ленивая последовательность и все остальное будет GCed.

Весь этот подход очень привлекателен… он позволяет вам выражать операции с состоянием и асинхронные операции, используя тот же подход, который обычно применяется к операциям без сохранения состояния и синхронным операциям в любом месте вашего кода. Это дразнит отдельную логику, которая обычно комбинируется из-за кратковременного удобства или необходимости.

Как и тогда, когда я сначала изучал объектно-ориентированное программирование, а затем сначала изучал функциональное программирование, я подозреваю, что буду продолжать изучать более совершенные и изящные способы использования каналов. Это что-то с нетерпением жду.