Buscar
Social
Ofertas laborales ES
« La semana pasada en javaHispano | Main | JavaHispano Podcast - 138 - Introducción a Liferay (Entrevista a Jorge Ferrer) »
lunes
mar192012

Escalamiento horizontal ACID de RDBMS con JEPLayer

Es posible que necesites la capacidad de las bases NoSQL de escalar horizontalmente, bien, estás de suerte, porque la necesidad de escalar suele ser un signo de éxito de un servicio, pero antes de cruzar el Rubicón, es conveniente intentar antes agotar las posibilidades de escalamiento de tu base de datos relacional de siempre.

La primera opción típica para escalar con una única instancia de RDBMS es añadir algún tipo de caché en memoria tal y como Ehcache o memcached. Esta opción es válida y popular, pero como cualquiera puede fácilmente imaginar, una caché en memoria es una versión muy pobre de una base de datos relacional respecto a lo que respecta a posibilidades de búsqueda, es más, es una versión pobre de una RDBMS en memoria tal y como HSQLDB. Una caché en memoria no va a ayudarte mucho cuando ejecutes cualquier clase de query SQL más allá de la típica búsqueda por clave primaria, por otra parte dicha caché exige un trabajo extra para mantener sincronizados ambos entornos, RDBMS y caché, por ejemplo la correcta sincronización de la caché respecto a un rollback de una transacción tras varias operaciones de escritura en la base de datos, es un problema muy tedioso que si no se realiza puede dejar la caché en un estado incoherente y con errores posteriores impredecibles (registros en la caché que realmente no existen en la base de datos), salvo que la propia caché sea participe también en la transacción.

Este tipo de problemas invitan a pensar sobre la viabilidad de reemplazar una caché en memoria con… una RDBMS actuando como caché. Los fabricantes de RDBMS no son tontos, cualquier base de datos relacional decente intentará cachear en memoria todo lo que pueda, por supuesto el acceso a una base de datos será significativamente más lento que una caché en memoria por ejemplo porque las escrituras se realizan en disco y porque hay por medio llamadas de red y transformación de datos, pero como vimos anteriormente una caché en memoria apenas es útil en operaciones de consulta de elementos por clave primaria por lo que cualquier cosa que mejore la escalabilidad de consultas SQL más complejas siempre será una mejora, por no hablar de que la transaccionalidad de las operaciones forma parte fundamental de cualquier RDBMS.

El razonamiento previo respecto al rendimiento es absurdo cuando estamos hablando de la típica combinación caché en memoria/RDBMS respecto a disponer solamente de la RDBMS, por supuesto una caché mejora el rendimiento, sin embargo no estamos hablando del rendimiento de un único nodo sino de escalar horizontalmente, es decir, ejecutar la misma aplicación en varios nodos, es en esta situación en la que las posibilidades de inconsistencia entre los nodos de la caché y las diferentes instancias de la RDBMS se multiplican, y aunque ciertamente cachés como EhCache pueden tener replicación síncrona, mantener la consistencia entre cachés y RDBMs en un rollback distribuido hace las cosas aún más complicadas.

Sí, mi propuesta es usar una RDBMS por servidor de aplicaciones/nodo como una forma de sofisticada caché distribuida en donde las operaciones de lectura tienen lugar en la RDMBS local y las de escritura se efectúan en todas las instancias de la RDBMS. Si no tienes problemas con el tamaño de la base de datos, es posible mantener varias RDBMSs con los mismos datos sin pérdida de las características relacionales y ACID como veremos más adelante (a pesar de que este enfoque podría soportar algún tipo de sharding).

Es obvio que surge la cuestión del coste de escribir simultáneamente a todas las instancias de RDBMS para que estén en sincronización, hay que tener en cuenta que en una aplicación típica:

  1. La proporción del número de lecturas es órdenes de magnitud más grande que el de escrituras.
  2. El tamaño de los datos que se leen en cada consulta y en conjunto es órdenes de magnitud más grande que los datos que se escriben en una operación y más aún en conjunto.

