The Event Streams module is a complete distributed publish-subscribe messaging system based on the concept of streams. EventStreams module allows to create event-driven applications. An event-driven application is a computer program that is written to respond to actions generated by the user or the system. In a computing context, an event is any identifiable occurrence that has significance for system hardware or software. As such, events include both user-generated actions like mouse clicks and keystrokes and system-generated events such as program loading.
The Event Streams module implements a new data type introduced with DMSContainer 4.0.0+, which models a “log data” structure in a more abstract way. Like a log file, often implemented as a file open in append only mode, event streams are primarily an append only data structure.
What makes Event Streams so useful is the fact that it implements additional features compared to a plain log file:
Event-driven programming separates event-processing logic from the rest of a program’s code. The event-driven approach contrasts with batch processing. Because event-driven programming is an approach rather than a type of language, event-driven apps can be created in any programming language. Depending on the specific application, event-driven processing can improve responsiveness, throughput and flexibility.
Event stream processing (ESP) is the practice of taking action on a series of data points that originate from a system that continuously creates data (a.k.a. produces events). The term “event” refers to each data point in the system, and “stream” refers to the ongoing delivery of those events. A series of events can also be referred to as “streaming data” or “data streams.” Actions that are taken on those events include aggregations (e.g., calculations such as sum, mean, standard deviation), analytics (e.g., predicting a future event based on patterns in the data), transformations (e.g., changing a number into a date format), enrichment (e.g., combining the data point with other data sources to create more context and meaning), and ingestion (e.g., inserting the data into a database).
Event stream processing is often viewed as complementary to batch processing. Batch processing is about taking action on a large set of static data (“data at rest”), while event stream processing is about taking action on a constant flow of data (“data in motion”). Event stream processing is necessary for situations where action needs to be taken as soon as possible. This is why event stream processing environments are often described as “real-time processing”.
Event stream processing works by handling a data set by one data point at a time. Rather than view data as a whole set, event stream processing is about dealing with a flow of continuously created data. This requires a specialized set of technologies which are not “files”, “tables” and other sub-optimal solutions often used for these kind of necessities.
In an event stream processing environment, there are two main classes of technologies:
The former component pertains to data storage, and stores data based on a timestamp. For example, you might capture outside temperature every minute of the day and treat that as an event stream. Each “event” is the temperature measurement accompanied by the exact time of the measurement. The latter (known as “stream processors” or “stream processing engines”) is truly the “event stream processing” component and lets you take action on the incoming data using the JSON-RPC API.
Applications publish data as a stream of events while other applications pick up that stream and consume it when they want. Because all events are stored, applications can hook into this stream and consume as required—in batch, real time or near-real-time. This means that you can truly decouple systems and enable proper agile development. Furthermore, a new system can subscribe to the stream and catch up with historic data up until the present before existing systems are properly decommissioned.
In their book Event-Driven Processing in Action, authors Peter Niblett and Dr. Opher Etzion describe some purposes of event-driven applications:
Event Stream module can help in all of these scenarios. The secret to long-term success is that the infrastructure is open to any technology and architectural pattern.
Use cases such as payment processing, fraud detection, anomaly detection, predictive maintenance, and IoT analytics all rely on immediate action on data. All of these use cases deal with data points in a continuous stream, each associated with a specific point in time. These are classic event stream processing examples because the order and timing of the data points help with identifying patterns and trends that represent an important insight for users.
Event stream processing is also valuable when data granularity is critical. For example, the actual changes to a stock price are often more important to a trader than the stock price itself, and event stream processing lets you track all the changes along the way to make better trading decisions. The practice of change data capture (CDC), in which all individual changes to a database are tracked, is another event stream processing use case. In CDC, downstream systems can use the stream of individual updates to a database for purposes such as identifying usage patterns that can help define optimization strategies, as well as tracking changes for auditing requirements.
DMS provides predefine users to handle events streams. They are on “disabled” state as default:
To use Event Streams from your client you need to download a proxy. Open you browser and connect to your DMS server typing this url: https://localhost/info.
Your url may be different, change “localhost” with the right DMS Server location.
To enqueue or insert a message on a queue, you can use methods:
.
This method sends a single message on a queue. You need to log in before using this method.
function EnqueueMessage(const Token: string; const QueueName: string; Message: TJsonObject): TJDOJsonObject;
params:
The method returns a JSONObject.
{
"messageid":"xxxxx",
"queuename":"test1"
}
MessageID is sequential number for the message in the queue. It is used as a placeholder to get next message.
To enqueue follow the example below:
var
lJObj: TJsonObject;
lProxy: TEventStreamsRPCProxy;
lToken: string;
begin
lProxy := TEventStreamsRPCProxy.Create(DMS_SERVER_URL + '/eventstreamsrpc');
try
lProxy.RPCExecutor.SetOnValidateServerCertificate(OnValidateCert);
lJObj := lProxy.Login(DMS_USERNAME, DMS_PWD);
try
lToken := lJObj.S['token'];
finally
lJObj.Free;
end;
lJObj := TJsonObject.Create;
try
lJObj.S['timestamp'] := TimeToStr(now);
lJObj.I['sender_pid'] := fPID;
lJObj.I['value'] := StrToInt(EditValue.Text);
lProxy.EnqueueMessage(lToken, EditQueueName.Text, lJObj.Clone);
EditValue.Text := (lJObj.I['value'] + 1).ToString;
finally
lJObj.Free;
end;
finally
lProxy.Free;
end;
This methods sends one or more messages to different queues. You need login before using this method.
function EnqueueMultipleMessages(const Token: string; Messages: TJsonArray): TJDOJsonObject;
Params:
To dequeue messages you can use methods:
Use this method to dequeue messages from a queue.
function DequeueMultipleMessage(const Token: string; const QueueName: string; LastKnownID: string; const MaxMessageCount: Integer; const TimeoutSec: Int64): TJDOJsonObject;
Params:
Token: is a string. it’s login session token.
Queuename: is a string. It’s the queue name.
LastKnownID: is a string. It’s the last queue message ID read from the queue.
First time you dequeue e queue, you don’t have this information. You can use the following predefined constant values:
MaxMessageCount: it is an integer. It’s the number of messages to read since last read (LastKnownID).
TimeoutSec: is an integer. It represents the seconds the method waits for messages (max 1 min 5 seconds).
The method returns a JSONObject with message informations.
Here an example. You can pass as LastKnownID the ID or the consts defined before.
procedure TMainForm.DequeueMessage(const QueueName, LastKnownID: String);
var
lProxy: TEventStreamsRPCProxy;
lJObj: TJsonObject;
lLastMgsID: string;
lToken: string;
lJMessage: TJsonObject;
begin
lProxy := TEventStreamsRPCProxy.Create(DMS_SERVER_URL + '/eventstreamsrpc');
try
lProxy.RPCExecutor.SetOnValidateServerCertificate(OnValidateCert);
lJObj := lProxy.Login(DMS_USERNAME, DMS_PWD);
try
lToken := lJObj.S['token'];
finally
lJObj.Free;
end;
LogStart('Dequeue ' + LastKnownID);
try
lLastMgsID := LastKnownID;
try
lJMessage := lProxy.DequeueMultipleMessage(lToken, QueueName, lLastMgsID, 1,
StrToInt(EditTimeout.Text));
try
if lJMessage.B['timeout'] then
begin
Memo1.Lines.Add(lJMessage.ToJSON);
end
else
begin
if chkUpdateKID.Checked then
begin
EditLastKnownID.Text := lJMessage.A['data'][0].S['messageid'];
end;
Memo1.Lines.Add(lJMessage.ToJSON());
end;
finally
lJMessage.Free;
end;
except
on E: Exception do
begin
Memo1.Lines.Add(E.Message);
end;
end;
ScrollToLastLine(Memo1);
finally
LogEnd;
end;
finally
lProxy.Free;
end;
Use this method to dequeue a message from a queue.
function DequeueMessage(const Token: string; const QueueName: string; LastKnownID: string; const TimeoutSec: Int64): TJDOJsonObject;
Params:
Token: is a string. it’s login session token.
Queuename: is a string. It’s the queue name.
LastKnownID: is a string. It’s the last queue message ID read from the queue.
First time, you dequeue e queue, you don’t have this information. You can use the following predefined constant values:
TimeoutSec: is an integer. It represents the seconds the method waits for messages (max 1 min 5 seconds).
The method returns a JSONObject with message informations.
Use this method to remove a queue.
procedure DeleteQueue(const Token: string; const QueueName: string);
Params:
Use this method returns the number of messages in the queue.
function GetQueueSize(const Token: string; const QueueName: string): TJDOJsonObject;
Params:
it returns a JSONObject:
{
"message_count":10,
"queuename":"test1"
}
Use this method to get the list of queues.
function GetQueueSize(const Token: string; const NameFilter: string): TJDOJsonObject;
Params:
it returns a JSONObject, with a list of queues:
{
"queue_names":['test1',...]
}
Use this method to get the first message after a particular timestamp
function GetNextMessageByTimestamp(const Token: string; const QueueName: string; TimeStamp: TDateTime; IsUTC: Boolean): TJDOJsonObject;
Params:
It returns a JSONObject with message data.
Full example code with all the features explained are available in the official samples provided.