Статьи

Как исправить условия гонки с оптимистической блокировкой с помощью пессимистической блокировки

резюмировать

В моем предыдущем посте я объяснил преимущества использования явной оптимистической блокировки . Как мы тогда обнаружили, существует очень короткое временное окно, в котором параллельная транзакция может все еще зафиксировать изменение цены Продукта непосредственно перед тем, как наша текущая транзакция будет зафиксирована.

Эта проблема может быть изображена следующим образом:

explicitlockinglockmodeoptimisticracecondition

  • Алиса получает продукт
  • Затем она решает заказать его
  • Продукт оптимистический замок приобретен
  • Заказ добавляется в текущий сеанс базы данных транзакций.
  • Версия продукта проверяется процедурой явной оптимистической блокировки Hibernate
  • Механизм цен управляет изменением цены Продукта.
  • Сделка Алисы совершается без осознания того, что цена Продукта только что изменилась

Репликация вопроса

Поэтому нам нужен способ внедрить изменение цены Продукта между оптимистической проверкой блокировки и фиксацией транзакции заказа.

Проанализировав исходный код Hibernate, мы обнаруживаем, что метод SessionImpl.beforeTransactionCompletion () вызывает текущий настроенный обратный вызов Interceptor.beforeTransactionCompletion () сразу после внутреннего обработчика этапа actionQueue (где проверяется явная оптимистическая версия заблокированной сущности):

01
02
03
04
05
06
07
08
09
10
public void beforeTransactionCompletion(TransactionImplementor hibernateTransaction) {
    LOG.trace( "before transaction completion" );
    actionQueue.beforeTransactionCompletion();
    try {
        interceptor.beforeTransactionCompletion( hibernateTransaction );
    }
    catch (Throwable t) {
        LOG.exceptionInBeforeTransactionCompletionInterceptor( t );
    }
}

Вооружившись этой информацией, мы можем настроить тест, чтобы повторить наше состояние гонки:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private AtomicBoolean ready = new AtomicBoolean();
private final CountDownLatch endLatch = new CountDownLatch(1);
 
@Override
protected Interceptor interceptor() {
    return new EmptyInterceptor() {
        @Override
        public void beforeTransactionCompletion(Transaction tx) {
            if(ready.get()) {
                LOGGER.info("Overwrite product price asynchronously");
 
                executeNoWait(new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        Session _session = getSessionFactory().openSession();
                        _session.doWork(new Work() {
                            @Override
                            public void execute(Connection connection) throws SQLException {
                                try(PreparedStatement ps = connection.prepareStatement("UPDATE product set price = 14.49 WHERE id = 1")) {
                                    ps.executeUpdate();
                                }
                            }
                        });
                        _session.close();
                        endLatch.countDown();
                        return null;
                    }
                });
                try {
                    LOGGER.info("Wait 500 ms for lock to be acquired!");
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }
        }
    };
}
 
@Test
public void testExplicitOptimisticLocking() throws InterruptedException {
    try {
        doInTransaction(new TransactionCallable<Void>() {
            @Override
            public Void execute(Session session) {
                try {
                    final Product product = (Product) session.get(Product.class, 1L, new LockOptions(LockMode.OPTIMISTIC));
                    OrderLine orderLine = new OrderLine(product);
                    session.persist(orderLine);
                    lockUpgrade(session, product);
                    ready.set(true);
                } catch (Exception e) {
                    throw new IllegalStateException(e);
                }
                return null;
            }
        });
    } catch (OptimisticEntityLockException expected) {
        LOGGER.info("Failure: ", expected);
    }
    endLatch.await();
}
 
protected void lockUpgrade(Session session, Product product) {}

При запуске теста генерируется следующий вывод:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
#Alice selects a Product
DEBUG [main]: Query:{[select abstractlo0_.id as id1_1_0_, abstractlo0_.description as descript2_1_0_, abstractlo0_.price as price3_1_0_, abstractlo0_.version as version4_1_0_ from product abstractlo0_ where abstractlo0_.id=?][1]}
 
#Alice inserts an OrderLine
DEBUG [main]: Query:{[insert into order_line (id, product_id, unitPrice, version) values (default, ?, ?, ?)][1,12.99,0]}
#Alice transaction verifies the Product version
DEBUG [main]: Query:{[select version from product where id =?][1]}
 