Las operaciones de escritura necesitarán más tiempo cuantos más nodos sean añadidos al cluster, las operaciones de escritura tendrán lugar secuencialmente pero podrían ser en paralelo si son soportadas, al mismo tiempo las lecturas son realizadas en el nodo local por lo que se obtiene un incremento lineal en las consultas a medida que se aumenta el número de nodos, debido en parte a que el moderno uso en las bases de datos relacionales de la técnica de “multiversión” en escritura, hace que éstas no sean bloqueantes respecto a las operaciones de lectura. Es el compromiso entre la mejora de la escalabilidad en lecturas y la degradación progresiva de las escrituras la que decidirá cuantos nodos podemos llegar a usar. Hay que tener en cuenta de que estamos hablando de escalabilidad horizontal y de mantener un sistema ACID siempre en sincronización de múltiples RDBMS, es decir las mismas características a las que estamos acostumbrados en nuestras aplicaciones transaccionales mono-RDBMS.

Por supuesto para cumplir con el compromiso ACID necesitamos JTA en el caso de Java.

JTA proporciona dos cosas:

  1. Operaciones atómicas en varias bases de datos
  2. Transacciones distribuidas, es decir propagación de la transacción entre varios nodos ya sea en el acceso a la misma base de datos o varias.

En nuestro caso solo nos interesa el punto 1).

Este enfoque es la alternativa manual a la replicación síncrona ofrecida por algunas bases de datos, es otra opción de hacer lo mismo con un mayor control, o bien es “la opción” cuando la base de datos en cuestión no ofrece replicación síncrona (por ejemplo MySQL). El motivo de escribir este artículo es la constatación de que es muy difícil de encontrar literatura que ilustre este enfoque manual usando JTA.

