Я рад объявить о первом выпуске ReactiveInflux, разработанного на Pygmalios . InfluxDB пропустил неблокирующий драйвер для Scala и Java . Неизменность, тестируемость и расширяемость являются ключевыми характеристиками ReactiveInflux. В сочетании с поддержкой Apache Spark это оружие выбора.
Он внутренне использует API-интерфейс Play Framework WS, который представляет собой богатый асинхронный HTTP-клиент, созданный поверх Async Http Client .
Особенности
- асинхронный (неблокирующий) интерфейс для Scala
- синхронный (блокирующий) интерфейс для Scala и Java
- поддерживает потоковую передачу Spark и Spark
- неизменность
- способность быть свидетелем в суде
- растяжимость
Совместимость
- InfluxDB 0.11, 0.10 и 0.9 (возможно, даже старше)
- Скала 2.11 и 2.10
- Java 7 и выше
- Apache Spark 1.4 и выше
Scala асинхронный (неблокирующий) пример
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
db.create().flatMap { _ => val point = Point( time = DateTime.now(), measurement = "measurement1", tags = Map("t1" -> "A", "t2" -> "B"), fields = Map( "f1" -> 10.3, "f2" -> "x", "f3" -> -1, "f4" -> true) ) db.write(point).flatMap { _ => db.query("SELECT * FROM measurement1").flatMap { queryResult => println(queryResult.row.mkString) db.drop() } } }} |
Scala синхронный (блокирующий) пример
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
implicit val awaitAtMost = 10.seconds db.create() val point = Point( time = DateTime.now(), measurement = "measurement1", tags = Map("t1" -> "A", "t2" -> "B"), fields = Map( "f1" -> 10.3, "f2" -> "x", "f3" -> -1, "f4" -> true) ) db.write(point) val queryResult = db.query("SELECT * FROM measurement1") println(queryResult.row.mkString) db.drop()} |
Пример синхронного Java (блокирования)
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// Use Influx at the provided URLReactiveInfluxConfig config = new JavaReactiveInfluxConfig(long awaitAtMostMillis = 30000;try (SyncReactiveInflux reactiveInflux = new JavaSyncReactiveInflux( config, awaitAtMostMillis)) { SyncReactiveInfluxDb db = reactiveInflux.database("example1"); db.create(); Map tags = new HashMap<>(); tags.put("t1", "A"); tags.put("t2", "B"); Map fields = new HashMap<>(); fields.put("f1", 10.3); fields.put("f2", "x"); fields.put("f3", -1); fields.put("f4", true); Point point = new JavaPoint( DateTime.now(), "measurement1", tags, fields ); db.write(point); QueryResult queryResult = db.query("SELECT * FROM measurement1"); System.out.println(queryResult.getRow().mkString()); db.drop();} |
Пример Apache Spark Scala
|
01
02
03
04
05
06
07
08
09
10
11
|
val point1 = Point( time = DateTime.now(), measurement = "measurement1", tags = Map( "tagKey1" -> "tagValue1", "tagKey2" -> "tagValue2"), fields = Map( "fieldKey1" -> "fieldValue1", "fieldKey2" -> 10.7))sc.parallelize(Seq(point1)).saveToInflux() |
Пример потоковой передачи Apache Spark
|
01
02
03
04
05
06
07
08
09
10
11
12
13
|
val point1 = Point( time = DateTime.now(), measurement = "measurement1", tags = Map( "tagKey1" -> "tagValue1", "tagKey2" -> "tagValue2"), fields = Map( "fieldKey1" -> "fieldValue1", "fieldKey2" -> 10.7))val queue = new mutable.Queue[RDD[Point]]queue.enqueue(ssc.sparkContext.parallelize(Seq(point1)))ssc.queueStream(queue).saveToInflux() |
Пример Apache Spark Java
|
1
2
3
|
...SparkInflux sparkInflux = new SparkInflux("example", 1000);sparkInflux.saveToInflux(sc.parallelize(Collections.singletonList(point))); |
Пример потоковой передачи Java Apache Spark
|
1
2
3
4
5
|
...SparkInflux sparkInflux = new SparkInflux("example", 1000);Queue> queue = new LinkedList<>();queue.add(ssc.sparkContext().parallelize(Collections.singletonList(point)));sparkInflux.saveToInflux(ssc.queueStream(queue)); |
Кредит Пигмалиосу
Высокотехнологичный стартап, базирующийся в Братиславе, Словакия, инвестирует в передовые технологии, чтобы обеспечить быстрый рост в области прогнозной розничной аналитики в реальном времени.