Статьи

Путешествие к идемпотентности и временному разделению

Идемпотентность в HTTP означает, что один и тот же запрос может быть выполнен несколько раз с тем же эффектом, как если бы он был выполнен только один раз. Если вы замените текущее состояние какого-либо ресурса новым, независимо от того, сколько раз вы это сделаете, конечное состояние будет таким же, как если бы вы делали это только один раз. Чтобы привести более конкретный пример: удаление пользователя идемпотентно, потому что независимо от того, сколько раз вы удаляете данного пользователя по уникальному идентификатору, в конечном итоге этот пользователь будет удален. С другой стороны, создание нового пользователя не является идемпотентом, потому что запрос такой операции дважды создаст двух пользователей. В терминах HTTP вот что говорит RFC 2616: 9.1.2 Идемпотентные методы :

9.1.2. Идемпотентные методы

Методы также могут иметь свойство « идемпотентности », заключающееся в том, что […] побочные эффекты от N> 0 идентичных запросов такие же, как и для одного запроса. Методы GET, HEAD, PUT и DELETE разделяют это свойство. Кроме того, методы OPTIONS и TRACE НЕ ДОЛЖНЫ иметь побочных эффектов, и поэтому являются по своей сути идемпотентными.

Временная связь является нежелательным свойством системы, где правильное поведение неявно зависит от измерения времени. Проще говоря, это может означать, что, например, система работает только тогда, когда все компоненты присутствуют одновременно. Для блокировки обмена запросом-ответом (ReST, SOAP или любая другая форма RPC) требуется, чтобы и клиент, и сервер были доступны одновременно, что является примером такого эффекта.

Имея общее представление о том, что означают эти концепции, давайте рассмотрим простой пример — многопользовательская ролевая онлайн-игра . Наш искусственный сценарий использования следующий: игрок отправляет SMS с премиальным рейтингом, чтобы купить виртуальный меч внутри игры. Наш HTTP-шлюз вызывается при доставке SMS, и нам необходимо сообщить InventoryService , развернутому на другом компьютере. Текущий API включает в себя ReST и выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Slf4j
@RestController
class SmsController {
  
    private final RestOperations restOperations;
  
    @Autowired
    public SmsController(RestOperations restOperations) {
        this.restOperations = restOperations;
    }
  
    @RequestMapping(value = "/sms/{phoneNumber}", method = POST)
    public void handleSms(@PathVariable String phoneNumber) {
        Optional<Player> maybePlayer = phoneNumberToPlayer(phoneNumber);
        maybePlayer
                .map(Player::getId)
                .map(this::purchaseSword)
                .orElseThrow(() -> new IllegalArgumentException("Unknown player for phone number " + phoneNumber));
    }
  
    private long purchaseSword(long playerId) {
        Sword sword = new Sword();
        HttpEntity<String> entity = new HttpEntity<>(sword.toJson(), jsonHeaders());
        restOperations.postForObject(
            "http://inventory:8080/player/{playerId}/inventory",
            entity, Object.class, playerId);
        return playerId;
    }
  
    private HttpHeaders jsonHeaders() {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        return headers;
    }
  
    private Optional<Player> phoneNumberToPlayer(String phoneNumber) {
        //...
    }
}

Который в свою очередь генерирует запрос, подобный этому:

01
02
03
04
05
06
07
08
09
10
> POST /player/123123/inventory HTTP/1.1
> Host: inventory:8080
> Content-type: application/json
>
> {"type": "sword", "strength": 100, ...}
  
< HTTP/1.1 201 Created
< Content-Length: 75
< Content-Type: application/json;charset=UTF-8
< Location: http://inventory:8080/player/123123/inventory/1

Это довольно просто. SmsController просто пересылает соответствующие данные в inventory:8080 сервис inventory:8080 с помощью меча, который был куплен. Эта служба немедленно или через некоторое время возвращает 201 Created HTTP-ответ, подтверждающий, что операция прошла успешно. Кроме того, ссылка на ресурс создается и возвращается, так что вы можете запросить его. Можно сказать: ReST современное состояние. Однако, если вы хотя бы немного заботитесь о деньгах своих клиентов и понимаете, что такое ACID (чему еще нужно научиться при обмене биткойнов: см. [1] , [2] , [3] и [4] ) — этот API слишком хрупкий и подвержен ошибкам. Представьте себе все эти типы ошибок:

  1. Ваш запрос не дошел до сервера inventory
  2. ваш запрос достиг сервера, но отказался
  3. сервер принял соединение, но не смог прочитать запрос
  4. запрос на чтение сервера, но завис
  5. сервер обработал запрос, но не смог отправить ответ
  6. сервер отправил 200 OK ответа, но он был потерян, и вы так и не получили его
  7. ответ сервера получен, но клиент не смог его обработать
  8. ответ сервера отправлен, но время ожидания клиента ранее

