Ленивые последовательности (также известные как потоки ) представляют собой интересную функциональную структуру данных, о которой вы, возможно, никогда не слышали. В основном, ленивая последовательность — это список, который не полностью известен / вычислен, пока вы его не используете. Представьте себе список, который очень дорог, чтобы создать, и вы не хотите вычислять слишком много — но все же позволяют клиентам потреблять столько, сколько они хотят или нуждаются. Подобно итератору, однако итераторы разрушительны — как только вы их прочитаете, они исчезнут. Ленивые последовательности с другой стороны помнят уже вычисленные элементы.
Обратите внимание, что эта абстракция даже позволяет нам создавать и работать с бесконечными потоками! Вполне возможно создать ленивую последовательность простых чисел или ряд Фибоначчи . Клиент сам решает, сколько элементов он хочет использовать — и только это
многие будут генерироваться. Сравните его с готовым списком, который должен быть предварительно вычислен до первого использования, и итератором, который забывает о уже вычисленных значениях.
Помните, однако, что ленивые последовательности всегда просматриваются с самого начала, поэтому, чтобы найти N-й элемент, ленивые последовательности должны вычислять предшествующие N-1 элементы.
Я стараюсь избегать чисто академических примеров, поэтому не будет примеров ряда Фибоначчи. Вы найдете это в каждой статье на эту тему. Вместо этого мы реализуем что-то полезное — утилиту тестирования выражений Cron , возвращающую последовательность очередных срабатываний. Мы уже реализовывали тестирование выражений Cron , используя рекурсию и итератор. Чтобы быстро резюмировать, мы хотели бы убедиться, что наше выражение Cron является правильным и срабатывает, когда мы действительно этого ожидаем. Кварцевый планировщик предоставляет удобный CronExpression.getNextValidTimeAfter(Date)
который возвращает время следующего срабатывания после указанной даты. Если мы хотим вычислить, например, следующие десять раз, нам нужно вызывать этот метод десять раз, но! Результат первого вызова должен быть передан в качестве аргумента для второго вызова — ведь когда мы знаем, когда задание будет запущено в первый раз, мы хотим знать, каково время следующего вызова (после первого). И продолжая, чтобы найти третье время вызова, мы должны передать второе время вызова в качестве аргумента. Это описание привело нас к простому рекурсивному алгоритму:
1
2
3
4
5
|
def findTriggerTimesRecursive(expr: CronExpression, after: Date): List[Date] = expr getNextValidTimeAfter after match { case null => Nil case next => next :: findTriggerTimesRecursive(expr, next) } |
getNextValidTimeAfter()
может возвращать null
чтобы указать, что выражение Cron никогда не сработает снова (например, оно выполняется только в течение 2013 года, и мы уже достигли конца года) Однако это решение имеет несколько проблем:
- мы на самом деле не знаем, сколько будущих дат нужно клиенту, поэтому мы, скорее всего, генерируем слишком много ненужных циклов ЦП 1
- еще хуже, некоторые выражения Cron никогда не заканчиваются.
"0 0 17 * * ? *"
Будет выполняться в 17:00 каждый день, каждый год, до бесконечности. У нас определенно не так много времени и памяти - наша реализация не является хвостовой рекурсивной. Легко исправить, хотя
Что если бы у нас была «подобная списку» структура данных, которую мы могли бы обойти и работать с ней, как с любой другой последовательностью, но без нетерпеливой оценки? Вот реализация в Scala of Stream[Date]
которая вычисляет время следующего запуска только при необходимости:
1
2
3
4
5
|
def findTriggerTimes(expr: CronExpression, after: Date): Stream[Date] = expr getNextValidTimeAfter after match { case null => Stream.Empty case next => next #:: findTriggerTimes(expr, next) } |
Смотри внимательно, так как он почти идентичен! Мы заменили List[Date]
на Stream[Date]
(оба реализуют LinearSeq
), Nil
на Stream.Empty
и ::
with #::
. Последнее изменение имеет решающее значение. #::
method (да, это метод…) принимает tl: => Stream[A]
— по имени . Это означает, что findTriggerTimes(expr, next)
здесь не вызывается! На самом деле это замыкание, которое мы передаем функции #::
высшего порядка. Это закрытие оценивается только при необходимости. Давайте немного поиграем с этим кодом:
01
02
03
04
05
06
07
08
09
10
11
|
val triggerTimesStream = findTriggerTimes( "0 0 17 L-3W 6-9 ? *" ) println(triggerTimesStream) //Stream(Thu Jun 27 17:00:00 CEST 2013, ?) val firstThree = triggerTimesStream take 3 println(firstThree.toList) //List(Thu Jun 27 17:00:00 CEST 2013, Mon Jul 29 17:00:00 CEST 2013, Wed Aug 28 17:00:00 CEST 2013) println(triggerTimesStream) //Stream(Thu Jun 27 17:00:00 CEST 2013, Mon Jul 29 17:00:00 CEST 2013, Wed Aug 28 17:00:00 CEST 2013, ?) |
Смотри внимательно. Первоначально печать потока едва показывает первый элемент. Вопросительный знак в Stream.toString
представляет неизвестную оставшуюся часть потока. Затем мы берем первые три элемента. Интересно, что мы должны преобразовать результат в List
. Один только вызов take(3)
едва возвращает другой поток, что еще больше откладывает оценку на максимально возможный срок. Но печать исходного потока снова показывает все три элемента, но четвертый еще не известен.
Давайте сделаем что-то более продвинутое. Скажем, мы хотели бы узнать, когда выражение Cron сработает в сотый раз? И сколько раз он сработает в течение одного года с сегодняшнего дня?
1
2
3
4
5
6
7
|
val hundredth = triggerTimesStream.drop( 99 ).head val calendar = new GregorianCalendar() calendar.add(Calendar.YEAR, 1 ) val yearFromNow = calendar.getTime val countWithinYear = triggerTimesStream.takeWhile(_ before yearFromNow).size |
Вычислить время 100-го огня довольно просто — просто отбросьте первые 99 дат и возьмите первое из оставшихся. Однако слово discard немного неудачно — эти элементы вычисляются и кэшируются в triggerTimesStream
поэтому в следующий раз, когда мы попытаемся получить доступ к любому из первых 100 элементов, они будут доступны немедленно. Интересный факт: Stream[T]
в Scala является неизменным и поточно-ориентированным, но он постоянно изменяется внутри вас, пока вы выполняете его. Но это деталь реализации.
Вы можете takeWhile(...).size
почему я использую takeWhile(...).size
вместо простого filter(...).size
или даже count(...)
? Что ж, из определения, что времена срабатывания в нашем потоке растут, поэтому, если мы хотим считать даты только в течение одного года, в тот момент, когда мы найдем первую несоответствующую дату, мы можем остановиться. Но это не только микрооптимизация. Помните, что потоки могут быть бесконечными? Думаю об этом. А пока мы перенесем нашу маленькую утилиту на Clojure.
Clojure
поток ( lazy-seq
) в Clojure:
1
2
3
4
5
|
(defn find-trigger-times [expr after] (let [next (. expr getNextValidTimeAfter after)] ( case next nil [] (cons next (lazy-seq (find-trigger-times expr next)))))) |
Это почти точный перевод кода Scala, за исключением того, что используется одно связывание let
для захвата результата getNextValidTimeAfter()
. Менее грамотный, но более компактный перевод может быть выполнен с помощью формы if-let
:
1
2
3
4
|
(defn find-trigger-times [expr after] ( if -let [next (. expr getNextValidTimeAfter after)] (cons next (lazy-seq (find-trigger-times expr next))) [])) |
if-let
сочетает в себе условие и привязку. Если выражение, привязанное к next
является ложным (или nil
в нашем случае), 3-я строка вообще не оценивается. Вместо этого возвращается результат четвертой строки (пустая последовательность). Эти две реализации эквивалентны. Для полноты давайте посмотрим, как получить 100-й элемент и посчитать количество дат, соответствующих выражению Cron, в течение одного года:
01
02
03
04
05
06
07
08
09
10
|
(def expr ( new CronExpression "0 0 17 L-3W 6-9 ? *" )) (def trigger-times (find-trigger-times expr ( new Date))) (def hundredth (first (drop 99 trigger-times))) (def year-from-now (let [calendar ( new GregorianCalendar)] (. calendar add Calendar/YEAR 1 ) (. calendar getTime))) (take- while #(.before % year-from-now) trigger-times) |
Обратите внимание, что мы снова используем take-while
вместо простого filter
Пространство и время сложность
Представьте, что вы используете filter()
вместо takeWhile()
чтобы вычислить, сколько раз сработает триггер Cron в следующем году. Помните, что потоки в целом (и наш поток Cron в частности) могут быть бесконечными. Простой filter()
в Stream
будет работать, пока не достигнет конца — что может никогда не произойти с бесконечным потоком. То же самое относится и к таким простым методам, как size
— Stream
будет оценивать все больше и больше, пока не достигнет конца. Но раньше ваша программа заполнит все пространство кучи. Почему? Поскольку после оценки элемента Stream[T]
кеширует его на потом. Случайно удержание головы большого Stream
является еще одной опасностью:
1
2
3
|
val largeStream: Stream[Int] = //,.. //... val smallerStream = largeStream drop 1000000 |
smallerStream
— это ссылка на поток без первого миллиона элементов. Но эти элементы все еще кэшируются в оригинальном largeStream
. Пока вы сохраняете ссылку на него, они хранятся в памяти. В тот момент, largeStream
ссылка largeStream
выходит из области видимости, первый миллион элементов подходит для сборки мусора, а на оставшуюся часть потока все еще ссылаются.
Вышеприведенное обсуждение в равной степени относится и к Scala и Clojure. Как вы видите, вы должны быть очень осторожны при работе с ленивыми последовательностями. Они очень мощные и вездесущие в функциональных языках — но
« С большой силой приходит большая ответственность » . В тот момент, когда вы начинаете играть с возможно бесконечными сущностями, вы должны быть осторожны.
повторять
Если у вас больше опыта работы с Clojure или Scala, вам может быть интересно, почему я не использовал (iterate fx)
или Stream.iterate()
. Эти вспомогательные методы хороши, когда у вас есть бесконечный поток, и когда каждый элемент может быть вычислен как функция предыдущего. Очевидно, что поток Cron не может воспользоваться этим удобным инструментом, так как он может быть конечным, как показано ранее. Но для полного завершения приведем гораздо более короткую, но неправильную реализацию с использованием iterate
:
1
2
3
4
|
def infiniteFindTriggerTimes(expr: CronExpression, after: Date) = Stream.iterate(expr getNextValidTimeAfter after){last => expr getNextValidTimeAfter last } |
… И Clojure:
1
2
3
4
|
(defn find-trigger-times [expr after] (iterate #(. expr getNextValidTimeAfter %) (. expr getNextValidTimeAfter after))) |
Идея в обоих случаях проста: мы предоставляем начальный элемент x
(первый аргумент в Scala, второй в Clojure) и функцию f
которая преобразует предыдущий элемент в текущий. Другими словами, мы создаем следующий поток: [x, f(x), f(f(x)), f(f(f(x))), ...]
.
Реализации выше работают, пока не достигнут конца потока (если есть). Таким образом, чтобы закончить с чем-то положительным, мы будем использовать iterate
для получения бесконечного потока простых чисел (извиняюсь за такую теоретическую проблему), используя наивное prime?
предикат:
01
02
03
04
05
06
07
08
09
10
|
(defn- divisors [x] (filter #(zero? (rem x %)) (range 2 (inc (Math/sqrt x))))) (defn- prime? [x] (empty? (divisors x))) (defn- next-prime [after] (loop [x (inc after)] ( if (prime? x) x (recur (inc x))))) (def primes (iterate next-prime 2 )) |
Я надеюсь, что и идея, и реализация понятны. Если число не имеет делителей, оно считается простым. next-prime
возвращает последующее простое число, большее заданного значения. Таким образом (next-prime 2)
дает 3
, (next-prime 3)
дает 5
и так далее. Используя эту функцию, мы можем построить ленивую последовательность primes
, просто предоставив первое простое число и next-prime
простую функцию.
Вывод
Ленивые последовательности (или потоки) являются отличными абстракциями, которые невозможно или утомительно представлять в императивных языках. Они выглядят как обычные списки, но они оцениваются только при необходимости. И Scala, и Clojure оказывают им большую поддержку, и они ведут себя одинаково. Вы можете отображать, фильтровать, вырезать и т. Д. В потоках, и они никогда не вычисляют свои элементы, если это не нужно. Более того, они кэшируют уже вычисленные значения, но при этом сохраняют многопоточность Однако, имея дело с бесконечностью, нужно быть осторожным. Если вы попытаетесь невинно сосчитать элементы бесконечного потока или найти несуществующий элемент (например, primes.find(_ == 10)
), никто вас не спасет.
1 —
Полная реализация getNextValidTimeAfter()
имеет длину 400 строк .