Статьи

Ленивые последовательности в Scala и Clojure

Ленивые последовательности (также известные как потоки ) представляют собой интересную функциональную структуру данных, о которой вы, возможно, никогда не слышали. В основном, ленивая последовательность — это список, который не полностью известен / вычислен, пока вы его не используете. Представьте себе список, который очень дорог, чтобы создать, и вы не хотите вычислять слишком много — но все же позволяют клиентам потреблять столько, сколько они хотят или нуждаются. Подобно итератору, однако итераторы разрушительны — как только вы их прочитаете, они исчезнут. Ленивые последовательности с другой стороны помнят уже вычисленные элементы.

Обратите внимание, что эта абстракция даже позволяет нам создавать и работать с бесконечными потоками! Вполне возможно создать ленивую последовательность простых чисел или ряд Фибоначчи . Клиент сам решает, сколько элементов он хочет использовать — и только это

многие будут генерироваться. Сравните его с готовым списком, который должен быть предварительно вычислен до первого использования, и итератором, который забывает о уже вычисленных значениях.

Помните, однако, что ленивые последовательности всегда просматриваются с самого начала, поэтому, чтобы найти N-й элемент, ленивые последовательности должны вычислять предшествующие N-1 элементы.

Я стараюсь избегать чисто академических примеров, поэтому не будет примеров ряда Фибоначчи. Вы найдете это в каждой статье на эту тему. Вместо этого мы реализуем что-то полезное — утилиту тестирования выражений Cron , возвращающую последовательность очередных срабатываний. Мы уже реализовывали тестирование выражений Cron , используя рекурсию и итератор. Чтобы быстро резюмировать, мы хотели бы убедиться, что наше выражение Cron является правильным и срабатывает, когда мы действительно этого ожидаем. Кварцевый планировщик предоставляет удобный CronExpression.getNextValidTimeAfter(Date) который возвращает время следующего срабатывания после указанной даты. Если мы хотим вычислить, например, следующие десять раз, нам нужно вызывать этот метод десять раз, но! Результат первого вызова должен быть передан в качестве аргумента для второго вызова — ведь когда мы знаем, когда задание будет запущено в первый раз, мы хотим знать, каково время следующего вызова (после первого). И продолжая, чтобы найти третье время вызова, мы должны передать второе время вызова в качестве аргумента. Это описание привело нас к простому рекурсивному алгоритму:

1
2
3
4
5
def findTriggerTimesRecursive(expr: CronExpression, after: Date): List[Date] =
    expr getNextValidTimeAfter after match {
        case null => Nil
        case next => next :: findTriggerTimesRecursive(expr, next)
    }

getNextValidTimeAfter() может возвращать null чтобы указать, что выражение Cron никогда не сработает снова (например, оно выполняется только в течение 2013 года, и мы уже достигли конца года) Однако это решение имеет несколько проблем:

  • мы на самом деле не знаем, сколько будущих дат нужно клиенту, поэтому мы, скорее всего, генерируем слишком много ненужных циклов ЦП 1
  • еще хуже, некоторые выражения Cron никогда не заканчиваются. "0 0 17 * * ? *" Будет выполняться в 17:00 каждый день, каждый год, до бесконечности. У нас определенно не так много времени и памяти
  • наша реализация не является хвостовой рекурсивной. Легко исправить, хотя

Что если бы у нас была «подобная списку» структура данных, которую мы могли бы обойти и работать с ней, как с любой другой последовательностью, но без нетерпеливой оценки? Вот реализация в Scala of Stream[Date] которая вычисляет время следующего запуска только при необходимости:

1
2
3
4
5
def findTriggerTimes(expr: CronExpression, after: Date): Stream[Date] =
    expr getNextValidTimeAfter after match {
        case null => Stream.Empty
        case next => next #:: findTriggerTimes(expr, next)
    }

Смотри внимательно, так как он почти идентичен! Мы заменили List[Date] на Stream[Date] (оба реализуют LinearSeq ), Nil на Stream.Empty и :: with #:: . Последнее изменение имеет решающее значение. #:: method (да, это метод…) принимает tl: => Stream[A]по имени . Это означает, что findTriggerTimes(expr, next) здесь не вызывается! На самом деле это замыкание, которое мы передаем функции #:: высшего порядка. Это закрытие оценивается только при необходимости. Давайте немного поиграем с этим кодом:

01
02
03
04
05
06
07
08
09
10
11
val triggerTimesStream = findTriggerTimes("0 0 17 L-3W 6-9 ? *")
  