Во всех этих случаях вы просто получаете исключение на стороне клиента и не знаете, каково состояние сервера. Технически вы должны повторить неудавшиеся запросы, но, поскольку POST не идемпотентен, вы можете в конечном итоге наградить игрока более чем одним мечом (в случаях 5-8). Но без повторов вы можете потерять деньги игрока, не дав ему его драгоценный артефакт. Должен быть лучший способ.

Превращение POST в идемпотентный PUT

В некоторых случаях на удивление просто перейти от POST к идемпотентному PUT, в основном перенеся генерацию идентификатора с сервера на клиент. В POST это был сервер, который сгенерировал ID меча и отправил его обратно клиенту в заголовке Location . Получается, что мы с нетерпением генерируем UUID на стороне клиента и немного меняем семантику, плюс достаточно соблюдения некоторых ограничений на стороне сервера:

01
02
03
04
05
06
07
08
09
10
11
12
13
private long purchaseSword(long playerId) {
    Sword sword = new Sword();
    UUID uuid = sword.getUuid();
    HttpEntity<String> entity = new HttpEntity<>(sword.toJson(), jsonHeaders());
    asyncRetryExecutor
            .withMaxRetries(10)
            .withExponentialBackoff(100, 2.0)
            .doWithRetry(ctx ->
                    restOperations.put(
                            "http://inventory:8080/player/{playerId}/inventory/{uuid}",
                            entity, playerId, uuid));
    return playerId;
}

API выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
> PUT /player/123123/inventory/45e74f80-b2fb-11e4-ab27-0800200c9a66 HTTP/1.1
> Host: inventory:8080
> Content-type: application/json;charset=UTF-8
>
> {"type": "sword", "strength": 100, ...}
  
< HTTP/1.1 201 Created
< Content-Length: 75
< Content-Type: application/json;charset=UTF-8
< Location: http://inventory:8080/player/123123/inventory/45e74f80-b2fb-11e4-ab27-0800200c9a66

Почему это так важно? Проще говоря (клиент не предназначен) теперь может повторять запрос PUT столько раз, сколько он хочет. Когда сервер получает PUT в первый раз, он сохраняет меч в базе данных с UUID, сгенерированным клиентом ( 45e74f80-b2fb-11e4-ab27-0800200c9a66 ) в качестве первичного ключа. В случае второй попытки PUT мы можем либо обновить, либо отклонить такой запрос. С POST это было невозможно, потому что каждый запрос рассматривался как покупка нового меча — теперь мы можем отследить, был ли такой PUT раньше или нет. Мы просто должны помнить, что последующий PUT — это не ошибка, это запрос на обновление:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@RestController
@Slf4j
public class InventoryController {
  
    private final PlayerRepository playerRepository;
  
    @Autowired
    public InventoryController(PlayerRepository playerRepository) {
        this.playerRepository = playerRepository;
    }
  
    @RequestMapping(value = "/player/{playerId}/inventory/{invId}", method = PUT)
    @Transactional
    public void addSword(@PathVariable UUID playerId, @PathVariable UUID invId) {
        playerRepository.findOne(playerId).addSwordWithId(invId);
    }
  
}
  
interface PlayerRepository extends JpaRepository<Player, UUID> {}
  
@lombok.Data
@lombok.AllArgsConstructor
@lombok.NoArgsConstructor
@Entity
class Sword {
  
    @Id
    @Convert(converter = UuidConverter.class)
    UUID id;
    int strength;
  
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof Sword)) return false;
        Sword sword = (Sword) o;
        return id.equals(sword.id);
  
    }
  
    @Override
    public int hashCode() {
        return id.hashCode();
    }
}
  
@Data
@Entity
class Player {
  
    @Id
    @Convert(converter = UuidConverter.class)
    UUID id = UUID.randomUUID();
  
    @OneToMany(cascade = ALL, fetch = EAGER)
    @JoinColumn(name="player_id")
    Set<Sword> swords = new HashSet<>();
  
    public Player addSwordWithId(UUID id) {
        swords.add(new Sword(id, 100));
        return this;
    }
  
}

