Статьи

Итерации для императивных программистов

Когда я впервые услышал слово iteratee, я подумал, что это шутка. Оказывается, это была не шутка, на самом деле есть также перечислители (это нормально) и перечислители (вы меня убиваете). Если вы императивный программист или, скорее, программист, который чувствует себя более комфортно при написании императивного кода, чем функционального кода, то вы можете быть немного ошеломлены всеми представлениями об итераторах, потому что все они предполагают, что вы думаете с функциональной точки зрения , Ну, я только что выучил итераций, и, хотя с функциональным программированием я чувствую себя все более и более комфортно, я все еще думаю, что в глубине души я вынужденный программист. Это сделало обучение итераторов очень сложным для меня Поэтому, пока я все еще нахожусь в императивном мышлении, я подумал, что это очень хорошая возможность объяснить итераторам с точки зрения императивных программистов, не принимая никаких функциональных знаний как должное. Если вы император-программист, который хочет изучать итераций, то это пост для вас. Я собираюсь специально рассмотреть API Iteratee от Play, но изученные здесь концепции применимы ко всем Iteratee в целом.

Итак, давайте начнем с объяснения того, чего пытаются достичь итераторы и их коллеги. Итерируемый – это метод реактивной обработки потоков данных, который очень легко компонуется. Под реактивной я имею в виду неблокирующую, то есть вы реагируете на данные, доступные для чтения, и реагируете на возможность записи данных. Под композируемым я имею в виду то, что вы пишете простые итерированные, которые хорошо выполняют одну небольшую вещь, затем вы используете их в качестве строительных блоков для написания итераторов, которые делают большие вещи, и вы используете их в качестве строительных блоков для написания итераторов, чтобы делать еще большие вещи, и скоро. На каждом этапе все просто и легко рассуждать.

Реактивная обработка потока

Если вы ищете информацию об итераторах, то, я думаю, вы уже немного знаете о том, что такое реактивная обработка потоков. Давайте сопоставим это с синхронным IO-кодом:

1
2
3
trait InputStream {
  def read(): Byte
}

Так что это должно быть очень знакомо, если вы хотите прочитать байт, вы вызываете read . Если в настоящее время нет доступного для чтения байта, этот вызов заблокируется, и ваш поток будет ждать, пока не будет доступен байт. С реактивными потоками, очевидно, все наоборот, вы передаете обратный вызов потоку, из которого хотите получать данные, и он будет вызывать его, когда он будет готов предоставить вам данные. Поэтому обычно вы можете реализовать черту, которая выглядит следующим образом:

1
2
3
trait InputStreamHandler {
  def onByte(byte: Byte)
}

Итак, прежде чем мы продолжим, давайте посмотрим, как этого можно достичь в чисто функциональном мире. На данный момент я не хочу, чтобы вы спрашивали, почему мы хотим действовать таким образом, вы увидите это позже, но если вы знаете что-то о функциональном программировании, вы знаете, что все имеет тенденцию быть неизменными, а функции не имеют никакой стороны эффекты. Признак выше должен иметь побочные эффекты, потому что, если вы не игнорируете байты, переданные onByte , вы должны каким-то образом изменять свое состояние (или что-то другое) в этой функции. Итак, как мы обрабатываем данные без изменения нашего состояния? Ответ так же, как работают другие неизменяемые структуры данных, мы возвращаем копию себя, обновленную с новым состоянием. Поэтому, если InputStreamHandler был быть функциональным, он мог бы выглядеть так:

1
2
3
trait InputStreamHandler {
  def onByte(byte: Byte): InputStreamHandler
}

И пример реализации, которая читает входные данные в seq, может выглядеть так:

1
2
3
class Consume(data: Seq[Byte]) extends InputStreamHandler {
  def onByte(byte: Byte) = new Consume(data :+ byte)
}

Итак, теперь у нас есть императивные и функциональные черты, которые реагируют на наш входной поток, и вы можете подумать, что это все, что нужно для реактивных потоков. Если это так, вы ошибаетесь. Что если мы не готовы обрабатывать данные при onByte метода onByte ? Если мы строим структуры в памяти, это никогда не будет иметь место, но если, например, мы храним их в файле или в базе данных при получении данных, то, скорее всего, так и будет. Таким образом, реактивные потоки являются двусторонними, не только вы, потребитель потока, который реагирует на ввод, производитель потока должен реагировать на то, что вы готовы к вводу.

