Статьи

Использование потокового API Twitter с помощью Spring Integration

1. Обзор

Известно, что Spring Integration имеет множество разъемов для взаимодействия с внешними системами. Twitter не был исключением и долгое время, так как Spring Social был готовым решением, которое Spring Integration использовала для подключения к социальным сетям.

1.1 Spring Social EOL

К сожалению, Spring Social достиг конца своей жизни , проект сейчас находится в режиме обслуживания. Причина, по которой Spring Team решила, что не будет дальше разрабатывать Spring Social, заключалась в том, что стало скучно поддерживать привязки API-интерфейса к API-интерфейсам социальных сетей.

Помимо этого, после выпуска Spring Framework 5 разработчики хотели использовать ее модель реактивного программирования, и для этого потребовалось бы команде повторно внедрить реактивные привязки Spring Social рядом с существующей.

Разработчикам теперь рекомендуется либо реализовать собственную привязку, либо использовать одну из специально созданных библиотек для подключения к социальным сетям.

1.2 Модуль Twitter для интеграции с Spring перенесен в расширения

Тот факт, что Spring Social теперь находится в режиме обслуживания, вынудил команду Spring Integration переместить модуль поддержки Twitter из основного проекта в расширения. Поскольку Spring Social не будет получать обновления, он будет построен на более ранней версии Spring Framework. Это может привести к конфликту пути класса и также помешает развитию Spring Integration.

Поэтому, начиная с Spring Integration 5.1, модуль Twitter доступен как расширение .

1.3 Какие есть альтернативы?

Twitter4J — это неофициальная библиотека Java для API Twitter, разработанная и поддерживаемая Юсуке Ямамото . Официальная библиотека HBC (созданная Twitter) — это HTTP-клиент Java для использования потокового API Twitter. Последний не видел крупных обновлений с 2016 года, в то время как Twitter4J регулярно получает обновления.

Реализация собственной привязки API также является опцией. В проектах на основе Spring RestTemplate, безусловно, является опцией, и это простой способ сделать вызовы REST.

Это руководство использует Twitter4J в потоковом режиме таким образом, чтобы его можно было интегрировать в поток сообщений Spring Integration.

1.4 Как работает Twitter Streaming?

Короче говоря, ваше приложение открывает единственное соединение с API Twitter, и новые результаты отправляются через это соединение всякий раз, когда происходят новые совпадения . Напротив, наоборот — доставка данных в пакетном режиме через повторяющиеся запросы к REST API.

Потоковая передача обеспечивает механизм доставки с низкой задержкой, который может поддерживать очень высокую пропускную способность, не имея дело с ограничением скорости.

2. Пример проекта

Пример проекта, который демонстрирует интеграцию потокового API Twitter в поток сообщений Spring Integration, доступен на GitHub : https://github.com/springuni/springuni-examples/tree/master/spring-integration/twitter-streaming .

2.1. Maven Зависимости

Поскольку Spring Social — это EOL, мы не будем опираться на это. Все, что мы привлекаем, это spring -gration- core и twitter4j-stream .

01
02
03
04
05
06
07
08
09
10
11
12
<dependencies>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>
 
    <dependency>
      <groupId>org.twitter4j</groupId>
      <artifactId>twitter4j-stream</artifactId>
      <version>4.0.1</version>
    </dependency>
  </dependencies>

В этом проекте также используется поддержка тестирования Lombok и Spring Boot , но это не обязательно.

2,3. Прослушиваемый источник сообщений с помощью Spring Integration

Spring Integration предоставляет поддержку для реализации компонентов входящих сообщений . Они делятся на опрос и прослушивание .

Оригинальный адаптер Inbound Twitter Channel Adapter , основанный на Spring Social и теперь перенесенный в расширения, является потребителем опроса . То есть вы должны предоставить конфигурацию опроса, чтобы использовать ее. С другой стороны, Twitter применяет ограничения скорости, чтобы управлять тем, как часто приложение может получать обновления. Вы должны были принять во внимание ограничение скорости при использовании старого адаптера канала Twitter, чтобы настроенные интервалы опроса соответствовали политикам Twitter.

