Idioma:English
juanma

CEP/ESP: Procesamiento y correlación de gran cantidad de eventos en arquitecturas SOA
Juan M. Reina - Director de Tecnología

El "matrimonio" entre ESP/CEP y las arquitecturas orientadas a servicios son un tema de actualidad y cuya aplicación práctica en proyectos SOA verá la luz en los próximos meses. En este modelo, se deben combinar los mecanismos de publicación-suscripción basados en mensajes con la capacidad de procesar y correlacionar en tiempo real eventos de negocio de alto nivel. En este artículo se exponen los principales conceptos asociados al modelo de procesamiento y correlación de grandes cantidades de eventos complejos, conocido con sus siglas inglesas por ESP (Event Stream Processing) y CEP (Complex Event Processing) y cómo integrar esta infraestructura en una arquitectura SOA. Asimismo se expone un ejemplo de aplicación de ESP/CEP haciendo uso de la solución Open-Source Esper.

Introducción

Los sistemas de publicación y subscripción basados en eventos siempre han formado parte de las arquitecturas relacionadas con la integración de sistemas o EAI a través de plataformas MOM (Message Oriented Middleware). Más recientemente se han integrado en las arquitecturas SOA dentro de una infraestructura ESB (Enterprise Service Bus).
Asimismo, desde hace ya varios anos, los sistemas BAM (Business Activity Monitoring) permiten el procesamiento de eventos de negocio desde distintas fuentes así como su notificación para que los responsables de la monitorización tomen una decisión en función del contexto del evento.
Por lo tanto, el reto actual, no es la generación, captura o notificación de dichos eventos, sino el poder procesar gran cantidad de sucesos producidos en tiempo real, la correlación de dichos eventos generando respuestas automáticas basadas en la semántica del evento y por último, su integración en una arquitectura SOA.

En este artículo se exponen los principios del modelo ESP/CEP y cómo se puede integrar dicho modelo en una arquitectura SOA, incluyendo un ejemplo de aplicación que hace uso de la solución Open-Source ESP/CEP de referencia: el proyecto Esper.

Definición de ESP/CEP

En primer lugar es necesario distinguir entre ESP y CEP, aunque es importante señalar que aún no hay una definición formal y consensuada por toda la industria.
Podríamos afirmar que el objetivo de ESP es la captura y el procesamiento de una gran cantidad de eventos en una ventana temporal concreta. Es lo que se conoce como "corriente" de eventos ('event stream' en inglés). Esta corriente se podría definir como una secuencia de eventos, del mismo tipo, ordenados en el tiempo. Por su parte, CEP tiene por meta la captura y procesamiento de eventos de diferente tipo de una forma desordenada en la llamada "nube" de eventos ('event cloud' en inglés). Digamos pues que una nube de eventos puede contener muchas corrientes de eventos y una corriente es un caso "especial" de nube (los eventos son del mismo tipo).

Una arquitectura ESP/CEP debe ser capaz de procesar las "nubes y corrientes" de eventos generados por uno o varios productores, almacenar y clasificar los eventos en un repositorio teniendo en cuanta la dimensión temporal (cuando se producen los eventos), tener mecanismos para detectar patrones complejos y generar respuestas automáticas a los consumidores de los patrones detectados.

Componentes de ESP/CEP

Pongamos un sencillo ejemplo de ESP/CEP: Supongamos un sistema financiero que tiene por objetivo el procesamiento en tiempo real de las acciones y de las noticias asociadas a las compañías que cotizan en la bolsa de Madrid.

  • Mediante ESP, podríamos obtener la cotización media que ha tenido una acción en una 'corriente' de eventos concreta, es decir, en un intervalo de tiempo determinado: por ejemplo, el valor medio de una companía como media de los valores recibidos en los últimos 5 minutos de sesión.
  • Mediante CEP, podríamos ir más allá y definir patrones complejos que correlacionen la publicación de una noticia con la cotización de una companía y que tome decisiones al respecto: por ejemplo, una regla podría ser si aparece una noticia de una companía y en un intervalo temporal determinado la cotización de la acción sufre una brusca bajada entonces vender automáticamente las acciones de dicha empresa.

