Статьи

Как интегрировать пользовательские источники данных в Apache Spark

В наши дни потоковая передача данных является горячей темой, и Apache Spark является отличной средой для потоковой передачи. В этом посте я покажу, как интегрировать пользовательские источники данных в Spark.

Spark Streaming дает нам возможность осуществлять потоковую передачу из различных источников, используя один и тот же лаконичный API для доступа к потокам данных, выполнения запросов SQL или создания алгоритмов машинного обучения. Эти возможности делают Spark предпочтительной средой для потоковых (или любого типа рабочего процесса) приложений, поскольку мы можем использовать все аспекты этой среды.

Задача состоит в том, чтобы выяснить, как интегрировать пользовательские источники данных в Spark, чтобы мы могли использовать его возможности без необходимости перехода на более стандартные источники. Может показаться логичным измениться, но в некоторых случаях это просто невозможно или неудобно сделать.

Потоковые пользовательские приемники

Spark предлагает различные точки расширения, как мы могли видеть, когда расширяли API источника данных здесь, чтобы интегрировать наше пользовательское хранилище данных в Spark SQL.

В этом примере мы собираемся сделать то же самое, но мы также собираемся расширить API потоковой передачи, чтобы мы могли осуществлять потоковую передачу из любого места .

Для реализации нашего пользовательского получателя нам нужно расширить класс Receiver [A]. Обратите внимание, что у него есть аннотация типа, поэтому мы можем обеспечить безопасность типов в нашем DStream с точки зрения потокового клиента.

Мы собираемся использовать этот пользовательский приемник для потоковой передачи заказов, которые одно из наших приложений отправляет через сокет.

Структура данных, проходящих через сеть, выглядит следующим образом:

1
2
3
4
5
6
7
1 5
1 1 2
2 1 1
2 1 1
4 1 1
2 2
1 2 2

Сначала мы получаем идентификатор заказа и общую сумму заказа, а затем мы получаем отдельные позиции заказа. Первое значение — это идентификатор товара, второе — это идентификатор заказа (который соответствует значению идентификатора заказа), а затем стоимость товара. В этом примере у нас есть два заказа. У первого есть четыре предмета, а у второго только один предмет.

Идея состоит в том, чтобы скрыть все это от нашего приложения Spark, так что то, что оно получает в DStream, представляет собой полный порядок, определенный для потока следующим образом:

1
val orderStream: DStream[Order] = .....
1
val orderStream: DStream[Order] = .....

В то же время мы также используем приемник для потоковой передачи нашего пользовательского потокового источника. Несмотря на то, что он отправляет данные через сокет, будет довольно сложно использовать стандартный поток сокетов от Spark, поскольку мы не сможем контролировать поступление данных, и у нас будет проблема согласования заказов в приложении. сам. Это может быть очень сложно, так как, как только мы попадаем в пространство приложений, мы работаем параллельно, и трудно синхронизировать все эти входящие данные. Однако в пространстве получателя легко создавать заказы из необработанного входного текста.

Давайте посмотрим, как выглядит наша первоначальная реализация.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
case class Order(id: Int, total: Int, items: List[Item] = null)
case class Item(id: Int, cost: Int)
 
class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY)  {
 
  override def onStart(): Unit = {
 
    println("starting...")
 
    val thread = new Thread("Receiver") {
      override def run() {receive() }
    }
 
    thread.start()
  }
 
  override def onStop(): Unit = stop("I am done")
 
  def receive() = ....
}
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
case class Order(id: Int, total: Int, items: List[Item] = null)
case class Item(id: Int, cost: Int)
 
class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY)  {
 
  override def onStart(): Unit = {
 
    println("starting...")
 
    val thread = new Thread("Receiver") {
      override def run() {receive() }
    }
 
    thread.start()
  }
 
  override def onStop(): Unit = stop("I am done")
 
  def receive() = ....
}

Наш OrderReceiver расширяет Receiver [Order], что позволяет нам хранить Order (аннотированный тип) внутри Spark. Нам также необходимо реализовать методы onStart () и onStop (). Обратите внимание, что onStart () создает поток, поэтому он не блокирует, что очень важно для правильного поведения.

Теперь давайте посмотрим на метод получения, где волшебство действительно происходит.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def receive() = {
    val socket = new Socket(host, port)
    var currentOrder: Order = null
    var currentItems: List[Item] = null
 
    val reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))
 
    while (!isStopped()) {
      var userInput = reader.readLine()
 
      if (userInput == null) stop("Stream has ended")
      else {
        val parts = userInput.split(" ")
 
        if (parts.length == 2) {
          if (currentOrder != null) {
            store(Order(currentOrder.id, currentOrder.total, currentItems))
          }
 
          currentOrder = Order(parts(0).toInt, parts(1).toInt)
          currentItems = List[Item]()
        }
        else {
          currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems
        }
      }
    }
  }
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def receive() = {
    val socket = new Socket(host, port)
    var currentOrder: Order = null
    var currentItems: List[Item] = null
 
    val reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))
 
    while (!isStopped()) {
      var userInput = reader.readLine()
 
      if (userInput == null) stop("Stream has ended")
      else {
        val parts = userInput.split(" ")
 
        if (parts.length == 2) {
          if (currentOrder != null) {
            store(Order(currentOrder.id, currentOrder.total, currentItems))
          }
 
          currentOrder = Order(parts(0).toInt, parts(1).toInt)
          currentItems = List[Item]()
        }
        else {
          currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems
        }
      }
    }
  }

Здесь мы создаем сокет и указываем его на наш источник, а затем просто начинаем читать из него до тех пор, пока не будет отправлена ​​команда останова или пока у нашего сокета больше не будет данных. Обратите внимание, что мы читаем ту же структуру, которую мы определили ранее (как наши данные отправляются). После того как мы полностью прочитали ордер, мы вызываем store (…), чтобы он был сохранен в Spark.

Здесь ничего не остается, кроме как использовать наш приемник в нашем приложении, которое выглядит следующим образом:

1
2
3
4
5
val config = new SparkConf().setAppName("streaming")
val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(5))
  
val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))
1
2
3
4
5
val config = new SparkConf().setAppName("streaming")
val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(5))
  
val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))

Обратите внимание, как мы создали поток, используя наш пользовательский OrderReceiver (поток val был аннотирован только для ясности, но это не обязательно). С этого момента мы используем поток (DString [Order]) как любой другой поток, который мы использовали в любом другом приложении.

1
2
3
4
5
6
stream.foreachRDD { rdd =>
      rdd.foreach(order => {
            println(order.id))             
            order.items.foreach(println)
      }
    }
1
2
3
4
5
6
stream.foreachRDD { rdd =>
      rdd.foreach(order => {
            println(order.id))             
            order.items.foreach(println)
      }
    }

Резюме

Spark Streaming очень удобен при обработке источников, генерирующих бесконечные данные. Вы можете использовать тот же API, который вы используете для Spark SQL и других компонентов в системе, но он также достаточно гибок, чтобы его можно было расширять для удовлетворения ваших конкретных потребностей.