Статьи

Горизонтальное масштабирование RDBMS с JTA и JEPLayer

В наши дни мы привыкли слышать, насколько хороши базы данных NoSQL для горизонтального масштабирования, верно? Вам повезло, потому что необходимость масштабирования является признаком успешного обслуживания. Но прежде чем пересечь Рубикон , подумайте дважды: попробуйте заранее исчерпать возможности горизонтального масштабирования предпочитаемой вами СУБД.

Я не буду обсуждать, насколько расширяются базы данных NoSQL, поскольку все знают, что СУБД были с нами много лет, и еще много лет вперед. Для этого есть много причин, одна из них — это богатство языка запросов, которое очень трудно победить.

Первый типичный вариант масштабирования только с одним экземпляром RDBMS — это добавление некоторого кеша в памяти. Этот вариант может подойти, но, как вы можете легко понять, кэш в памяти — это версия СУБД для бедняков. Кроме того, это версия RDMBS для оперативной памяти для бедного человека. Кэш в памяти не очень поможет, когда вы выполняете какие-либо SQL-запросы, кроме типичного вида на основе первичного ключа, и кэш не избавляет вас от необходимости усердно работать над синхронизацией обеих сред, RDBMS и кеша. , Например, подумайте о проблеме отката транзакции, такой же вид отката должен выполняться в кеше, если в кеше не поддерживается такое же поведение транзакции .

Подобные вещи заставили меня задуматься об удобстве замены кэша в памяти на … СУБД. СУБД пытаются перехватить как можно больше в памяти, конечно, они намного медленнее, чем кэш в памяти, потому что операции записи сохраняются на диск, но вы цените его значение, когда любой SQL-запрос кэшируется бесплатно и транзакции работают нормально без стычек!

Предыдущие рассуждения абсурдны, когда речь идет о сравнении типичного кэша в памяти / RDBMS с прямым доступом к одной RDBMS. Конечно, в памяти / RDBMS работает лучше, независимо от того, сколько RDBMS кеширует. Но я говорю о горизонтальном масштабировании ACID, то есть запуске приложения на нескольких узлах. Это ситуация, когда использование кэша в памяти становится проблемой, если вы хотите синхронизировать все узлы, избегая при этом какого-либо несоответствия (я не говорю о типичных постоянных данных, которые никогда не будут изменены). Например, подумайте о проблеме синхронизации вашего кэша в памяти, когда происходит откат многострочной транзакции.

Использование вашей СУБД в качестве кеша

Да, мое предложение заключается в использовании СУБД для каждого сервера приложений / узла, как своего рода сложное кэширование. Если у вас нет проблем с размером базы данных, вы можете поддерживать несколько СУБД с одними и теми же данными без потери реляционных возможностей (несмотря на то, что этот подход может поддерживать некоторый тип шардинга).

Я дурак? Да … возможно … но мы говорим о горизонтальном масштабировании 🙂

Прежде чем поднять руку, чтобы спросить о проблеме записи в несколько СУБД (узлов) для синхронизации всех данных, я знаю об этой проблеме и Я уверен, что вы согласны со мной, что в типичном приложении:

  1. Чтение подавляющее чаще, чем пишет
  2. Записанные данные на порядки меньше, чем считанные данные

Таким образом, не возникает серьезных проблем при записи в несколько удаленных баз данных и при чтении в ближайшей (локальной) СУБД.

Операции записи занимают линейно больше времени, когда в облако добавляется больше узлов (если операции записи выполняются последовательно, если поддерживается, асинхронная запись может быть опцией), но в то же время пропускная способность чтения данных увеличивается линейно, поскольку операции чтения происходят локально (каждый узел читает данные в локальной РСУБД). В наши дни с мультиверсионными RDBM операции чтения практически не блокируются. Конечно, операции записи могут быть слишком дорогими, когда вы добавляете много узлов, хотя это зависит от ваших требований, и вам решать, когда этот тип масштабирования достигнет неприемлемого уровня производительности для записи данных. Я говорю о горизонтальных масштабируемых RDBM, когда-либо синхронизированных с использованием транзакций без потери реляционных возможностей, то есть с теми же гарантиями ACID одной RDBMS, но с несколькими экземплярами RDBMS.
  