Теперь это возможно реализовать в императивном мире, хотя вещи начинают выглядеть намного более функциональными. Мы просто начинаем использовать фьючерсы:

1
2
3
trait InputStreamHandler {
  def onByte(byte: Byte): Future[Unit]
}

Итак, когда поток, который мы потребляем, имеет для нас байт, он вызывает onByte , а затем присоединяет обратный вызов к будущему, которое мы возвращаем, чтобы передать следующий байт, когда он будет готов. Если вы посмотрите на API асинхронного канала Netty, вы увидите, что он использует именно этот шаблон. Мы также можем реализовать нечто подобное для неизменяемого функционального API:

1
2
3
trait InputStreamHandler {
  def onByte(byte: Byte): Future[InputStreamHandler]
}

И вот здесь у нас есть функциональное решение для обработки реактивного потока. Но для начала это не очень хорошо, так как обработчики не могут связаться с кодом, который их использует, о том, что они больше не хотят получать ввод, или если они столкнулись с ошибкой (исключения не одобряются) по функциональному программированию). Мы могли бы добавить вещи, чтобы справиться с этим, но очень скоро наш интерфейс станет довольно сложным, трудно разбить на мелкие кусочки, которые можно составить, и т. Д. Я не собираюсь оправдывать это сейчас, я думаю, вы увидите это позже когда я покажу вам, как легко создавать итерации.

Итак, на этом этапе я надеюсь, что вы поняли два важных момента. Во-первых, обработка реактивного потока означает двойную реакцию: и ваш код должен реагировать на готовность потока, и поток должен реагировать на то, что вы готовы. Во-вторых, когда я говорю, что нам нужно функциональное решение, я имею в виду решение, в котором все является неизменным, и это достигается тем, что наши потоковые обработчики создают свои копии каждый раз, когда получают / отправляют данные. Если вы поняли эти два важных момента, то теперь мы можем перейти к представлению повторяющихся.

Iteratees

Есть несколько вещей, которые наш интерфейс еще не рассмотрел. Во-первых, как поток сообщает нам, что он закончен, то есть, что у него больше нет данных для нас? Чтобы сделать это, вместо прямой передачи байта, мы собираемся абстрагировать наш байт от типа Input[Byte] , и этот тип может иметь три возможных реализации: EOF, элемент или пустой. Давайте пока не будем беспокоиться о том, почему нам нужно пустое, но предположим, что по какой-то причине мы можем захотеть пройти пустым. Вот как выглядит Input :

1
2
3
4
5
6
7
sealed trait Input[+E]
 
object Input {
  case object EOF extends Input[Nothing]
  case object Empty extends Input[Nothing]
  case class El[+E](e: E) extends Input[E]
}

Обновляя наш InputStreamHandler , мы теперь получаем что-то похожее на это:

1
2
3
trait InputStreamHandler[E] {
  def onInput(in: Input[E]): Future[InputStreamHandler[E]]
}

Теперь, обновив наш Consumer прежде, чтобы справиться с этим, это может выглядеть так:

1
2
3
4
5
6
class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler[Byte] {
  def onInput(in: Input[Byte]) = in match {
    case El(byte) => Future.successful(new Consume(data :+ byte))
    case _ => Future.successful(this)
  }
}

Вы можете видеть, что когда мы получаем EOF или Empty , нам нечего делать, чтобы изменить наше состояние, поэтому мы просто возвращаемся снова. Если бы мы писали в другой поток, мы могли бы при получении EOF закрыть этот поток (или, скорее, отправить ему EOF ).

Следующее, что мы собираемся сделать, – это сделать так, чтобы наш обработчик сразу потреблял ввод без необходимости создавать будущее. Чтобы сделать это, вместо прямой передачи байта, мы передадим функцию, которая принимает функцию в качестве параметра, и эта функция будет принимать байт в качестве параметра. Итак, наш обработчик, когда он будет готов, создаст функцию для обработки байта, а затем вызовет функцию, которая была ему передана, с этой функцией. Мы назовем первую функцию cont функцией, которая сокращена от continue, и означает, что когда вы будете готовы продолжить получать ввод, вызовите меня. Слишком много функций? Давайте посмотрим на код:

