Propagación entre colas

Hemos visto como trabajar con colas y también que los procedimientos de encolar y desencolar sólo permiten trabajar con colas locales (situadas en la misma BBDD desde donde se llaman).

Cuando queremos encolar en una BBDD y desencolar en otra, debemos configurar la propagación entre colas. Esta propagación entre colas también se puede utilizar para pasar de una cola  a otra en la misma BBDD.

Consideraciones sobre la propagación

Las principales consideraciones sobre la propagación entre colas son:

  • La cola origen debe ser una cola MultiConsumer (obligatoriamente)
  • Puede haber 1 o varias colas destino.
  • Las colas destino pueden estar situadas en la misma BBDD o bien en otra BBDD.
  • Las colas destino pueden ser MultiConsumer o SingleConsumer.
    • Si la cola destino es MultiConsumer, puede ser, a su vez, cola origen para otras propagaciones.

Ideas generales sobre la propagación entre colas

Para realizar la propagación entre colas, se debe:

  • Crear los dblinks apropiados para comunicar la BBDD origen y destino.
    • Estos dblinks se deben crear aunque la BBDD origen y destino sea la misma.
      • Téngase en cuenta que, en la BBDD Oracle, un dblink indica un canal de conunicaciones entre BBDDs o entre procesos de la misma BBDD.
  • En la cola origen, se debe añadir un subscriptor por cada propagación (o cola destino).
    • Por esta razón, la cola origen debe ser MultiConsumer.
    • Al definir los subscriptores, se debe indicar el dblink a utilizar en la propagación.
  • Finalmente, se debe activar la propagación indicando la frecuencia y la cola de destino.
    • Estos dblinks que sirven para propagar datos entre BBDD y se activan periódicamente, se llaman scheduled dblinks.

Parámetros de la BBDD

Para propagar los datos entre las colas, se utilizan JOBs de BBDD.

El número de jobs empleados depende de los parámetros indicados en la propagación y el número de dblinks implicados. Más adelante se indicarán fórmulas para ver el número de jobs necesarios.

Por todo ello, el parámetro de la BBDD job_queue_processes debe ser mayor que 0 y se debe ajustar convenientemente. Téngase en cuenta que el valor de este parámetro en un máximo y se irán creando jobs según se necesiten hasta este máximo.

Si job_queue_processes tiene un valor demasiado bajo, algunas propagaciones no se realizarán o bien se realizarán con mucho retraso.

Consideramos que no es mala idea empezar con un valor job_queue_processes = 10. Si en la BBDD este valor es inferior, se debe cambiar:

SQL> ALTER SYSTEM SET job_queue_processes = 10 SCOPE=BOTH;

Creación de las colas para un ejemplo práctico de propagación

Vamos a ver un ejemplo práctico de propagación. Se puede realizar con una única Base de Datos.

Vamos a crear:

  1. Los tipos de datos
  2. La cola origen MultiConsumer
  3. Una cola destino SingleConsumer
  4. Una cola destino MultiConsuler
  5. Y vamos a configurar la propagación y ponerla en marcha.

Muchos de estos pasos ya los hemos visto en los pasos anteriores: sencillamente, los vamos a repetir.

Para facilidad, vamos a trabajar con una sola BBDD y un único usuario. La dificultad es la misma que trabajando con diversas BBDD y/o usuarios.

Se aconseja seguir los pasos que se indican a continuación

Partir de un entorno 'cero'

Por si acaso en el esquema de trabajo hay colas creadas de tests anteriores, hacemos una limpieza ejecutando las sentencias de http://www.ora-600.es/aqcleanup#drop

Crear los tipos de datos y procedimientos

Vamos a crear los tipos de datos para factura ejecutando las sentencias de http://www.ora-600.es/aqcola1#type

Si trabajáramos con varias Bases de Datos, estas sentencias se deben ejecutar en todas ellas: el tipo de datos de la cola origen y destino debe ser el mismo (después veremos excepciones).

Por si no los teníamos, y para facilitarnos la tarea, vamos a crear los procedimientos de encolar, ejecutando las sentencias de http://www.ora-600.es/aqcola1#encola y el procedeimiento de llenar un mensaje y encolarlo en la cola ejecutando las sentecnias de http://www.ora-600.es/aqcola1#encolar_mensaje

Crear la cola origen MultiConsumer

La cola origen será la cola 'CM_FACTURA' creada sobre la queue table TM_FACTURA

Para ello ejecutamos las sentencias de http://www.ora-600.es/aqcola2#tm_factura

Crear la cola destino SingleConsumer

