Впервые я столкнулся с Reactive Extensions около 4 лет назад в блоге Мэтью Подвысоцкого, но потом почти ничего о нем не слышал, пока несколько недель назад не увидел выступления Мэтью в Code Mesh .
Похоже, что в последнее время популярность возросла, и я заметил, что теперь есть версия Java под названием RxJava, написанная Netflix .
Я решил попробовать, изменив некоторый код, который я написал, исследуя функцию MERGE на Cypher, чтобы представить Observable вместо Futures.
Напомним, у нас есть 50 потоков, и мы делаем 100 итераций, где создаем случайные пары (пользователь, событие). Мы создаем максимум 10 пользователей и 50 событий, и цель состоит в том, чтобы одновременно отправлять запросы на одни и те же пары.
В примере моего другого поста я отбрасывал результат каждого запроса, тогда как здесь я возвращал результат обратно, поэтому у меня было что подписаться.
Схема кода выглядит следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public class MergeTimeRx { public static void main( final String[] args ) throws InterruptedException, IOException { String pathToDb = "/tmp/foo" ; FileUtils.deleteRecursively( new File( pathToDb ) ); GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb ); final ExecutionEngine engine = new ExecutionEngine( db ); int numberOfThreads = 50 ; int numberOfUsers = 10 ; int numberOfEvents = 50 ; int iterations = 100 ; Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations ); events.subscribe( new Action1<ExecutionResult>() { @Override public void call( ExecutionResult result ) { for ( Map<String, Object> row : result ) { } } } ); .... } } |
Хорошая вещь об использовании RxJava в том, что там нет упоминания о том, как мы получили нашу коллекцию ExecutionResult , это не важно. У нас просто есть их поток, и, вызывая функцию подписки в Observable, мы будем информироваться о том, когда будет доступна другая.
Большинство примеров, которые я нашел, показывают, как генерировать события из одного потока, но я хотел использовать пул потоков, чтобы можно было одновременно запускать множество запросов. Метод processEvents в итоге выглядел так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations ) { final Random random = new Random(); final List<Integer> userIds = generateIds( numberOfUsers ); final List<Integer> eventIds = generateIds( numberOfEvents ); return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>() { @Override public Subscription onSubscribe( final Observer<? super ExecutionResult> observer ) { final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads ); List<Future<ExecutionResult>> jobs = new ArrayList<>(); for ( int i = 0 ; i < iterations; i++ ) { Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>() { @Override public ExecutionResult call() { Integer userId = userIds.get( random.nextInt( numberOfUsers ) ); Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) ); return engine.execute( "MERGE (u:User {id: {userId}})\n" + "MERGE (e:Event {id: {eventId}})\n" + "MERGE (u)-[:HAS_EVENT]->(e)\n" + "RETURN u, e" , MapUtil.map( "userId" , userId, "eventId" , eventId ) ); } } ); jobs.add( job ); } for ( Future<ExecutionResult> future : jobs ) { try { observer.onNext( future.get() ); } catch ( InterruptedException | ExecutionException ignored ) { } } observer.onCompleted(); executor.shutdown(); return Subscriptions.empty(); } } ); } |
Я не уверен, что это правильный способ использования Observable, поэтому, пожалуйста, дайте мне знать в комментариях, если я ошибаюсь.
Я не был уверен, каков правильный способ обработки ошибок. Изначально у меня был вызов наблюдателя # onError в блоке catch, но это означает, что больше не создаются события, которые были не тем, что я хотел.
Код доступен как суть, если вы хотите поиграть с ним. Я добавил следующую зависимость, чтобы получить библиотеку RxJava:
1
2
3
4
5
|
< dependency > < groupId >com.netflix.rxjava</ groupId > < artifactId >rxjava-core</ artifactId > < version >0.15.1</ version > </ dependency > |