En nuestro ejemplo de “prueba de concepto” usaremos JEPLayer (http://code.google.com/p/jeplayer/), JEPLayer introduce una gestión muy sencilla de transacciones JTA en varias bases de datos (DataSources) muy útil para este tipo de uso.

JPELayer es un ORM de bajo nivel sobre JDBC creado para liberar a los programadores de las típicas tareas tediosas que implican el uso de JDBC y la demarcación de transacciones (transacciones JDBC y JTA emulando la semántica de las transacciones de JavaEE), con una programación estilo IoC (Inversion of Control) pero de una manera convencional a través de listeners registrados manualmente.

El siguiente código demostrativo es incompleto y está extraído de la distribución con código fuente de JEPLayer 1.0.1. Este ejemplo es una simulación del acceso de múltiples accesos concurrentes ejecutando aleatoriamente selects, inserciones y borrados. Las consultas SELECT son ejecutados siempre en la misma base de datos (la base de datos local), las inserciones y borrados son ejecutados en varias bases de datos dentro de la misma transacción local JTA y aleatoriamente simulando errores (excepciones) con el fin de forzar el rollback coordinado en todas las bases de datos por parte del proveedor de JTA. Ha sido testado con los proveedores JTA, JOTM y Atomikos (el JTA de GlassFish has sido probado en JEPLayer pero no en este ejemplo).   

Es necesario configurar varias instancias de bases de datos (en este ejemplo se usa MySQL) en diferentes nodos y ejecutar este código en cada nodo al mismo tiempo para simular una situación de alta concurrencia en todos los nodos del cluster.

Código extraído de la clase test.scaling.TestScalingJTA

    public void test(final TestScalingConf conf,final JEPLJTAMultipleDataSource jdsMgr,

                      final PersonDAOScalingTest[] personDaoArr) throws Exception

    {

        final int[] inserted = new int[1];

        final int[] deleted = new int[1];

        final int[] select = new int[1];

         int numberOfThreads = conf.getNumberOfThreads();

         Random randRoot = new Random();

        final Random[] randArr = new Random[numberOfThreads];

        for(int i = 0; i < numberOfThreads; i++)

            randArr[i] = new Random(randRoot.nextLong());

         Thread[] threadArray = new Thread[numberOfThreads];

         final boolean[] run = new boolean[]{false};

        for(int i = 0; i < threadArray.length; i++)

        {

            final int threadNumber = i;

            Thread thread = new Thread()

            {

                @Override

                public void run()

                {

                    while(!run[0]) Thread.yield();

                    try

                    {

                        executeActionsByThread(conf,jdsMgr, personDaoArr,

                                              randArr[threadNumber],inserted,deleted,select);

                    }

                    catch (Exception ex)   {  throw new RuntimeException(ex);   }

                }

            };

            thread.start();

            threadArray[i] = thread;

        }

         long start = System.currentTimeMillis();

         run[0] = true;

         for(int i = 0; i < threadArray.length; i++)

            threadArray[i].join();

         long end = System.currentTimeMillis();

        long lapse = end - start;

        System.out.println("LAPSE: " + lapse);

        System.out.println("INSERTED: " + inserted[0] + ", per second: " + (1000.0*inserted[0]/lapse));

        System.out.println("DELETED: " + deleted[0] + ", per second: " + (1000.0*deleted[0]/lapse));

        System.out.println("SELECTS: " + select[0] + ", per second: " + (1000.0*select[0]/lapse));

    }

 

    public void executeActionsByThread(TestScalingConf conf,final JEPLJTAMultipleDataSource jdsMgr,

                    final PersonDAOScalingTest[] personDaoArr,final Random rand,

                     final int[] inserted,final int[] deleted,final int[] select) throws Exception

    {

        int loopsPerRepetition = conf.getNumberOfLoopsEveryRepetition();

        final int masterDataSourceIndex =

                                  TestScalingJTAShared.getMasterDataSourceIndex(conf,personDaoArr);

        final int closerDataSourceIndex =

                                  TestScalingJTAShared.getCloserDataSourceIndex(conf,personDaoArr);

        int ratioSelectChange = conf.getRatioSelectChange();

        int ratioInsertDelete = conf.getRatioInsertDelete();

        final boolean testRollback = conf.getTestRollback();

         for(int loop = 0; loop < loopsPerRepetition; loop++)

        {

            int rndNum = rand.nextInt(ratioSelectChange);

            if (rndNum == 0)

            {

                int rndNumIns = rand.nextInt(ratioInsertDelete);

                if (rndNumIns == 0)

                {

                    JEPLTask task = new JEPLTask()

                    {

                        @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.REQUIRED)

                        public Object exec() throws Exception

                        {

                            int index = rand.nextInt(personDaoArr.length);

                            PersonDAOScalingTest dao = personDaoArr[index];

                            List<Person> list = dao.selectRangeOrderByIdDesc(0,1);

                            if (list.size() > 0)

                            {

                                Person person = list.get(0);

                                TestScalingJTAShared.deletePerson(masterDataSourceIndex,person,personDaoArr,testRollback,rand);

                                deleted[0]++;

                            }

                            return null;

                        }

                    };

                    try  {   jdsMgr.exec(task);    }

                    catch(JEPLException ex)

                    {

                        if (ex.getCause() == null || !ex.getCause().getMessage().startsWith("FALSE ERROR"))

                            throw new RuntimeException("Unexpected",ex);

                        else

                            System.out.println("EXPECTED ROLLBACK (DELETE)");

                    }

                }

                else

                {

                    JEPLTask task = new JEPLTask()

                    {

                        @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.REQUIRED)

                        public Object exec() throws Exception

                        {

                            TestScalingJTAShared.insertPerson(masterDataSourceIndex,personDaoArr,testRollback,rand);

                            inserted[0]++;

                            return null;

                        }

                    };

                     try  {   jdsMgr.exec(task);   }

                    catch(JEPLException ex)

                    {

                        if (ex.getCause() == null || !ex.getCause().getMessage().startsWith("FALSE ERROR"))

                            throw new RuntimeException("Unexpected",ex);

                        else

                            System.out.println("EXPECTED ROLLBACK (INSERT)");

                    }

                }

            }

            else

            {

                JEPLTask task = new JEPLTask()

                {

                    @JEPLTransactionalJTA(propagation=JEPLTransactionPropagation.NOT_SUPPORTED)

                    public Object exec() throws Exception

                    {

                        PersonDAOScalingTest dao = personDaoArr[closerDataSourceIndex];

                        dao.selectRangeOrderByIdDesc(0,50);

                        select[0]++;

                        return null;

                    }

                };

                jdsMgr.exec(task);

            }

        }

    }

