Статьи

Синхронизация снимков и инкрементов с однопоточностью

Многие из приложений, которые я пишу в эти дни, содержат много данных — настолько, что нет никакого разумного способа непрерывно отправлять все это. Вместо этого большинство приложений, с которыми я работаю, будут иметь возможность получать снимок текущего состояния и возможность получать дельты (инкременты), которые должны быть применены к предыдущему снимку. Чтобы еще больше усложнить ситуацию, неполные данные неприемлемы, а порядок имеет значение. Этот тип среды порождает множество решений для синхронизации снимков и инкрементов. Эта статья об использовании однопоточности (через jetlang ) для синхронизации и гарантированной точности.

Давайте рассмотрим очень простой пример: у вас есть два процесса — клиент и сервер. У сервера есть список, и клиент должен отобразить этот список — полностью и по порядку. Список на клиенте также необходимо обновлять всякий раз, когда список на сервере обновляется.

Есть несколько проблем, с которыми вы можете столкнуться в многопоточной среде.

  • Если вы запрашиваете моментальный снимок, а затем начинаете прослушивание инкрементальных файлов, вы можете пропустить данные, которых нет в моментальном снимке, но транслировались до того, как вы начали слушать инкрементные файлы.
  • Если вы начнете слушать инкременты и одновременно запросите моментальный снимок, вы можете применить инкрементальный моментальный снимок, даже если моментальный снимок уже отражает инкрементный.
  • Если вы сначала начнете слушать инкременты, вам понадобится какой-то способ отбросить инкременты, которые уже отражены в снимке.

Пришло время войти в некоторый код.

Вот простой серверный код.

;; server
(defonce server-list (atom []))

(defonce appender (atom nil))
(defonce appending-fiber (atom nil))
(defonce subscriber (atom identity))

(defn subscribe [f]
  (reset! subscriber f))

(defn publish-to-client [m]
  (@subscriber m))

(defn get-snapshot []
  (publish-to-client {:type :snapshot :val @server-list}))

(defn append-to-list []
  (when (< 9 (count @server-list))
    (swap! server-list butlast))
  (let [incremental (rand-nth (range 100))]
    (swap! server-list conj incremental)
    (publish-to-client {:type :incremental :val incremental})))

(defn server-start []
  (reset! appending-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start))
  (reset! appender (.scheduleAtFixedRate @appending-fiber
                                         append-to-list 500 500
                                         java.util.concurrent.TimeUnit/MILLISECONDS)))

(defn server-stop []
  (.dispose @appender)
  (.dispose @appending-fiber))

Приведенный выше код содержит список серверов, который представляет собой список, который представляет упорядоченные случайные числа, генерируемые на стороне сервера. Наша задача — отразить этот список в нашем клиенте. Запланированное задание добавления и добавление волокна сохраняются, чтобы упростить запуск и остановку добавления. Функции запуска сервера и остановки сервера предоставляются для удобства, если вы решите запустить этот пример локально.

Атом подписчика и функция подписки — это простой способ для клиента подписаться на моментальные снимки и инкрементальные копии. Функция публикации для клиента разыменовывает fn и немедленно вызывает его со снимком или инкрементно. В продвинутом приложении логика публикации и подписки, вероятно, будет включать сокет или систему обмена сообщениями — наше решение целенаправленно наивно, чтобы сосредоточиться на точке поста: синхронизации.

Функция get-snapshot публикует текущее состояние списка серверов для клиента. Функция добавления к списку удаляет элементы, поэтому легко увидеть, как меняется список серверов — без увеличения данных до неуправляемого размера в prod этого (скорее всего) не будет; однако остальная часть кода в append-to-list довольно типична для обычной практики — генерировать дельту, применять ее к локальному списку и публиковать ее клиентам.

Глядя на этот код, легко увидеть, что одно волокно добавляется в список и публикуется на клиенте, а другое волокно возвращает значение get-snapshot. Этот код может работать, но способ, которым он в настоящее время записан, не может гарантировать точность данных.

Давайте посмотрим на некоторый клиентский код.

;; client
(defonce client-list (atom nil))

(defn handle-update [{:keys [type val]}]
  (if (= type :snapshot)
    (reset! client-list val)
    (when @client-list
      (when (< 9 (count @client-list))
        (swap! client-list butlast))
      (swap! client-list conj val))))

(defn client-start []
  (subscribe handle-update)
  (get-snapshot))

(defn client-stop []
  (subscribe identity))

Функция запуска клиента подписывается на обновления сервера, а затем запрашивает моментальный снимок. Функция обновления дескриптора сбрасывает список клиентов на снимке и добавляет инкремент к существующему списку. (примечание: список клиентов хранится в 10 элементах для простоты, как и сервер — я бы не ожидал, что этот тип кода будет в prod).

Ниже приведен полный снимок текущего кода.

(ns synchro.core)

;; server
(defonce server-list (atom []))

(defonce appender (atom nil))
(defonce appending-fiber (atom nil))
(defonce subscriber (atom identity))

(defn subscribe [f]
  (reset! subscriber f))