En el último apartado expondremos cómo se podría implementar este sistema utilizando la solución Open Source Esper.

Arquitectura solución ESP/CEP

Una típica arquitectura SOA esta formada por los siguientes elementos:

  • Sistemas Legacy que a través de 'Binding Component' se conecte a un ESB (Enterprise Service Bus)
  • Servicios Web que proporcionen funcionalidad de negocio al ESB
  • Un Motor BPEL para realizar la coreografía de Servicios Web
  • Un workflow BPM para el modelado de negocio de aquellos procesos que requieran de intervención humana
  • Aplicaciones de usuario final y herramientas de monitorización (BAM)

El componente a añadir sería un Gestor de Eventos Complejos el cual tendría que procesar y almacenar todos los eventos producidos en el ESB y en base a patrones ser capaz de inferir eventos complejos y generar respuestas automáticas.

Arquitectura SOA con gestor de eventos complejos

EPL: Event Pattern Language

La disciplina ESP/CEP necesita de un nuevo enfoque por el cual, a través de un lenguaje, se permita no solo la consulta y el procesamiento de eventos sino el ser capaz de añadir a los eventos un "dimensión" temporal y lo que es más importante inferir nuevos eventos basados en patrones semánticos.
Este lenguaje se conoce como EPL (Event Pattern Language). EPL amplía y extiende a SQL en la definición de reglas temporales y en la aplicación de reglas no lineales para el procesamiento de eventos.
Algunos ejemplos de EPL's son:

  • RAPIDE-EPL
  • STRAW-EPL
  • StreamSQL

La aproximación de la plataforma Open Source Esper, ha sido la creación de un lenguaje de consulta similar a SQL, llamado EQL (Event Query Languaje), que permite filtrar eventos en función de una ventana temporal, crear nuevas corrientes de eventos a partir de una o varias corrientes existentes y definir patrones que faciliten aplicar reglas de negocio a los eventos capturados.

Lo veremos en el siguiente ejemplo de aplicación.

Ejemplo ESP/CEP utilizando Esper

Descripción del problema
El proyecto Esper es una implementación Java del concepto CEP (y por tanto también de su subconjunto ESP) y puede formar parte de una arquitectura SOA 2.0. Como implementación de ESP/CEP, permite el procesamiento a gran escala de flujos de eventos ofreciendo un excelente rendimiento. Esta capacidad de proceso es la piedra angular y objetivo del equipo de desarrollo de Esper.

En el mundo real, existen casos en los que un retraso o latencia de pocos segundos puede resultar en un grave perjuicio económico, organizativo o incluso vital. En este artículo vamos a plantear, desarrollar y resolver un problema que requiere de respuesta en tiempo real. Afortunadamente el perjuicio de no resolver adecuadamente la situación que se plantea a continuación tan sólo es económico.

El problema a resolver sería el siguiente: Poseemos una pequeña empresa de gestión de carteras de valores. Nuestra clientela ha aumentado considerablemente en un período de tiempo muy corto gracias a una correcta gestión de la información financiera. Nos hemos especializado en reaccionar con mucha celeridad ante bruscas caídas o subidas de los valores de nuestros clientes justo tras la publicación de alguna noticia relacionada con estos valores. Esto nos ha llevado a ganar prestigio y más clientes pero nos ha creado un problema, ya que no somos capaces de responder con la celeridad del principio. Lo que antes hacíamos a mano porque teníamos pocos clientes, noticias y pocos valores a analizar, debido al repentino aumento de nuestra clientela ahora ya no es posible, debemos automatizar al máximo esta tarea.
Para ello hemos contactado con una empresa que proporciona información de la cotización bursátil y de noticias a través de Internet utilizando los siguientes servicios web:

  • El servicio web de cotización bursátil nos enviará una notificación en tiempo real con la cotización de cada valor en una sesión por lo que el volumen de información y su frecuencia es alto.
  • El servicio web de noticias financieras nos enviará una notificación cuando se produce una noticia relativa a alguna companía que cotice en bolsa, estas noticias no tienen una periodicidad determinada.

