Stream Processor
Mit dem Stream Processor können einlaufende Nachrichten durch Verschaltung verschiedener Verarbeitungsblöcke ("Processing Blocks") verarbeitet und anschließend zurück an den Edge Broker des RoboGate Edge gegeben werden. Zu den Funktionen gehören z.B. Datenaggregation und mathematische Berechnungen auf Grundlage einlaufender Nachrichten. Mehrere unabhängige Datenströme können unabhängig voneinander und gleichartig verarbeitet werden, siehe dazu die nachfolgende Skizze.

Konfigurationsschema
Die Konfiguration des Stream Processor beinhaltet die Definition folgender Elemente:
-
Input in den Stream Processor,
-
Verarbeitung (Processing) des Inputs und
-
Output aus dem Stream Processor an den Edge Broker.
Für die Konfiguration in der RoboGate Edge UI und Verbindung dieser Elemente bietet der Stream Processor einen Floweditor (Abbildung 1) mit folgenden Bestandteilen:
-
Editor-Menüleiste:
Zoom,
automatische Block-Anordnung auf der Zeichenfläche,
Leeren der Zeichenfläche,
Löschen eines Blocks
-
Panel "Settings": Konfigurieren von Input Edge Topics und Default Output Topic
-
Panel "Blocks": Sammlung verfügbarer Input-, Processing- und Output-Blöcke
-
Zeichenfläche: Konfiguration, Verbindung und Anordnung von Blöcken
-
Speichern: Speichern der Konfiguration

Um einen neuen Block anzulegen, klicken Sie das entsprechende Element im Panel "Blocks". Anschließend erscheint dieser Block auf der Zeichenfläche. Sie können den Block via drag&drop frei in der Zeichenfläche verschieben.
Um einen Block zu entfernen, klicken Sie entweder auf die Löschschaltfläche im Block in der Zeichenfläche oder wählen Sie den Block an und klicken Sie
"Delete Selected" in der Editor-Menüleiste.
Um alle Blöcke zu entfernen, klicken Sie auf
"Clear" in der Editor-Menüleiste.
Jeder Block hat Ein- und/oder Ausgangsknoten über die er sich verbinden lässt. Über diese Verbindungen werden Nachrichten weitergeleitet. Um Blöcke zu verbinden, klicken Sie auf einen Ausgangsknoten und ziehen eine Linie zu einem Eingangsknoten eines anderen Blocks. Um eine Verbindung zu entfernen, klicken Sie auf den Knoten am Ende der Verbindung und lösen Sie die Verbindung vom Eingangsknoten des Folgeblocks. Wird keine neue Verbindung anglegt, wird die Verbindung automatisch entfernt. Jeder Knotenpunkt kann über mehrere Verbindungen verfügen.
Die Verbindung der Input-, Processing- und Output-Blöcke ist Voraussetzung für die Verarbeitung von Nachrichten im Stream Processor!
Input
Über die Angabe von Input Edge Topics wird definiert, welche der im RoboGate Edge bzw. Edge Broker vorhandenen Topics im Stream Processor verarbeitet werden. Diese Auswahl dient somit als Eingangsfilter in den Stream Processor. Es können mehrere Einträge angelegt werden.
Über einen Input-Block lassen sich die konfigurierten Input Edge Topics nach Source und Scope filtern und an einen Processing-Block schicken. Diese Filterung ist durch Reguläre Ausdrücke (RegEx) realisiert. Stimmen die Header-Felder "Source" und "Scope" aus den einlaufenden Nachrichten mit der Konfiguration des Input-Blocks überein, wird die Nachricht an die nachfolgenden Blöcke gegeben. Es können mehrere Input-Blöcke angelegt werden.
Parameter | Beschreibung |
---|---|
Name |
Name des Input-Block |
Source (Filter) |
RegEx-Filterung des Header-Feld "Source" |
Scope (Filter) |
RegEx-Filterung des Header-Feld "Scope" |
Nach Konfiguration des Inputs in den Stream Processor (Input Edge Topics und Input-Block/Blöcke), müssen die notwendigen Processing-Blöcke konfiguriert und dem Verarbeitungsfluss entsprechend verbunden werden.
Konfiguration in der UI
Im Panel "Settings" können die Input Edge Topics konfiguriert werden. Das Hinzufügen eines Input Edge Topics erfolgt durch die Eingabe des Namen des Topics, das verarbeitet werden soll, im Feld "New Topic". Entfernt werden können Topics mit einem Klick auf das Kreuz .
Beispiel: Der zu verarbeitende Daten-Input wird von der „PollGroup a“ des OPC UA-Moduls bezogen. Abbildung 2 zeigt die nachfolgend beschriebene Konfiguration im OPC UA-Modul:
-
Poll Groups: „PollGroup a“ mit Interval 1000ms, Name: “a”, Output Topic: “PollGroup a”
-
Field Name: a und b

Die angelegte "PollGroup a" muss nun als Input Edge Topic im Stream Processor angelegt werden. Dies erfolgt durch Eingabe des Namens unter New Topic und einem Klick auf das Hinzufügen-Symbol "+" (Abbildung 3).

Anschließend muss ein Input-Block mit dem Filter angelegt werden. Hierzu klicken Sie auf das Element Input in der Auswahlliste des Panels "Blocks" und der Block erscheint in der Zeichenfläche des Stream Processor (Abbildung 4).
Mit einem Klick auf den Stift
im Input-Block auf der Zeichenfläche öffnet sich das Dialogfenster zum Konfigurieren des Input-Blocks. Im Beispiel erhält der Input Block den Namen "OPCUA" und filtert die Nachrichten nach Source ("OPCUA") und Scope ("^a$") weiter (Abbildung 5)