(defn publish-to-client [m]
  (@subscriber m))

(defn get-snapshot []
  (publish-to-client {:type :snapshot :val @server-list}))

(defn append-to-list []
  (when (< 9 (count @server-list))
    (swap! server-list butlast))
  (let [incremental (rand-nth (range 100))]
    (swap! server-list conj incremental)
    (publish-to-client {:type :incremental :val incremental})))

(defn server-start []
  (reset! appending-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start))
  (reset! appender (.scheduleAtFixedRate @appending-fiber
                                         append-to-list 500 500
                                         java.util.concurrent.TimeUnit/MILLISECONDS)))

(defn server-stop []
  (.dispose @appender)
  (.dispose @appending-fiber))

;; client
(defonce client-list (atom nil))

(defn handle-update [{:keys [type val]}]
  (if (= type :snapshot)
    (reset! client-list val)
    (when @client-list
      (when (< 9 (count @client-list))
        (swap! client-list butlast))
      (swap! client-list conj val))))

(defn client-start []
  (subscribe handle-update)
  (get-snapshot))

(defn client-stop []
  (subscribe identity))

(comment
  (server-start)
  @server-list
  (reset! server-list [])
  (server-stop)

  (client-start)
  @client-list
  (client-stop)

  [@server-list @client-list (= @server-list @client-list)]
  )

Код клиента и сервера такой же, как и выше, но этот пример также содержит некоторые вызовы функций в комментарии. На этом этапе вы можете вставить этот код в ваш любимый редактор, запустить клиент и сервер и проверить оба списка. Частота обновления настолько велика, что вы даже можете сравнить два списка, и весьма вероятно, что они равны.

Для многих проблем этого кода может быть достаточно; однако, как мы отмечали выше, у вас определенно есть возможность увидеть недействительное состояние. С помощью этого конкретного кода добавляемое волокно может обновить атом с добавочным X, на основном волокне get-snapshot может разыскивать снимок с включенным X (и опубликовать его), а затем добавляющее волокно также может опубликовать добавочное X. К счастью, есть простое решение, опубликуйте снимок, обновите список серверов и опубликуйте инкрементные файлы на одном волокне.

Приведенный ниже код показывает, как легко создать джетланговое волокно и выполнить анонимную функцию.

(ns synchro.core)

;; server
(defonce server-list (atom []))

(defonce appender (atom nil))
(defonce appending-fiber (atom nil))
(defonce subscriber (atom identity))
(defonce synchro-fiber (atom nil))

(defn subscribe [f]
  (reset! subscriber f))

(defn publish-to-client [m]
  (@subscriber m))

(defn get-snapshot []
  (.execute @synchro-fiber
            #(publish-to-client {:type :snapshot :val @server-list})))

(defn append-to-list []
  (.execute @synchro-fiber
            #(do
               (when (< 9 (count @server-list))
                 (swap! server-list butlast))
               (let [incremental (rand-nth (range 100))]
                 (swap! server-list conj incremental)
                 (publish-to-client {:type :incremental :val incremental})))))

(defn server-start []
  (reset! synchro-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start))
  (reset! appending-fiber (doto (org.jetlang.fibers.ThreadFiber.) .start))
  (reset! appender (.scheduleAtFixedRate @appending-fiber
                                         append-to-list 500 500
                                         java.util.concurrent.TimeUnit/MILLISECONDS)))

(defn server-stop []
  (.dispose @appender)
  (.dispose @appending-fiber)
  (.dispose @synchro-fiber))

;; client
(defonce client-list (atom nil))

(defn handle-update [{:keys [type val]}]
  (if (= type :snapshot)
    (reset! client-list val)
    (when @client-list
      (when (< 9 (count @client-list))
        (swap! client-list butlast))
      (swap! client-list conj val))))

(defn client-start []
  (subscribe handle-update)
  (get-snapshot))

(defn client-stop []
  (subscribe identity))

(comment
  (server-start)
  @server-list
  (reset! server-list [])
  (server-stop)

  (client-start)
  @client-list
  (client-stop)

  [@server-list @client-list (= @server-list @client-list)]
  )

Как видите, с кодом мало что изменилось. Мы определили другое волокно, synchro-fiber, которое мы будем использовать для однопоточности наших обновлений списка серверов и наших публикаций клиенту. Synchro-fiber будет выполнять runnables (в нашем примере анонимные функции), которые помещаются в его очередь по порядку. Тело get-snapshot и append-to-list было немного изменено, чтобы вызвать функцию execute с их предыдущим телом как анонимной функцией. Другие технические различия также верны — код запускается не сразу, он больше не блокируется, а возвращаемое значение было изменено. Хотя все эти наблюдения верны, они не имеют отношения к тому, чего мы пытались достичь.

Используя волокна Jetlang, мы достигли нашей цели — мы можем гарантировать, что моментальные снимки и инкрементальные копии будут легко синхронизировать (без идентификаторов последовательности), точно и по порядку. Конечно, вам нужно использовать оба этих сообщения в одном волокне, но это должно быть одинаково легко осуществимо.