Статьи

Получаешь ошибки с ядром Clojure.async

Несколько месяцев назад кто-то в офисе указал нам на интересное предложение о работе от CartoDB, которое выглядело следующим образом:

Далее следует технический тест для этого предложения о работе в CARTO: https://boards.greenhouse.io/cartodb/jobs/705852#.WSvORxOGPUI

Создайте следующее и сделайте так быстро, как вы можете, используя Python 3 (vanilla). Чем быстрее он работает, тем больше вы будете удивлять нас!

Ваш код должен:

Все это наиболее эффективным способом, который вы можете придумать.

Вот и все. Заставь его летать!

Не забудьте подать заявку здесь !: https://boards.greenhouse.io/cartodb/jobs/705852#.WSvORxOGPUI

Прочитав это, я помню, что подумал: «Человек, это хороший повод, чтобы покопаться в core.async Clojure»…… и чем раньше сказал, чем сделал , это то, что я делал несколько дней назад. Вот мои заметки об этом опыте.

Когда начать?

Я уже был знаком с теорией, лежащей в основе core.async (я говорю о передаче последовательных процессов или просто о CSP), но когда пришло время настроить мой проект и начать выкладывать свое решение, я обнаружил, что документация была между дефицитной и обескураживающей , При этом неожиданный спад не остановил меня:

Все это плюс несколько часов разочарования помогли наконец увидеть свет в конце туннеля. Теперь пришло время для моего собственного кодирования.

неблокирующее чтение файла

Итак, первое, с чего я хотел начать, было чтение всех строк из файла неблокирующим способом , и именно эта цель привела меня к следующей реализации:

1
2
3
4
5
6
7
8
(defn stream-lines-from [path]                                                                                                                                                                
  (let                                                                                                                                                                         
    (go                                                                                                                                                                                       
      (with-open [rdr (io/reader path)]                                                                                                                                                       
        (doseq [line (line-seq rdr)]                                                                                                                                                          
          (>! c line)))                                                                                                                                                                       
      (close! c))                                                                                                                                                                             
    c))

Позвольте мне разобрать это для вас:

  1. Я начну с определения канала c с размером буфера 1024 . Размер не обязателен, но я подумал, что было бы неплохо буферизовать строки, чтобы избежать максимально возможной блокировки потока-потребителя.
  2. Тогда у нас есть блок-блок — краеугольный камень библиотеки. Это главным образом позволяет асинхронно выполнять тело, возвращаясь немедленно к вызывающему потоку. В этом конкретном блоке мы будем делать следующее:
    1. Начинает читать строки из исходного файла
    2. Каждая строка передается на канал с помощью >! оператор.
    3. Когда достигнут конец файла, канал закрывается при закрытии! оператор.
  3. Наконец, с нашим определенным блоком go мы возвращаем канал для дальнейшего потребления.

Подсчет линий

Я не уверен, как вам всем нравится решать проблему, но лично я всегда начинаю с решения самых крошечных из них и начинаю искать решение оттуда — по одной проблеме за раз. В этом конкретном случае я разработал простую функциональность, где, учитывая канал линий , я могу считать их все, пока он не закроется. И вот что я сделал:

1
2
3
4
5
(defn w-counter [channel]
  (go-loop [total 0]
    (if-some [_ (<! channel)]
      (recur (inc total))
      total)))