Um den Input zu verarbeiten, müssen nun den Anforderungen entsprechend Processing-Blöcke angelegt werden.
Processing
Die Processing-Blöcke verarbeiten die einlaufenden Nachrichten und können diese anschließend über Output-Blöcke zurück an den Edge Broker des RoboGate Edge geben.
Folgende Typen von Processing-Blöcken stehen zur Verfügung:
-
Aggregator
-
Dedup
-
Edge Method
-
Expression
-
Resend
-
SNCFilter
-
Timer
-
Router
-
RuleEngine
Um Nachrichten mit unterschiedlichen Source- und Scope-Werten (d.h. aus unterschiedlichen Datenquellen) unabhängig voneinander und parallel zueinander zu verarbeiten, bauen die Processing-Blöcke in der Regel für jede Kombination von Source und Scope unabhängige Zustände (z.B. eine Timerinstanz auf dotnet Ebene oder einen Puffer) auf.
Die verfügbaren Typen von Processing-Blöcken mit ihren spezifischen Konfigurationsparametern werden in den nachfolgenden Abschnitten beschrieben.
Konfiguration in der UI
Um einen Processing-Block zu erstellen, wählen Sie das entsprechende Element im Panel "Blocks" unter dem Bereich "Processing" aus. Anschließend erscheint der Block auf der Zeichenfläche des Stream Processor. In diesem Beispiel wird der Aggregator-Block gewählt (Abbildung 6).
Mit einem Klick auf den Stift öffnet sich das Dialogfenster zum Konfigurieren des Blocks. Nun lässt sich der Name des Blocks, im Beispiel "agg1", festlegen und die weiteren Parameter konfigurieren.


Damit die Nachrichten des Input Edge Topics weitergeleitet und verarbeitet können, muss eine Verbindung zwischen Input und Verarbeitung (Input Block und Processing Block) geschaffen werden. Hierzu klicken Sie auf den ausgehenden Knotenpunkt "Messages" des Input-Blocks und ziehen eine Verbindung zu einem Eingangsknotenpunkt am Processing-Block (Abbildung 8). Über diese Verbindung laufen die Nachrichten aus Source und Scope entsprechend des konfigurierten Filters im Input-Block in den Aggregator-Block ein.
Auch Processing-Blöcke können untereinander über Eingangs- und Ausgabeknoten miteinander verbunden werden.

Output
Um die verarbeiteten Nachrichten im Stream Processor in weiteren Modulen des RoboGate Edge zu verarbeiten, müssen diese an den Edge Broker zurückgegeben werden.
Hierzu können ein Default Output Topic und/oder mittels Output Block weitere Output Topics definiert werden. Das Default Output Topic ist optional. In das Modul einlaufende Nachrichten, die durch keine Filter-Konfiguration passen (in den Input-Blöcken), werden, falls konfiguriert, auf dieses Edge Broker Topic geleitet. Um eine Nachricht aus dem Stream Processor zurück an den Edge Broker zu geben, muss über einen Output-Block ein Output Topic konfiguriert werden. Dieses Topic kann zur Auswertung in einem Datenausgabemodul genutzt werden.
Es können mehrere Output-Blöcke angelegt werden.
Parameter | Beschreibung |
---|---|
Name |
Name des konfigurierten Output-Blocks, entspricht dem Namen des Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen |
Konfiguration in der UI
Zum Erstellen eines Output-Blocks klicken Sie, analog zum Erstellen eines Input-Blocks auf das Element Output in der Auswahlliste des Panels "Blocks" im Bereich "Output". Daraufhin erscheint der erstellte Output-Block in der Zeichenfläche (Abbildung 9).
Mit einem Klick aud den Stift
öffnet sich das Dialogfenster zum Konfigurieren des Output-Blocks (Abbildung 10). Im Beispiel erhält der Output-Block den Namen "out".
Anschließend kann der zuvor erstelle Aggregator-Block mit dem Output-Block verbunden werden. Hierzu klicken Sie auf den externen, ausgehenden Knotenpunkt "Result" des Aggregator-Blocks und ziehen die entstehende Verbindungslinie zum Knotenpunkt "Out" des Output-Blocks (Abbildung 11).



Processing Blocks
Im folgenden werden die Processing-Blöcke und ihre Konfiguration näher beschrieben.
Aggregator
Funktionsbeschreibung und Anwendung
Der Aggregator-Block sammelt einlaufende Nachrichten. Die in Nachrichten enthaltenen Felder können entsprechend der Konfiguration verschiedenartig behandelt werden. Die Nachrichten werden in sich optional überlappenden Gruppen (Windows) basierend auf Zeit oder Anzahl zusammengefasst und verarbeitet.