Para resolver el problema nos queda desarrollar un sistema de información que permita procesar las noticias y eventos y genere respuestas automáticas cuando una noticia de una companía provoque una bajada (o subida) en la cotización.
Vamos a utilizar la plataforma Open Source Esper para el desarrollo del sistema.


Desarrollo de la solución
Lo primero que debemos hacer es inicializar Esper y obtener un proveedor de servicios para que nos pueda ayudar en nuestra tarea.

Esto lo podemos hacer en una clase de utilidad como la siguiente:

Clase de Utilidad para inicializar Esper y generar sentencias y patrones EPL

Esta clase contiene tres métodos estáticos, el primero getService() inicializa Esper y nos devuelve una instancia del proveedor de servicios. En este caso le estamos diciendo a Esper que nuestros eventos serán instancias de clases del paquete com.novayre.esper.jsweb.dto. Esto nos evitará en el futuro tener que incluir el nombre completo de la clase en cada sentencia Esper. Los otros dos métodos, utilizados para generar sentencias y patrones EPL, los comentamos más adelante.
Tras la obtención de un proveedor de servicios necesitamos varios generadores de eventos, es decir, clases que nos permitan introducir en los flujos de eventos de Esper las notificaciones recibidas de los servicios web.

Para el caso de las cotizaciones bursátiles la clase transformará cada mensaje recibido en una instancia de la clase MarketValueEvent y a continuación insertará las instancias en el flujo. Aquí mostramos el fragmento de la clase que inserta una instancia de MarketValueEvent en Esper:

Fragmento que inserta un evento en Esper

Para los mensajes recibidos del servicio web de noticias financieras haremos lo mismo generando eventos de la clase CompanyNewsEvent.
Una vez realizados los pasos anteriores, tendremos los eventos en flujos manejados por Esper. El siguiente reto es poder tratar estos eventos según nuestras necesidades.
Queremos que Esper detecte cuándo la desviación estándar de alguno de los valores supere un cierto límite, en este caso el umbral es un euro.

Para ello vamos a definir una sentencia Esper, utilizando el lenguaje EQL, con la ayuda del método estático getStatement(String) de la clase EsperUtil que definimos anteriormente:

Fragmento que define una sentencia EQL y anade un listener para su tratamiento

El método getStatement(String) nos devuelve una sentencia Esper validada por el gestor de eventos y puesta en ejecución de forma inmediata, es decir, Esper evalúa y valida el texto enviado y si es válido lo ejecuta. Una sentencia Esper permanece en ejecución hasta que es detenida por la aplicación.
En esta sentencia lo que hacemos es agrupar por compañía la desviación estándar del valor de su acción en bolsa, si esta desviación supera el límite de un euro, Esper insertará un evento en el flujo FilteredMarketValueEvent y enviará una notificación al listener que hemos anadido en la segunda instrucción de la figura anterior.
En la sentencia EQL introducimos una ventana temporal, es decir, tan sólo tenemos en cuenta para nuestro cálculo los eventos producidos en los últimos 5 segundos.
Podemos ampliar, reducir o eliminar esta ventana temporal como deseemos o tengamos necesidad.

Tras esto, recibiremos notificaciones de Esper cada vez que un valor bursátil fluctúe más allá del límite que le hemos fijado. El manejo de estas notificaciones se muestra a continuación:

Clase que recibe las notificaciones de los eventos asociados a sentencias EQL

Ahora lo que queremos recibir es una notificación de Esper por cada noticia financiera enviada por el servicio web de noticias. Podríamos hacerlo así:

Fragmento que define una sentencia EQL y anade un listener para su tratamiento

Y tratar las notificaciones de la siguiente forma:

Clase que recibe las notificaciones de los eventos asociados a sentencias EQL