С другой стороны, прослушивающие входящие компоненты проще и обычно требуют реализации только MessageProducerSupport . Такой компонент прослушивания выглядит следующим образом.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class MyMessageProducer extends MessageProducerSupport {
 
  public MyMessageProducer(MessageChannel outputChannel) {
    // Defining an output channel is required
    setOutputChannel(outputChannel);
  }
 
  @Override
  protected void onInit() {
    super.onInit();
    // Custom initialization - if applicable - comes here
  }
 
  @Override
  public void doStart() {
    // Lifecycle method for starting receiving messages
  }
 
  @Override
  public void doStop() {
    // Lifecycle method for stopping receiving messages
  }
 
  private void receiveMessage() {
    // Receive data from upstream service
    SomeData data = ...;
 
    // Convert it to a message as appropriate and send it out
    this.sendMessage(MessageBuilder.withPayload(data).build());
  }
 
}

Есть только два обязательных элемента:

  • Канал выходного сообщения должен быть определен
  • sendMessage должен вызываться всякий раз, когда компонент получает сообщение

При желании вы можете взять на себя управление инициализацией компонента и управлять его жизненным циклом.

Поскольку потоковый API Twitter по своей сути управляется сообщениями, поведение прослушивания вполне естественно. Давайте посмотрим, как Twitter4J может быть включен в такой контекст.

2,4. Подключитесь к Twitter Streaming API с помощью Twitter4J

Twitter4J управляет нюансами соединения, получая и получая обновления от потокового API Twitter. Все, что нам нужно сделать, это получить экземпляр TwitterStream , подключить слушателя и определить фильтрацию.

2.4.1. Создание TwitterStream

Потоковые примеры на сайте Twitter4J предполагают, что экземпляр TwitterStream должен быть создан через TwitterStreamFactory . Это имеет смысл, однако в контексте приложения Spring мы хотим, чтобы это был управляемый компонент.

FactoryBean — это простой и FactoryBean способ содержать подробности создания FactoryBean .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
public class TwitterStreamFactory extends AbstractFactoryBean<TwitterStream> {
 
  @Override
  public Class<?> getObjectType() {
    return TwitterStream.class;
  }
 
  @Override
  protected TwitterStream createInstance() {
    return new twitter4j.TwitterStreamFactory().getInstance();
  }
 
  @Override
  protected void destroyInstance(TwitterStream twitterStream) {
    twitterStream.shutdown();
  }
 
}

Хотя мы могли бы также представить его как обычный bean-компонент без создания FactoryBean , это не позаботилось бы о его правильном завершении.

2.4.2. Присоединение слушателя и определение фильтрации

Это будет обязанностью нашей пользовательской реализации MessageProducer .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Slf4j
public class TwitterMessageProducer extends MessageProducerSupport {
 
  private final TwitterStream twitterStream;
 
  private List<Long> follows;
  private List<String> terms;
 
  private StatusListener statusListener;
  private FilterQuery filterQuery;
 
  public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {
    this.twitterStream = twitterStream;
    setOutputChannel(outputChannel);
  }
 
  @Override
  protected void onInit() {
    super.onInit();
 
    statusListener = new StatusListener();
 
    long[] followsArray = null;
 
    if (!CollectionUtils.isEmpty(follows)) {
      followsArray = new long[follows.size()];
      for (int i = 0; i < follows.size(); i++) {
        followsArray[i] = follows.get(i);
      }
    }
 
    String[] termsArray = null;
    if (!CollectionUtils.isEmpty(terms)) {
      termsArray = terms.toArray(new String[0]);
    }
 
    filterQuery = new FilterQuery(0, followsArray, termsArray);
  }
 
  @Override
  public void doStart() {
    twitterStream.addListener(statusListener);
    twitterStream.filter(filterQuery);
  }
 
  @Override
  public void doStop() {
    twitterStream.cleanUp();
    twitterStream.clearListeners();
  }
 
  public void setFollows(List<Long> follows) {
    this.follows = follows;
  }
 
  public void setTerms(List<String> terms) {
    this.terms = terms;
  }
 
  StatusListener getStatusListener() {
    return statusListener;
  }
 