Ein Anwendungsbeispiel ist eine Minimum/Maximum-Aggregation, um das aufkommende Datenvolumen zu reduzieren. Beispielsweise laufen Temperaturdaten der Außentemperatur im Sekundentakt ein, aber statt 60 Momentan-Werte zu übertragen, können für jede Minute nur der Minimum- und Maximum-Wert übertragen werden.
Die in der folgenden Tabelle erläuterten Parameter sind für die Konfiguration notwendig.
Parameter | Beschreibung |
---|---|
Name |
Name der Blockinstanz. |
Into |
Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Window - Based On |
Auswahl des Window-Typ
|
Window – Size |
Die Windowgröße bestimmt den vorhandenen Puffer je nach Window-Typ sample-basiert (Wert als Anzahl) oder zeitbasiert (Wert in Millisekunden). |
Window - HoppingEach |
Häufigkeit einer Berechnung und Ausgabe je nach Window-Typ sample-basiert (Wert als Anzahl) oder zeitbasiert (Wert in Millisekunden). Ein hoher Wert führt seltener zu einer Ausführung und Ausgabe als ein kleiner. |
Behaviour |
Die verarbeitende Funktion kann für jedes Feld der einlaufenden Nachricht angegeben werden. Sollte eine ausgehende Nachricht keinen Zeitstempel (Key timestamp) besitzen, so wird dieser mit der aktuellen Zeit erzeugt. |
Default |
Default-Wert, der für alle Felder gilt, die nicht explizit angegeben sind. |
Name |
Feldname (Variablenname) im JSON der einlaufenden Nachricht |
Value |
Funktionsauswahl für das jeweilige Feld. Folgende Werttypen stehen zur Verfügung:
|
Beispielkonfiguration Aggregator-Block
Folgende Beispielkonfiguration ist in Abbildung 13 dargestellt:
-
„a“ ist explizit konfiguriert und wird in einem Array gesammelt (Value: "Collect") und ausgegeben.
-
„b“ wird explizit ignoriert (Value: "Ignore") und entfällt für die Ausgabenachricht.
-
Da der Timestamp nicht konfiguriert ist, erhält die Default-Funktion die Konfiguration first.

Dedup
Funktionsbeschreibung und Anwendung
Der Deduplicate-Block (Dedup) verwirft Nachrichten, die gleiche Informationen zur vorherigen Nachricht aufweisen.
Die Prüfung kann mittels des Parameter Watch
maskiert, d.h. eingeschränkt werden.
Der Block wird verwendet, um redundante Daten/Nachrichten auszufiltern.
Die in der folgenden Tabelle erläuterten Parameter sind in [img-90] zu finden.
Parameter | Beschreibung |
---|---|
Name |
Name der Blockinstanz. |
Into |
Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Overwrite Header Scope with |
string, Name für neue Scope Bezeichnung (optional). Dient dem Überschreiben des Header-Feldes Scope. Die Option ist inaktiv, wenn der String nicht vorhanden oder leer ist. |
Watch |
string oder string Array, Watch ist der Ausdruck für die Auswahl des Prüfkriteriums in der Nachricht. Wenn aufeinanderfolgende Nachrichten an der Stelle/den Stellen „x“ gleich sind, wird die Nachricht verworfen. Für den Zugriff auf Nachrichtenkeys mit einem Punkt „.“, bspw. Key Data.Value, ist der Zugriff über die Syntax ['Data.Value'] erforderlich. |
Beispielkonfiguration Dedup-Block
Die erläuterten Parameter sind in Abbildung 14 beispielhaft angewendet.

Edge Method
Funktionsbeschreibung und Anwendung
Der Edge Method-Block (Edge Method Invocation) ruft eine Edge Method auf und leitet deren Ergebnis in Abhängigkeit des Erfolgs an einen weiteren Processing- oder Output-Block weiter.
Parameter | Beschreibung |
---|---|
Name |
Name der Blockinstanz. |
Method |
Name der Methode, die aufgerufen werden soll. |
Parameter |
Liste von konstanten Parametern zum Aufrufen der Methode |
Beispiel: Aktualisieren eines OPC UA Nodes
Im Beispiel wird ein OPC UA Node gesetzt, sobald eine Nachricht den Block auslöst.

Häufig verwendete Methoden
Im Folgenden werden Methoden des RoboGate Edge beschrieben, welche sich für Verwendung zusammen mit dem Edge Method-Block eignen.
robogate.opcua.writeNodeValue
Methode des OPC UA-Moduls. Setzt einen Node eines konfigurierten OPC UA Servers auf einen konstanten Wert.
Parameter | Beschreibung |
---|---|
server |
Name des konfigurierten Servers |
nodeId |
Node ID des zu setzenden Nodes |
value |
Wert des Nodes |
robogate.opcua.readNodeValue
Methode des OPC UA-Moduls. Liest den Wert eines Nodes eines konfigurierten OPC UA Servers.
Parameter | Beschreibung |
---|---|
server |
Name des konfigurierten Servers |
nodeId |
Node ID des zu setzenden Nodes |
value |
Wert des Nodes |
Property | Beschreibung |
---|---|
val |
Ausgelesener Wert |
robogate.rfc1006.readSignal
Methode des RFC1006-Moduls. Liest einen Wert aus einem Speicherbereich einer konfigurierten PLC.
Parameter | Beschreibung |
---|---|
plc |
Name der konfigurierten PLC |
dataType |
Speichertyp auf PLC
|
dataBlock |
Name des DataBlocks (Bei DataType DataBlock) |
startByte |
StartByte des auszulesendes Wertes im Speicherbereich |
count |
Anzahl der zu lesenden Bytes des Speicherbereiches |
bit |
Zu lesendes Bit innerhalb des zu lesenden Bytes (bei VarType = Bit) |
varType |
Datentyp in den der gelesene Wert konvertiert werden soll
|
Property | Beschreibung |
---|---|
value |
Ausgelesener Wert |
robogate.rfc1006.writeSignal
Methode des RFC1006-Moduls. Schreibt einen Wert in einen Speicherbereich einer konfigurierten PLC.
Parameter | Beschreibung |
---|---|
plc |
Name der konfigurierten PLC |
dataType |
Speichertyp auf PLC
|
dataBlock |
ID des DataBlocks |
startByte |
Start Byte des zu lesendes Wertes im Speicherbereich |
varType |
Datentyp in den der gelesene Wert konvertiert werden soll
|
value |
Zu schreibender Wert |
modbus-testread
Methode des Modbus-Moduls. Liest einen Wert aus einem konfigurierten Modbus Slave.
Parameter |
Beschreibung |
serverInstance |
Name des konfigurierten Modbus |
slaveId |
ID des auszulesenden Slaves im Modbus |
functionCode |
Modbus Function Code
|
registerStartAddress |
Startadresse des zu lesenden Speicherbereichs |
registerLength |
Länge des zu lesenden Speicherbereichs in Words (2 byte) bei FunctionCodes 3 und 4 |
type |
Datentyp in den der augelesene Wert konvertiert werden soll
Bei FunctionCodes 1 und 2: Bool, Länge 1 |
valueGain |
Konstanter Faktor des gelesenen Wertes, um richtigen Wert zu erhalten (Default: 1.0) |
valueOffset |
Konstante Verschiebung des gelesenen Wertes, um richtigen Wert zu erhalten (Default: 0.0) |
isLittleEndian |
Wenn true, interpretiert beim Lesen des Wertes das erste gelesende Byte als kleinstwertiges Byte. (Default: false ⇒ big-endian) |
Property | Beschreibung |
---|---|
resultCode |
Ergebnis Code ("Ok" oder "Error") |
data.Variable |
Ausgelesener Wert |
robogate.set_led
Diese Methode ist nur bei RoboGate Devices powered by Turck verfügbar. Konfiguriert das Blinkverhalten einer HMI LED am Gerät.
Parameter | Beschreibung |
---|---|
name |
Name der LED
|
modus |
Blink Modus
|
Expression
Funktionsbeschreibung und Anwendung
Der Expression Evaluator-Block (ExpEval) kann Berechnungen und sonstige mathematische oder logische Verknüpfungen auf Grundlage einlaufender Nachrichten durchführen. Der Processing-Block hat Zugriff auf einlaufende Nachrichten-Payloads.
Die folgenden Skizze veranschaulicht die Funktionsweise:

Im Beispiel der Skizee wird laut Konfiguration die Berechnung eines Wertes result
durchgeführt.
Dafür wird auf die einlaufenden Datenfelder a
und b
zugegriffen. Nach Verarbeitung, wird die einlaufende Nachricht mit dem Wert result
erweitert und ausgegeben.
In der folgenden Tabelle werden die Konfigurationsparameter erläutert.
Parameter | Beschreibung |
---|---|
Name |
Name der Blockinstanz. |
Into |
Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Expressions - Name |
Name des Ergebnis (Key) für einen angegebenen Ausdruck. |
Value |
string, Auszuwertender Ausdruck. Die Syntax ist unter Ausdrücke beschrieben. |
Preserve input properties |
optional, boolean, default=false, |
Results in object |
optional, string, Wenn an dieser Stelle etwas angegeben wird, werden Ergebnisse in einem Unterobjekt der Ausgangsnachricht eingefügt. |
Ausdrücke
Folgende Operatoren und Ausdrücke stehen zur Verfügung und können ausgewertet werden:
-
mathematische Verknüpfungen mit den Operatoren
+, -, *, /
-
Modulo mit Operator
%
-
mathematische Funktionen aus .NET Math. Bspw.
Round(..), Abs(..), Pow(..), Sqrt(..), ..
-
binäre Verknüpfungen mit den Operatoren
| , &, ^, ~
-
logische Verknüpfungen mit den Operatoren
||, &&, or, and, !, not
-
mathematische Vergleiche mit den Operatoren
<, >, <=, >=, ==, !=
Neben numerischen Konstanten (wie 12
oder 7.91
) oder booleschen true
/false
kann auf die Properties der einlaufenden Nachricht per JSON Key zugegriffen werden.
Die folgende Tabelle zeigt einige Beispiele.
Expression (JSON Value) | Variables (aus einlaufender Nachricht) | Ergebnis (JSON Value) |
---|---|---|
"1.1*6" |
6.6 |
|
"20 % 10" |
0 |
|
"not true" |
false |
|
"true && true" |
true |
|
"true || false" |
true |
|
"not(2>8)" |
true |
|
"foo*bar" |
foo:20; bar:4 |
80 |
"foo-bar" |
foo:20; bar:4 |
16 |
"foo/bar" |
foo:20; bar:4 |
5 |
"foo+bar" |
foo:20; bar:4 |
24 |
"foo%bar" |
foo:20; bar:4 |
0 |
"foo-(bar * 100)" |
foo:20; bar:4 |
-380 |
Zu beachten ist, dass dieser Processing-Block über keinen Zustand verfügt. Jede Berechnung erfolgt anhand der Konfiguration und der einlaufenden Nachrichten!
.Net Math Operations Syntax: Auf die Funktionen aus dotnet math kann folgendermaßen zugegriffen werden:
-
Potenzieren:
y = x^a
⇒ "y": "Pow(x,a)" -
Runden:
y = x (auf zwei Nachkommastellen)
⇒ "y": "Round(x,2)" -
Quadratwurzel:
y = sqrt(x)
⇒ "y": "Sqrt(x)" -
Abrunden auf Ganzzahl: x ⇒ "y": "Floor(x)"
-
Aufrunden auf Ganzzahl: x ⇒ "y": "Ceiling(x)"
-
Betrag einer Zahl: x ⇒ "y": "Abs(x)"
Weitere Details sind in der Dokumentation der Math Klasse .Net Core zu finden: https://docs.microsoft.com/de-de/dotnet/api/system.math.sqrt?view=netcore-3.1
Beispielkonfiguration Expression-Block
Die Berechnung im Beispiel in Abbildung 16, ergibt sich aus dem Feld „a“ einer einlaufenden Nachricht und Addition mit einem konstanten Wert 1. Das Ergebnis wird in dem Feld „aPlus1“ abgelegt. Im Weiteren sollen alle vorhandenen Felder auf der Eingangsseite erhalten bleiben (→ Preserve input properties: true). Das Ergebnis soll direkt in das Top Level JSON-Objekt eingefügt werden (→ Results in object: null).