El siguiente método extraído de TestScalingJTAShared muestra como insertamos en todas las bases de datos, las inserciones son ejecutadas secuencialmente. En este ejemplo hay una base de datos “master”, esto es debido a que se usa la generación automática de claves en MySQL y esta generación no es transaccional en MySQL, si tú generas tu propia clave primaria no es necesario que haya un master cuyo código de inserción sea ligeramente diferente al resto, ahora bien el orden de las operaciones en las diferentes bases de datos ha de ser el mismo en todos los nodos para evitar dead-locks debidos a bloqueos de escritura.

    public static Person insertPerson(int masterDSIndex,PersonDAOScalingTest[] personDaoArr,

                         boolean testRollback,Random rand)

    {

        Person person = new Person();

        person.setName("A Person object");

        person.setPhone("1111111");

        person.setEmail("hello@world.com");

        person.setAge(20);

 

        PersonDAOScalingTest dao = personDaoArr[masterDSIndex];

        dao.insertKeyGenerated(person);

 

        for(int i = 0; i < personDaoArr.length ; i++)

        {

            if (i == masterDSIndex) continue;

 

            if (testRollback && rand.nextInt(3) == 0)

                throw new RuntimeException("FALSE ERROR INSERT");

            PersonDAOScalingTest currDao = personDaoArr[i];

            currDao.insertKeyNotGenerated(person);

        }

         return person;

    }

El resultado es claro, un cluster ACID plenamente relacional salvo que se use algún tipo de particionamiento de datos (sharding), incremento lineal de la escalabilidad de operaciones de lectura y degradación lineal de las operaciones de escritura (aunque una opción a explorar sería el realizar las operaciones de escritura concurrentemente si el proveedor de JTA lo permite). Si la relación entre lecturas y escrituras es alta y el tiempo de cada escritura es aceptable se conseguirá un incremento de posibles usuarios concurrentes a medida que se añadan más nodos al cluster.   

¿Has intentado alguna vez algo así?

¿Cómo escalas tu RDBMS horizontalmente?

 

PrintView Printer Friendly Version

EmailEmail Article to Friend

Reader Comments (5)

Excelente análisis, aunque no comparto tus premisas de partida. Existen desde hace tiempo plataformas de caché "distribuidas" como Coherence de Oracle y eXtremeScale de IBM que proporcionan una forma de consultar información muy parecida a SQL y unos mecanismos de sincronización con la persistencia que desde mi punto de vista resuelven la mayoría de los problemas.

Otra opción que tampoco veo reflejada, es el uso de una Base de datos en memoria como Timesten de Oracle y soliddb de IBM, que pueden ayudarnos en aquellos escenarios en los que no tengamos una demanda extra de escala, en datos, y no podamos o queramos modificar la aplicación. Estos dos tipos de productos se sincronizan perfectamente con los Sistemas Gestores de BD's.

marzo 21, 2012 | Unregistered CommenterJose Luis

Jose Luis yo planteo un escenario muy concreto que es escalar en un escenario plenamente relacional ACID, si renuncias a alguna de esas características las opciones se multiplican.

En el caso de Coherence y eXtremeScale, su soporte de JTA puede ser una opción a considerar, en el artículo consideré esa opción en el caso de Ehcaché, aunque tendrás que reconocer que muy transparente no es respecto a una solución 100% SQL.