1
2
3
trait InputStreamHandler[E] {
  def onByte[B](cont: (Input[E] => InputStreamHandler[E]) => Future[B]): Future[B]
}

Теперь, откуда пришло это Future[B] ? B – это просто механизм, используемый потоком для передачи состояния обратно самому себе. Как обработчик, нам не нужно беспокоиться о том, что это такое, мы просто должны удостовериться, что в конечном итоге cont функцию cont , и в конечном итоге убедиться, что возвращаемый им B возвращает его нашему вызывающему. И как это выглядит в нашем Итеритеру Потребителя? Давайте посмотрим:

1
2
3
4
5
6
class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler {
  def onByte(cont: (Input[Byte] => InputStreamHandler) => Future[B]) = cont {
    case Input.El(byte) => new Consume(data :+ byte)
    case _ => this
  }
}

Вы можете видеть в нашем простом случае готовности обрабатывать ввод немедленно, мы просто немедленно вызываем cont , нам больше не нужно беспокоиться о создании фьючерсов. Если мы хотим обрабатывать ввод асинхронно, это немного сложнее, но мы рассмотрим это позже.

Теперь у нас есть один последний шаг в создании нашего API для Iteratee. Как обработчик сообщает потоку, что он завершил получение данных? Для этого может быть две причины, одна из них – получение данных завершено. Например, если наш обработчик является анализатором JSON, он, возможно, достиг конца объекта, который он анализировал, и поэтому больше не хочет его получать. Другая причина в том, что обнаружена ошибка, для синтаксического анализатора JSON это может быть синтаксическая ошибка или, если он отправляет данные в другой поток, это может быть ошибка ввода-вывода в этом потоке.

Чтобы позволить нашему итерирующемуся общаться с потоком, мы собираемся создать черту, которая представляет его состояние. Мы назовем этот признак Step , и три состояния, в которых может находиться итератор, будут Cont , Done и Error . Наше состояние Cont будет содержать нашу функцию Input[Byte] => InputStreamHandler , чтобы поток мог ее вызывать. Наше состояние Done будет содержать наш результат (в случае ConsumeSeq[Byte] ), а наше состояние Error будет содержать сообщение об ошибке.

В дополнение к этому наши состояния « Done и « Error должны содержать оставшиеся входные данные, которые они не потребляли. Это будет важно, когда мы собираем итераторов вместе, так что, как только один итератор закончил потреблять входные данные из потока, следующий может выбрать, где остановился первый. Это одна из причин, почему нам нужен Input.Empty , потому что, если мы действительно потребляем весь ввод, нам нужен какой-то способ указать это.

Итак, вот наша черта Step :

1
2
3
4
5
6
7
sealed trait Step[E, +A]
 
object Step {
  case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
  case class Cont[E, +A](k: Input[E] => InputStreamHandler[E, A]) extends Step[E, A]
  case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}

Параметр типа E – это тип ввода, который хочет принять наш итератор, а A – то, что он производит. Итак, наша черта обработчика теперь выглядит так:

1
2
3
trait InputStreamHandler[E, A] {
  def onInput[B](step: Step[E, A] => Future[B]): Future[B]
}

И наш потребитель реализован так:

1
2
3
4
5
6
7
8
9
class Consume(data: Seq[Byte]) extends InputStreamHandler[Byte, Seq[Byte]] {
  def onInput(step: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Cont({
    case Input.El(byte) => new Consume(data :+ byte)
    case Input.EOF => new InputStreamHandler[Byte, Seq[Byte]] {
      def onInput(cont: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Done(data, Input.Empty))
    }      
    case Input.Empty => this
  }))
}

Одна большая разница, которую вы сейчас заметили, заключается в том, что когда мы получаем EOF , мы фактически передаем Done в функцию step, чтобы сказать, что мы закончили, потребляя ввод.