  FilterQuery getFilterQuery() {
    return filterQuery;
  }
 
  class StatusListener extends StatusAdapter {
 
    @Override
    public void onStatus(Status status) {
      sendMessage(MessageBuilder.withPayload(status).build());
    }
 
    @Override
    public void onException(Exception ex) {
      log.error(ex.getMessage(), ex);
    }
 
    @Override
    public void onStallWarning(StallWarning warning) {
      log.warn(warning.toString());
    }
 
  }
}

Методы жизненного цикла, предоставляемые MessageProducerSupport и интерфейсом управления TwitterStream, прекрасно сочетаются друг с другом. Это также позволит нам при необходимости остановить и запустить компонент во время выполнения.

2.4.3. Конфигурация Java

Хотя Spring может автоматически связывать компоненты, я все же предпочитаю управлять зависимостями с помощью ручной настройки.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
@Configuration
public class TwitterConfig {
 
  @Bean
  TwitterStreamFactory twitterStreamFactory() {
    return new TwitterStreamFactory();
  }
 
  @Bean
  TwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {
    return twitterStreamFactory.getInstance();
  }
 
  @Bean
  MessageChannel outputChannel() {
    return MessageChannels.direct().get();
  }
 
  @Bean
  TwitterMessageProducer twitterMessageProducer(
      TwitterStream twitterStream, MessageChannel outputChannel) {
 
    TwitterMessageProducer twitterMessageProducer =
        new TwitterMessageProducer(twitterStream, outputChannel);
 
    twitterMessageProducer.setTerms(Arrays.asList("java", "microservices", "spring"));
 
    return twitterMessageProducer;
  }
 
  @Bean
  IntegrationFlow twitterFlow(MessageChannel outputChannel) {
    return IntegrationFlows.from(outputChannel)
        .transform(Status::getText)
        .handle(m -> log.info(m.getPayload().toString()))
        .get();
  }
 
}

Важной частью здесь является то, как наш пользовательский производитель сообщений интегрируется с потоком сообщений. По сути, нам не нужно ничего делать, кроме перечисления сообщений на выходном канале производителя.

2.5. тестирование

Только Чак Норрис тестирует код в производстве. Тем не менее, обычные смертные люди, как вы и я, мы пишем тестовые случаи.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TestConfig.class)
public class TwitterMessageProducerTest {
 
  @MockBean
  private TwitterStream twitterStream;
 
  @Autowired
  private PollableChannel outputChannel;
 
  @Autowired
  private TwitterMessageProducer twitterMessageProducer;
 
  @Test
  public void shouldBeInitialized() {
    StatusListener statusListener = twitterMessageProducer.getStatusListener();
    verify(twitterStream).addListener(statusListener);
 
    FilterQuery filterQuery = twitterMessageProducer.getFilterQuery();
    verify(twitterStream).filter(filterQuery);
  }
 
  @Test
  public void shouldReceiveStatus() {
    StatusListener statusListener = twitterMessageProducer.getStatusListener();
 
    Status status = mock(Status.class);
    statusListener.onStatus(status);
 
    Message<?> statusMessage = outputChannel.receive();
    assertSame(status, statusMessage.getPayload());
  }
 
  @Import(TwitterConfig.class)
  static class TestConfig {
 
    @Bean
    MessageChannel outputChannel() {
      return MessageChannels.queue(1).get();
    }
 
  }
 
}

Мне нравится дизайн Twitter4J, потому что он использует интерфейсы. Большинство важных частей библиотеки представлены как обычные интерфейсы. TwitterStream является исключением из этого. То есть в тестовых случаях его можно легко смоделировать.

6. Заключение

  • Spring Social теперь EoL — новых возможностей не будет
  • Twitter-модуль Spring Integration доступен как расширение — он был удален из основного проекта.
  • Адаптер входящего канала Twitter является потребителем опроса — вам нужно иметь дело с ограничением скорости при выборе интервала опроса
  • API потоковой передачи Twitter соответствует поведению прослушивания адаптера входящего канала
См. Оригинальную статью здесь: Использование потокового API Twitter с помощью Spring Integration

Мнения, высказанные участниками Java Code Geeks, являются их собственными.