Event Streams

Introducing the Event Streams Module

The Event Streams module is a complete distributed publish-subscribe messaging system based on the concept of streams. EventStreams module allows to create event-driven applications. An event-driven application is a computer program that is written to respond to actions generated by the user or the system. In a computing context, an event is any identifiable occurrence that has significance for system hardware or software. As such, events include both user-generated actions like mouse clicks and keystrokes and system-generated events such as program loading.

The Event Streams module implements a new data type introduced with DMSContainer 4.0.0+, which models a “log data” structure in a more abstract way. Like a log file, often implemented as a file open in append only mode, event streams are primarily an append only data structure.

What makes Event Streams so useful is the fact that it implements additional features compared to a plain log file:

  • A set of blocking operations allowing consumers to wait for new data added to a stream by producers;
  • The mandatory json object which is attached to the event itself;
  • A roles systems which allows only to specific users to read and/or write on the event streams queues;

Event-driven programming separates event-processing logic from the rest of a program’s code. The event-driven approach contrasts with batch processing. Because event-driven programming is an approach rather than a type of language, event-driven apps can be created in any programming language. Depending on the specific application, event-driven processing can improve responsiveness, throughput and flexibility.

Event stream processing overview

Event stream processing (ESP) is the practice of taking action on a series of data points that originate from a system that continuously creates data (a.k.a. produces events). The term “event” refers to each data point in the system, and “stream” refers to the ongoing delivery of those events. A series of events can also be referred to as “streaming data” or “data streams.” Actions that are taken on those events include aggregations (e.g., calculations such as sum, mean, standard deviation), analytics (e.g., predicting a future event based on patterns in the data), transformations (e.g., changing a number into a date format), enrichment (e.g., combining the data point with other data sources to create more context and meaning), and ingestion (e.g., inserting the data into a database).

Event stream processing architecture sample

Event stream processing is often viewed as complementary to batch processing. Batch processing is about taking action on a large set of static data (“data at rest”), while event stream processing is about taking action on a constant flow of data (“data in motion”). Event stream processing is necessary for situations where action needs to be taken as soon as possible. This is why event stream processing environments are often described as “real-time processing”.

How Does Event Stream Processing Work?

Event stream processing works by handling a data set by one data point at a time. Rather than view data as a whole set, event stream processing is about dealing with a flow of continuously created data. This requires a specialized set of technologies which are not “files”, “tables” and other sub-optimal solutions often used for these kind of necessities.

In an event stream processing environment, there are two main classes of technologies:

  1. the system that stores the events (DMSContainer Internal Storage)
  2. the technology that helps developers write applications that take action on the events (DMSContainer JSON-RPC API)

The former component pertains to data storage, and stores data based on a timestamp. For example, you might capture outside temperature every minute of the day and treat that as an event stream. Each “event” is the temperature measurement accompanied by the exact time of the measurement. The latter (known as “stream processors” or “stream processing engines”) is truly the “event stream processing” component and lets you take action on the incoming data using the JSON-RPC API.

Applications publish data as a stream of events while other applications pick up that stream and consume it when they want. Because all events are stored, applications can hook into this stream and consume as required—in batch, real time or near-real-time. This means that you can truly decouple systems and enable proper agile development. Furthermore, a new system can subscribe to the stream and catch up with historic data up until the present before existing systems are properly decommissioned.

What kind of applications can benefit from Event Stream Processing and DMSContainer Event Stream Module?

In their book Event-Driven Processing in Action, authors Peter Niblett and Dr. Opher Etzion describe some purposes of event-driven applications:

  • Your application might be naturally centered on events. They involve some kind of sensor that detects and reports events and the purpose of the application is to analyze and react to these events.
  • Your application might need to identify and react to certain situations (either good or bad) as they occur. An event-driven approach, where changes in state are monitored as they happen lets an application respond in a much more timely fashion than a batch approach where the detection process runs only intermittently.
  • Your application might involve analysis of a large amount of data in order to provide some output to be delivered to a human user or some other application. By treating the input data as events you can use an event-driven approach to distribute this analysis across multiple computing nodes.
  • The event-driven approach can give you a way of extending an existing application in a flexible, non-invasive manner. Rather than changing the original application to add the extra function it’s sometimes possible to instrument the original application by adding event producers to it (for example by processing the log files that it produces). The additional functionality can then be implemented by processing the events generated by these event producers.

Event Stream module can help in all of these scenarios. The secret to long-term success is that the infrastructure is open to any technology and architectural pattern.

