Я рад объявить о первом выпуске ReactiveInflux, разработанного на Pygmalios . InfluxDB пропустил неблокирующий драйвер для Scala и Java . Неизменность, тестируемость и расширяемость являются ключевыми характеристиками ReactiveInflux. В сочетании с поддержкой Apache Spark это оружие выбора.
Он внутренне использует API-интерфейс Play Framework WS, который представляет собой богатый асинхронный HTTP-клиент, созданный поверх Async Http Client .
Особенности
- асинхронный (неблокирующий) интерфейс для Scala
- синхронный (блокирующий) интерфейс для Scala и Java
- поддерживает потоковую передачу Spark и Spark
- неизменность
- способность быть свидетелем в суде
- растяжимость
Совместимость
- InfluxDB 0.11, 0.10 и 0.9 (возможно, даже старше)
- Скала 2.11 и 2.10
- Java 7 и выше
- Apache Spark 1.4 и выше
Scala асинхронный (неблокирующий) пример
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
db.create().flatMap { _ = > val point = Point( time = DateTime.now(), measurement = "measurement1" , tags = Map( "t1" -> "A" , "t2" -> "B" ), fields = Map( "f1" -> 10.3 , "f2" -> "x" , "f3" -> - 1 , "f4" -> true ) ) db.write(point).flatMap { _ = > db.query( "SELECT * FROM measurement1" ).flatMap { queryResult = > println(queryResult.row.mkString) db.drop() } } } } |
Scala синхронный (блокирующий) пример
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
implicit val awaitAtMost = 10 .seconds db.create() val point = Point( time = DateTime.now(), measurement = "measurement1" , tags = Map( "t1" -> "A" , "t2" -> "B" ), fields = Map( "f1" -> 10.3 , "f2" -> "x" , "f3" -> - 1 , "f4" -> true ) ) db.write(point) val queryResult = db.query( "SELECT * FROM measurement1" ) println(queryResult.row.mkString) db.drop() } |
Пример синхронного Java (блокирования)
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
// Use Influx at the provided URL ReactiveInfluxConfig config = new JavaReactiveInfluxConfig( long awaitAtMostMillis = 30000 ; try (SyncReactiveInflux reactiveInflux = new JavaSyncReactiveInflux( config, awaitAtMostMillis)) { SyncReactiveInfluxDb db = reactiveInflux.database( "example1" ); db.create(); Map tags = new HashMap<>(); tags.put( "t1" , "A" ); tags.put( "t2" , "B" ); Map fields = new HashMap<>(); fields.put( "f1" , 10.3 ); fields.put( "f2" , "x" ); fields.put( "f3" , - 1 ); fields.put( "f4" , true ); Point point = new JavaPoint( DateTime.now(), "measurement1" , tags, fields ); db.write(point); QueryResult queryResult = db.query( "SELECT * FROM measurement1" ); System.out.println(queryResult.getRow().mkString()); db.drop(); } |
Пример Apache Spark Scala
01
02
03
04
05
06
07
08
09
10
11
|
val point 1 = Point( time = DateTime.now(), measurement = "measurement1" , tags = Map( "tagKey1" -> "tagValue1" , "tagKey2" -> "tagValue2" ), fields = Map( "fieldKey1" -> "fieldValue1" , "fieldKey2" -> 10.7 ) ) sc.parallelize(Seq(point 1 )).saveToInflux() |
Пример потоковой передачи Apache Spark
01
02
03
04
05
06
07
08
09
10
11
12
13
|
val point 1 = Point( time = DateTime.now(), measurement = "measurement1" , tags = Map( "tagKey1" -> "tagValue1" , "tagKey2" -> "tagValue2" ), fields = Map( "fieldKey1" -> "fieldValue1" , "fieldKey2" -> 10.7 ) ) val queue = new mutable.Queue[RDD[Point]] queue.enqueue(ssc.sparkContext.parallelize(Seq(point 1 ))) ssc.queueStream(queue).saveToInflux() |
Пример Apache Spark Java
1
2
3
|
... SparkInflux sparkInflux = new SparkInflux( "example" , 1000 ); sparkInflux.saveToInflux(sc.parallelize(Collections.singletonList(point))); |
Пример потоковой передачи Java Apache Spark
1
2
3
4
5
|
... SparkInflux sparkInflux = new SparkInflux( "example" , 1000 ); Queue> queue = new LinkedList<>(); queue.add(ssc.sparkContext().parallelize(Collections.singletonList(point))); sparkInflux.saveToInflux(ssc.queueStream(queue)); |
Кредит Пигмалиосу
Высокотехнологичный стартап, базирующийся в Братиславе, Словакия, инвестирует в передовые технологии, чтобы обеспечить быстрый рост в области прогнозной розничной аналитики в реальном времени.