И вот теперь мы создали наш интерфейс Iteratee. Наше наименование не совсем правильное, поэтому мы переименуем этот признак, очевидно, в Iteratee , и переименуем onInput для fold , поскольку мы сворачиваем наше состояние в один результат. И вот теперь мы получаем наш интерфейс:

1
2
3
trait Iteratee[E, +A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B]
}

Итераторы на практике

До сих пор мы начали с требований традиционного императивного входного потока и описали, что итерируемый имеет к этому ограничение. Но, глядя на приведенный выше код, вы можете подумать, что использовать их действительно сложно. Кажется, что они намного сложнее, чем нужно, по крайней мере концептуально, для реализации реактивных потоков. Что ж, получается, что хотя мы пока продемонстрировали основы интерфейса iteratee, есть еще много чего, что может предложить полноценный API для iteratee, и как только мы начнем понимать это и использовать его, вы начнете видеть насколько сильны, просты и полезны итераторы.

Итак, помните, как итераторы неизменны? И помните, как итераторы могут находиться в одном из трех состояний: cont, done и error, и в зависимости от того, в каком состоянии он находится, он передаст свой соответствующий класс шага в функцию folder? Что ж, если итератор является неизменным и может находиться в одном из трех состояний, то он может быть только в том состоянии, в котором он находится, и, следовательно, он будет только когда-либо передавать этот соответствующий шаг в функцию папки. Если итератор завершен, он завершен, не имеет значения, сколько раз вы вызываете его функцию fold , он никогда не станет продолжением или ошибкой, а значение done никогда не изменится, он только передаст шаг Done в папку функция с тем же значением A и тем же оставшимся над входом. Из-за этого нам понадобится только одна реализация готового итератора, она выглядит так:

1
2
3
case class Done[E, A](a: A, e: Input[E] = Input.Empty) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Done(a, e))
}

Это единственный готовый итератор, который вам когда-либо понадобится, чтобы указать, что вы сделали. В приведенном выше итераторе Consume , когда мы достигли EOF , мы создали готового итератора с использованием анонимного внутреннего класса, нам не нужно было этого делать, мы могли просто использовать вышеупомянутый итератор Done . То же самое относится и к ошибочным итераторам:

1
2
3
case class Error[E](msg: String, e: Input[E]) extends Iteratee[E, Nothing] {
  def fold[B](folder: Step[E, Nothing] => Future[B]): Future[B] = folder(Step.Error(msg, e))
}

Вы можете быть удивлены, обнаружив, что то же самое относится и к контрагентам – контрагент просто передает функцию в папку, и эта функция, поскольку итератор является неизменным, никогда не изменится. Следовательно, следующий итератор обычно будет достаточно для ваших требований:

1
2
3
case class Cont[E, A](k: Input[E] => Iteratee[E, A]) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Cont(k))
}

Итак, давайте перепишем нашего итератора потребления, чтобы использовать эти вспомогательные классы:

1
2
3
4
5
def consume(data: Array[Byte]): Iteratee[Byte, Array[Byte]] = Cont {
  case Input.El(byte) => consume(data :+ byte)
  case Input.EOF => Done(data)
  case Input.Empty => consume(data)
}

Парсер CSV

Теперь мы выглядим намного проще, наш код сфокусирован на простой обработке различных типов ввода, которые мы можем получить, и возвращении правильного результата. Итак, давайте начнем писать несколько разных итераторов. На самом деле, давайте напишем итератор, который анализирует файл CSV из потока символов. Наш анализатор CSV будет поддерживать необязательные поля кавычек и экранирование кавычек с двойной кавычкой.

Нашим первым шагом будет написание строительных блоков нашего парсера. Прежде всего, мы хотим написать что-то, что пропускает некоторые виды пробелов. Итак, давайте напишем общее назначение drop в то время как iteratee:

1
2
3
4
5
def dropWhile(p: Char => Boolean): Iteratee[Char, Unit] = Cont {
  case in @ Input.El(char) if !p(char) => Done(Unit, in)
  case in @ Input.EOF => Done(Unit, in)
  case _ => dropWhile(p)
}