Example Use Cases

Use cases such as payment processing, fraud detection, anomaly detection, predictive maintenance, and IoT analytics all rely on immediate action on data. All of these use cases deal with data points in a continuous stream, each associated with a specific point in time. These are classic event stream processing examples because the order and timing of the data points help with identifying patterns and trends that represent an important insight for users.

Event stream processing is also valuable when data granularity is critical. For example, the actual changes to a stock price are often more important to a trader than the stock price itself, and event stream processing lets you track all the changes along the way to make better trading decisions. The practice of change data capture (CDC), in which all individual changes to a database are tracked, is another event stream processing use case. In CDC, downstream systems can use the stream of individual updates to a database for purposes such as identifying usage patterns that can help define optimization strategies, as well as tracking changes for auditing requirements.

Event Streams Module Main Features

  • Asynchronous
  • Subscribe/Unsubscribe architecture
  • Each queue can have a MaxSize or TTL (Time To Live)
  • Each message can specify a custom TTL, so that will be ignored by the internal purging mechanisms
  • User access and permissions by Roles

Required roles

  • event_reader: owning this role a user can append messages to the queues
  • queue_reader: owning this role a user can read messages from the queues

Predefine Users

DMS provides predefine users to handle events streams. They are on “disabled” state as default:

  • user_event_reader: it has read only access. It can only dequeue messages. Default password “pwd1”.
  • user_event_writer: it has write only access. It can only enqueue messages. Default password “pwd1”.
  • user_event: it has read and write access. Defalut password “pwd1”.

Event Streams Module Proxy

To use Event Streams from your client you need to download a proxy. Open you browser and connect to your DMS server typing this url: https://localhost/info.

Your url may be different, change “localhost” with the right DMS Server location.

DMS proxies

Enqueueing Messages

To enqueue or insert a message on a queue, you can use methods:

  • EnqueueMessage
  • EnqueueMultipleMessages

.

EnqueueMessage Proxy Method

This method sends a single message on a queue. You need to log in before using this method.

function EnqueueMessage(const Token: string; const QueueName: string; Message: TJsonObject): TJDOJsonObject;

params:

  • Token: is a string. It is a session token. Login method returns it.
  • QueueName: is a string. It’s the name to the queue to enqueue the message to. If it doesn’t exist will be created with the firs message
  • Message: is a JSONObject. It contains the message data (mandatory).

The method returns a JSONObject.

{
	"messageid":"xxxxx",
	"queuename":"test1"
}

MessageID is sequential number for the message in the queue. It is used as a placeholder to get next message.

To enqueue follow the example below:

var
  lJObj: TJsonObject;
  lProxy: TEventStreamsRPCProxy;
  lToken: string;
begin
  lProxy := TEventStreamsRPCProxy.Create(DMS_SERVER_URL + '/eventstreamsrpc');
  try
    lProxy.RPCExecutor.SetOnValidateServerCertificate(OnValidateCert);
    lJObj := lProxy.Login(DMS_USERNAME, DMS_PWD);
    try
      lToken := lJObj.S['token'];
    finally
      lJObj.Free;
    end;

    lJObj := TJsonObject.Create;
    try
      lJObj.S['timestamp'] := TimeToStr(now);
      lJObj.I['sender_pid'] := fPID;
      lJObj.I['value'] := StrToInt(EditValue.Text);
      lProxy.EnqueueMessage(lToken, EditQueueName.Text, lJObj.Clone);
      EditValue.Text := (lJObj.I['value'] + 1).ToString;
    finally
      lJObj.Free;
    end;
  finally
    lProxy.Free;
  end;
EnqueueMultipleMessages Proxy Method

This methods sends one or more messages to different queues. You need login before using this method.

function EnqueueMultipleMessages(const Token: string; Messages: TJsonArray): TJDOJsonObject;

Params:

  • Token: is a string. Login session Token.
  • Messages: is a JSONArray of object. Each object must declare a property named “queue” which contains the name of the queue to send message to.

Dequeuing Messages

To dequeue messages you can use methods:

  • DequeueMessage
  • DequeueMultipleMessages
DequeueMultipleMessage Proxy Method

Use this method to dequeue messages from a queue.

function DequeueMultipleMessage(const Token: string; const QueueName: string; LastKnownID: string; const MaxMessageCount: Integer; const TimeoutSec: Int64): TJDOJsonObject;

