Статьи

RxJava: от будущего к наблюдаемому

Впервые я столкнулся с Reactive Extensions около 4 лет назад в блоге Мэтью Подвысоцкого, но потом почти ничего о нем не слышал, пока несколько недель назад не увидел выступления Мэтью в Code Mesh .

Похоже, что в последнее время популярность возросла, и я заметил, что теперь есть версия Java под названием RxJava, написанная Netflix .

Я решил попробовать, изменив некоторый код, который я написал, исследуя функцию MERGE на Cypher, чтобы представить Observable вместо Futures.

Напомним, у нас есть 50 потоков, и мы делаем 100 итераций, где создаем случайные пары (пользователь, событие). Мы создаем максимум 10 пользователей и 50 событий, и цель состоит в том, чтобы одновременно отправлять запросы на одни и те же пары.

В примере моего другого поста я отбрасывал результат каждого запроса, тогда как здесь я возвращал результат обратно, поэтому у меня было что подписаться.

Схема кода выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class MergeTimeRx
{
    public static void main( final String[] args ) throws InterruptedException, IOException
    {
        String pathToDb = "/tmp/foo";
        FileUtils.deleteRecursively( new File( pathToDb ) );
 
        GraphDatabaseService db = new GraphDatabaseFactory().newEmbeddedDatabase( pathToDb );
        final ExecutionEngine engine = new ExecutionEngine( db );
 
        int numberOfThreads = 50;
        int numberOfUsers = 10;
        int numberOfEvents = 50;
        int iterations = 100;
 
        Observable<ExecutionResult> events = processEvents( engine, numberOfUsers, numberOfEvents, numberOfThreads, iterations );
 
        events.subscribe( new Action1<ExecutionResult>()
        {
            @Override
            public void call( ExecutionResult result )
            {
                for ( Map<String, Object> row : result )
                {
                }
            }
        } );
 
        ....
    }
 
}

Хорошая вещь об использовании RxJava в том, что там нет упоминания о том, как мы получили нашу коллекцию ExecutionResult , это не важно. У нас просто есть их поток, и, вызывая функцию подписки в Observable, мы будем информироваться о том, когда будет доступна другая.

Большинство примеров, которые я нашел, показывают, как генерировать события из одного потока, но я хотел использовать пул потоков, чтобы можно было одновременно запускать множество запросов. Метод processEvents в итоге выглядел так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private static Observable<ExecutionResult> processEvents( final ExecutionEngine engine, final int numberOfUsers, final int numberOfEvents, final int numberOfThreads, final int iterations )
    {
        final Random random = new Random();
        final List<Integer> userIds = generateIds( numberOfUsers );
        final List<Integer> eventIds = generateIds( numberOfEvents );
 
        return Observable.create( new Observable.OnSubscribeFunc<ExecutionResult>()
        {
            @Override
            public Subscription onSubscribe( final Observer<? super ExecutionResult> observer )
            {
                final ExecutorService executor = Executors.newFixedThreadPool( numberOfThreads );
 
                List<Future<ExecutionResult>> jobs = new ArrayList<>();
                for ( int i = 0; i < iterations; i++ )
                {
                    Future<ExecutionResult> job = executor.submit( new Callable<ExecutionResult>()
                    {
                        @Override
                        public ExecutionResult call()
                        {
                            Integer userId = userIds.get( random.nextInt( numberOfUsers ) );
                            Integer eventId = eventIds.get( random.nextInt( numberOfEvents ) );
 
                            return engine.execute(
                                    "MERGE (u:User {id: {userId}})\n" +
                                    "MERGE (e:Event {id: {eventId}})\n" +
                                    "MERGE (u)-[:HAS_EVENT]->(e)\n" +
                                    "RETURN u, e",
                                    MapUtil.map( "userId", userId, "eventId", eventId ) );
                        }
                    } );
                    jobs.add( job );
                }
 
                for ( Future<ExecutionResult> future : jobs )
                {
                    try
                    {
                        observer.onNext( future.get() );
                    }
                    catch ( InterruptedException | ExecutionException ignored )
                    {
                    }
                }
 
                observer.onCompleted();
                executor.shutdown();
 
                return Subscriptions.empty();
            }
        } );
    }

Я не уверен, что это правильный способ использования Observable, поэтому, пожалуйста, дайте мне знать в комментариях, если я ошибаюсь.

Я не был уверен, каков правильный способ обработки ошибок. Изначально у меня был вызов наблюдателя # onError в блоке catch, но это означает, что больше не создаются события, которые были не тем, что я хотел.

Код доступен как суть, если вы хотите поиграть с ним. Я добавил следующую зависимость, чтобы получить библиотеку RxJava:

1
2
3
4
5
<dependency>
      <groupId>com.netflix.rxjava</groupId>
      <artifactId>rxjava-core</artifactId>
      <version>0.15.1</version>
    </dependency>

Ссылка: RxJava: От будущего к наблюдаемому от нашего партнера по JCG Марка Нидхэма в блоге Марка Нидхэма .