Статьи

Представляем ReactiveInflux: неблокирующий драйвер InfluxDB для Scala и Java, поддерживающий Apache Spark

Я рад объявить о первом выпуске ReactiveInflux, разработанного на Pygmalios . InfluxDB пропустил неблокирующий драйвер для Scala и Java . Неизменность, тестируемость и расширяемость являются ключевыми характеристиками ReactiveInflux. В сочетании с поддержкой Apache Spark это оружие выбора.

Он внутренне использует API-интерфейс Play Framework WS, который представляет собой богатый асинхронный HTTP-клиент, созданный поверх Async Http Client .

Особенности

  • асинхронный (неблокирующий) интерфейс для Scala
  • синхронный (блокирующий) интерфейс для Scala и Java
  • поддерживает потоковую передачу Spark и Spark
  • неизменность
  • способность быть свидетелем в суде
  • растяжимость

Совместимость

  • InfluxDB 0.11, 0.10 и 0.9 (возможно, даже старше)
  • Скала 2.11 и 2.10
  • Java 7 и выше
  • Apache Spark 1.4 и выше

Scala асинхронный (неблокирующий) пример

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
val result = withInfluxDb(new URI("http://localhost:8086/"), "example1") { db =>
  db.create().flatMap { _ =>
    val point = Point(
      time        = DateTime.now(),
      measurement = "measurement1",
      tags        = Map("t1" -> "A", "t2" -> "B"),
      fields      = Map(
        "f1" -> 10.3,
        "f2" -> "x",
        "f3" -> -1,
        "f4" -> true)
    )
    db.write(point).flatMap { _ =>
      db.query("SELECT * FROM measurement1").flatMap { queryResult =>
        println(queryResult.row.mkString)
        db.drop()
      }
    }
  }
}

Scala синхронный (блокирующий) пример

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
implicit val awaitAtMost = 10.seconds
syncInfluxDb(new URI("http://localhost:8086/"), "example1") { db =>
  db.create()
 
  val point = Point(
    time        = DateTime.now(),
    measurement = "measurement1",
    tags        = Map("t1" -> "A", "t2" -> "B"),
    fields      = Map(
      "f1" -> 10.3,
      "f2" -> "x",
      "f3" -> -1,
      "f4" -> true)
  )
  db.write(point)
 
  val queryResult = db.query("SELECT * FROM measurement1")
  println(queryResult.row.mkString)
 
  db.drop()
}

Пример синхронного Java (блокирования)

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Use Influx at the provided URL
ReactiveInfluxConfig config = new JavaReactiveInfluxConfig(
  new URI("http://localhost:8086/"));
long awaitAtMostMillis = 30000;
try (SyncReactiveInflux reactiveInflux = new JavaSyncReactiveInflux(
  config, awaitAtMostMillis)) {
    SyncReactiveInfluxDb db = reactiveInflux.database("example1");
    db.create();
 
    Map tags = new HashMap<>();
    tags.put("t1", "A");
    tags.put("t2", "B");
 
    Map fields = new HashMap<>();
    fields.put("f1", 10.3);
    fields.put("f2", "x");
    fields.put("f3", -1);
    fields.put("f4", true);
 
    Point point = new JavaPoint(
        DateTime.now(),
        "measurement1",
        tags,
        fields
    );
    db.write(point);
 
    QueryResult queryResult = db.query("SELECT * FROM measurement1");
    System.out.println(queryResult.getRow().mkString());
 
    db.drop();
}

Пример Apache Spark Scala

01
02
03
04
05
06
07
08
09
10
11
val point1 = Point(
  time        = DateTime.now(),
  measurement = "measurement1",
  tags        = Map(
    "tagKey1" -> "tagValue1",
    "tagKey2" -> "tagValue2"),
  fields      = Map(
    "fieldKey1" -> "fieldValue1",
    "fieldKey2" -> 10.7)
)
sc.parallelize(Seq(point1)).saveToInflux()

Пример потоковой передачи Apache Spark

01
02
03
04
05
06
07
08
09
10
11
12
13
val point1 = Point(
  time        = DateTime.now(),
  measurement = "measurement1",
  tags        = Map(
    "tagKey1" -> "tagValue1",
    "tagKey2" -> "tagValue2"),
  fields      = Map(
    "fieldKey1" -> "fieldValue1",
    "fieldKey2" -> 10.7)
)
val queue = new mutable.Queue[RDD[Point]]
queue.enqueue(ssc.sparkContext.parallelize(Seq(point1)))
ssc.queueStream(queue).saveToInflux()

Пример Apache Spark Java

1
2
3
...
SparkInflux sparkInflux = new SparkInflux("example", 1000);
sparkInflux.saveToInflux(sc.parallelize(Collections.singletonList(point)));

Пример потоковой передачи Java Apache Spark

1
2
3
4
5
...
SparkInflux sparkInflux = new SparkInflux("example", 1000);
Queue> queue = new LinkedList<>();
queue.add(ssc.sparkContext().parallelize(Collections.singletonList(point)));
sparkInflux.saveToInflux(ssc.queueStream(queue));

Кредит Пигмалиосу

Высокотехнологичный стартап, базирующийся в Братиславе, Словакия, инвестирует в передовые технологии, чтобы обеспечить быстрый рост в области прогнозной розничной аналитики в реальном времени.