Resend
Funktionsbeschreibung und Anwendung
Der Resend-Block speichert jede einlaufende Nachricht in einem Cache und sendet diese in einem Intervall erneut aus. Jede einlaufende Nachricht wird dabei sofort wieder am Ausgang ausgesendet und der Timer für das erneute Aussenden neugestartet.
Der Block kann verwendet werden, um eine Datenübertragung in einem Zeitintervall zu erzwingen.
Beispielsweise wird ein Temperaturwert per Subscription gelesen und nur gesendet, wenn sich dieser ändert. In der Zielplattform möchte man jedoch mindestens einmal pro Tag ein Wert vom EdgeGateway erhalten, da sonst z.B. ein Alarm ausgelöst wird. Der Resend-Block kann nun vor der Übergabe an die Zielplattform platziert werden, um mindestens alle 24h erneut den letzten Wert zu übertragen.
In der folgenden Tabelle werden die Konfigurationsparameter erläutert.
Parameter | Beschreibung |
---|---|
Name |
Name der Blockinstanz. |
Into |
Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
ResendInterval |
Millisekunden-Intervall, in dem die letzte eingelaufene Nachricht zyklisch erneut gesendet wird. |
Upate Timestamp On Resend |
default=false, true = Beim Resend wird der Zeitstempel mit der aktuellen Zeit überschrieben. |
Beispielkonfiguration Resend-Block

Router
Funktionsbeschreibung und Anwendung
Mit dem Router-Block können einlaufende Nachrichten bedingungsbezogen in bestimmte Processing-Blöcke weitergeleitet werden.
Hierfür gilt:
-
Jede Bedingung muss einen booleschen Zustand zurückgeben.
-
Jede Bedingung, die keinen booleschen Zustand zurückgibt, ist automatisch "false".
-
Sind alle Bedingungen einer Route erfüllt, wird die Nachricht an alle mit dem Block verbundenen Nchfolgerblöcke weitergeleitet.
-
Es werden immer alle konfigurierten Routes überprüft.
-
Wenn keine Route erfüllt wird, wird die Nachricht das defaultOutputTopic des stream processor weitergeleitet.
Folgende Ausdrücke sind in den Bedingungen zulässig:
-
Regular Expression
-
Syntax: RegEx(<value>,'<pattern>')
-
Hochkomma (''): Für Value kann ein statischer Wert eingetragen werden
-
Ohne Hochkomma (''): Value aus Parametern (Header & Body) wird geholt.
-
-
alle Ausdrücke, die auch im Expression Evaluator Block angewendet werden können
Der Zugriff auf Nachrichteninhalte, wie die Eigenschaften des Headers und des Bodys, kann über die Bedingungen umgesetzt werden. Dies funktioniert über den Prefix header_ oder body_ innerhalb der Bedingung.
In der folgenden Tabelle werden die Konfigurationsparameter erläutert. Die Route-Blöcke enthalten die Routes mit ihren Bedingungen (conditions). Werden mehrere Bedingungen angegeben, so werden diese per Und-Logik verbunden. Das Ergebnis einer Bedingung muss immer einen boolschen Wert ergeben.
Parameter | Beschreibung |
---|---|
Name |
Name der Blockinstanz |
Route Name |
Name des Route-Blocks. Wurden mehrere Route-Blöcke angelegt, erscheinen diese als wählbare Reiter unter der Eingabezeile. |
Conditions - Name |
Name für die jeweilige Bedingung. |
Conditions - Value |
String, der die Logik für die Bewertung der keys enthält. |
Beispielkonfiguration Router-Block
Die Schrittweise Konfiguration des Stream Processors mit Block Routing wird am folgenden Beispiel erläutert:
-
Aggregate-Blöcke 1 bis 3 mit den gleichen Parametern anlegen (Abbildung 18)
-
Samples mit Size „1” und HoppingEach „1“
-
Behavior: ignore default and collect für „b”
-
-
Neuen Processing Block vom Type Router mit Name "Routing" anlegen (Abbildung 19)
-
Routes anlegen:
-
Route Name: b<10000
-
Conditions:
-
Name: toAggregate1
-
Value: RegEx(header_scope, '^a$') && body_b<10000
-
-
-