Так как мы просто отбрасываем ввод, наш результат на самом деле – Unit . Мы возвращаем Done если предикат не соответствует текущему char , или если мы достигаем EOF, и в противном случае мы возвращаемся снова. Обратите внимание, что когда мы закончим, мы включим входные данные, которые были переданы нам в качестве оставшихся данных, потому что это будет необходимо для использования следующим итератором. Используя этого итератора, мы теперь можем написать итератор, который отбрасывает пробел:

1
def dropSpaces = dropWhile(c => c == ' ' || c == '\t' || c == '\r')

Далее, мы собираемся написать команду take while iteratee, она будет представлять собой смесь между нашим ранее iteratee потребления, несущим состояние между каждым вызовом и отбрасыванием while iteratee:

1
2
3
4
5
6
7
8
9
def takeWhile(p: Char => Boolean, data: Seq[Char] = IndexedSeq[Char]()): Iteratee[Char, Seq[Char]] = Cont {
  case in @ Input.El(char) => if (p(char)) {
    takeWhile(p, data :+ char)
  } else {
    Done(data, in)
  }
  case in @ Input.EOF => Done(data, in)
  case _ => takeWhile(p, data)
}

Мы также хотим написать pete iteratee, который смотрит на то, что является следующим вводом, фактически не потребляя его:

1
2
3
4
5
def peek: Iteratee[Char, Option[Char]] = Cont {
  case in @ Input.El(char) => Done(Some(char), in)
  case in @ Input.EOF => Done(None, in)
  case Input.Empty => peek
}

Обратите внимание, что наш pete iteratee должен вернуть опцию, так как, если он встречает EOF, он не может ничего вернуть.

И, наконец, мы хотим получить один итератор:

1
2
3
4
5
def takeOne: Iteratee[Char, Option[Char]] = Cont {
  case in @ Input.El(char) => Done(Some(char))
  case in @ Input.EOF => Done(None, in)
  case Input.Empty => takeOne
}

Используя итератора take one, мы создадим ожидаемый итератор, который предписывает, чтобы следующий символ появлялся следующим, иначе он выдаст ошибку:

1
2
3
4
5
def expect(char: Char): Iteratee[Char, Unit] = takeOne.flatMap {
  case Some(c) if c == char => Done(Unit)
  case Some(c) => Error('Expected ' + char + ' but got ' + c, Input.El(c))
  case None => Error('Premature end of input, expected: ' + char, Input.EOF)
}

Обратите внимание на использование flatMap здесь. Если вы не сталкивались с этим раньше, в асинхронном мире flatMap основном означает «а потом». Он применяет функцию к результату итерируемого и возвращает нового итерируемого. В нашем случае мы используем его для преобразования результата в готовый итератор или в итератор с ошибкой, в зависимости от того, соответствует ли результат ожидаемому. flatMap – это один из фундаментальных механизмов, который мы будем использовать для объединения наших итераторов.

Теперь с нашими строительными блоками мы готовы начать сборку нашего CSV-парсера. Первая часть, которую мы напишем, – это анализатор значений без кавычек. Это очень просто, мы просто хотим взять все символы, которые не являются запятой или новой строкой, с одним уловом. Мы хотим, чтобы результат был String, а не Seq[Char] как takeWhile . Давайте посмотрим, как мы это делаем:

1
def unquoted = takeWhile(c => c != ',' && c != '\n').map(v => v.mkString.trim)

Как вы можете видеть, мы использовали функцию map для преобразования конечного результата из последовательности символов в строку. Это еще один ключевой метод для итераторов, который вы найдете полезным.

Наша следующая задача – разобрать значение в кавычках. Давайте начнем с реализации, которая не учитывает экранированные кавычки. Чтобы разобрать значение в кавычках, нам нужно ожидать кавычку, а затем нам нужно принять любое значение, которое не является кавычкой, а затем нам нужно ожидать кавычку. Обратите внимание, что во время этого предложения я сказал «а потом» 2 раза? Какой метод мы можем использовать, чтобы сделать «а потом»? Это верно, метод flatMap о котором я говорил ранее. Давайте посмотрим, как выглядит наш анализатор значений в кавычках:

