Многие из приложений, которые я пишу в эти дни, содержат много данных — настолько, что нет никакого разумного способа непрерывно отправлять все это. Вместо этого большинство приложений, с которыми я работаю, будут иметь возможность получать снимок текущего состояния и возможность получать дельты (инкременты), которые должны быть применены к предыдущему снимку. Чтобы еще больше усложнить ситуацию, неполные данные неприемлемы, а порядок имеет значение. Этот тип среды порождает множество решений для синхронизации снимков и инкрементов. Эта статья об использовании однопоточности (через jetlang ) для синхронизации и гарантированной точности.
Давайте рассмотрим очень простой пример: у вас есть два процесса — клиент и сервер. У сервера есть список, и клиент должен отобразить этот список — полностью и по порядку. Список на клиенте также необходимо обновлять всякий раз, когда список на сервере обновляется.
Есть несколько проблем, с которыми вы можете столкнуться в многопоточной среде.
- Если вы запрашиваете моментальный снимок, а затем начинаете прослушивание инкрементальных файлов, вы можете пропустить данные, которых нет в моментальном снимке, но транслировались до того, как вы начали слушать инкрементные файлы.
- Если вы начнете слушать инкременты и одновременно запросите моментальный снимок, вы можете применить инкрементальный моментальный снимок, даже если моментальный снимок уже отражает инкрементный.
- Если вы сначала начнете слушать инкременты, вам понадобится какой-то способ отбросить инкременты, которые уже отражены в снимке.
Пришло время войти в некоторый код.
Вот простой серверный код.
;; server
(defonce server-list (atom []))
(defonce appender (atom nil))
(defonce appending-fiber (atom nil))
(defonce subscriber (atom identity))
(defn subscribe [f]
(reset! subscriber f))
(defn publish-to-client [m]
(@subscriber m))
(defn get-snapshot []
(publish-to-client {:type :snapshot :val @server-list}))
(defn append-to-list []
(when (< 9 (count @server-list))
(swap! server-list butlast))
(let [incremental (rand-nth (range 100))]
(swap! server-list conj incremental)
(publish-to-client {:type :incremental :val incremental})))
(defn server-start []
(reset! appending-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start))
(reset! appender (.scheduleAtFixedRate @appending-fiber
append-to-list 500 500
java.util.concurrent.TimeUnit/MILLISECONDS)))
(defn server-stop []
(.dispose @appender)
(.dispose @appending-fiber))
Приведенный выше код содержит список серверов, который представляет собой список, который представляет упорядоченные случайные числа, генерируемые на стороне сервера. Наша задача — отразить этот список в нашем клиенте. Запланированное задание добавления и добавление волокна сохраняются, чтобы упростить запуск и остановку добавления. Функции запуска сервера и остановки сервера предоставляются для удобства, если вы решите запустить этот пример локально.
Атом подписчика и функция подписки — это простой способ для клиента подписаться на моментальные снимки и инкрементальные копии. Функция публикации для клиента разыменовывает fn и немедленно вызывает его со снимком или инкрементно. В продвинутом приложении логика публикации и подписки, вероятно, будет включать сокет или систему обмена сообщениями — наше решение целенаправленно наивно, чтобы сосредоточиться на точке поста: синхронизации.
Функция get-snapshot публикует текущее состояние списка серверов для клиента. Функция добавления к списку удаляет элементы, поэтому легко увидеть, как меняется список серверов — без увеличения данных до неуправляемого размера в prod этого (скорее всего) не будет; однако остальная часть кода в append-to-list довольно типична для обычной практики — генерировать дельту, применять ее к локальному списку и публиковать ее клиентам.
Глядя на этот код, легко увидеть, что одно волокно добавляется в список и публикуется на клиенте, а другое волокно возвращает значение get-snapshot. Этот код может работать, но способ, которым он в настоящее время записан, не может гарантировать точность данных.
Давайте посмотрим на некоторый клиентский код.
;; client
(defonce client-list (atom nil))
(defn handle-update [{:keys [type val]}]
(if (= type :snapshot)
(reset! client-list val)
(when @client-list
(when (< 9 (count @client-list))
(swap! client-list butlast))
(swap! client-list conj val))))
(defn client-start []
(subscribe handle-update)
(get-snapshot))
(defn client-stop []
(subscribe identity))
Функция запуска клиента подписывается на обновления сервера, а затем запрашивает моментальный снимок. Функция обновления дескриптора сбрасывает список клиентов на снимке и добавляет инкремент к существующему списку. (примечание: список клиентов хранится в 10 элементах для простоты, как и сервер — я бы не ожидал, что этот тип кода будет в prod).
Ниже приведен полный снимок текущего кода.
(ns synchro.core)
;; server
(defonce server-list (atom []))
(defonce appender (atom nil))
(defonce appending-fiber (atom nil))
(defonce subscriber (atom identity))
(defn subscribe [f]
(reset! subscriber f))
(defn publish-to-client [m]
(@subscriber m))
(defn get-snapshot []
(publish-to-client {:type :snapshot :val @server-list}))
(defn append-to-list []
(when (< 9 (count @server-list))
(swap! server-list butlast))
(let [incremental (rand-nth (range 100))]
(swap! server-list conj incremental)
(publish-to-client {:type :incremental :val incremental})))
(defn server-start []
(reset! appending-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start))
(reset! appender (.scheduleAtFixedRate @appending-fiber
append-to-list 500 500
java.util.concurrent.TimeUnit/MILLISECONDS)))
(defn server-stop []
(.dispose @appender)
(.dispose @appending-fiber))
;; client
(defonce client-list (atom nil))
(defn handle-update [{:keys [type val]}]
(if (= type :snapshot)
(reset! client-list val)
(when @client-list
(when (< 9 (count @client-list))
(swap! client-list butlast))
(swap! client-list conj val))))
(defn client-start []
(subscribe handle-update)
(get-snapshot))
(defn client-stop []
(subscribe identity))
(comment
(server-start)
@server-list
(reset! server-list [])
(server-stop)
(client-start)
@client-list
(client-stop)
[@server-list @client-list (= @server-list @client-list)]
)
Код клиента и сервера такой же, как и выше, но этот пример также содержит некоторые вызовы функций в комментарии. На этом этапе вы можете вставить этот код в ваш любимый редактор, запустить клиент и сервер и проверить оба списка. Частота обновления настолько велика, что вы даже можете сравнить два списка, и весьма вероятно, что они равны.
Для многих проблем этого кода может быть достаточно; однако, как мы отмечали выше, у вас определенно есть возможность увидеть недействительное состояние. С помощью этого конкретного кода добавляемое волокно может обновить атом с добавочным X, на основном волокне get-snapshot может разыскивать снимок с включенным X (и опубликовать его), а затем добавляющее волокно также может опубликовать добавочное X. К счастью, есть простое решение, опубликуйте снимок, обновите список серверов и опубликуйте инкрементные файлы на одном волокне.
Приведенный ниже код показывает, как легко создать джетланговое волокно и выполнить анонимную функцию.
(ns synchro.core)
;; server
(defonce server-list (atom []))
(defonce appender (atom nil))
(defonce appending-fiber (atom nil))
(defonce subscriber (atom identity))
(defonce synchro-fiber (atom nil))
(defn subscribe [f]
(reset! subscriber f))
(defn publish-to-client [m]
(@subscriber m))
(defn get-snapshot []
(.execute @synchro-fiber
#(publish-to-client {:type :snapshot :val @server-list})))
(defn append-to-list []
(.execute @synchro-fiber
#(do
(when (< 9 (count @server-list))
(swap! server-list butlast))
(let [incremental (rand-nth (range 100))]
(swap! server-list conj incremental)
(publish-to-client {:type :incremental :val incremental})))))
(defn server-start []
(reset! synchro-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start))
(reset! appending-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start))
(reset! appender (.scheduleAtFixedRate @appending-fiber
append-to-list 500 500
java.util.concurrent.TimeUnit/MILLISECONDS)))
(defn server-stop []
(.dispose @appender)
(.dispose @appending-fiber)
(.dispose @synchro-fiber))
;; client
(defonce client-list (atom nil))
(defn handle-update [{:keys [type val]}]
(if (= type :snapshot)
(reset! client-list val)
(when @client-list
(when (< 9 (count @client-list))
(swap! client-list butlast))
(swap! client-list conj val))))
(defn client-start []
(subscribe handle-update)
(get-snapshot))
(defn client-stop []
(subscribe identity))
(comment
(server-start)
@server-list
(reset! server-list [])
(server-stop)
(client-start)
@client-list
(client-stop)
[@server-list @client-list (= @server-list @client-list)]
)
Как видите, с кодом мало что изменилось. Мы определили другое волокно, synchro-fiber, которое мы будем использовать для однопоточности наших обновлений списка серверов и наших публикаций клиенту. Synchro-fiber будет выполнять runnables (в нашем примере анонимные функции), которые помещаются в его очередь по порядку. Тело get-snapshot и append-to-list было немного изменено, чтобы вызвать функцию execute с их предыдущим телом как анонимной функцией. Другие технические различия также верны — код запускается не сразу, он больше не блокируется, а возвращаемое значение было изменено. Хотя все эти наблюдения верны, они не имеют отношения к тому, чего мы пытались достичь.
Используя волокна Jetlang, мы достигли нашей цели — мы можем гарантировать, что моментальные снимки и инкрементальные копии будут легко синхронизировать (без идентификаторов последовательности), точно и по порядку. Конечно, вам нужно использовать оба этих сообщения в одном волокне, но это должно быть одинаково легко осуществимо.