RuleEngine
Funktionsbeschreibung und Anwendung
Der RuleEngine-Block dient der Beschreibung von Zustandsübergängen anhand einlaufender Nachrichten in Abhängigkeit von Vorbedingungen. Dabei können Form und Inhalt der Ausgangsnachricht frei definiert werden.
-
Es existiert ein Kontext in dem Informationen abgelegt werden können, die bei der Auswertung der Bedingungen oder in der Ausgangsnachricht verwendet werden können. Der Kontext kann initial mit Informationen gefüllt werden.
-
Es gibt eine Menge von "RuleSets", welche (Vor-)Bedingungen beinhalten.
-
Jedes RuleSets kann eine Menge von "Rules" enthalten, die ebenfalls über Bedingungen verfügen.
-
Weiterhin können ein "defaultRuleSet" sowie in RuleSets "defaultRules" angelegt werden, die keine Bedingungen enthalten.
Jede Bedingung muss einen boolschen Zustand zurückgeben. Jede Bedingung, die keinen boolschen Zustand zurückgibt, ist automatisch "Falsch" (false). In den Bedingungen kann auf Inhalte des Kontextes und der eingehenden Nachricht (header & body & context) zugegriffen werden. Damit eine "Rule" zutrifft, müssen alle Bedingungen von dieser erfüllt sein.
Zuerst werden die Bedingungen der definierten RuleSets der Reihe nach, von RuleSet-1 ausgehend, geprüft. Sobald ein RuleSet die Bedingungen erfüllt, wird dieses RuleSet für die weitere Verarbeitung der Nachricht ausgewählt. Erfüllt kein RuleSet die Bedingung, wird das defaultRuleSet gewählt, sofern es angegeben ist, andernfalls erfolgt ein Abbruch.
Anschließend werden im ausgewählten RuleSet die Rules geprüft. Die Reihenfolge der Prüfung ist abhängig von der definierten „Evaluation Stategy“ (FirstMatch, BestMatch oder LeastMatch). Entsprechend kann eine Überprüfung der weiteren im RuleSet enthaltenen "Rules" übersprungen, die „zutreffendste“ oder die letzte zutreffende "Rule" everwendet werden. Erfüllt keine Rule die Bedingungen erfolgt ein Abbruch. Folglich wird im RuleEngine-Block nur ein RuleSet und darin nur eine Rule verarbeitet.
Das Ergebnis einer "Rule" wird in dem Kontext abgelegt, der Inhalt des Kontext wird gepatched. Es können gezielt Inhalte aus dem Kontext nach dem Patchen entfernt werden.
-
Wird das angegebene Message Template nicht gefunden, wird das Message Template "default" genommen.
-
Die Ausgangsnachricht wird anhand des Message Templates erzeugt.
-
Ist in dem Message Template kein Header definiert, werden die Informationen "source" und "scope" aus der Eingangsnachricht übernommen.
-
Ist in dem Message Template Header kein "source" und/oder "scope" definiert, werden die Informationen aus der Eingangsnachricht übernommen.
-
Ist in dem Message Template kein Body definiert, enthält die Ausgangsnachricht keine Informationen im Body.
-
Nach dem Erfüllen einer "Rule" wird eine Ausgangsnachricht an alle zugewiesenen Input bzw. Output Topics gesendet.
Bezeichnungen und Syntax
Die in den folgenden Tabellen erläuterten Parameter sind in einer beispielhaften Konfiguration und deren Abbildungen im folgenden Absatz zu finden.
Parameter | Beschreibung |
---|---|
Name |
Name der Blockinstanz |
Context |
Optionaler Bereich in dem Variablen mit oder ohne initialen Inhalt initialisiert werden. Jedes Contextelement kann in den Bedingungen verwendet werden. |
RuleSets |
Zusammenstellung von Bedingungspfaden/Zustandsüberwachungen. Es können beliebig viele erstellt werden. Die angelegten RuleSets erscheinen als Reiter unterhalb der Eingabemöglichkeit. Die Namen "RuleSets-n" können nicht geändert werden, n steht hierbei für eine fortlaufende Zahl. Hier kann auch ein „DefaultRuleSet“ definiert werden. |
Message Templates |
Beliebig viele Vorlagen für die Ausgabe von Nachrichten und Daten. Es können hier alle Elemente aus dem Nachrichtenbereich Header und Body bearbeitet werden. |
Parameter | Beschreibung |
---|---|
Name |
optional, Name des Kontextelementes |
Value |
optional, Inhalt des Kontextelementes |
Parameter | Beschreibung |
---|---|
Evaluation Strategy |
Es kann zwischen FirstMatch, BestMatch oder LeastMatch, also erster, bester oder letzter Übereinstimmung, entschieden werden. Die Voreinstellung ist „FirstMatch“. |
DefaultRuleSet |
Zu befüllen wie ein normales RuleSet mit Rules – jedoch ohne Bedingungen. In diesem RuleSet können wiederum Rules angelegt werden, die diverse Bedingungen enthalten. |
Conditions |
optional, globale Bedingung(en) damit in diesem RuleSet die Rules auf Übereinstimmung geprüft werden. |
Conditions – Name |
optional, Bezeichnung der Bedingung eingeben |
Conditions – Value |
optional – sofern kein Name/ key definiert wurde, Wert aus dem Context, body, header oder selbstdefiniert |
Rules |
Es kann zwischen den Reitern per Pfeiltasten gewechselt werden. Zusammenstellung von Bedingungspfaden/ Zustandsüberwachungen zur Verfeinerung der Bedingungspfade. Es können beliebig viele erstellt werden. Die angelegten Rules erscheinen als Reiter unterhalb der Eingabemöglichkeit. Die Namen können nicht geändert werden – Rules-n, n steht hierbei für eine fortlaufende Zahl. Hier kann auch ein „DefaultRule“ definiert werden. |
Rules – Context |
Beliebige Anzahl von Contextelementen |
Rules – Context – Name |
Name des Contextelementes |
Rules – Context – Value |
Inhalt des Contextelementes |
Rules – Into |
Folgeblock bestimmen, wenn notwendig |
Rules – Obsolete context |
Hier können Elemente des Context entfernt werden. |
Rules – Message Template |
Optionale Eingabe eines Namens für das Message Template zur Erstellung der Output-Nachricht |
Parameter | Beschreibung |
---|---|
Header – Name |
optional, Name des zu erstellenden keys |
Header – Value |
optional, sofern kein Name/ key definiert wurde, Wert aus dem Context, Body, Header oder selbstdefiniert |
Body – Name |
optional, Name des zu erstellenden keys |
Body – Value |
optional, sofern kein Name definiert wurde, Wert aus dem Context, Body, Header oder selbstdefiniert |
Verwendung von Elementen aus Header, Body und Context:
-
Groß- und Kleinschreibung ist zu beachten
-
Um das Element source aus dem Nachrichtenbereich Header zu erhalten, muss header_ für den zu verwendenden Bereich und source für das Element eingegeben werden. → header_scope
Verwendung einer Zeichenkette/ String
-
Eine Zeichenkette wird mit einem vorangehenden „ ‘ „ und einem abschließenden „ ‘ „ gekennzeichnet. Bsp.: ‘Zeit verstrichen‘
-
Werden die Zeichen nicht verwendet, so wird die Eingabe wie eine Variable verwendet.
Beispielkonfiguration RuleEngine-Block
Die schrittweise Konfiguration im Stream Processor wird am folgenden Beispiel dargestellt. Es werden Daten eines OPC UA Servers verarbeitet. Die Reihenfolge sollte wie beschrieben erfolgen, da so alle angezeigten Auswahlmöglichkeiten im Zuge der Konfiguration zur Verfügung stehen. Auswahlwahlmöglichkeiten stehen immer erst nach einem Speichervorgang zur Verfügung.
Voraussetzung: Im Modul OPC UA wurde ein OPC UA Server mit dem Output Topic „OpcuaOut1“ konfiguriert (Abbildung 20).

