Многие из приложений, которые я пишу в эти дни, содержат много данных — настолько, что нет никакого разумного способа непрерывно отправлять все это. Вместо этого большинство приложений, с которыми я работаю, будут иметь возможность получать снимок текущего состояния и возможность получать дельты (инкременты), которые должны быть применены к предыдущему снимку. Чтобы еще больше усложнить ситуацию, неполные данные неприемлемы, а порядок имеет значение. Этот тип среды порождает множество решений для синхронизации снимков и инкрементов. Эта статья об использовании однопоточности (через jetlang ) для синхронизации и гарантированной точности.
Давайте рассмотрим очень простой пример: у вас есть два процесса — клиент и сервер. У сервера есть список, и клиент должен отобразить этот список — полностью и по порядку. Список на клиенте также необходимо обновлять всякий раз, когда список на сервере обновляется.
Есть несколько проблем, с которыми вы можете столкнуться в многопоточной среде.
- Если вы запрашиваете моментальный снимок, а затем начинаете прослушивание инкрементальных файлов, вы можете пропустить данные, которых нет в моментальном снимке, но транслировались до того, как вы начали слушать инкрементные файлы.
- Если вы начнете слушать инкременты и одновременно запросите моментальный снимок, вы можете применить инкрементальный моментальный снимок, даже если моментальный снимок уже отражает инкрементный.
- Если вы сначала начнете слушать инкременты, вам понадобится какой-то способ отбросить инкременты, которые уже отражены в снимке.
Пришло время войти в некоторый код.
Вот простой серверный код.
;; server (defonce server-list (atom [])) (defonce appender (atom nil)) (defonce appending-fiber (atom nil)) (defonce subscriber (atom identity)) (defn subscribe [f] (reset! subscriber f)) (defn publish-to-client [m] (@subscriber m)) (defn get-snapshot [] (publish-to-client {:type :snapshot :val @server-list})) (defn append-to-list [] (when (< 9 (count @server-list)) (swap! server-list butlast)) (let [incremental (rand-nth (range 100))] (swap! server-list conj incremental) (publish-to-client {:type :incremental :val incremental}))) (defn server-start [] (reset! appending-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start)) (reset! appender (.scheduleAtFixedRate @appending-fiber append-to-list 500 500 java.util.concurrent.TimeUnit/MILLISECONDS))) (defn server-stop [] (.dispose @appender) (.dispose @appending-fiber))
Приведенный выше код содержит список серверов, который представляет собой список, который представляет упорядоченные случайные числа, генерируемые на стороне сервера. Наша задача — отразить этот список в нашем клиенте. Запланированное задание добавления и добавление волокна сохраняются, чтобы упростить запуск и остановку добавления. Функции запуска сервера и остановки сервера предоставляются для удобства, если вы решите запустить этот пример локально.
Атом подписчика и функция подписки — это простой способ для клиента подписаться на моментальные снимки и инкрементальные копии. Функция публикации для клиента разыменовывает fn и немедленно вызывает его со снимком или инкрементно. В продвинутом приложении логика публикации и подписки, вероятно, будет включать сокет или систему обмена сообщениями — наше решение целенаправленно наивно, чтобы сосредоточиться на точке поста: синхронизации.
Функция get-snapshot публикует текущее состояние списка серверов для клиента. Функция добавления к списку удаляет элементы, поэтому легко увидеть, как меняется список серверов — без увеличения данных до неуправляемого размера в prod этого (скорее всего) не будет; однако остальная часть кода в append-to-list довольно типична для обычной практики — генерировать дельту, применять ее к локальному списку и публиковать ее клиентам.
Глядя на этот код, легко увидеть, что одно волокно добавляется в список и публикуется на клиенте, а другое волокно возвращает значение get-snapshot. Этот код может работать, но способ, которым он в настоящее время записан, не может гарантировать точность данных.
Давайте посмотрим на некоторый клиентский код.
;; client (defonce client-list (atom nil)) (defn handle-update [{:keys [type val]}] (if (= type :snapshot) (reset! client-list val) (when @client-list (when (< 9 (count @client-list)) (swap! client-list butlast)) (swap! client-list conj val)))) (defn client-start [] (subscribe handle-update) (get-snapshot)) (defn client-stop [] (subscribe identity))
Функция запуска клиента подписывается на обновления сервера, а затем запрашивает моментальный снимок. Функция обновления дескриптора сбрасывает список клиентов на снимке и добавляет инкремент к существующему списку. (примечание: список клиентов хранится в 10 элементах для простоты, как и сервер — я бы не ожидал, что этот тип кода будет в prod).
Ниже приведен полный снимок текущего кода.
(ns synchro.core) ;; server (defonce server-list (atom [])) (defonce appender (atom nil)) (defonce appending-fiber (atom nil)) (defonce subscriber (atom identity)) (defn subscribe [f] (reset! subscriber f)) (defn publish-to-client [m] (@subscriber m)) (defn get-snapshot [] (publish-to-client {:type :snapshot :val @server-list})) (defn append-to-list [] (when (< 9 (count @server-list)) (swap! server-list butlast)) (let [incremental (rand-nth (range 100))] (swap! server-list conj incremental) (publish-to-client {:type :incremental :val incremental}))) (defn server-start [] (reset! appending-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start)) (reset! appender (.scheduleAtFixedRate @appending-fiber append-to-list 500 500 java.util.concurrent.TimeUnit/MILLISECONDS))) (defn server-stop [] (.dispose @appender) (.dispose @appending-fiber)) ;; client (defonce client-list (atom nil)) (defn handle-update [{:keys [type val]}] (if (= type :snapshot) (reset! client-list val) (when @client-list (when (< 9 (count @client-list)) (swap! client-list butlast)) (swap! client-list conj val)))) (defn client-start [] (subscribe handle-update) (get-snapshot)) (defn client-stop [] (subscribe identity)) (comment (server-start) @server-list (reset! server-list []) (server-stop) (client-start) @client-list (client-stop) [@server-list @client-list (= @server-list @client-list)] )
Код клиента и сервера такой же, как и выше, но этот пример также содержит некоторые вызовы функций в комментарии. На этом этапе вы можете вставить этот код в ваш любимый редактор, запустить клиент и сервер и проверить оба списка. Частота обновления настолько велика, что вы даже можете сравнить два списка, и весьма вероятно, что они равны.
Для многих проблем этого кода может быть достаточно; однако, как мы отмечали выше, у вас определенно есть возможность увидеть недействительное состояние. С помощью этого конкретного кода добавляемое волокно может обновить атом с добавочным X, на основном волокне get-snapshot может разыскивать снимок с включенным X (и опубликовать его), а затем добавляющее волокно также может опубликовать добавочное X. К счастью, есть простое решение, опубликуйте снимок, обновите список серверов и опубликуйте инкрементные файлы на одном волокне.
Приведенный ниже код показывает, как легко создать джетланговое волокно и выполнить анонимную функцию.
(ns synchro.core) ;; server (defonce server-list (atom [])) (defonce appender (atom nil)) (defonce appending-fiber (atom nil)) (defonce subscriber (atom identity)) (defonce synchro-fiber (atom nil)) (defn subscribe [f] (reset! subscriber f)) (defn publish-to-client [m] (@subscriber m)) (defn get-snapshot [] (.execute @synchro-fiber #(publish-to-client {:type :snapshot :val @server-list}))) (defn append-to-list [] (.execute @synchro-fiber #(do (when (< 9 (count @server-list)) (swap! server-list butlast)) (let [incremental (rand-nth (range 100))] (swap! server-list conj incremental) (publish-to-client {:type :incremental :val incremental}))))) (defn server-start [] (reset! synchro-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start)) (reset! appending-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start)) (reset! appender (.scheduleAtFixedRate @appending-fiber append-to-list 500 500 java.util.concurrent.TimeUnit/MILLISECONDS))) (defn server-stop [] (.dispose @appender) (.dispose @appending-fiber) (.dispose @synchro-fiber)) ;; client (defonce client-list (atom nil)) (defn handle-update [{:keys [type val]}] (if (= type :snapshot) (reset! client-list val) (when @client-list (when (< 9 (count @client-list)) (swap! client-list butlast)) (swap! client-list conj val)))) (defn client-start [] (subscribe handle-update) (get-snapshot)) (defn client-stop [] (subscribe identity)) (comment (server-start) @server-list (reset! server-list []) (server-stop) (client-start) @client-list (client-stop) [@server-list @client-list (= @server-list @client-list)] )
Как видите, с кодом мало что изменилось. Мы определили другое волокно, synchro-fiber, которое мы будем использовать для однопоточности наших обновлений списка серверов и наших публикаций клиенту. Synchro-fiber будет выполнять runnables (в нашем примере анонимные функции), которые помещаются в его очередь по порядку. Тело get-snapshot и append-to-list было немного изменено, чтобы вызвать функцию execute с их предыдущим телом как анонимной функцией. Другие технические различия также верны — код запускается не сразу, он больше не блокируется, а возвращаемое значение было изменено. Хотя все эти наблюдения верны, они не имеют отношения к тому, чего мы пытались достичь.
Используя волокна Jetlang, мы достигли нашей цели — мы можем гарантировать, что моментальные снимки и инкрементальные копии будут легко синхронизировать (без идентификаторов последовательности), точно и по порядку. Конечно, вам нужно использовать оба этих сообщения в одном волокне, но это должно быть одинаково легко осуществимо.