Params:

  • Token: is a string. it’s login session token.

  • Queuename: is a string. It’s the queue name.

  • LastKnownID: is a string. It’s the last queue message ID read from the queue.

    First time you dequeue e queue, you don’t have this information. You can use the following predefined constant values:

    • __last__ : it gets the last message from the queue
    • __first__: it gets the first message from the queue
  • MaxMessageCount: it is an integer. It’s the number of messages to read since last read (LastKnownID).

  • TimeoutSec: is an integer. It represents the seconds the method waits for messages (max 1 min 5 seconds).

The method returns a JSONObject with message informations.

Here an example. You can pass as LastKnownID the ID or the consts defined before.

procedure TMainForm.DequeueMessage(const QueueName, LastKnownID: String);
var
  lProxy: TEventStreamsRPCProxy;
  lJObj: TJsonObject;
  lLastMgsID: string;
  lToken: string;
  lJMessage: TJsonObject;
begin
  lProxy := TEventStreamsRPCProxy.Create(DMS_SERVER_URL + '/eventstreamsrpc');
  try
    lProxy.RPCExecutor.SetOnValidateServerCertificate(OnValidateCert);
    lJObj := lProxy.Login(DMS_USERNAME, DMS_PWD);
    try
      lToken := lJObj.S['token'];
    finally
      lJObj.Free;
    end;
    LogStart('Dequeue ' + LastKnownID);
    try
      lLastMgsID := LastKnownID;
      try
        lJMessage := lProxy.DequeueMultipleMessage(lToken, QueueName, lLastMgsID, 1,
          StrToInt(EditTimeout.Text));
        try
          if lJMessage.B['timeout'] then
          begin
            Memo1.Lines.Add(lJMessage.ToJSON);
          end
          else
          begin
            if chkUpdateKID.Checked then
            begin
              EditLastKnownID.Text := lJMessage.A['data'][0].S['messageid'];
            end;
            Memo1.Lines.Add(lJMessage.ToJSON());
          end;
        finally
          lJMessage.Free;
        end;
      except
        on E: Exception do
        begin
          Memo1.Lines.Add(E.Message);
        end;
      end;
      ScrollToLastLine(Memo1);
    finally
      LogEnd;
    end;
  finally
    lProxy.Free;
  end;
DequeueMessage Proxy Method

Use this method to dequeue a message from a queue.

function DequeueMessage(const Token: string; const QueueName: string; LastKnownID: string; const TimeoutSec: Int64): TJDOJsonObject;

Params:

  • Token: is a string. it’s login session token.

  • Queuename: is a string. It’s the queue name.

  • LastKnownID: is a string. It’s the last queue message ID read from the queue.

    First time, you dequeue e queue, you don’t have this information. You can use the following predefined constant values:

    • __last__ : it gets the last message from the queue
    • __first__: it gets the first message from the queue
  • TimeoutSec: is an integer. It represents the seconds the method waits for messages (max 1 min 5 seconds).

The method returns a JSONObject with message informations.

DeleteQueue Proxy Method

Use this method to remove a queue.

 procedure DeleteQueue(const Token: string; const QueueName: string);

Params:

  • Token: is a string. it’s login session token.
  • Queuename: is a string. It’s the queue name.
GetQueueSize Proxy Method

Use this method returns the number of messages in the queue.

    function GetQueueSize(const Token: string; const QueueName: string): TJDOJsonObject;

Params:

  • Token: is a string. it’s login session token.
  • Queuename: is a string. It’s the queue name.

it returns a JSONObject:

{
 "message_count":10,
 "queuename":"test1"   
}
GetQueuesInfo Proxy Method

Use this method to get the list of queues.

    function GetQueueSize(const Token: string; const NameFilter: string): TJDOJsonObject;

Params:

  • Token: is a string. it’s login session token.
  • NameFilter: is a string. It can be empty. If it has a value is used to extract all queue starting with it.

it returns a JSONObject, with a list of queues:

{
 "queue_names":['test1',...]
}
GetNextMessageByTimestamp Proxy Method

Use this method to get the first message after a particular timestamp

function GetNextMessageByTimestamp(const Token: string; const QueueName: string; TimeStamp: TDateTime; IsUTC: Boolean): TJDOJsonObject;		

Params:

  • Token: is a string. it’s login session token.
  • Queuename: is a string. It’s the queue name.
  • TimeStamp: is a timestamp. It’s the date and time used to search for the next message (grater or equal).
  • IsUtc: it is a boolean. It declares whether the timestamp is UTC or not.

It returns a JSONObject with message data.

Full example

Full example code with all the features explained are available in the official samples provided.