Nun werden die grundlegenden Elemente im Stream Processor angelegt bzw. konfiguriert (Abbildung 21):
-
Topic "OpcuaOut1" unter "Settings" als Input Edge Topic hinzufügen
-
Input-Block "switch1" anlegen und Source (OPCUA) und Scope (OpcuaOut1$) konfigurieren
-
RuleEngine-Block anlegen mit Name "UsingRules"
-
Output-Block "OUTBlock" anlegen

Es folgt die Definition der Message Templates "default" und "Rule1" im RuleEngine-Block:
-
default: Änderung des Inhaltes des Headers von Source mit einer Zeichenfolge und des Inhaltes Zusatzinformation mit einer Zeichenfolge im Body (Abbildung 22)
-
Rule1: Änderung des Inhaltes des Headers von Source mit einer Zeichenfolge und Scope mit dem Inhalt eines Elementes vom Context. Der Body wird mit Werten aus dem ursprünglichen Body - statUint und dynUint – befüllt und mit während der Abarbeitung der Rules generierten Werten – Kontext1 und Zusatz (Abbildung 23).


Anschließend wird der Context des Block "UsingRules" konfiguriert, hier optional die Eingabe von Kontext mit Inhalten, und das default und ein weiteres RuleSet angelegt (Abbildung 24).

Es folgt die Konfiguration der RuleSets:
-
DefaultRuleSet (Abbildung 25)
-
Evaluation Strategy eintragen: FirstMatch
-
Rules: Add Default → DefaultRule erstellt
-
Context und Into wurden leer gelassen
-
Im Obsolete Context werden die Elemente Kontext2 und Kontext3 entfernt
-
Message Template "default" angeben
-
-
Ruleset-1
-
Evaluation Strategy eintragen: FirstMatch
-

Ist das RuleSet-1 erstellt, müssen die Rules mit ihren Bedingungen konfiguriert werden.
-
Add Rule → Rules-1 erstellt
-
Conditions
-
Name: "dynUint >= 35000"
-
Value: es wird die Bedingung "body_dynUint >= 30000" eingetragen.
-
-
Im Context werden zwei Elemente mit einem neuen Wert beschrieben:
-
RulesetTest mit der Zeichenfolge 'RuleSet1 Rule1'
-
Kontext1 mit der Zeichenfolge 'dynUint >= 35000'
-
-
Obsolete Context wird leer gelassen
-
Message Template mit Rule1 angeben
-

-
Add Rule → Rules-2 erstellt (Abbildung 27)
-
Conditions
-
Name: dynUint kleiner gleich 10000 – dies ist nur eine Beschreibung der Bedingung.
-
Value: 'body_dynUint < = 10000' – dies ist die Bedingung
-
-
Im Context werden zwei Elemente mit einem neuen Wert beschrieben:
-
RulesetTest mit der Zeichenfolge 'RuleSet1 Rule2'
-
Kontext1 mit der Zeichenfolge 'dynUint < = 10000'
-
-
Obsolete Context wird leer gelassen
-
Message Template mit Rule1 angeben
-

Abschließend müssen die konfigurierten Blöcke verbunden werden (Abbildung 28).

Mit dieser Konfiguration kann nun über den MQTT oder über andere Schnittstellen die fertige Nachricht ausgegeben werden.
SNCFilter
Funktionsbeschreibung und Anwendung
Der Significant-Numeric-Change-Filter (SNCFilter) dient der Reduzierung der Datenlast durch schwankende Sensor- oder Analogwerte. Der Filter sendet nur eine Nachricht, wenn:
-
Der Daten Key der zu filternden Variablen unter Thresholds konfiguriert ist
-
UND der zugehörige Value ggü. der zuletzt weitergeleiteten Nachricht eine Änderung betraglich größer/gleich des konfigurierten Thresholds hat.
Beim erstmaligen Eintreffen eines neuen Parameters, d.h. der Parameter ist im internen Cache noch nicht vorhanden, wird dieser angelegt und zukünftig immer durchgeleitet.
Folgende Parameter sind relevant für die Filterfunktion:
-
Value: aktueller Wert der Variable in der Eingangsnachricht
-
ValueCache: letzter durchgereichter Wert der Variable
-
Threshold: konfigurierte Schwelle
Abbildung 29 veranschaulicht das Verhalten des Filters.