La cola destino será la cola 'C_FACTURA' creada sobre la queue table T_FACTURA

Para ello ejecutamos las sentencias de http://www.ora-600.es/aqcola1#t_factura y http://www.ora-600.es/aqcola1#c_factura

Crear la cola destino MultiConsumer

La cola destino será la cola 'CMD_FACTURA' creada sobre la queue table TMD_FACTURA

Para ello ejecutaremos las sentencias (estas sentencias son las mismas que las de la crecaión de la cola CM_FACTURA, pero cambiando los nombres de la cola y la queue table):

-- Creacion de la queue table indicando multiple_consumers
BEGIN 
dbms_aqadm.create_queue_table ( 
      queue_table=> 'tmd_factura',        -- nombre de la Queue table
      queue_payload_type=> 'factura',     -- Tipo de datos que soporta entre comillas
      multiple_consumers=> TRUE,          -- si es multiple consumers
      message_grouping=> DBMS_AQADM.NONE  -- si agrupa los mensajes por transacciones
  );
COMMIT;        --> no olvidar el commit
END;
/
-- Creacion de la cola
BEGIN 
  dbms_aqadm.create_queue(
        queue_name=> 'cmd_factura',              -- Nombre de la cola
        queue_table=> 'tmd_factura',             -- Queue table en la que se crea
        queue_type=> DBMS_AQADM.NORMAL_QUEUE     -- Hay colas normales y de error
       ); 
COMMIT;
END;
/
BEGIN 
   dbms_aqadm.start_queue('cmd_factura', TRUE, TRUE); -- permitir encolar y desencolar
COMMIT;
END;
/

Configuracion de la propagacion

Ahora que ya tenemos creadas todas las colas, vamos a configurar la propagación.

Recordemos que deseamos propagar el contenido de la cola CM_FACTURA hacia las colas C_FACTURA (SingleConsumer) y CMD_FACTURA (Multiconsumer).

Para ello, debemos hacer:

  1. Crear los dblinks de comunicación entre las BBDDs.
  2. Crear los subscriptores adecuados en la cola origen.
  3. Después, debemos activar la propagación.

Creación de los dblinks

Los nombres de los dblinks dependen del valor del parámetro global_names. En nuestro caso, global_name = FALSE y podemos utilizar cualquier nombre de dblink.

Si el valor del parámetros global_names = TRUE, el nombre del dblink debe coincidir con el valor DB_UNIQUE_NAME de la BBDD destino.

Aunque trabajemos con una única Base de Datos, vamos acrear un dblink por cada propagación (Si global_names = TRUE, solo podremos crear un dblink para cada BBDD destino). Para ello ejecutamos:

CREATE DATABASE LINK dblink.c_factura
CONNECT TO usuario
IDENTIFIED BY password
USING 'tnsnames_entry'
/
CREATE DATABASE LINK dblink.cmd_factura
CONNECT TO usuario
IDENTIFIED BY password
USING 'tnsnames_entry'
/

Por supuesto, usuario y password deben ser los correspondientes al destino. tnsnames_entry debe ser la entrada en el fichero tnsnames.ora que apunta a la BBDD de destino.

Es conveniente hacer un test del buen funcionamiento de los dblinks. Basta con ejecutar:

SELECT * FROM dual@dblink.c_factura;
SELECT * FROM dual@dblink.cmd_factura;

Estas sentencias no deben dar error.

Verificación de la compatibilidad de las colas

Este paso no es imprescindible, pero sí que es aconsejable.

Vamos a ver si las colas origen y destino son compatibles.

Para ello se utliza el procedimiento DBMS_AQADM.VERIFY_QUEUE_TYPES.

  • Si devuelve 1: Las colas son compatibles
  • Si devuelve 0: Las colas son incompatibles
  • Cualquier otro valor: es un error Oracle que se produce durante la verificación.

Así, ejecutamos:

set serveroutput on
declare
   rc binary_integer;
begin
  DBMS_AQADM.VERIFY_QUEUE_TYPES (
   'cm_factura',        -- src_queue_name    IN    VARCHAR2,
   'c_factura',         -- dest_queue_name   IN    VARCHAR2,
   'dblink.c_factura',  -- destination       IN    VARCHAR2 DEFAULT NULL,
   rc);                 -- rc                OUT   BINARY_INTEGER);
   dbms_output.put_line('Cola c_factura: ' || rc);
  DBMS_AQADM.VERIFY_QUEUE_TYPES (
   'cm_factura',          -- src_queue_name    IN    VARCHAR2,
   'cmd_factura',         -- dest_queue_name   IN    VARCHAR2,
   'dblink.cmd_factura',  -- destination       IN    VARCHAR2 DEFAULT NULL,
   rc);                   -- rc                OUT   BINARY_INTEGER);
   dbms_output.put_line('Cola cmd_factura: ' || rc);