marzo 21, 2012 | Registered Commenterjmarranz

Hola jmarranz, reconozco que la integración de una plataforma de cache distribuida (DCP = Distributed Cache Platform) es bastante intrusiva en el código, es más, al existir una especificación, tan pobre como la JSR 107, que no estandariza prácticamente nada, podemos quedar atrapados en un proveedor. De momento, solo nos queda desarrollar con cierto patrones que nos aislen de los proveedores en la medida que podamos!

Como te dije, la solución que propones me parece correcta, solo quería mostrar mi desacuerdo contigo al descartar, al inicio del artículo, las DCP, comparandolas con las posibilidades de búsqueda de un gestor de BD´s y poniendo en duda las funcionalidades relacionadas con las transaccionalidad. Las dos DCP que os describí en el post anterior soportan:
1.- Las propiedades típicas de los SGBD relacionadas con la transaccionalidad: ACID (Atomicity, Consistency, Isolation and Durability)

2.- Un lenguaje de consultas de alto nivel como SQL pero orientado a Objetos, quizás mas parecido al lenguaje de consultas sobre objetos que a SQL. En eXtreme Scale "ObjectGrid query language (OGQL)" y en Coherence "Coherence Query Language (CohQL).

Os dejo algún ejemplo para que podais ver lo que os digo:
--> select * from "employees" where emp_id contains any (5,10,15,20)
--> select homeAddress.state, age, count() from "ContactInfoCache" group by homeAddress.state, age
-> select key().firstName, key().lastName, age from "ContactInfoCache" where age > 42

3.- Es posible definir el nivel de asilamiento

4.- Existe el soporte XA, por lo menos en Coherence en eXtreme Scale lo desconozco, para soportar transacciones distribuidas.

5.- Las dos tienen soporte a algo parecido a los procedimiento almacenados de las BD´s. Con la ventaja de que al estar distribuido en varios nodos las consultas se hacen en paralelo mejorando el rendiemiento y tiempo de respuesta.

6.- Las dos implementan algo parecido a los triggers, Coherence mediante eventos Java.

7.- Las dos tienen un concepto parecido al de las Vistas (Continuous Query).

etc.

Otra de las cosas buenas con respecto a cualquier ORM, es que trabajan en el mundo de los "Objetos", no hay mapeos entre el mundo relacional y el mundo de objetos, cosa que a priori parece natural si usamos un lenguaje como Java, C++ o cualquiera con orientación a objetos, y esto no quita para que la información se almacene en una BD´s a posteriori o en la misma transacción.

Acabo resaltado lo malo, y es que al no existir un estandar real la integración de un producto de este tipo es totalmente intrusiva. Estaría bien que los fabricantes y comiters se pusieran de acuerdo en este punto como en el resto de API´s.

Un saludo y gracias por la respuesta. Soy usuario de Javahispano desde casi sus comienzos, pero es la primera vez que aporto algo, y he de reconocer que es muy gratificante!

marzo 22, 2012 | Unregistered CommenterJose Luis

Pues despues de tantos anyos el estreno ha sido por la puerta grande :-)

marzo 22, 2012 | Registered Commenterjmarranz

Garcias jmarranz, si el poco tiempo que me queda me lo permite intentaré compartir con vosotros más reflexiones, estoy seguro que nos enriquece a todos!

De todas formas, el debate que intenta despertar mi post, por si sirve para otro "hilo más", es si es el momento de afrontar "realmente" otras soluciones de persistencia que no estén basadas en un modelo relacional. ¿Estamos preparados para luchar con la informática tradicional? Seguro que más de uno sabe a lo que me refiero y lo sufre cada día en su trabajo.

marzo 23, 2012 | Registered Commenterjoseluisdavid

PostPost a New Comment

Enter your information below to add a new comment.

My response is on my own website »
Author Email (optional):
Author URL (optional):
Post:
 
Some HTML allowed: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <code> <em> <i> <strike> <strong>