Конечно, чтобы получить поведение ACID при записи в несколько баз данных, вам нужна JTA, и JTA предоставляет две вещи:

  1. атомарные операции над несколькими базами данных, используемыми в одном узле
  2. распределенные транзакции

Нам нужно использовать только первый.

Этот подход является просто ручной альтернативой синхронной автоматической репликации, предлагаемой некоторыми базами данных, это еще одна синхронная опция, если вы хотите полностью контролировать проблему масштабируемости или когда в вашей СУБД нет опции синхронной репликации (как в MySQL). Вам будет трудно найти литературу об этом ручном подходе, и это одна из причин этой статьи.

Некоторый код 

JEPLayer — это низкоуровневая ORM, созданная для того, чтобы избавить разработчиков от типичных утомительных задач JDBC и демаркации транзакций. Особое внимание уделяется транзакциям, включая транзакции JTA с той же семантикой, что и у JavaEE. JEPLayer обеспечивает уникальную поддержку JTA для нескольких баз данных (источников данных).

Следующий фрагмент кода взят из JEPLayer 1.0.1, распределение исходного кода, класс test.scaling.TestScalingJTA. Этот пример представляет собой симуляцию многих сильно параллельных запросов, выполняющих произвольный выбор, вставку и удаление. Запросы выбора выполняются в одной и той же базе данных (локальной базе данных), а операции вставки и удаления выполняются в одной и той же локальной транзакции JTA, случайным образом моделируя исключения для принудительного скоординированного отката провайдером JTA. Протестировано JOTM и Atomikosпоставщики автономных JTA, JTA в GlassFish, как сообщается, работают с JEPLayer, но в этом примере это не тестировалось.

Вам необходимо настроить несколько баз данных (в этом примере используется MySQL) на разных узлах и выполнить этот код на каждом узле одновременно, чтобы имитировать тяжелый параллелизм на нескольких узлах.

    public void test(final TestScalingConf conf,final JEPLJTAMultipleDataSource jdsMgr,final PersonDAOScalingTest[] personDaoArr) throws Exception
    {
        final int[] inserted = new int[1];
        final int[] deleted = new int[1];
        final int[] select = new int[1];

        int numberOfThreads = conf.getNumberOfThreads();

        Random randRoot = new Random();
        final Random[] randArr = new Random[numberOfThreads];
        for(int i = 0; i < numberOfThreads; i++)
            randArr[i] = new Random(randRoot.nextLong());

        Thread[] threadArray = new Thread[numberOfThreads];

        final boolean[] run = new boolean[]{false};
        for(int i = 0; i < threadArray.length; i++)
        {
            final int threadNumber = i;
            Thread thread = new Thread()
            {
                @Override
                public void run()
                {
                    while(!run[0]) Thread.yield();
                    try
                    {
                        executeActionsByThread(conf,jdsMgr, personDaoArr, randArr[threadNumber],inserted,deleted,select);
                    }
                    catch (Exception ex)
                    {
                        throw new RuntimeException(ex);
                    }
                }
            };
            thread.start();
            threadArray[i] = thread;
        }

        long start = System.currentTimeMillis();

        run[0] = true;

        for(int i = 0; i < threadArray.length; i++)
            threadArray[i].join();

        long end = System.currentTimeMillis();
        long lapse = end - start;
        System.out.println("LAPSE: " + lapse);
        System.out.println("INSERTED: " + inserted[0] + ", per second: " + (1000.0*inserted[0]/lapse));
        System.out.println("DELETED: " + deleted[0] + ", per second: " + (1000.0*deleted[0]/lapse));
        System.out.println("SELECTS: " + select[0] + ", per second: " + (1000.0*select[0]/lapse));
    }

    public void executeActionsByThread(TestScalingConf conf,final JEPLJTAMultipleDataSource jdsMgr,final PersonDAOScalingTest[] personDaoArr,final Random rand, final int[] inserted,final int[] deleted,final int[] select) throws Exception
    {
        int loopsPerRepetition = conf.getNumberOfLoopsEveryRepetition();
        final int masterDataSourceIndex = TestScalingJTAShared.getMasterDataSourceIndex(conf,personDaoArr);
        final int closerDataSourceIndex = TestScalingJTAShared.getCloserDataSourceIndex(conf,personDaoArr);
        int ratioSelectChange = conf.getRatioSelectChange();
        int ratioInsertDelete = conf.getRatioInsertDelete();
        final boolean testRollback = conf.getTestRollback();

        for(int loop = 0; loop < loopsPerRepetition; loop++)
        {
            int rndNum = rand.nextInt(ratioSelectChange);
            if (rndNum == 0)
            {
                int rndNumIns = rand.nextInt(ratioInsertDelete);
                if (rndNumIns == 0)
                {
                    JEPLTask task = new JEPLTask()
                    {
                        @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.REQUIRED)
                        public Object exec() throws Exception
                        {
                            int index = rand.nextInt(personDaoArr.length);
                            PersonDAOScalingTest dao = personDaoArr[index];
                            List<Person> list = dao.selectRangeOrderByIdDesc(0,1);
                            if (list.size() > 0)
                            {
                                Person person = list.get(0);
                                TestScalingJTAShared.deletePerson(masterDataSourceIndex,person,personDaoArr,testRollback,rand);
                                deleted[0]++;
                            }
                            return null;
                        }
                    };
                    try
                    {
                        jdsMgr.exec(task);
                    }
                    catch(JEPLException ex)
                    {
                        if (ex.getCause() == null || !ex.getCause().getMessage().startsWith("FALSE ERROR"))
                            throw new RuntimeException("Unexpected",ex);
                        else
                            System.out.println("EXPECTED ROLLBACK (DELETE)");
                    }
                }
                else
                {
                    JEPLTask task = new JEPLTask()
                    {
                        @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.REQUIRED)
                        public Object exec() throws Exception
                        {
                            TestScalingJTAShared.insertPerson(masterDataSourceIndex,personDaoArr,testRollback,rand);
                            inserted[0]++;
                            return null;
                        }
                    };

                    try
                    {
                        jdsMgr.exec(task);
                    }
                    catch(JEPLException ex)
                    {
                        if (ex.getCause() == null || !ex.getCause().getMessage().startsWith("FALSE ERROR"))
                            throw new RuntimeException("Unexpected",ex);
                        else
                            System.out.println("EXPECTED ROLLBACK (INSERT)");
                    }
                }
            }
            else
            {
                JEPLTask task = new JEPLTask()
                {
                    @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.NOT_SUPPORTED)
                    public Object exec() throws Exception
                    {
                        PersonDAOScalingTest dao = personDaoArr[closerDataSourceIndex];
                        dao.selectRangeOrderByIdDesc(0,50);
                        select[0]++;
                        return null;
                    }
                };
                jdsMgr.exec(task);
            }
        }
    }

Следующий метод TestScalingJTAShared показывает, как мы вставляем одни и те же данные во все базы данных, и как вставки выполняются последовательно. В этом примере есть база данных «master», и вставка в эту базу данных немного отличается, потому что в этой базе данных генерируется первичный ключ. Это связано с тем, что автоматическая генерация первичного ключа MySQL не является транзакционной в MySQL, поэтому, если вы сгенерируете свой собственный первичный ключ, мастера не будет, и код вставки будет позором. В любом случае порядок баз данных при вставке должен быть одинаковым во всех узлах, чтобы избежать тупиковых ситуаций.

    public static Person insertPerson(int masterDSIndex,PersonDAOScalingTest[] personDaoArr,boolean testRollback,Random rand)
    {
        Person person = new Person();
        person.setName("A Person object");
        person.setPhone("1111111");
        person.setEmail("hello@world.com");
        person.setAge(20);

        PersonDAOScalingTest dao = personDaoArr[masterDSIndex];
        dao.insertKeyGenerated(person);

        for(int i = 0; i < personDaoArr.length ; i++)
        {
            if (i == masterDSIndex) continue;

            if (testRollback && rand.nextInt(3) == 0)
                throw new RuntimeException("FALSE ERROR INSERT");
            PersonDAOScalingTest currDao = personDaoArr[i];
            currDao.insertKeyNotGenerated(person);
        }

        return person;
    }

Результат очевиден: ACID-совместимый кластер базы данных, линейное увеличение пропускной способности в выборках, линейное ухудшение записи, и, если соотношение выборок и записей высокое, вы получите общий выигрыш в производительности и масштабируемости.

Вы пробовали что-то подобное для горизонтального масштабирования вашей RDBMS?

Как вы масштабируете по горизонтали?