Ahora podemos recibir las notificaciones de fluctuaciones acusadas tras alguna noticia financiera con el siguiente código:

Fragmento para definir un patrn EQL

En la figura anterior utilizamos el método estático getPattern(String).
Este método es similar al ya visto getStatement(String) con la diferencia que genera un patrón semántico en lugar de un flujo. El patrón es el siguiente: "Generar un nuevo evento cada vez que se genere una noticia y en los siguientes 5 segundos se genere una fluctuación"

Para procesar las notificaciones del evento asociado al patrón utilizamos un nuevo listener:

Clase que recibe las notificaciones de los eventos asociados al patrn EQL definido

En la aplicación de ejemplo hemos simulado la recepción de cotizaciones bursátiles con intervalos de un segundo. A continuación vemos el resultado de una ejecución de la aplicación de ejemplo:
NEWS!!!! Company = Novayre
MARKETVALUE: company=Novayre, value=77.11390971979608, moment=10-07-2008 18:08:00
MARKETVALUE: company=Novayre, value=75.21365862759095, moment=10-07-2008 18:08:01
MARKETVALUE: company=Novayre, value=78.25906608187235, moment=10-07-2008 18:08:02
MARKETVALUE: company=Novayre, value=74.29336083407968, moment=10-07-2008 18:08:03
MARKETVALUE: company=Novayre, value=77.94812518731693, moment=10-07-2008 18:08:04
DANGER Company = Novayre, stddev(value) = 1.7371429929268212
MARKETVALUE: company=Novayre, value=78.06954121021784, moment=10-07-2008 18:08:05
MARKETVALUE: company=Novayre, value=77.49799158888048, moment=10-07-2008 18:08:06
MARKETVALUE: company=Novayre, value=77.15722030951656, moment=10-07-2008 18:08:07
MARKETVALUE: company=Novayre, value=78.03564497449142, moment=10-07-2008 18:08:08
NEWS!!!! Company = Novayre
MARKETVALUE: company=Novayre, value=80.42591608768689, moment=10-07-2008 18:08:09
DANGER Company = Novayre, stddev(value) = 1.2817953422598312
ESP/CEP NOTIFY about Novayre *********************************************************

En la siguiente figura se muestra la secuencia de eventos en función de la corriente a la que pertenece:

Secuencia de eventos y patrones producidos en Esper

En la figura anterior vemos que la primera noticia no genera una notificación del patrón definido al producirse la fluctuación fuera de la ventana temporal. Sin embargo la siguiente noticia, al ir acompañada de una fluctuación en el segundo posterior, genera una notificación de que se ha cumplido el patrón.

Conclusiones

Hemos revisado los conceptos básicos de ESP/CEP y construido una pequeña solución que hace uso de la plataforma Esper. Podemos decir que tanto BAM como ESP/CEP se corresponden con sistemas que correlacionan eventos e identifican patrones semánticos. La diferencia radica en que cuando el sistema encuentra dichas coincidencias, BAM lo notifica a un responsable de tomar decisiones para que actúe en consecuencia mientras que ESP/CEP puede orquestar una respuesta automática sin necesidad de intervención humana.

El integrar un modelo de procesamiento ESP/CEP en una arquitectura SOA, nos debe permitir decidir qué eventos deben ser filtrados desde una perspectiva de monitorización (BAM) y cuáles pueden ser correlacionados en el tiempo o agregados en eventos de negocio de alto nivel, es decir, de mayor contenido semántico.
Esa puede ser precisamente la primera aplicación de ESP/CEP en una arquitectura SOA, dotar de una mayor capacidad de procesamiento e "inteligencia" humana a las herramientas de monitorización de actividad.
Para ello, la plataforma Open Source Esper nos ofrece una gran potencia y flexibilidad y es una solución digna de estudio e inclusión en cualquier arquitectura SOA 2.0.

Edificio Marie Curie    |   c/Leonardo Da Vinci 18, 5ª Planta   |    Parque Tecnológico Cartuja 93 - 41092 Sevilla    |    Términos y Condiciones