println(triggerTimesStream)
//Stream(Thu Jun 27 17:00:00 CEST 2013, ?)
  
val firstThree = triggerTimesStream take 3
println(firstThree.toList)
//List(Thu Jun 27 17:00:00 CEST 2013, Mon Jul 29 17:00:00 CEST 2013, Wed Aug 28 17:00:00 CEST 2013)
  
println(triggerTimesStream)
//Stream(Thu Jun 27 17:00:00 CEST 2013, Mon Jul 29 17:00:00 CEST 2013, Wed Aug 28 17:00:00 CEST 2013, ?)

Смотри внимательно. Первоначально печать потока едва показывает первый элемент. Вопросительный знак в Stream.toString представляет неизвестную оставшуюся часть потока. Затем мы берем первые три элемента. Интересно, что мы должны преобразовать результат в List . Один только вызов take(3) едва возвращает другой поток, что еще больше откладывает оценку на максимально возможный срок. Но печать исходного потока снова показывает все три элемента, но четвертый еще не известен.

Давайте сделаем что-то более продвинутое. Скажем, мы хотели бы узнать, когда выражение Cron сработает в сотый раз? И сколько раз он сработает в течение одного года с сегодняшнего дня?

1
2
3
4
5
6
7
val hundredth = triggerTimesStream.drop(99).head
  
val calendar = new GregorianCalendar()
calendar.add(Calendar.YEAR, 1)
val yearFromNow = calendar.getTime
  
val countWithinYear = triggerTimesStream.takeWhile(_ before yearFromNow).size

Вычислить время 100-го огня довольно просто — просто отбросьте первые 99 дат и возьмите первое из оставшихся. Однако слово discard немного неудачно — эти элементы вычисляются и кэшируются в triggerTimesStream поэтому в следующий раз, когда мы попытаемся получить доступ к любому из первых 100 элементов, они будут доступны немедленно. Интересный факт: Stream[T] в Scala является неизменным и поточно-ориентированным, но он постоянно изменяется внутри вас, пока вы выполняете его. Но это деталь реализации.

Вы можете takeWhile(...).size почему я использую takeWhile(...).size вместо простого filter(...).size или даже count(...) ? Что ж, из определения, что времена срабатывания в нашем потоке растут, поэтому, если мы хотим считать даты только в течение одного года, в тот момент, когда мы найдем первую несоответствующую дату, мы можем остановиться. Но это не только микрооптимизация. Помните, что потоки могут быть бесконечными? Думаю об этом. А пока мы перенесем нашу маленькую утилиту на Clojure.

Clojure

поток ( lazy-seq ) в Clojure:

1
2
3
4
5
(defn find-trigger-times [expr after]
    (let [next (. expr getNextValidTimeAfter after)]
        (case next
            nil []
            (cons next (lazy-seq (find-trigger-times expr next))))))

Это почти точный перевод кода Scala, за исключением того, что используется одно связывание let для захвата результата getNextValidTimeAfter() . Менее грамотный, но более компактный перевод может быть выполнен с помощью формы if-let :

1
2
3
4
(defn find-trigger-times [expr after]
    (if-let [next (. expr getNextValidTimeAfter after)]
        (cons next (lazy-seq (find-trigger-times expr next)))
        []))

if-let сочетает в себе условие и привязку. Если выражение, привязанное к next является ложным (или nil в нашем случае), 3-я строка вообще не оценивается. Вместо этого возвращается результат четвертой строки (пустая последовательность). Эти две реализации эквивалентны. Для полноты давайте посмотрим, как получить 100-й элемент и посчитать количество дат, соответствующих выражению Cron, в течение одного года:

01
02
03
04
05
06
07
08
09
10
(def expr (new CronExpression "0 0 17 L-3W 6-9 ? *"))
(def trigger-times (find-trigger-times expr (new Date)))
  
(def hundredth (first (drop 99 trigger-times)))
  
(def year-from-now (let [calendar (new GregorianCalendar)]
    (. calendar add Calendar/YEAR 1)
    (. calendar getTime)))
  