В приведенном выше фрагменте кода было сделано несколько ярлыков, например, добавление хранилища непосредственно в контроллер, а также аннотирование с помощью @Transactional . Но ты получил идею. Также обратите внимание, что этот код довольно оптимистичен, если предположить, что два меча с одинаковым UUID не вставляются в одно и то же время. В противном случае возникнет исключение нарушения ограничения.

Примечание 1: Я использую тип UUID в моделях контроллера и JPA. Они не поддерживаются «из коробки», для JPA вам нужен специальный конвертер:

01
02
03
04
05
06
07
08
09
10
11
public class UuidConverter implements AttributeConverter<UUID, String> {
    @Override
    public String convertToDatabaseColumn(UUID attribute) {
        return attribute.toString();
    }
  
    @Override
    public UUID convertToEntityAttribute(String dbData) {
        return UUID.fromString(dbData);
    }
}

Аналогично для Spring MVC (только в одну сторону):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
@Bean
GenericConverter uuidConverter() {
    return new GenericConverter() {
        @Override
        public Set<ConvertiblePair> getConvertibleTypes() {
            return Collections.singleton(new ConvertiblePair(String.class, UUID.class));
        }
  
        @Override
        public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
            return UUID.fromString(source.toString());
        }
    };
}

Примечание 2: если вы не можете изменить клиента, вы можете отслеживать дубликаты, сохраняя хэш каждого запроса на стороне сервера. Таким образом, когда один и тот же запрос отправляется несколько раз (повторяется клиентом), он игнорируется. Однако иногда у нас может быть законный вариант использования для отправки одного и того же запроса дважды (например, покупка двух мечей в течение короткого периода времени).

Временная связь — недоступность клиента

Вы думаете, что вы умны, но PUT с повторными попытками недостаточно. Прежде всего, клиент может умереть при повторной попытке неудачных запросов. Если сервер серьезно поврежден или не работает, повторная попытка может занять минуты или даже часы. Вы не можете просто заблокировать ваш входящий HTTP-запрос только потому, что одна из ваших нижестоящих зависимостей не работает — вы должны обрабатывать такие запросы асинхронно в фоновом режиме — если это возможно. Но увеличение времени повторения увеличивает вероятность смерти или перезапуска клиента, что приведет к потере нашего запроса. Представьте, что мы получили премиальные SMS, но InventoryService в данный момент не работает. Мы можем повторить попытку через секунду, два, четыре и т. Д., Но что, если InventoryService работал в течение нескольких часов и случилось так, что наш сервис также был перезапущен? Мы только что потеряли это SMS и меч никогда не был передан геймеру.

Ответ на этот вопрос — сначала сохранить ожидающий запрос, а затем обработать его в фоновом режиме. При получении SMS мы едва храним идентификатор игрока в таблице базы данных, которая называется pending_purchases . Фоновый планировщик или событие запускает асинхронный поток, который будет собирать все ожидающие покупки и пытаться отправить их в InventoryService (возможно, даже в пакетном режиме?). Периодические пакетные потоки, работающие каждую минуту или даже секунду и собирающие все ожидающие запросы, неизбежно приведут к задержке и ненужным трафик базы данных. Таким образом, я собираюсь использовать планировщик Quartz, который будет планировать повторную работу для каждого ожидающего запроса:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
@RestController
class SmsController {
  
    private Scheduler scheduler;
  
    @Autowired
    public SmsController(Scheduler scheduler) {
        this.scheduler = scheduler;
    }
  
    @RequestMapping(value = "/sms/{phoneNumber}", method = POST)
    public void handleSms(@PathVariable String phoneNumber) {
        phoneNumberToPlayer(phoneNumber)
                .map(Player::getId)
                .map(this::purchaseSword)
                .orElseThrow(() -> new IllegalArgumentException("Unknown player for phone number " + phoneNumber));
    }
  
    private UUID purchaseSword(UUID playerId) {
        UUID swordId = UUID.randomUUID();
        InventoryAddJob.scheduleOn(scheduler, Duration.ZERO, playerId, swordId);
        return swordId;
    }
  
    //...
  
}

И сама работа

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Slf4j
public class InventoryAddJob implements Job {
  