END;
/
Cola c_factura: 1
Cola cmd_factura: 1

Por tanto, las colas son compatibles

Creación de los subscriptores

Hemos visto ya la creación de los subscriptores en el apartado de colas MultiConsumer.

Los subscriptores se crean a partir de una estructura SYS.AQ$_AGENT definido como:

TYPE SYS.AQ$_AGENT IS OBJECT (
   name       VARCHAR2(30),
   address    VARCHAR2(1024),
   protocol   NUMBER  DEFAULT 0);
  • Name es el nombre del agente
    • Si la cola destino es una cola SingleConsumer, Name debe ser NULL.
    • Si la cola destino es MultipleConsumer, Name tiene el significado de los receptores y subscriptores de las colas MultiConsumer:
      • Si Name es NULL, el mensaje irá destinado a todos los subscriptores de la cola remota.
        • En la cola remota, los subscriptors deben estar creados antes de la propagación.
      • Si Name no es NULL, el mensaje irá destinado al receptor de la cola remota indicado como Name.
  • Protocol: Solo está definido el 0 e indica que es via TCP/IP
  • Address es el nombre destino en formato <cola>@<dblink>
    • Si el dblink fuera público, el formato de address sería: <esquema>.<cola>@<dblink>

Vamos a crear los subscriptores de la cola local. Para la cola remora MultiConsumer, creamos un subscriptor con nombre. Los mensajes se enviarán al receptor subscriptor_1 de la cola remota:

-- Para la SingleConsumer
BEGIN
   dbms_aqadm.add_subscriber(
       queue_name => 'cm_factura',
       subscriber => sys.aq$_agent(NULL, 'C_FACTURA@dblink.c_factura' ,0)
      );
   commit;
END;
/
-- Para la MultiConsumer
BEGIN
   dbms_aqadm.add_subscriber(
       queue_name => 'cm_factura',
       subscriber => sys.aq$_agent('subscriptor_1', 'CMD_FACTURA@dblink.cmd_factura' ,0)
      );
   commit;
END;
/

Encolar algunos mensajes y ver el contenido de la cola CM_FACTURA

Ahora, podemos encolar algunos mensajes y ver como está la cola CM_FACTURA. Como la propagación aún no está activada, los mensajes permanecen en la cola CM_FACTURA.

SQL> exec encolar_mensaje(3,'cm_factura');
SQL> exec encolar_mensaje(4,'cm_factura');
SQL> COMMIT;

 Después de encolar estos dos mensajes, veamos como han quedado las vistas:

SQL> -- ver los subscriptores
SQL> select * from AQ$TM_FACTURA_S;

QUEUE        NAME           ADDRESS                                      PROTOCOL TRANSFORMATION
------------ -------------- -------------------------------------------- -------- ---------------
CM_FACTURA                  "ESQUEMA"."C_FACTURA"@DBLINK.C_FACTURA              0         
CM_FACTURA   SUBSCRIPTOR_1  "ESQUEMA"."CMD_FACTURA"@DBLINK.CMD_FACTURA          0

SQL> -- Ver el contenido de la cola
SQL> select queue, msg_id, msg_state, enq_txn_id, consumer_name, address
     from aq$tm_factura;

QUEUE       MSG_ID                            MSG_STATE  ENQ_TXN_ID  CONSUMER_NAME  ADDRESS
----------- --------------------------------- ---------- ----------- -------------- ------------------------------------------
CM_FACTURA  73ECB670400B0382E040007F01017DFB  READY      9.9.4961    SUBSCRIPTOR_1  "ESQUEMA"."CMD_FACTURA"@DBLINK.CMD_FACTURA
CM_FACTURA  73ECB670400B0382E040007F01017DFB  READY      9.9.4961                   "ESQUEMA"."C_FACTURA"@DBLINK.C_FACTURA
CM_FACTURA  73ECB670400C0382E040007F01017DFB  READY      9.9.4961    SUBSCRIPTOR_1  "ESQUEMA"."CMD_FACTURA"@DBLINK.CMD_FACTURA
CM_FACTURA  73ECB670400C0382E040007F01017DFB  READY      9.9.4961                   "ESQUEMA"."C_FACTURA"@DBLINK.C_FACTURA