1
2
3
4
def quoted = expect(''')
  .flatMap(_ => takeWhile(_ != '''))
  .flatMap(value => expect(''')
    .map(_ => value.mkString))

Так что теперь вы, вероятно, можете начать видеть полезность flatMap . На самом деле это настолько полезно, не только для повторяющихся, но и для многих других вещей, что в Scala есть специальный синтаксис, который требует понимания. Давайте перепишем вышеупомянутый итератор, используя это:

1
2
3
4
5
def quoted = for {
  _     <- expect(''')
  value <- takeWhile(_ != ''')
  _     <- expect(''')
} yield value.mkString

Сейчас я надеюсь, что вы взволнованы. Как выглядит приведенный выше код? Это похоже на обычный императивный синхронный код. Прочитайте это значение, затем прочитайте это значение, затем прочитайте это значение. За исключением того, что это не синхронно, и это не обязательно. Это функционально и асинхронно. Мы взяли наши строительные блоки и скомпоновали их в кусок очень удобочитаемого кода, который дает полное представление о том, что мы делаем.

Теперь, если вы не уверены на 100% в вышеуказанном синтаксисе, значения слева от знака <- это результаты итераторов справа. Их можно использовать в любом месте в любых последующих строках, в том числе в операторе конечной yield . Подчеркивания используются, чтобы сказать, что мы не заинтересованы в значении, мы используем это для expect итератора, так как он все равно возвращает Unit. Оператор после yield является функцией карты, которая дает нам возможность взять все промежуточные значения и превратить их в один результат.

Итак, теперь, когда мы это понимаем, давайте перепишем нашего quoted итератора для поддержки экранированных кавычек. Прочитав нашу цитату, мы хотим посмотреть на следующего персонажа. Если это кавычка, то мы хотим добавить значение, которое мы только что прочитали, плюс кавычку к нашему накопленному значению и рекурсивно снова вызвать цитируемый итератор. В противном случае мы достигли конца значения.

01
02
03
04
05
06
07
08
09
10
def quoted(value: Seq[Char] = IndexedSeq[Char]()): Iteratee[Char, String] = for {
  _          <- expect(''')
  maybeValue <- takeWhile(_ != ''')
  _          <- expect(''')
  nextChar   <- peek
  value      <- nextChar match {
    case Some(''') => quoted(value ++ maybeValue :+ ''')
    case _ => Done[Char, String]((value ++ maybeValue).mkString)
  }
} yield value

Теперь нам нужно написать итератор, который может анализировать значение в кавычках или без кавычек. Мы выбираем какой из них, посмотрев на первый символ, а затем, соответственно, вернем правильного итератора.

1
2
3
4
5
6
7
8
def value = for {
  char  <- peek
  value <- char match {
    case Some(''') => quoted()
    case None => Error[Char]('Premature end of input, expected a value', Input.EOF)
    case _ => unquoted
  }
} yield value

Давайте теперь проанализируем всю строку, читая до конца строки символа.

01
02
03
04
05
06
07
08
09
10
11
def values(state: Seq[String] = IndexedSeq[String]()): Iteratee[Char, Seq[String]] = for {
  _        <- dropSpaces
  value    <- value
  _        <- dropSpaces
  nextChar <- takeOne
  values   <- nextChar match {
    case Some('\n') | None => Done[Char, Seq[String]](state :+ value)
    case Some(',') => values(state :+ value)
    case Some(other) => Error('Expected comma, newline or EOF, but found ' + other, Input.El(other))
  }
} yield values

Enumeratees

Теперь, аналогично тому, как мы анализируем значения, мы также можем анализировать каждую строку файла CSV, пока не достигнем EOF. Но на этот раз мы собираемся сделать что-то немного другое. Мы видели, как мы можем упорядочить итерации с помощью flatMap , но есть и другие возможности для создания итераторов. Другая концепция у итераторов – это перечислители. Перечисляемые адаптируют поток для использования итерируемым. Простейшие перечислители просто отображают входные значения потока как нечто другое. Так, например, вот перечислитель, который преобразует поток строк в поток целых:

1
def toInt: Enumeratee[String,Int] = Enumeratee.map[String](_.toInt)

Одним из методов на Enumeratee является transform . Мы можем использовать этот метод для применения перечисляемого к итерируемому:

