Статьи

Ахой, там обратные вызовы!

Потому что это моя сумка, мне нравится JavaScript . На самом деле, я полюбил асинхронный стиль программирования JavaScritp, ориентированный на обратный вызов . Следовательно, когда я оказываюсь в среде, отличной от JavaScript, например, в Java , я склонен пропускать обратные вызовы. Перезвоните

Хорошей новостью является то, что вы можете эмулировать асинхронные обратные вызовы в Java. Фактически, я сделал это недавно с библиотекой, которую я назвал Ахой! , который является асинхронным адаптером SQS для библиотеки AWS Java SQS .

Для непосвященных SQS представляет собой облачную платформу обмена сообщениями — с SQS вы можете создавать очереди и помещать сообщения в эти очереди, которые затем могут быть прочитаны — позже или немедленно каким-либо другим процессом или таким же точным процессом. Все это использует чрезвычайно избыточную архитектуру Amazon для обеспечения чрезвычайно высокой доступности при одновременном доступе.

Асинхронные обратные вызовы в Java могут быть реализованы с помощью двух функций: анонимных классов (содержащих один метод) и пакета Java java.util.concurrent .

Поскольку Java не позволяет вам легко передавать функции (или методы) в качестве параметра для имитации обратного вызова, вы можете создать интерфейс, содержащий один метод, который в основном имитирует функцию. В случае Ahoy есть два интерфейса: MessageSendCallback и MessageReceivedCallback — оба имеют один метод: onSend и onReceive соответственно. Соответственно, основной класс Ahoy!, SQSAdapter название SQSAdapter предоставляет два простых метода: send и receive и оба принимают соответствующий интерфейс обратного вызова.

Самый простой обратный вызов для понимания — это метод receive . Как вы можете себе представить, receive предназначено для обработки поведения, когда сообщение получено из определенной очереди. Таким образом, метод receive определяется следующим образом:

SQSAdapter метод получения

1
public void receive(final MessageReceivedCallback callback) {}

Интерфейс MessageReceivedCallback выглядит следующим образом:

Интерфейс MessageReceivedCallback

1
2
3
public interface MessageReceivedCallback {
    public void onReceive(String messageId, String message);
}

Обратите внимание, что метод onReceive принимает идентификатор сообщения (который относится к SQS) и само сообщение, которое в случае SQS всегда является String (эта String может содержать все, что вы хотите, имейте в виду: JSON, XML, последовательность байтов , и т.д).

Таким образом, клиенты Ахой! обеспечить предполагаемое поведение сообщения при его получении. Такое поведение может быть для записи чего-либо в базу данных, создания другого сообщения и отправки его в другую очередь, вы называете это.

Теперь интересная часть — реализация метода receive Ahoy! Для достижения асинхронности я использовал Java-пакет java.util.concurrent , который, к сожалению, кажется недооцененным.

Реализация метода приема с обратным вызовом

01
02
03
04
05
06
07
08
09
10
11
12
13
14
private void receive(final AmazonSQS sqs, final String queueURL, final MessageReceivedCallback callback) {
  pool.execute(new Runnable() {
    public void run() {
      final List<Message> messages = sqs.receiveMessage(
              new ReceiveMessageRequest(queueURL).withMaxNumberOfMessages(10).withWaitTimeSeconds(20)).getMessages();
      if (messages.size() > 0) {
          for (final Message message : messages) {
            callback.onReceive(message.getMessageId(), message.getBody());
            sqs.deleteMessage(new DeleteMessageRequest(queueURL, message.getReceiptHandle()));
          }
      }
    }
  });
}

При фиксированном пуле потоков создается поток, который ожидает поступления сообщений в определенную очередь; когда он появляется, переданный в MessageReceivedCalledback вызывается для каждого сообщения.

Для примера того, как это работает для клиентов Ahoy !, вот тестовый пример, который проверяет выполнение обратного вызова:

Метод получения реализован

1
2
3
4
5
6
7
8
final boolean[] wasReceived = {false};
ahoy.receive(new MessageReceivedCallback() {
  public void onReceive(String messageId, String message) {
    wasReceived[0] = true;
    assertNotNull("message id was null", messageId);
    assertEquals("message wasn't " + origMessage, origMessage, message);
  }
});

Аналогично, отправка сообщения аналогична — создается новый экземпляр Runnable , который отправляет конкретное сообщение и вызывает переданный в MessageSentCallback onSend , передавая идентификатор вновь отправленных сообщений.

Метод отправки также является асинхронным

01
02
03
04
05
06
07
08
09
10
private void send(final AmazonSQS sqs, final String queueURL, final String message, final MessageSentCallback callback) {
  pool.execute(new Runnable() {
    public void run() {
      SendMessageResult res = sqs.sendMessage(new SendMessageRequest(queueURL, message));
      if (callback != null) {
        callback.onSend(res.getMessageId());
      }
    }
  });
}

Кстати, AWS Java SDK предоставляет асинхронный клиент ; однако реализация этого клиента использует Futures в Java. В то время как Futures — это опрятная концепция , реализация Ahoy! Более удобна ( по крайней мере для меня и в том, как я использую SQS ), чем Futures, так как после отправки или получения сообщения не происходит никакого опроса.

Хотя обратные вызовы не обязательно изначально поддерживаются в Java, вы можете довольно эмулировать их и достичь того же уровня краткости кода, что и в JavaScript. И если вам нужен удобный способ взаимодействия с AWS SQS, тогда дайте Ahoy! попробовать ! Ты можешь копать это, чувак?

Ссылка: Ahoy There Callbacks! от нашего партнера JCG Эндрю Гловера в блоге The Disco Blog .