(take-while #(.before % year-from-now) trigger-times)

Обратите внимание, что мы снова используем take-while вместо простого filter

Пространство и время сложность

Представьте, что вы используете filter() вместо takeWhile() чтобы вычислить, сколько раз сработает триггер Cron в следующем году. Помните, что потоки в целом (и наш поток Cron в частности) могут быть бесконечными. Простой filter() в Stream будет работать, пока не достигнет конца — что может никогда не произойти с бесконечным потоком. То же самое относится и к таким простым методам, как sizeStream будет оценивать все больше и больше, пока не достигнет конца. Но раньше ваша программа заполнит все пространство кучи. Почему? Поскольку после оценки элемента Stream[T] кеширует его на потом. Случайно удержание головы большого Stream является еще одной опасностью:

1
2
3
val largeStream: Stream[Int] = //,..
//...
val smallerStream = largeStream drop 1000000

smallerStream — это ссылка на поток без первого миллиона элементов. Но эти элементы все еще кэшируются в оригинальном largeStream . Пока вы сохраняете ссылку на него, они хранятся в памяти. В тот момент, largeStream ссылка largeStream выходит из области видимости, первый миллион элементов подходит для сборки мусора, а на оставшуюся часть потока все еще ссылаются.

Вышеприведенное обсуждение в равной степени относится и к Scala и Clojure. Как вы видите, вы должны быть очень осторожны при работе с ленивыми последовательностями. Они очень мощные и вездесущие в функциональных языках — но
« С большой силой приходит большая ответственность » . В тот момент, когда вы начинаете играть с возможно бесконечными сущностями, вы должны быть осторожны.

повторять

Если у вас больше опыта работы с Clojure или Scala, вам может быть интересно, почему я не использовал (iterate fx) или Stream.iterate() . Эти вспомогательные методы хороши, когда у вас есть бесконечный поток, и когда каждый элемент может быть вычислен как функция предыдущего. Очевидно, что поток Cron не может воспользоваться этим удобным инструментом, так как он может быть конечным, как показано ранее. Но для полного завершения приведем гораздо более короткую, но неправильную реализацию с использованием iterate :

1
2
3
4
def infiniteFindTriggerTimes(expr: CronExpression, after: Date) =
    Stream.iterate(expr getNextValidTimeAfter after){last =>
        expr getNextValidTimeAfter last
    }

… И Clojure:

1
2
3
4
(defn find-trigger-times [expr after]
    (iterate
        #(. expr getNextValidTimeAfter %)
        (. expr getNextValidTimeAfter after)))

Идея в обоих случаях проста: мы предоставляем начальный элемент x (первый аргумент в Scala, второй в Clojure) и функцию f которая преобразует предыдущий элемент в текущий. Другими словами, мы создаем следующий поток: [x, f(x), f(f(x)), f(f(f(x))), ...] .

Реализации выше работают, пока не достигнут конца потока (если есть). Таким образом, чтобы закончить с чем-то положительным, мы будем использовать iterate для получения бесконечного потока простых чисел (извиняюсь за такую ​​теоретическую проблему), используя наивное prime? предикат:

01
02
03
04
05
06
07
08
09
10
(defn- divisors [x]
    (filter #(zero? (rem x %))
        (range 2 (inc (Math/sqrt x)))))
(defn- prime? [x] (empty? (divisors x)))
(defn- next-prime [after]
    (loop [x (inc after)]
        (if (prime? x)
            x
            (recur (inc x)))))
(def primes (iterate next-prime 2))

Я надеюсь, что и идея, и реализация понятны. Если число не имеет делителей, оно считается простым. next-prime возвращает последующее простое число, большее заданного значения. Таким образом (next-prime 2) дает 3 , (next-prime 3) дает 5 и так далее. Используя эту функцию, мы можем построить ленивую последовательность primes , просто предоставив первое простое число и next-prime простую функцию.

Вывод

Ленивые последовательности (или потоки) являются отличными абстракциями, которые невозможно или утомительно представлять в императивных языках. Они выглядят как обычные списки, но они оцениваются только при необходимости. И Scala, и Clojure оказывают им большую поддержку, и они ведут себя одинаково. Вы можете отображать, фильтровать, вырезать и т. Д. В потоках, и они никогда не вычисляют свои элементы, если это не нужно. Более того, они кэшируют уже вычисленные значения, но при этом сохраняют многопоточность Однако, имея дело с бесконечностью, нужно быть осторожным. Если вы попытаетесь невинно сосчитать элементы бесконечного потока или найти несуществующий элемент (например, primes.find(_ == 10) ), никто вас не спасет.

1
Полная реализация getNextValidTimeAfter() имеет длину 400 строк .

Ссылка: Ленивые последовательности в Scala и Clojure от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей .