Escalamiento horizontal ACID de RDBMS con JEPLayer
lunes, marzo 19, 2012 at 11:38AM
jmarranz in JEPLayer, ORM, RDBMS, escalar, javaSE

Es posible que necesites la capacidad de las bases NoSQL de escalar horizontalmente, bien, estás de suerte, porque la necesidad de escalar suele ser un signo de éxito de un servicio, pero antes de cruzar el Rubicón, es conveniente intentar antes agotar las posibilidades de escalamiento de tu base de datos relacional de siempre.

La primera opción típica para escalar con una única instancia de RDBMS es añadir algún tipo de caché en memoria tal y como Ehcache o memcached. Esta opción es válida y popular, pero como cualquiera puede fácilmente imaginar, una caché en memoria es una versión muy pobre de una base de datos relacional respecto a lo que respecta a posibilidades de búsqueda, es más, es una versión pobre de una RDBMS en memoria tal y como HSQLDB. Una caché en memoria no va a ayudarte mucho cuando ejecutes cualquier clase de query SQL más allá de la típica búsqueda por clave primaria, por otra parte dicha caché exige un trabajo extra para mantener sincronizados ambos entornos, RDBMS y caché, por ejemplo la correcta sincronización de la caché respecto a un rollback de una transacción tras varias operaciones de escritura en la base de datos, es un problema muy tedioso que si no se realiza puede dejar la caché en un estado incoherente y con errores posteriores impredecibles (registros en la caché que realmente no existen en la base de datos), salvo que la propia caché sea participe también en la transacción.

Este tipo de problemas invitan a pensar sobre la viabilidad de reemplazar una caché en memoria con… una RDBMS actuando como caché. Los fabricantes de RDBMS no son tontos, cualquier base de datos relacional decente intentará cachear en memoria todo lo que pueda, por supuesto el acceso a una base de datos será significativamente más lento que una caché en memoria por ejemplo porque las escrituras se realizan en disco y porque hay por medio llamadas de red y transformación de datos, pero como vimos anteriormente una caché en memoria apenas es útil en operaciones de consulta de elementos por clave primaria por lo que cualquier cosa que mejore la escalabilidad de consultas SQL más complejas siempre será una mejora, por no hablar de que la transaccionalidad de las operaciones forma parte fundamental de cualquier RDBMS.

El razonamiento previo respecto al rendimiento es absurdo cuando estamos hablando de la típica combinación caché en memoria/RDBMS respecto a disponer solamente de la RDBMS, por supuesto una caché mejora el rendimiento, sin embargo no estamos hablando del rendimiento de un único nodo sino de escalar horizontalmente, es decir, ejecutar la misma aplicación en varios nodos, es en esta situación en la que las posibilidades de inconsistencia entre los nodos de la caché y las diferentes instancias de la RDBMS se multiplican, y aunque ciertamente cachés como EhCache pueden tener replicación síncrona, mantener la consistencia entre cachés y RDBMs en un rollback distribuido hace las cosas aún más complicadas.

Sí, mi propuesta es usar una RDBMS por servidor de aplicaciones/nodo como una forma de sofisticada caché distribuida en donde las operaciones de lectura tienen lugar en la RDMBS local y las de escritura se efectúan en todas las instancias de la RDBMS. Si no tienes problemas con el tamaño de la base de datos, es posible mantener varias RDBMSs con los mismos datos sin pérdida de las características relacionales y ACID como veremos más adelante (a pesar de que este enfoque podría soportar algún tipo de sharding).

Es obvio que surge la cuestión del coste de escribir simultáneamente a todas las instancias de RDBMS para que estén en sincronización, hay que tener en cuenta que en una aplicación típica:

  1. La proporción del número de lecturas es órdenes de magnitud más grande que el de escrituras.
  2. El tamaño de los datos que se leen en cada consulta y en conjunto es órdenes de magnitud más grande que los datos que se escriben en una operación y más aún en conjunto.

Las operaciones de escritura necesitarán más tiempo cuantos más nodos sean añadidos al cluster, las operaciones de escritura tendrán lugar secuencialmente pero podrían ser en paralelo si son soportadas, al mismo tiempo las lecturas son realizadas en el nodo local por lo que se obtiene un incremento lineal en las consultas a medida que se aumenta el número de nodos, debido en parte a que el moderno uso en las bases de datos relacionales de la técnica de “multiversión” en escritura, hace que éstas no sean bloqueantes respecto a las operaciones de lectura. Es el compromiso entre la mejora de la escalabilidad en lecturas y la degradación progresiva de las escrituras la que decidirá cuantos nodos podemos llegar a usar. Hay que tener en cuenta de que estamos hablando de escalabilidad horizontal y de mantener un sistema ACID siempre en sincronización de múltiples RDBMS, es decir las mismas características a las que estamos acostumbrados en nuestras aplicaciones transaccionales mono-RDBMS.

Por supuesto para cumplir con el compromiso ACID necesitamos JTA en el caso de Java.

JTA proporciona dos cosas:

  1. Operaciones atómicas en varias bases de datos
  2. Transacciones distribuidas, es decir propagación de la transacción entre varios nodos ya sea en el acceso a la misma base de datos o varias.

En nuestro caso solo nos interesa el punto 1).

Este enfoque es la alternativa manual a la replicación síncrona ofrecida por algunas bases de datos, es otra opción de hacer lo mismo con un mayor control, o bien es “la opción” cuando la base de datos en cuestión no ofrece replicación síncrona (por ejemplo MySQL). El motivo de escribir este artículo es la constatación de que es muy difícil de encontrar literatura que ilustre este enfoque manual usando JTA.

En nuestro ejemplo de “prueba de concepto” usaremos JEPLayer (http://code.google.com/p/jeplayer/), JEPLayer introduce una gestión muy sencilla de transacciones JTA en varias bases de datos (DataSources) muy útil para este tipo de uso.

JPELayer es un ORM de bajo nivel sobre JDBC creado para liberar a los programadores de las típicas tareas tediosas que implican el uso de JDBC y la demarcación de transacciones (transacciones JDBC y JTA emulando la semántica de las transacciones de JavaEE), con una programación estilo IoC (Inversion of Control) pero de una manera convencional a través de listeners registrados manualmente.

El siguiente código demostrativo es incompleto y está extraído de la distribución con código fuente de JEPLayer 1.0.1. Este ejemplo es una simulación del acceso de múltiples accesos concurrentes ejecutando aleatoriamente selects, inserciones y borrados. Las consultas SELECT son ejecutados siempre en la misma base de datos (la base de datos local), las inserciones y borrados son ejecutados en varias bases de datos dentro de la misma transacción local JTA y aleatoriamente simulando errores (excepciones) con el fin de forzar el rollback coordinado en todas las bases de datos por parte del proveedor de JTA. Ha sido testado con los proveedores JTA, JOTM y Atomikos (el JTA de GlassFish has sido probado en JEPLayer pero no en este ejemplo).   

Es necesario configurar varias instancias de bases de datos (en este ejemplo se usa MySQL) en diferentes nodos y ejecutar este código en cada nodo al mismo tiempo para simular una situación de alta concurrencia en todos los nodos del cluster.

Código extraído de la clase test.scaling.TestScalingJTA

    public void test(final TestScalingConf conf,final JEPLJTAMultipleDataSource jdsMgr,

                      final PersonDAOScalingTest[] personDaoArr) throws Exception

    {

        final int[] inserted = new int[1];

        final int[] deleted = new int[1];

        final int[] select = new int[1];

         int numberOfThreads = conf.getNumberOfThreads();

         Random randRoot = new Random();

        final Random[] randArr = new Random[numberOfThreads];

        for(int i = 0; i < numberOfThreads; i++)

            randArr[i] = new Random(randRoot.nextLong());

         Thread[] threadArray = new Thread[numberOfThreads];

         final boolean[] run = new boolean[]{false};

        for(int i = 0; i < threadArray.length; i++)

        {

            final int threadNumber = i;

            Thread thread = new Thread()

            {

                @Override

                public void run()

                {

                    while(!run[0]) Thread.yield();

                    try

                    {

                        executeActionsByThread(conf,jdsMgr, personDaoArr,

                                              randArr[threadNumber],inserted,deleted,select);

                    }

                    catch (Exception ex)   {  throw new RuntimeException(ex);   }

                }

            };

            thread.start();

            threadArray[i] = thread;

        }

         long start = System.currentTimeMillis();

         run[0] = true;

         for(int i = 0; i < threadArray.length; i++)

            threadArray[i].join();

         long end = System.currentTimeMillis();

        long lapse = end - start;

        System.out.println("LAPSE: " + lapse);

        System.out.println("INSERTED: " + inserted[0] + ", per second: " + (1000.0*inserted[0]/lapse));

        System.out.println("DELETED: " + deleted[0] + ", per second: " + (1000.0*deleted[0]/lapse));

        System.out.println("SELECTS: " + select[0] + ", per second: " + (1000.0*select[0]/lapse));

    }

 

    public void executeActionsByThread(TestScalingConf conf,final JEPLJTAMultipleDataSource jdsMgr,

                    final PersonDAOScalingTest[] personDaoArr,final Random rand,

                     final int[] inserted,final int[] deleted,final int[] select) throws Exception

    {

        int loopsPerRepetition = conf.getNumberOfLoopsEveryRepetition();

        final int masterDataSourceIndex =

                                  TestScalingJTAShared.getMasterDataSourceIndex(conf,personDaoArr);

        final int closerDataSourceIndex =

                                  TestScalingJTAShared.getCloserDataSourceIndex(conf,personDaoArr);

        int ratioSelectChange = conf.getRatioSelectChange();

        int ratioInsertDelete = conf.getRatioInsertDelete();

        final boolean testRollback = conf.getTestRollback();

         for(int loop = 0; loop < loopsPerRepetition; loop++)

        {

            int rndNum = rand.nextInt(ratioSelectChange);

            if (rndNum == 0)

            {

                int rndNumIns = rand.nextInt(ratioInsertDelete);

                if (rndNumIns == 0)

                {

                    JEPLTask task = new JEPLTask()

                    {

                        @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.REQUIRED)

                        public Object exec() throws Exception

                        {

                            int index = rand.nextInt(personDaoArr.length);

                            PersonDAOScalingTest dao = personDaoArr[index];

                            List<Person> list = dao.selectRangeOrderByIdDesc(0,1);

                            if (list.size() > 0)

                            {

                                Person person = list.get(0);

                                TestScalingJTAShared.deletePerson(masterDataSourceIndex,person,personDaoArr,testRollback,rand);

                                deleted[0]++;

                            }

                            return null;

                        }

                    };

                    try  {   jdsMgr.exec(task);    }

                    catch(JEPLException ex)

                    {

                        if (ex.getCause() == null || !ex.getCause().getMessage().startsWith("FALSE ERROR"))

                            throw new RuntimeException("Unexpected",ex);

                        else

                            System.out.println("EXPECTED ROLLBACK (DELETE)");

                    }

                }

                else

                {

                    JEPLTask task = new JEPLTask()

                    {

                        @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.REQUIRED)

                        public Object exec() throws Exception

                        {

                            TestScalingJTAShared.insertPerson(masterDataSourceIndex,personDaoArr,testRollback,rand);

                            inserted[0]++;

                            return null;

                        }

                    };

                     try  {   jdsMgr.exec(task);   }

                    catch(JEPLException ex)

                    {

                        if (ex.getCause() == null || !ex.getCause().getMessage().startsWith("FALSE ERROR"))

                            throw new RuntimeException("Unexpected",ex);

                        else

                            System.out.println("EXPECTED ROLLBACK (INSERT)");

                    }

                }

            }

            else

            {

                JEPLTask task = new JEPLTask()

                {

                    @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.NOT_SUPPORTED)

                    public Object exec() throws Exception

                    {

                        PersonDAOScalingTest dao = personDaoArr[closerDataSourceIndex];

                        dao.selectRangeOrderByIdDesc(0,50);

                        select[0]++;

                        return null;

                    }

                };

                jdsMgr.exec(task);

            }

        }

    }

El siguiente método extraído de TestScalingJTAShared muestra como insertamos en todas las bases de datos, las inserciones son ejecutadas secuencialmente. En este ejemplo hay una base de datos “master”, esto es debido a que se usa la generación automática de claves en MySQL y esta generación no es transaccional en MySQL, si tú generas tu propia clave primaria no es necesario que haya un master cuyo código de inserción sea ligeramente diferente al resto, ahora bien el orden de las operaciones en las diferentes bases de datos ha de ser el mismo en todos los nodos para evitar dead-locks debidos a bloqueos de escritura.

    public static Person insertPerson(int masterDSIndex,PersonDAOScalingTest[] personDaoArr,

                         boolean testRollback,Random rand)

    {

        Person person = new Person();

        person.setName("A Person object");

        person.setPhone("1111111");

        person.setEmail("hello@world.com");

        person.setAge(20);

 

        PersonDAOScalingTest dao = personDaoArr[masterDSIndex];

        dao.insertKeyGenerated(person);

 

        for(int i = 0; i < personDaoArr.length ; i++)

        {

            if (i == masterDSIndex) continue;

 

            if (testRollback && rand.nextInt(3) == 0)

                throw new RuntimeException("FALSE ERROR INSERT");

            PersonDAOScalingTest currDao = personDaoArr[i];

            currDao.insertKeyNotGenerated(person);

        }

         return person;

    }

El resultado es claro, un cluster ACID plenamente relacional salvo que se use algún tipo de particionamiento de datos (sharding), incremento lineal de la escalabilidad de operaciones de lectura y degradación lineal de las operaciones de escritura (aunque una opción a explorar sería el realizar las operaciones de escritura concurrentemente si el proveedor de JTA lo permite). Si la relación entre lecturas y escrituras es alta y el tiempo de cada escritura es aceptable se conseguirá un incremento de posibles usuarios concurrentes a medida que se añadan más nodos al cluster.   

¿Has intentado alguna vez algo así?

¿Cómo escalas tu RDBMS horizontalmente?

 

Article originally appeared on javaHispano (http://www.javahispano.org/).
See website for complete article licensing information.