    @Autowired private RestOperations restOperations;
    @lombok.Setter private UUID invId;
    @lombok.Setter private UUID playerId;
  
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        try {
            tryPurchase();
        } catch (Exception e) {
            Duration delay = Duration.ofSeconds(5);
            log.error("Can't add to inventory, will retry in {}", delay, e);
            scheduleOn(context.getScheduler(), delay, playerId, invId);
        }
    }
  
    private void tryPurchase() {
        restOperations.put(/*...*/);
    }
  
    public static void scheduleOn(Scheduler scheduler, Duration delay, UUID playerId, UUID invId) {
        try {
            JobDetail job = newJob()
                    .ofType(InventoryAddJob.class)
                    .usingJobData("playerId", playerId.toString())
                    .usingJobData("invId", invId.toString())
                    .build();
            Date runTimestamp = Date.from(Instant.now().plus(delay));
            Trigger trigger = newTrigger().startAt(runTimestamp).build();
            scheduler.scheduleJob(job, trigger);
        } catch (SchedulerException e) {
            throw new RuntimeException(e);
        }
    }
  
}

Каждый раз, когда мы получаем премиальные SMS, мы планируем немедленное выполнение асинхронной работы. Кварц позаботится о сохранении (если приложение закрывается, задание будет выполнено как можно скорее после перезапуска). Более того, если этот конкретный экземпляр выйдет из строя, другой может поднять это задание — или мы можем сформировать кластер и запросить балансировку нагрузки между ними: один экземпляр получает SMS, другой — меч в InventoryService . Очевидно, что если HTTP-вызов завершится неудачно, повторная попытка будет перенесена позже, все будет транзакционным и безопасным. В реальном коде вы, вероятно, добавили бы максимальный предел повторов, а также экспоненциальную задержку, но вы поняли идею.

Временная связь — клиент и сервер не могут встретиться

Наша борьба за правильное выполнение повторных попыток является признаком неясной временной связи между клиентом и сервером — они должны жить вместе в одно и то же время. Технически это не обязательно. Представьте, что геймер отправляет электронное письмо с заказом в службу поддержки клиентов, которое они обрабатывают в течение 48 часов, изменяя свой инвентарь вручную. То же самое можно применить к нашему случаю, но заменив почтовый сервер каким-либо брокером сообщений, например JMS:

1
2
3
4
5
6
7
8
9
@Bean
ActiveMQConnectionFactory activeMQConnectionFactory() {
    return new ActiveMQConnectionFactory("tcp://localhost:61616");
}
  
@Bean
JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
    return new JmsTemplate(connectionFactory);
}

Установив соединение ActiveMQ, мы можем просто отправить запрос на покупку брокеру:

1
2
3
4
5
6
7
8
9
private UUID purchaseSword(UUID playerId) {
    final Sword sword = new Sword(playerId);
    jmsTemplate.send("purchases", session -> {
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText(sword.toJson());
        return textMessage;
    });
    return sword.getUuid();
}

Полностью заменив протокол синхронного запроса-ответа на обмен сообщениями по теме JMS, мы временно отделим клиента от сервера. Им больше не нужно жить одновременно. Более того, более одного производителя и потребителя могут взаимодействовать друг с другом. Например, у вас может быть несколько каналов закупок и, что более важно, несколько заинтересованных сторон, а не только InventoryService . Еще лучше, если вы используете специализированную систему обмена сообщениями, такую ​​как Kafka, вы можете технически хранить дни (месяцы?) Сообщений без потери производительности. Преимущество заключается в том, что если вы добавите другого потребителя событий покупки в систему рядом с InventoryService он сразу же получит много исторических данных. Более того, теперь ваше приложение временно связано с брокером, поэтому, поскольку Kafka распространяется и реплицируется, в этом случае он работает лучше.

Недостатки асинхронного обмена сообщениями

Синхронный обмен данными, используемый в ReST, SOAP или любой форме RPC, прост для понимания и реализации. Кому небезразлична эта абстракция, просачивается с точки зрения латентности (локальный вызов метода обычно на несколько порядков быстрее по сравнению с удаленным, не говоря уже о том, что он может потерпеть неудачу по многим причинам, неизвестным локально), он быстро разрабатывается. Одним из настоящих предостережений при обмене сообщениями является канал обратной связи. Вы можете больше просто « отправить » (« вернуть ») сообщение обратно, так как нет ответного канала. Вам либо нужна очередь ответов с некоторым идентификатором корреляции, либо временные одноразовые очереди ответов на запрос. Также мы немного соврали, утверждая, что размещение посредника сообщений между двумя системами устраняет временную связь. Да, но теперь мы подключены к шине обмена сообщениями, которая также может отключиться, тем более что она часто находится под высокой нагрузкой, а иногда и не реплицируется должным образом.

В этой статье показаны некоторые проблемы и частичные решения для обеспечения гарантий в распределенных системах. Но, в конце концов, помните, что семантику « ровно один раз » практически невозможно реализовать, поэтому дважды проверьте, что они вам действительно нужны.