#The price engine thread is started
INFO  [main]: c.v.h.m.l.c.LockModeOptimisticRaceConditionTest - Overwrite product price asynchronously
#Alice thread sleeps for 500ms to give the price engine a chance to execute its transaction
INFO  [main]: c.v.h.m.l.c.LockModeOptimisticRaceConditionTest - Wait 500 ms for lock to be acquired!
 
#The price engine changes the Product price
DEBUG [pool-1-thread-1]: Query:{[UPDATE product set price = 14.49 WHERE id = 1][]}
 
#Alice transaction is committed without realizing the Product price change
DEBUG [main]: o.h.e.t.i.j.JdbcTransaction - committed JDBC Connection

Итак, условия гонки реальны. Вы сами решаете, требует ли ваше текущее приложение более строгих требований к целостности данных, но, как правило, лучше, чем потом сожалеть.

Исправление проблемы

Чтобы исправить эту проблему, нам просто нужно добавить запрос на пессимистическую блокировку непосредственно перед завершением нашего транзакционного метода.

1
2
3
4
@Override
protected void lockUpgrade(Session session, Product product) {
    session.buildLockRequest(new LockOptions(LockMode.PESSIMISTIC_READ)).lock(product);
}

Явная общая блокировка предотвратит одновременную запись на объект, который мы ранее оптимистически заблокировали. При использовании этого метода никакая другая параллельная транзакция не может изменить Продукт до снятия этой блокировки (после того, как текущая транзакция зафиксирована или откатана).

explicitlockinglockmodeoptimisticraceconditionfix

С новым запросом на пессимистическую блокировку предыдущий тест генерирует следующий вывод:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
#Alice selects a Product
DEBUG [main]: Query:{[select abstractlo0_.id as id1_1_0_, abstractlo0_.description as descript2_1_0_, abstractlo0_.price as price3_1_0_, abstractlo0_.version as version4_1_0_ from product abstractlo0_ where abstractlo0_.id=?][1]}
 
#Alice inserts an OrderLine
DEBUG [main]: Query:{[insert into order_line (id, product_id, unitPrice, version) values (default, ?, ?, ?)][1,12.99,0]}
 
#Alice applies an explicit physical lock on the Product entity
DEBUG [main]: Query:{[select id from product where id =? and version =? for update][1,0]}
 
#Alice transaction verifies the Product version
DEBUG [main]: Query:{[select version from product where id =?][1]}
 
#The price engine thread is started
INFO  [main]: c.v.h.m.l.c.LockModeOptimisticRaceConditionTest - Overwrite product price asynchronously
#Alice thread sleeps for 500ms to give the price engine a chance to execute its transaction
INFO  [main]: c.v.h.m.l.c.LockModeOptimisticRaceConditionTest - Wait 500 ms for lock to be acquired!
 
#The price engine cannot proceed because of the Product entity was locked exclusively, so Alice transaction is committed against the ordered Product price
DEBUG [main]: o.h.e.t.i.j.JdbcTransaction - committed JDBC Connection
 
#The physical lock is released and the price engine can change the Product price
DEBUG [pool-1-thread-1]: Query:{[UPDATE product set price = 14.49 WHERE id = 1][]}

Даже несмотря на то, что мы запросили блокировку PESSIMISTIC_READ , HSQLDB может вместо этого выполнить только эксклюзивную блокировку FOR UPDATE, что эквивалентно явному режиму блокировки PESSIMISTIC_WRITE .

Вывод

Если вы удивляетесь, почему мы используем как оптимистическую, так и пессимистическую блокировку для нашей текущей транзакции, вы должны помнить, что оптимистическая блокировка является единственным возможным механизмом управления параллелизмом для диалогов с несколькими запросами .

В нашем примере сущность Product загружается по первому запросу с использованием транзакции только для чтения. У сущности Product есть связанная версия, и этот снимок сущности во время чтения будет оптимистически заблокирован во время транзакции во время записи.

Пессимистическая блокировка полезна только во время транзакции во время записи, чтобы предотвратить одновременное обновление после проверки версии объекта Product. Таким образом, и логическая блокировка, и физическая блокировка взаимодействуют для обеспечения целостности данных о цене заказа.

Пока я работал над этим сообщением в блоге, чемпион по Java Маркус Эйзел взял у меня интервью об инициативе Hibernate Master Class . Во время интервью я пытался объяснить текущие примеры постов, подчеркивая истинную важность знания ваших инструментов помимо справочной документации.

  • Код доступен на GitHub .