Как обычно, здесь есть пара новых конструкций:

  • Блок go-loop позволяет нам выполнить цикл внутри блока go. Это то же самое, что и делать (иди (цикл …). Кроме того, в качестве напоминания go-block возвращает вызывающему каналу канал, который получит результат тела (в данном конкретном сценарии общее количество строк).
  • <! Оператор — это способ, которым мы должны прочитать значение из канала, если он доступен, иначе он будет парковать поток. Стоит отметить, что когда канал закрыт , он возвращает ноль , что приводит к возобновлению подсчета.

Как вы можете видеть, выполняя некоторую рекурсию, я продолжаю потреблять и считать линии из канала до тех пор, пока он не закроется ( если кто-то делает работу плавно ) , возвращая окончательный счет вызывающей стороне. Теперь, когда все было готово, я хотел добиться следующей операции подсчета, но одновременно с каким-то образом контролировать количество счетчиков, выполняющих эту работу. Итак, после некоторой разработки, основанной на REPL, вот что я получил:

1
2
3
4
(defn count-lines [channel npar]
  (let [counters (for [_ (range npar)]
                   (w-counter channel))]
    (async/reduce + 0 (async/merge counters))))

И это был момент, когда вещи начали щелкать в моей голове. Просто выполняя простое понимание с использованием моей предыдущей функции w-counter вместе с асинхронной комбинацией слияния и уменьшения, она наконец добилась цели всего за 3 чертовых строки кода . Итак, что это за два комбинатора?

  • async / merge позволил мне взять все значения из всех каналов и вернуть один канал, содержащий все из них. Это было удобно, чтобы собрать все частичные результаты от всех моих работников.
  • Асинхронизация / уменьшение была самой лучшей чертой, поскольку она позволяла мне объединять все частичные подсчеты в конечные результаты, в данном конкретном случае — общий подсчет.

Вы можете представить поток как что-то вроде этого:

Острота!

Хитрая хитрая агрегация

С подсчетом строк в моем кармане пришло время заняться агрегацией, но, чтобы понять, почему я сделал то, что сделал, давайте начнем с проверки структуры исходного файла.

1
2
3
4
5
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2016-01-01 00:00:00,2016-01-01 00:00:00,2,1.10,-73.990371704101563,40.734695434570313,1,N,-73.981842041015625,40.732406616210937,2,7.5,0.5,0.5,0,0,0.3,8.8
2,2016-01-01 00:00:00,2016-01-01 00:00:00,5,4.90,-73.980781555175781,40.729911804199219,1,N,-73.944473266601563,40.716678619384766,1,18,0.5,0.5,0,0,0.3,19.3
2,2016-01-01 00:00:00,2016-01-01 00:00:00,1,10.54,-73.984550476074219,40.6795654296875,1,N,-73.950271606445313,40.788925170898438,1,33,0.5,0.5,0,0,0.3,34.3
2,2016-01-01 00:00:00,2016-01-01 00:00:00,1,4.75,-73.99346923828125,40.718990325927734,1,N,-73.962242126464844,40.657333374023437,2,16.5,0,0.5,0,0,0.3,17.3

Легко видеть, что первая строка всегда будет включать заголовки, а остальная часть файла будет состоять из значений в формате CSV . Теперь, если вы помните упражнение, он попросил нас агрегировать поле tip_amount, но в соответствии с этим мы должны сначала выяснить, какой индекс должен использоваться для определения местоположения агрегируемого значения .

По правде говоря, я начал с простого решения крошечной проблемы, поэтому мне пришла в голову идея, прочитав первое значение канала (которое совпадает с заголовками), я могу определить функцию, способную извлекать значение, которое я ищу каждый раз, когда вводить строку, и вот что я получил:

1
2
3
4
5
6
7
8
(defn extract-field-fn [fname channel]
  (let [headers   (<!! channel)
        field-idx (->> (split headers  #",")
                       (map-indexed vector)
                       (filter (fn [[_ v]] (= v fname)))
                       first
                       first)]
    #(java.lang.Double/parseDouble (nth (split % #",") field-idx))))

Он начинается с ожидания первого значения из канала (помните, что мы обязательно выводим индекс перед выполнением агрегации), и именно поэтому я использовал <!! заблокировать, пока значение не станет доступным. Затем, с удобной первой строкой, я разделил ее, чтобы позже найти позицию, в которой находилось поле с именем fname , и, наконец, вернуть частичную функцию, которая может извлекать и анализировать запрошенное поле по требованию.

После этого следующей проблемой, которая должна быть решена, было вычисление среднего значения последовательности значений, поступающих из канала. К этому времени я усвоил несколько очень важных уроков, которые проложили мне путь к следующей реализации:

1
2
3
4
5
(defn w-aggregate [channel]
  (go-loop
    (if-some [n (<! channel)]
      (recur (inc c) (+ n s))
      [s c])))

Я надеюсь, вы видите, что я следовал аналогичной реализации, как и для подсчета строк, но на этот раз мне пришлось суммировать и подсчитывать каждое значение, пока канал не закроется. Легкий покой.

Наконец, с этими двумя небольшими функциями я смог построить свое асинхронное решение средней задачи по полю, которое выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
(defn aggregate-field [fname channel npar]
  (let [as-value-fn       (extract-field-fn fname channel)
        fvalue-chan       (pipe channel (chan 1024 (map as-value-fn)))
        fvalue-aggregator (for [_ (range npar)]
                            (w-aggregate fvalue-chan))]
    (go
      (let [[sum count] (<! (async/reduce
                             #(apply map + [%1 %2])
                             [0.0 0]
                             (async/merge fvalue-aggregator)))]
        (/ sum count)))))

Это более или менее то, что происходит:

  1. Мы начнем с создания частичной функции, которая по имени поля и каналу определяет функцию, которая считывает значение поля из строки.
  2. Следующее, что мы делаем, это направляем входящий канал-канал в канал -значение-поле … почему я так поступил? … потому что каналы поддерживают преобразователи !!! Здесь я определяю преобразователь, способный отображать строку в значение, которое необходимо агрегировать (map as-value-fn), и использую его для определения нового канала, который испускает все значения, которые необходимо агрегировать. Это было очень хорошо.
  3. Используя преобразованный канал fvalue-chan вместе с функцией для понимания, используя функцию w-aggregate , все, что мне нужно было сделать, это подсчитать и суммировать входящие значения одновременно.
  4. Наконец, комбинируя запросы на уменьшение и объединение для одной и той же цели, я собираю частичные подсчеты и суммы, чтобы окончательно вычислить среднее значение запрошенного значения, просто разделив их.

Вы можете изобразить поток следующим образом:

Делать это все одновременно

Как решить проблему с количеством строк и средним значением поля, как их объединить? И именно тогда ссылка на API сыграла важную роль в этой истории, так как мне удалось найти оператор mult . Согласно документации ..

Создает и возвращает мульт (iple) предоставленного канала. Каналы, содержащие копии канала, могут быть созданы с помощью «tap» и отключены с помощью «untap».

и это щелкнуло … снова … поэтому я попробовал это

1
2
3
4
5
6
7
8
9
(defn process! [path fname npar]
  (let [lines         (mult (stream-lines-from path))
        aggregate-tap (tap lines (chan 1024))
        count-tap     (tap lines (chan 1024))]
    (<!! (async/reduce
          (fn [_ v] (println v))
          ""
          (async/merge [(aggregate-field fname aggregate-tap npar)
                        (count-lines count-tap npar)])))))

И это сработало !! Давайте посмотрим, что у нас здесь:

  • Помните функцию stream-lines-from ? Это место, где вы его используете, мы указываем путь к исходному файлу и возвращаем канал, где будут опубликованы все строки, но, поскольку нам нужно использовать эти строки дважды (для подсчета и усреднения), мы объявляем этот канал как mult.
  • Затем дважды коснемся канала линий : один раз для подсчета потока, а другой — для среднего потока. Используя tap , мы можем скопировать многоканальный канал на предоставленный канал, и, таким образом, мы можем без труда отправить каждую строку в подсчет и усреднение потоков, используя параллелизм!
  • Наконец, мы передаем в каждом из каналов отводов функции агрегатного поля и счетчика и ожидаем, пока все они завершатся блокирующим (оператор <!! ) слиянием / уменьшением .

Последние мысли

Это летало? Не совсем: подсчет был «быстрым» (на моем ноутбуке это заняло около минуты), но агрегирование значений было медленнее, чем ожидалось (около 5 минут). Честно говоря, здесь также нужно учитывать множество факторов: мой ноутбук довольно старый, я не потратил достаточно времени, чтобы покопаться в подробностях о внутренних устройствах, просто назову несколько, но это также было чрезвычайно Трудно найти информацию о рекомендациях по производительности от сообщества, и это, на мой взгляд, является важным препятствием, о котором следует помнить, когда вы рассматриваете вопрос о принятии нового стека. Сообщество является ключевым.

В целом, я осмелюсь сказать, что это была забавная поездка, я был рад видеть, что JVM все еще является прекрасным программным обеспечением с множеством возможностей, и что Clojure все еще может сохранить эту красоту в Concurrent Land, но я все еще склоняюсь к другие решения, а не использование core.async для высокопроизводительной системы, у меня не было того внутреннего ощущения, которое я испытывал при использовании Akka или Rx, по крайней мере, на данный момент.

Вы можете сделать это быстрее? Маякни мне! Не упустите этот шанс, вы можете стать шутником с Clojure СЕЙЧАС!

Опубликовано на Java Code Geeks с разрешения Гильермо Селиги, партнера нашей программы JCG . Посмотрите оригинальную статью здесь: ПОЛУЧИТЕ ШВИФТИ С ЯДРОМ CLOJURE. ASYNC

Мнения, высказанные участниками Java Code Geeks, являются их собственными.