Obsérvese:

  • Hay dos MSG_ID distintos (aunque muy parecidos) correspondientes a los dos mensajes encolados
  • Dado que se han encolado en la misma transacción, ENQ_TXN_ID es el mismo para todos.
  • CONSUMER_NAME y ADDRESS se corresponden con los subscriptores especificados.

Activar la propagación

El último paso que nos queda es activar la propagación.

La activación se realiza con una llamada al procedimiento DBMS_AQADM.SCHEDULE_PROPAGATION que está definido como:

DBMS_AQADM.SCHEDULE_PROPAGATION (
   queue_name          IN    VARCHAR2,
   destination         IN    VARCHAR2 DEFAULT NULL,
   start_time          IN    DATE     DEFAULT SYSDATE,
   duration            IN    NUMBER   DEFAULT NULL,
   next_time           IN    VARCHAR2 DEFAULT NULL,
   latency             IN    NUMBER   DEFAULT 60,
   destination_queue   IN    VARCHAR2 DEFAULT NULL);

Los parámetros son autoexplicativos:

  • Queue_name: es la cola origen
  • Destination: es el dblink de destino
    • De forma indirecta, este deblink identifica las colas de destino (a través de los subscriptores)
    • Si el mismo dblink estuviera incluido en la definición de dos o mas subscriptores subcriptores, al activar la propagación se activaria la propagación de todas las colas implicacdas.
  • Start_time: Cuando empieza.
  • Duration: Duración de la propagación
  • Next_time: Cuando acabe la propagación, cuando tendrá lugar la siguiente la siguiente.
  • Latency: Mientras la propagación está en marcha, tiempo máximo de espera, es segundos, desde que se deposita un mensaje en la cola origen y se propaga
  • Destination_queue: lo veremos más adelante.

Creamos la propagación ejecutando:

begin
dbms_aqadm.schedule_propagation('CM_FACTURA', 'dblink.c_factura', SYSDATE, 3600, 'sysdate+(1/(24*60))', 0);
commit;
end;
/
begin
dbms_aqadm.schedule_propagation('CM_FACTURA', 'dblink.cmd_factura', SYSDATE, 3600, 'sysdate+(1/(24*60))', 0);
commit;
end;
/

Obsérvese que realizamos un commit después de cada llamada a schedule_propagation

Con estos parámetros:

  • La propagación empieza inmediatamente
  • Se está 1 hora propagando
  • y se vuelve  a empezar otra propagación al cabo de 1 minuto
  • En cuanto se encola un mensaje, se propaga inmediatamente.

Con esta parametrización, siempre hay 2 JOBs de BBDD dedicados a la propagación.

Estado de las colas después de la propagación

Después de la propagación, el estado de las colas queda como sigue:

SQL> select count(*) from AQ$TM_FACTURA;

COUNT(*)
----------
0

SQL> select queue, msg_id, msg_state, enq_txn_id, consumer_name, address
from aq$tmd_factura;

QUEUE       MSG_ID                            MSG_STATE  ENQ_TXN_ID  CONSUMER_NAME ADDRESS
----------- --------------------------------- ---------- ----------- -------------- -------
CMD_FACTURA 740B2222A7AB6904E040007F010117E3  READY      10.0.4545   SUBSCRIPTOR_1
CMD_FACTURA 740B2222A7AC6904E040007F010117E3  READY      10.0.4545   SUBSCRIPTOR_1


SQL> select queue, msg_id, msg_state, enq_txn_id
2 from aq$t_factura;

QUEUE       MSG_ID                            MSG_STATE  ENQ_TXN_ID
----------- --------------------------------- ---------- ----------
C_FACTURA   740B7CCE2A6FB3FAE040007F0101249E  READY      5.45.4848
C_FACTURA   740B7CCE2A70B3FAE040007F0101249E  READY      5.45.4848

Es decir:

  • La cola origen CM_FACTURA está vacia (tabla AQ$TM_FACTURA)
  • La cola MultiConsumer CMD_Factura tiene dos mensajes. Para desencolarlos, se debe especificar el subscriptor de nombre subscriptor_1
  • La cola SingleConsumer C_FACTURA tiene los dos mensajes que debe tener.
  • Los MSG_IDs de las colas son distintos.
    • En las mismas vistas hay el campo que indica el ORIGINAL_MSG_ID (no se muestra)

Una vez activada la propagación, en la Consola de Enterprisse Manager, el Colas Avanzadas -> Administraión se pueden ver las propagaciones de forma gráfica. En la parte de Programación (Schedule), seleccionando cada uno de ellos, se pueden ver estdísticas de propagación.