Эта статья была создана в партнерстве с MongoDB . Спасибо за поддержку партнеров, которые делают возможным использование SitePoint.
MongoDB 4.0 добавляет поддержку транзакций ACID с несколькими документами.
Но подождите … Означает ли это, что MongoDB не поддерживал транзакции до сих пор?
Нет, на самом деле MongoDB всегда поддерживал транзакции в форме транзакций с одним документом. MongoDB 4.0 распространяет эти транзакционные гарантии на несколько документов, несколько выписок, несколько коллекций и несколько баз данных. Что хорошего в базе данных без какой-либо гарантии целостности транзакционных данных?
Прежде чем мы погрузимся в этот пост, вы можете найти весь код и попробовать транзакции ACID с несколькими документами здесь .
Быстрый старт
Шаг 1: Запустите MongoDB
Запустите один узел MongoDB ReplicaSet с минимальной версией 4.0.0 на локальном хосте, порт 27017.
Если вы используете Docker:
- Вы можете использовать
start-mongo.sh
.- Когда вы закончите, вы можете использовать
stop-mongo.sh
.- Если вы хотите подключиться к MongoDB с помощью оболочки Mongo, вы можете использовать
connect-mongo.sh
.Если вы предпочитаете запускать mongod вручную:
mkdir /tmp/data && mongod --dbpath /tmp/data --replSet rs
mongo --eval 'rs.initiate()'
Шаг 2: Запустите Java
Это демо содержит две основные программы:
ChangeStreams.java
иTransactions.java
.
- Change Steams позволяет вам получать уведомления о любых изменениях данных в коллекции или базе данных MongoDB.
- Процесс транзакции — это само демо.
Вам нужны две оболочки, чтобы запустить их.
Если вы используете Docker:
Первая оболочка:
./compile-docker.sh ./change-streams-docker.sh
Вторая оболочка:
./transactions-docker.sh
Если вы не используете Docker, вам необходимо установить Maven 3.5.X и JDK 10 (или минимум JDK 8, но вам нужно будет обновить версии Java в pom.xml):
Первая оболочка:
./compile.sh ./change-streams.sh
Вторая оболочка:
./transactions.sh
Давайте сравним наши существующие транзакции с одним документом с ACID-совместимыми транзакциями MongoDB 4.0 и посмотрим, как мы можем использовать эту новую функцию с Java.
До MongoDB 4.0
Даже в MongoDB 3.6 и более ранних версиях каждая операция записи представляется как транзакция, ограниченная уровнем отдельного документа на уровне хранилища. Поскольку модель документа объединяет связанные данные, которые иначе были бы смоделированы по отдельным таблицам типа «родители-потомки» в табличной схеме, атомарные операции с одним документом MongoDB обеспечивают семантику транзакций, которая удовлетворяет требованиям целостности данных большинства приложений.
Каждая типичная операция записи, модифицирующая несколько документов, фактически происходит в нескольких независимых транзакциях: по одной для каждого документа.
Давайте рассмотрим пример с очень простым приложением для управления запасами.
Прежде всего, мне нужен набор реплик MongoDB, поэтому, пожалуйста, следуйте инструкциям, приведенным выше, чтобы запустить MongoDB.
Теперь давайте вставим следующие документы в коллекцию product
:
MongoDB Enterprise rs:PRIMARY> db.product.insertMany([ { "_id" : "beer", "price" : NumberDecimal("3.75"), "stock" : NumberInt(5) }, { "_id" : "wine", "price" : NumberDecimal("7.5"), "stock" : NumberInt(3) } ])
Давайте представим, что есть распродажа, и мы хотим предложить нашим клиентам скидку 20% на все наши продукты.
Но прежде чем применять эту скидку, мы хотим отслеживать, когда эти операции происходят в MongoDB с потоками изменений.
Выполните следующее в Mongo Shell:
cursor = db.product.watch([{$match: {operationType: "update"}}]); while (!cursor.isExhausted()) { if (cursor.hasNext()) { print(tojson(cursor.next())); } }
Держите эту раковину сбоку, откройте другую раковину Монго и примените скидку:
PRIMARY> db.product.updateMany({}, {$mul: {price:0.8}}) { "acknowledged" : true, "matchedCount" : 2, "modifiedCount" : 2 } PRIMARY> db.product.find().pretty() { "_id" : "beer", "price" : NumberDecimal("3.00000000000000000"), "stock" : 5 } { "_id" : "wine", "price" : NumberDecimal("6.0000000000000000"), "stock" : 3 }
Как видите, оба документа были обновлены в одной командной строке, но не в одной транзакции.
Вот что мы можем видеть в оболочке Change Stream:
{ "_id" : { "_data" : "825B4637290000000129295A1004374DC58C611E4C8DA4E5EDE9CF309AC5463C5F6964003C62656572000004" }, "operationType" : "update", "clusterTime" : Timestamp(1531328297, 1), "ns" : { "db" : "test", "coll" : "product" }, "documentKey" : { "_id" : "beer" }, "updateDescription" : { "updatedFields" : { "price" : NumberDecimal("3.00000000000000000") }, "removedFields" : [ ] } } { "_id" : { "_data" : "825B4637290000000229295A1004374DC58C611E4C8DA4E5EDE9CF309AC5463C5F6964003C77696E65000004" }, "operationType" : "update", "clusterTime" : Timestamp(1531328297, 2), "ns" : { "db" : "test", "coll" : "product" }, "documentKey" : { "_id" : "wine" }, "updateDescription" : { "updatedFields" : { "price" : NumberDecimal("6.0000000000000000") }, "removedFields" : [ ] } }
Как вы можете видеть, время кластера (см. Ключ clusterTime
) для двух операций различно: операции clusterTime
в течение одной секунды, но счетчик отметки времени был увеличен на единицу.
Таким образом, здесь каждый документ обновляется по одному, и даже если это происходит очень быстро, кто-то другой может прочитать документы во время выполнения обновления и увидеть только один из двух продуктов со скидкой.
В большинстве случаев это то, что вы можете терпеть в своей базе данных MongoDB, потому что, насколько это возможно, мы стараемся встроить тесно связанные или связанные данные в один и тот же документ.
В результате два обновления одного и того же документа происходят в одной транзакции:
PRIMARY> db.product.update({_id: "wine"},{$inc: {stock:1}, $set: {description : "It's the best wine on Earth"}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 }) PRIMARY> db.product.findOne({_id: "wine"}) { "_id" : "wine", "price" : NumberDecimal("6.0000000000000000"), "stock" : 4, "description" : "It's the best wine on Earth" }
Тем не менее, иногда вы не можете смоделировать все связанные данные в одном документе, и есть много веских причин, чтобы не вставлять документы.
MongoDB 4.0 с многодокументными ACID-транзакциями
Многодокументарные ACID-транзакции в MongoDB очень похожи на то, что вы, вероятно, уже знаете из традиционных реляционных баз данных.
Транзакции MongoDB — это диалоговый набор связанных операций, которые должны атомарно фиксироваться или полностью откатываться при выполнении «все или ничего».
Транзакции используются для обеспечения атомарности операций даже в нескольких коллекциях или базах данных. Таким образом, при чтении изоляции моментального снимка другой пользователь может видеть только все операции или ни одну из них.
Давайте теперь добавим корзину покупок в наш пример.
Для этого примера требуется 2 коллекции, потому что мы имеем дело с 2 различными бизнес-объектами: управление запасами и корзина покупок, которую каждый клиент может создать во время покупок. Жизненный цикл каждого документа в этих коллекциях отличается.
Документ в коллекции продуктов представляет собой товар, который я продаю. Здесь указана текущая цена товара и текущий запас. Я создал POJO для его представления: Product.java.
{ "_id" : "beer", "price" : NumberDecimal("3"), "stock" : NumberInt(5) }
Корзина покупок создается, когда клиент добавляет свой первый товар в корзину, и удаляется, когда клиент переходит к оформлению заказа или покидает веб-сайт. Я создал POJO для его представления: Cart.java.
{ "_id" : "Alice", "items" : [ { "price" : NumberDecimal("3"), "productId" : "beer", "quantity" : NumberInt(2) } ] }
Проблема здесь заключается в том, что я не могу продать больше, чем имею: если у меня есть 5 пива на продажу, у меня не может быть более 5 сортов пива, распределенных по разным корзинам клиентов.
Чтобы убедиться в этом, я должен убедиться, что операция создания или обновления клиентской корзины является атомарной с обновлением запаса. Вот где в игру вступает многодокументная транзакция.
Транзакция должна завершиться неудачей в случае, если кто-то попытается купить то, чего у меня нет на складе Я добавлю ограничение на товарный запас:
db.createCollection("product", { validator: { $jsonSchema: { bsonType: "object", required: [ "_id", "price", "stock" ], properties: { _id: { bsonType: "string", description: "must be a string and is required" }, price: { bsonType: "decimal", minimum: 0, description: "must be a positive decimal and is required" }, stock: { bsonType: "int", minimum: 0, description: "must be a positive integer and is required" } } } } })
Узел, что это уже включено в код Java.
Для мониторинга нашего примера мы собираемся использовать потоки изменений MongoDB, которые были представлены в MongoDB 3.6.
В каждом из потоков этого процесса под названием ChangeStreams.java
я собираюсь отслеживать одну из 2 коллекций и печатать каждую операцию с соответствующим временем кластера.
// package and imports public class ChangeStreams { private static final Bson filterUpdate = Filters.eq("operationType", "update"); private static final Bson filterInsertUpdate = Filters.in("operationType", "insert", "update"); private static final String jsonSchema = "{ $jsonSchema: { bsonType: \"object\", required: [ \"_id\", \"price\", \"stock\" ], properties: { _id: { bsonType: \"string\", description: \"must be a string and is required\" }, price: { bsonType: \"decimal\", minimum: 0, description: \"must be a positive decimal and is required\" }, stock: { bsonType: \"int\", minimum: 0, description: \"must be a positive integer and is required\" } } } } "; public static void main(String[] args) { MongoDatabase db = initMongoDB(args[0]); MongoCollection<Cart> cartCollection = db.getCollection("cart", Cart.class); MongoCollection<Product> productCollection = db.getCollection("product", Product.class); ExecutorService executor = Executors.newFixedThreadPool(2); executor.submit(() -> watchChangeStream(productCollection, filterUpdate)); executor.submit(() -> watchChangeStream(cartCollection, filterInsertUpdate)); ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor(); scheduled.scheduleWithFixedDelay(System.out::println, 0, 1, TimeUnit.SECONDS); } private static void watchChangeStream(MongoCollection<?> collection, Bson filter) { System.out.println("Watching " + collection.getNamespace()); List<Bson> pipeline = Collections.singletonList(Aggregates.match(filter)); collection.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP) .forEach((Consumer<ChangeStreamDocument<?>>) doc -> System.out.println( doc.getClusterTime() + " => " + doc.getFullDocument())); } private static MongoDatabase initMongoDB(String mongodbURI) { getLogger("org.mongodb.driver").setLevel(Level.SEVERE); CodecRegistry providers = fromProviders(PojoCodecProvider.builder().register("com.mongodb.models").build()); CodecRegistry codecRegistry = fromRegistries(MongoClient.getDefaultCodecRegistry(), providers); MongoClientOptions.Builder options = new MongoClientOptions.Builder().codecRegistry(codecRegistry); MongoClientURI uri = new MongoClientURI(mongodbURI, options); MongoClient client = new MongoClient(uri); MongoDatabase db = client.getDatabase("test"); db.drop(); db.createCollection("cart"); db.createCollection("product", productJsonSchemaValidator()); return db; } private static CreateCollectionOptions productJsonSchemaValidator() { return new CreateCollectionOptions().validationOptions( new ValidationOptions().validationAction(ValidationAction.ERROR).validator(BsonDocument.parse(jsonSchema))); } }
В этом примере у нас есть 5 сортов пива на продажу.
Алиса хочет купить 2 пива, но мы не собираемся использовать для этого новые многодокументные транзакции MongoDB 4.0. Мы будем наблюдать в потоках изменений две операции: одну, создающую корзину, и одну, обновляющую склад в 2 разных времени кластера.
Затем Алиса добавляет еще 2 пива в свою корзину, и на этот раз мы собираемся использовать транзакцию. Результатом в потоке изменений будет 2 операции, происходящие в одно и то же время кластера.
Наконец, она попытается заказать 2 дополнительных пива, но валидатор jsonSchema не сможет обновить продукт и приведет к откату. Мы не увидим ничего в потоке изменений.
Вот исходный код Transaction.java
:
// package and import public class Transactions { private static MongoClient client; private static MongoCollection<Cart> cartCollection; private static MongoCollection<Product> productCollection; private final BigDecimal BEER_PRICE = BigDecimal.valueOf(3); private final String BEER_ID = "beer"; private final Bson stockUpdate = inc("stock", -2); private final Bson filterId = eq("_id", BEER_ID); private final Bson filterAlice = eq("_id", "Alice"); private final Bson matchBeer = elemMatch("items", eq("productId", "beer")); private final Bson incrementBeers = inc("items.$.quantity", 2); public static void main(String[] args) { initMongoDB(args[0]); new Transactions().demo(); } private static void initMongoDB(String mongodbURI) { getLogger("org.mongodb.driver").setLevel(Level.SEVERE); CodecRegistry codecRegistry = fromRegistries(MongoClient.getDefaultCodecRegistry(), fromProviders( PojoCodecProvider.builder().register("com.mongodb.models").build())); MongoClientOptions.Builder options = new MongoClientOptions.Builder().codecRegistry(codecRegistry); MongoClientURI uri = new MongoClientURI(mongodbURI, options); client = new MongoClient(uri); MongoDatabase db = client.getDatabase("test"); cartCollection = db.getCollection("cart", Cart.class); productCollection = db.getCollection("product", Product.class); } private void demo() { clearCollections(); insertProductBeer(); printDatabaseState(); System.out.println("######### NO TRANSACTION #########"); System.out.println("Alice wants 2 beers."); System.out.println("We have to create a cart in the 'cart' collection and update the stock in the 'product' collection."); System.out.println("The 2 actions are correlated but can not be executed on the same cluster time."); System.out.println("Any error blocking one operation could result in stock error or beer sale we don't own."); System.out.println("---------------------------------------------------------------------------"); aliceWantsTwoBeers(); sleep(); removingBeersFromStock(); System.out.println("####################################\n"); printDatabaseState(); sleep(); System.out.println("\n######### WITH TRANSACTION #########"); System.out.println("Alice wants 2 extra beers."); System.out.println("Now we can update the 2 collections simultaneously."); System.out.println("The 2 operations only happen when the transaction is committed."); System.out.println("---------------------------------------------------------------------------"); aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback(); sleep(); System.out.println("\n######### WITH TRANSACTION #########"); System.out.println("Alice wants 2 extra beers."); System.out.println("This time we do not have enough beers in stock so the transaction will rollback."); System.out.println("---------------------------------------------------------------------------"); aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback(); client.close(); } private void aliceWantsTwoExtraBeersInTransactionThenCommitOrRollback() { ClientSession session = client.startSession(); try { session.startTransaction(TransactionOptions.builder().writeConcern(WriteConcern.MAJORITY).build()); aliceWantsTwoExtraBeers(session); sleep(); removingBeerFromStock(session); session.commitTransaction(); } catch (MongoCommandException e) { session.abortTransaction(); System.out.println("####### ROLLBACK TRANSACTION #######"); } finally { session.close(); System.out.println("####################################\n"); printDatabaseState(); } } private void removingBeersFromStock() { System.out.println("Trying to update beer stock : -2 beers."); try { productCollection.updateOne(filterId, stockUpdate); } catch (MongoCommandException e) { System.out.println("##### MongoCommandException #####"); System.out.println("##### STOCK CANNOT BE NEGATIVE #####"); throw e; } } private void removingBeerFromStock(ClientSession session) { System.out.println("Trying to update beer stock : -2 beers."); try { productCollection.updateOne(session, filterId, stockUpdate); } catch (MongoCommandException e) { System.out.println("##### MongoCommandException #####"); System.out.println("##### STOCK CANNOT BE NEGATIVE #####"); throw e; } } private void aliceWantsTwoBeers() { System.out.println("Alice adds 2 beers in her cart."); cartCollection.insertOne(new Cart("Alice", Collections.singletonList(new Cart.Item(BEER_ID, 2, BEER_PRICE)))); } private void aliceWantsTwoExtraBeers(ClientSession session) { System.out.println("Updating Alice cart : adding 2 beers."); cartCollection.updateOne(session, and(filterAlice, matchBeer), incrementBeers); } private void insertProductBeer() { productCollection.insertOne(new Product(BEER_ID, 5, BEER_PRICE)); } private void clearCollections() { productCollection.deleteMany(new BsonDocument()); cartCollection.deleteMany(new BsonDocument()); } private void printDatabaseState() { System.out.println("Database state:"); printProducts(productCollection.find().into(new ArrayList<>())); printCarts(cartCollection.find().into(new ArrayList<>())); System.out.println(); } private void printProducts(List<Product> products) { products.forEach(System.out::println); } private void printCarts(List<Cart> carts) { if (carts.isEmpty()) System.out.println("No carts..."); else carts.forEach(System.out::println); } private void sleep() { System.out.println("Sleeping 3 seconds..."); try { Thread.sleep(3000); } catch (InterruptedException e) { System.err.println("Oups..."); e.printStackTrace(); } } }
Вот консоль потока изменений:
$ ./change-streams.sh Watching test.cart Watching test.product Timestamp{value=6570052721557110786, seconds=1529709604, inc=2} => Cart{id='Alice', items=[Item{productId=beer, quantity=2, price=3}]} Timestamp{value=6570052734442012673, seconds=1529709607, inc=1} => Product{id='beer', stock=3, price=3} Timestamp{value=6570052764506783745, seconds=1529709614, inc=1} => Product{id='beer', stock=1, price=3} Timestamp{value=6570052764506783745, seconds=1529709614, inc=1} => Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}
Как вы можете видеть здесь, мы получаем только четыре операции, потому что две последние операции никогда не были зафиксированы в базе данных, и поэтому потоку изменений нечего показывать.
Вы также можете заметить, что два первых времени кластера различаются, потому что мы не использовали транзакцию для двух первых операций, а две последние операции совместно используют одно и то же время кластера, потому что мы использовали новую систему многодокументных транзакций MongoDB 4.0, и, таким образом, они атомные.
Вот консоль Java-процесса Transaction, который суммирует все, что я сказал ранее.
$ ./transactions.sh Database state: Product{id='beer', stock=5, price=3} No carts... ######### NO TRANSACTION ######### Alice wants 2 beers. We have to create a cart in the 'cart' collection and update the stock in the 'product' collection. The 2 actions are correlated but can not be executed on the same cluster time. Any error blocking one operation could result in stock error or a sale of beer that we can't fulfill as we have no stock. --------------------------------------------------------------------------- Alice adds 2 beers in her cart. Sleeping 3 seconds... Trying to update beer stock : -2 beers. #################################### Database state: Product{id='beer', stock=3, price=3} Cart{id='Alice', items=[Item{productId=beer, quantity=2, price=3}]} Sleeping 3 seconds... ######### WITH TRANSACTION ######### Alice wants 2 extra beers. Now we can update the 2 collections simultaneously. The 2 operations only happen when the transaction is committed. --------------------------------------------------------------------------- Updating Alice cart : adding 2 beers. Sleeping 3 seconds... Trying to update beer stock : -2 beers. #################################### Database state: Product{id='beer', stock=1, price=3} Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]} Sleeping 3 seconds... ######### WITH TRANSACTION ######### Alice wants 2 extra beers. This time we do not have enough beers in stock so the transaction will rollback. --------------------------------------------------------------------------- Updating Alice cart : adding 2 beers. Sleeping 3 seconds... Trying to update beer stock : -2 beers. ##### MongoCommandException ##### ##### STOCK CANNOT BE NEGATIVE ##### ####### ROLLBACK TRANSACTION ####### #################################### Database state: Product{id='beer', stock=1, price=3} Cart{id='Alice', items=[Item{productId=beer, quantity=4, price=3}]}
Следующие шаги
Спасибо, что нашли время, чтобы прочитать мой пост — надеюсь, вы нашли его полезным и интересным.
Напоминаем, что весь код доступен в этом репозитории Github, чтобы вы могли поэкспериментировать.
Если вы ищете очень простой способ начать работу с MongoDB, вы можете сделать это всего за 5 кликов на нашей службе базы данных MongoDB Atlas в облаке.
Кроме того, многодокументные транзакции ACID — не единственная новая функция в MongoDB 4.0, поэтому не стесняйтесь взглянуть на наш бесплатный курс по MongoDB University M040: новые функции и инструменты в MongoDB 4.0 и наше руководство по новым возможностям в MongoDB 4.0, где Вы можете узнать больше о преобразованиях нативных типов, новых инструментах визуализации и аналитики и интеграции Kubernetes.