1
2
val someIteratee: Iteratee[Int, X] = ...
val adaptedIteratee: Iteratee[String, X] = toInt.transform(someIteratee)

Этот метод также связан с оператором &>> , и поэтому этот код ниже эквивалентен приведенному выше коду:

1
val adaptedIteratee: Iteratee[String, X] = toInt &>> someIteratee

Мы также можем сделать перечислитель из другого итерируемого, и это именно то, что мы собираемся сделать с нашими values итерируемого. Метод Enumeratee.grouped принимает итератора и применяет его к потоку снова и снова, в результате каждое приложение является входом для подачи в итератор, который будет преобразован. Давайте посмотрим:

1
def csv = Enumeratee.grouped(values())

Теперь давайте немного креативнее с перечисляемыми. Допустим, наш CSV-файл очень большой, поэтому мы не хотим загружать его в память. Каждая строка представляет собой последовательность из 3 целых столбцов, и мы хотим суммировать каждый столбец. Итак, давайте определим перечислителя, который преобразует каждый набор значений в целые числа:

1
def toInts = Enumeratee.map[Seq[String]](_.map(_.toInt))

И еще один перечислитель для преобразования последовательности в 3-кортеж:

1
def toThreeTuple = Enumeratee.map[Seq[Int]](s => (s(0), s(1), s(2)))

И, наконец, итерируемый, чтобы суммировать их:

1
2
3
4
5
def sumThreeTuple(a: Int = 0, b: Int = 0, c: Int = 0): Iteratee[(Int, Int, Int), (Int, Int, Int)] = Cont {
  case Input.El((x, y, z)) => sumThreeTuple(a + x, b + y, c + z)
  case Input.Empty => sumThreeTuple(a, b, c)
  case in @ Input.EOF => Done((a, b, c), in)
}

Теперь собрать их все вместе. В enumeratee есть еще один метод, называемый compose , который, как вы уже догадались, позволяет вам создавать перечислимых. Здесь есть оператор псевдонима ><> . Давайте использовать это:

1
val processCsvFile = csv ><> toInts ><> toThreeTuple &>> sumThreeTuple()

Перечислители

Наконец, если итератор потребляет поток, что создает поток? Ответ перечислитель. Перечислитель может быть применен к итерируемому, используя его метод apply , который также имеет псевдоним >>> . Это оставит итератора в состоянии продолжения, готового получить больше информации. Однако, если перечислитель содержит весь поток, вместо него можно использовать метод run который отправит итерирующему EOF, как только он будет завершен. Это псевдоним для |>>> .

API-интерфейс перечислителя Play упрощает создание перечислителя, передавая последовательность входных данных в метод apply сопутствующих объектов Enumerator . Итак, мы можем создать перечислитель символов, используя следующий код:

1
2
3
val csvFile = Enumerator(
  '''1,2,3
    |4,5,6'''.stripMargin.toCharArray:_*)

И мы можем добавить это в нашу итерируемую команду следующим образом:

1
val result = csvFile |>>> processCsvFile

И нашим результатом в этом случае будет будущее, которое в конечном итоге выкупится (5, 7, 9).

Вывод

Что ж, это был долгий путь, но, надеюсь, если вы император-программист, вы не только понимаете итераторов, вы понимаете причины их разработки и то, насколько легко они сочиняются. Я также надеюсь, что вы лучше понимаете как функциональное, так и асинхронное программирование в целом. Функциональное мышление весьма отличается от императивного мышления, и я все еще обдумываю его, но особенно после того, как я вижу, с какими хорошими и простыми итераторами можно работать (как только вы их поймете), я убежден, что функциональный программирование это путь.

Если вы заинтересованы в загрузке кода из этого сообщения в блоге, или если вы хотите увидеть более сложный анализ итератора / перечисления JSON, ознакомьтесь с этим проектом GitHub , в котором есть несколько примеров, в том числе анализ потоков байтов / символов в кусках массива, а точнее чем по одному.

Ссылка: Итерации для императивных программистов от нашего партнера по JCG Джеймса Ропера в блоге блогов Джеймса и Бет Роперов .