Die in folgender Tabelle erläuterten Parameter sind Abbildung 30 zu finden.
Parameter | Beschreibung |
---|---|
Name |
Name der Blockinstanz. |
Into |
Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Thresholds - Name |
Name des Daten Key (Variable) in der eingehenden Nachricht. Zugriff nur auf die oberste Objektebene im JSON. Die Variable muss einen numerischen Wert (Ganz- oder Fließkomma Typen) haben. |
Thresholds - Value |
double oder integer; Numerische Schwelle (einseitig) der Variable. |
Overwrite Header Scope with |
string; optional. Name für neue Scope Bezeichnung, zum Überschreiben des Header-Feldes Scope. Option ist inaktiv, wenn String nicht vorhanden oder leer ist. |
Issue Full Model |
boolean, default=true; |
Beispielkonfiguration SNCFilter-Block
Folgende Abbildung zeigt eine beispielhafte Konfiguration des SNCFilter.

Timer
Funktionsbeschreibung und Anwendung
Der Timer-Block kann einlaufende Nachrichten verzögern. Im Detail besitzt der Block die folgenden Events:
a) Start bzw. Neustart des Timers.
Startet den Timer oder startet einen laufenden Timer von vorn.
Es wird jedes mal eine Action (falls konfiguriert) OnStartEvent
ausgelöst.
Der Event Trigger ist über die Bedingungen in OnStartEvent.conditions
festzulegen.
b) Cancel Timer. Ein laufender Timer wird aufgrund einer Nachricht gestoppt und die Action OnCancelEvent
ausgeführt.
Der Event Trigger ist über die Bedingungen in OnCancelEvent.conditions
festzulegen.
c) Timer Expired. Ein Timer hat die konfigurierte Zeitspanne erreicht.
Das Event wird nur durch den Timer getriggert und kann nicht per Bedingung konfiguriert werden.
Das Event OnExpiredEvent
ausgelöst, wenn konfiguriert wird die entsprechende Action ausgeführt.
Die angegebenen conditions
müssen alle erfüllt sein (true), um den Trigger/die Aktion zu erhalten/auszulösen.
Der Zugriff auf die Nachrichten Properties der Timer Start Nachricht über body_<BODY_KEY>
und header_<HEADER_KEY>
ist ebenso wie in den Blöcken RuleEngine und Router möglich.
Als Aktion auf ein Event stehen die folgenden Möglichkeiten über das Property Type
zur Wahl:
-
SendMessage
⇒ Sende eine vordefinierte Nachricht. Der Nachrichteninhalt ist unter Data zu konfigurieren. -
SendStartMessage
⇒ Sende die (komplette) Nachricht aus, die zuletzt zum Start des Timers geführt hat. Dertimestamp
der Nachricht wird mit dem Aussenden aktualisiert.
Wird ein Timer bspw. mit den identischen Bedingungen für Start und Cancel konfiguriert und treffen diese mit einem Nachrichteneingang zu, so wird der Timer gestartet und sofort wieder gestoppt und die zugehörigen Aktionen ausgeführt.
Die folgende Tabelle erläutertet die Konfigurationsparameter des Timer-Blocks.
Parameter | Beschreibung |
---|---|
Name |
Name der Blockinstanz. |
Into |
Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. mit Folgeblöcken verbunden und Ausgabenachrichten an die entsprechend angegeben Blockinstanzen (Namen) weitergeleitet werden. |
Timespan(ms) |
Zeitspanne für den Timer in Millisekunden |
Konfiguration Timer |
Folgende Events sind im Block vorhanden
Die konfigurierbaren Actions OnStartEvent, OnCancelEvent und OnExpiredEvent sind nur bei Bedarf zu konfigurieren und können ggf. weggelassen werden. Bei Auftreten eines der Events und einer konfigurierten Aktion wird der aktuelle Timestamp im Message Body hinzugefügt (und ggf. überschrieben). Bei ausgehenden Nachrichten kann das Event per Header Property timer-was: {started, canceled, expired} für weitere Prüfung herangezogen werden. |
Conditions |
Alle in diesem Objekt angegebene Ausdrücke müssen mit true ausgewertet werden, um das Event auszuführen. |
Name |
Nutzerdefinierter Name der Bedingung. Der Wert kann frei gewählt werden, es bestehen keine Abhängigkeiten zu anderen Konfigurationsparametern. |
Value |
string, Ausdruck für die Prüfung der Regel. Für den Zugriff auf die Properties der zugehörigen Start-Nachricht ist Folgendes zu beachten: |
TimerType |
Als Aktion auf ein Event stehen folgende Möglichkeiten zur Wahl:
Hinweis: Wird ein Timer bspw. mit den identischen Bedinungen für Start und Cancel konfiguriert und treffen diese mit einem Nachrichteneingang zu, so wird der Timer gestart und sofort wieder gestoppt und die zugehörigen Aktionen ausgeführt. |
Header |
Nachrichten-Body der auszusendenden Nachricht bei diesem Event. |
Body |
Nachrichten-Header der auszusendenden Nachricht bei diesem Event. |
In der folgenden Darstellung ist ein beispielhaftes Szenario aufgezeigt.
Es läuft die Nachricht [1] ein. Diese wird vom Block ignoriert, da die Timer condition (state >=10
) nicht erfüllt ist.
Nachricht [2] folgt einlaufend und triggert den Start, da hier state >=10
. Der Timer startet und sendet eine Nachricht [3], die vielleicht für Debugging Zwecke konfiguriert ist, sofort heraus.
Der Timer erreicht die konfigurierte Zeit von 20 Sekunden und sendet die Nachricht [4], welche der Nachricht [2] mit aktuellem Zeitstempel entspricht, am Ausgang heraus.

Beispielkonfiguration Timer-Block
Folgende Abbildung zeigt eine beispielhafte Konfiguration des Timer-Blocks in der RoboGate Edge UI.
