Stream Processor
Mit dem Stream Processor können einlaufende Nachrichten durch Verschaltung verschiedener Verarbeitungsblöcke verarbeitet und anschließend zurück an den Edge Broker des EdgeGateways gegeben werden. Zu den Funktionen gehören z.B. Datenaggregation und mathematische Berechnungen auf Grundlage einlaufender Nachrichten. Mehrere unabhängige Datenströme können unabhängig voneinander und gleichartig verarbeitet werden, siehe dazu die nachfolgende Skizze.
+----------------------+ +---------------+ | +------------+ | | Pollgroup1 |----+-->| Aggregator |-----+->Aggregation(Pollgroup1) =>| | | | | | | Pollgroup2 |----+-->| n Buffers |-----+->Aggregation(Pollgroup2) | | | +------------+ | +---------------+ +----------------------+ OPC UA-Reader Stream Processor
Konfigurationsschema
Die Konfiguration des Stream Processor setzt sich aus der Definition der Edge Broker-Verbindungen, d.h. dem Input in den Stream Processor und dem Output an den Edge Broker, sowie den Verarbeitungsblöcken zusammen. Diese werden im folgenden beschrieben.
Input - Edge Topics und Switch
Über die Angabe von Input Edge Topics wird definiert, welche der im EdgeGateway vorhandenen Topics im Stream Processor verarbeitet werden sollen. Diese Auswahl dient somit als Eingangsfilter in den Stream Processor.
Über den Input Switch werden die einlaufenden Nachrichten weitergefiltert und Verarbeitungsblöcken zugeordnet. Diese Filterung ist durch Reguläre Ausdrücke (RegEx) realisiert. Stimmen die Header-Felder "Source" und "Scope" aus den einlaufenden Nachrichten mit der Konfiguration des Input Switch überein, wird die Nachricht an die unter "Target Blocks" zugewiesenen Blockinstanzen der Verarbeitungsblöcke übergegeben. Hierfür müssen diese Blockinstanzen zuvor angelegt worden sein.
Die Konfiguration des Input Switch, d.h. Zuweisung von Source und Scope zu einem Verarbeitungsblock, ist Voraussetzung für die weitere Verarbeitung von Nachrichten im Stream Processor!
Die folgende Tabelle gibt einen Überblick über die Konfigurationsparameter.
Parameter | Beschreibung |
---|---|
New Topic | Name der Topics, die verarbeitet werden sollen. |
Name | Name des Input Switch, welcher frei wählbar ist und eindeutig sein muss. Es können mehrere Einträge angelegt werden. Jeder Eintrag erhält einen Namen. Der Name spielt im weiteren Verarbeitungsprozess keine Rolle. |
Source / Scope Pattern | Filterung der einlaufenden Nachrichten durch reguläre Ausdrücke (RegEx). |
Target Blocks | In diese Blöcken laufen die Nachrichten aus Source und Scope entsprechend gewählter Filterung ein. Es können je Zeile mehrere Blöcke zugeordnet werden. Die Blöcke stehen erst nach erfolgter Konfiguration zur Auswahl. |
Beispiel: Der zu verarbeitende Daten-Input wird von der „PollGroup a“ des OPC UA-Moduls bezogen. Abbildung 1 zeigt die nachfolgend beschriebene Konfiguration im OPC UA-Modul:
Poll Groups: „PollGroup a“ mit Interval 1000ms, Name: “a”, Output Topic: “PollGroup a”
Field Name: a und b
Die angelegte "Pollgroup a" muss nun als Input Edge Topic im Stream Processor angelegt werden. Anschließend müssen die erläuterten Parameter für den Input Switch definiert werden. Im Beispiel werden die Nachrichten im Input Switch nach Source ("OPCUA") und Scope ("^a$") weitergefiltert und dem Verarbeitungsblock "ExpressionEvaluator" zugewiesen (Abbildung 2).


Processing Blocks
Die Verarbeitungsblöcke (Processing Blocks) verarbeiten die einlaufenden Nachrichten und geben die verarbeiteten Nachrichten anschließend über Output Topics zurück an den Edge Broker des EdgeGateways. Jeder Block kann mit Folgeblöcken verbunden werden, Ausgabenachrichten werden dann an die angegeben Blockinstanzen weitergeleitet (per JSON Property into: ["<BlockNameA>, <BlockNameB>"]).
Um Nachrichten mit unterschiedlichen Source- und Scope-Werten (d.h. aus unterschiedlichen Datenquellen) unabhängig voneinander und parallel zueinander zu verarbeiten, bauen die Verarbeitungsblöcke in der Regel für jede Kombination von Source und Scope unabhängige Zustände (z.B. eine Timerinstanz auf dotnet Ebene oder einen Puffer) auf.
Die grundlegenden Parameter der Verarbeitungsblöcke sind in der folgenden Tabelle erläutert und in der EdgeGateway UI, wie in Abbildung 3 dargestellt, zu konfigurieren. Die verfügbaren Typen von Verarbeitungsblöcken werden mit ihren spezifischen Konfigurationsparametern in den nachfolgenden Abschnitten beschrieben.
Die Blockinstanz eines Processing Block steht erst nach erfolgter Konfiguration für die Target Blocks im Input Switch zur Verfügung.
Parameter | Beschreibung |
---|---|
Name | Name der Blockinstanz. |
Type | Das Feld type legt den Typ des Verarbeitungsblocks fest. Folgende Typen stehen zur Verfügung:
|

Output Blocks
Die in den Processing Blocks definierten Output Topics müssen über die Auflistung in "Output Blocks" an den EdgeGateway Broker zurückgegeben werden, um diese in weiteren Modulen zu verarbeiten.
Die in der folgenden Tabelle erläuterten Parameter sind im EdgeGateway konfigurierbar (Abbildung 4).
Parameter | Beschreibung |
---|---|
DefaultOutputTopic | Optional. In das Modul einlaufende Nachrichten, die durch keine Filter Konfiguration passen (inputs), werden, falls konfiguriert, auf dieses Edge Broker Topic geleitet. |
Name | Name des konfigurierten Output Topic des jeweiligen Blocks. |
Edge Topic | Um eine Nachricht aus dem Stream Processor zurück an den Edge Broker zu geben, muss ein Output Topic konfiguriert werden. Dieses Topic kann zur Auswertung in einem Datenausgabemodul genutzt werden. |

Verarbeitungsblöcke
Im folgenden werden die Verarbeitungsblöcke und ihre Konfiguration näher beschrieben.
Aggregator
Funktionsbeschreibung und Anwendung
Der Aggregator-Block sammelt einlaufende Nachrichten. Die in Nachrichten enthaltenen Felder können entsprechend der Konfiguration verschiedenartig behandelt werden.
Ein Anwendungsbeispiel ist eine Minimum/Maximum-Aggregation, um das aufkommende Datenvolumen zu reduzieren. Beispielsweise laufen Temperaturdaten der Außentemperatur im Sekundentakt ein, aber statt 60 Momentan-Werte zu übertragen, können für jede Minute nur der Minimum- und Maximum-Wert übertragen werden.
Die in der folgenden Tabelle erläuterten Parameter sind in Abbildung 6 und Abbildung 5 zu finden.
Parameter | Beschreibung |
---|---|
Name | Name der Blockinstanz. |
Into | Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Window - Based On | Auswahl des Window-Typ
|
Window – Size | Die Windowgröße bestimmt den vorhandenen Puffer je nach Window-Typ sample-basiert (Wert als Anzahl) oder zeitbasiert (Wert in Millisekunden). |
Window - HoppingEach | Häufigkeit einer Berechnung und Ausgabe je nach Window-Typ sample-basiert (Wert als Anzahl) oder zeitbasiert (Wert in Millisekunden). Ein hoher Wert führt seltener zu einer Ausführung und Ausgabe als ein kleiner. |
Behaviour | Die verarbeitende Funktion kann für jedes Feld der einlaufenden Nachricht angegeben werden. Sollte eine ausgehende Nachricht keinen Zeitstempel (Key timestamp) besitzen, so wird dieser mit der aktuellen Zeit erzeugt. |
Default | Default-Wert, der für alle Felder gilt, die nicht explizit angegeben sind. |
Name | Feldname (Variablenname) im JSON der einlaufenden Nachricht |
Value | Funktionsauswahl für das jeweilige Feld. Folgende Werttypen stehen zur Verfügung:
|
Output Topic | Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen. |
Beispielkonfiguration Aggregator-Block
Folgende Beispielkonfiguration ist in Abbildung 5 und Abbildung 6 dargestellt:
„a“ ist explizit konfiguriert und wird in einem Array gesammelt (Value: "Collect") und ausgegeben.
„b“ wird explizit ignoriert (Value: "Ignore") und entfällt für die Ausgabenachricht.
Da der Timestamp nicht konfiguriert ist, erhält die Default-Funktion die Konfiguration first.


Dedup
Funktionsbeschreibung und Anwendung
Der Deduplicate-Block (Dedup) verwirft Nachrichten, die gleiche Informationen zur vorherigen Nachricht aufweisen.
Die Prüfung kann mittels des Parameter Watch
maskiert, d.h. eingeschränkt werden.
Der Block wird verwendet, um redundante Daten/Nachrichten auszufiltern.
Die in der folgenden Tabelle erläuterten Parameter sind in Abbildung 7 zu finden.
Parameter | Beschreibung |
---|---|
Name | Name der Blockinstanz. |
Into | Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Output Topic | Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen. |
Overwrite Header Scope with | string, Name für neue Scope Bezeichnung (optional). Dient dem Überschreiben des Header-Feldes Scope. Die Option ist inaktiv, wenn der String nicht vorhanden oder leer ist. |
Watch | string oder string Array, Watch ist der Ausdruck für die Auswahl des Prüfkriteriums in der Nachricht. Wenn aufeinanderfolgende Nachrichten an der Stelle/den Stellen „x“ gleich sind, wird die Nachricht verworfen. Für den Zugriff auf Nachrichtenkeys mit einem Punkt „.“, bspw. Key Data.Value, ist der Zugriff über die Syntax ['Data.Value'] erforderlich. |
Beispielkonfiguration Dedup-Block
Die erläuterten Parameter sind in Abbildung 8 und Abbildung 7 beispielhaft angewendet.


ExpEval
Funktionsbeschreibung und Anwendung
Der Expression Evaluator-Block (ExpEval) kann Berechnungen und sonstige mathematische oder logische Verknüpfungen auf Grundlage einlaufender Nachrichten durchführen. Der Verarbeitungsblock hat Zugriff auf einlaufende Nachrichten-Payloads.
Die folgenden Skizze veranschaulicht die Funktionsweise:
+----------------+ | | in ====> | Expression | ====> out | Evaluator | Message in | | Message out +-----------------+ +----------------+ +----------------+ | | ^ | "a": 7, | | "a": 7, | | | "b": 2, | | "b": 2, =================================>| "result": 12, | | | | +----------------+ +-----------------+ | | Config | ----------------------- / "result": "a*b - 2" / -----------------------
Im Beispiel der Skizee wird laut Konfiguration die Berechnung eines Wertes result
durchgeführt.
Dafür wird auf die einlaufenden Datenfelder a
und b
zugegriffen. Nach Verarbeitung, wird die einlaufende Nachricht mit dem Wert result
erweitert und ausgegeben.
In der folgenden Tabelle werden die Konfigurationsparameter erläutert.
Parameter | Beschreibung |
---|---|
Name | Name der Blockinstanz. |
Into | Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Expressions - Name | Name des Ergebnis (Key) für einen angegebenen Ausdruck. |
Value | string, Auszuwertender Ausdruck. Die Syntax ist unter Ausdrücke beschrieben. |
Preserve input properties | optional, boolean, default=false, |
Results in object | optional, string, Wenn an dieser Stelle etwas angegeben wird, werden Ergebnisse in einem Unterobjekt der Ausgangsnachricht eingefügt. |
Output Topic | Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen. |
Ausdrücke
Folgende Operatoren und Ausdrücke stehen zur Verfügung und können ausgewertet werden:
mathematische Verknüpfungen mit den Operatoren
+, -, *, /
Modulo mit Operator
%
mathematische Funktionen aus .NET Math. Bspw.
Round(..), Abs(..), Pow(..), Sqrt(..), ..
binäre Verknüpfungen mit den Operatoren
| , &, ^, ~
logische Verknüpfungen mit den Operatoren
||, &&, or, and, !, not
mathematische Vergleiche mit den Operatoren
<, >, <=, >=, ==, !=
Neben numerischen Konstanten (wie 12
oder 7.91
) oder booleschen true
/false
kann auf die Properties der einlaufenden Nachricht per JSON Key zugegriffen werden.
Die folgende Tabelle zeigt einige Beispiele.
Expression (JSON Value) | Variables (aus einlaufender Nachricht) | Ergebnis (JSON Value) |
---|---|---|
"1.1*6" | 6.6 | |
"20 % 10" | 0 | |
"not true" | false | |
"true && true" | true | |
"true || false" | true | |
"not(2>8)" | true | |
"foo*bar" | foo:20; bar:4 | 80 |
"foo-bar" | foo:20; bar:4 | 16 |
"foo/bar" | foo:20; bar:4 | 5 |
"foo+bar" | foo:20; bar:4 | 24 |
"foo%bar" | foo:20; bar:4 | 0 |
"foo-(bar * 100)" | foo:20; bar:4 | -380 |
Zu beachten ist, dass dieser Verarbeitungsblock über keinen Zustand verfügt. Jede Berechnung erfolgt anhand der Konfiguration und der einlaufenden Nachrichten!
.Net Math Operations Syntax: Auf die Funktionen aus dotnet math kann folgendermaßen zugegriffen werden:
Potenzieren:
y = x^a
⇒ "y": "Pow(x,a)"Runden:
y = x (auf zwei Nachkommastellen)
⇒ "y": "Round(x,2)"Quadratwurzel:
y = sqrt(x)
⇒ "y": "Sqrt(x)"Abrunden auf Ganzzahl: x ⇒ "y": "Floor(x)"
Aufrunden auf Ganzzahl: x ⇒ "y": "Ceiling(x)"
Betrag einer Zahl: x ⇒ "y": "Abs(x)"
Weitere Details sind in der Dokumentation der Math Klasse .Net Core zu finden: https://docs.microsoft.com/de-de/dotnet/api/system.math.sqrt?view=netcore-3.1
Beispielkonfiguration Expression Evaluator-Block
Die Berechnung im Beispiel in Abbildung 10 und Abbildung 9, ergibt sich aus dem Feld „a“ einer einlaufenden Nachricht und Addition mit einem konstanten Wert 1. Das Ergebnis wird in dem Feld „aPlus1“ abgelegt. Im Weiteren sollen alle vorhandenen Felder auf der Eingangsseite erhalten bleiben (→ Preserve input properties: true). Das Ergebnis soll direkt in das Top Level JSON-Objekt eingefügt werden (→ Results in object: null).


Resend
Funktionsbeschreibung und Anwendung
Der Resend-Block speichert jede einlaufende Nachricht in einem Cache und sendet diese in einem Intervall erneut aus. Jede einlaufende Nachricht wird dabei sofort wieder am Ausgang ausgesendet und der Timer für das erneute Aussenden neugestartet.
Der Block kann verwendet werden, um eine Datenübertragung in einem Zeitintervall zu erzwingen.
Beispielsweise wird ein Temperaturwert per Subscription gelesen und nur gesendet, wenn sich dieser ändert. In der Zielplattform möchte man jedoch mindestens einmal pro Tag ein Wert vom EdgeGateway erhalten, da sonst z.B. ein Alarm ausgelöst wird. Der Resend-Block kann nun vor der Übergabe an die Zielplattform platziert werden, um mindestens alle 24h erneut den letzten Wert zu übertragen.
In der folgenden Tabelle werden die Konfigurationsparameter erläutert.
Parameter | Beschreibung |
---|---|
Name | Name der Blockinstanz. |
Into | Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Output Topic | Name des Output Topic des jeweiligen Blocks |
ResendInterval | Millisekunden-Intervall, in dem die letzte eingelaufene Nachricht zyklisch erneut gesendet wird. |
Upate Timestamp On Resend | default=false, true = Beim Resend wird der Zeitstempel mit der aktuellen Zeit überschrieben. |
Beispielkonfiguration Resend-Block


Router
Funktionsbeschreibung und Anwendung
Mit dem Router-Block können einlaufende Nachrichten bedingungsbezogen in bestimmte Verarbeitungsblöcke weitergeleitet werden.
Hierfür gilt:
Jede Bedingung muss einen booleschen Zustand zurückgeben.
Jede Bedingung, die keinen booleschen Zustand zurückgibt, ist automatisch "false".
Sind alle Bedingungen einer Route erfüllt, wird die Nachricht an alle Blöcke, die in "into" aufgelistet sind und/oder an das Output Topic weitergeleitet.
Es werden immer alle konfigurierten Routes überprüft.
Wenn keine Route erfüllt wird, wird die Nachricht an den defaultInto-Block und/oder an das defaultOutputTopic weitergeleitet.
Folgende Ausdrücke sind in den Bedingungen zulässig:
Regular Expression
Syntax: RegEx(<value>,'<pattern>')
Hochkomma (''): Für Value kann ein statischer Wert eingetragen werden
Ohne Hochkomma (''): Value aus Parametern (Header & Body) wird geholt.
alle Ausdrücke, die auch im Expression Evaluator Block angewendet werden können
Der Zugriff auf Nachrichteninhalte, wie die Eigenschaften des Headers und des Bodys, kann über die Bedingungen umgesetzt werden. Dies funktioniert über den Prefix header_ oder body_ innerhalb der Bedingung.
In der folgenden Tabelle werden die Konfigurationsparameter erläutert. Die Route-Blöcke enthalten die Routes mit ihren Bedingungen (conditions). Werden mehrere Bedingungen angegeben, so werden diese per Und-Logik verbunden. Das Ergebnis einer Bedingung muss immer einen boolschen Wert ergeben.
Parameter | Beschreibung |
---|---|
Name | Name der Blockinstanz |
Default Output Topic | Name des Output Topic. Wird verwendet, wenn kein OutputTopic in den Routes angegeben wurde. |
Default Into | Optional. Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Route Name | Name des Route-Blocks. Wurden mehrere Route-Blöcke angelegt, erscheinen diese als wählbare Reiter unter der Eingabezeile. |
Conditions - Name | Name für die jeweilige Bedingung. |
Conditions - Value | String, der die Logik für die Bewertung der keys enthält. |
Into | stringarray, optional. Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Output Topic | Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen. |
Beispielkonfiguration Router-Block
Die schrittweise Konfiguration des Stream Processors mit Block Routing wird am folgenden Beispiel erläutert (vgl. Abbildung 15):
Aggregate-Blöcke 1 bis 3 mit dem Output Topic „out“ und den gleichen Parametern anlegen (Abbildung 13)
Samples mit Size „1” und HoppingEach „1“
Behavior: ignore default and collect für „b”
Neuen Processing Block vom Type Router mit Name "Routing" anlegen (Abbildung 14)
Routes anlegen:
Route Name: b<10000
Conditions:
Name: toAggregate1
Value: RegEx(header_scope, '^a$') && body_b<10000
into: aggregate1



RuleEngine
Funktionsbeschreibung und Anwendung
Der RuleEngine-Block dient der Beschreibung von Zustandsübergängen anhand einlaufender Nachrichten in Abhängigkeit von Vorbedingungen. Dabei können Form und Inhalt der Ausgangsnachricht frei definiert werden.
Es existiert ein Kontext in dem Informationen abgelegt werden können, die bei der Auswertung der Bedingungen oder in der Ausgangsnachricht verwendet werden können. Der Kontext kann initial mit Informationen gefüllt werden.
Es gibt eine Menge von "RuleSets", welche (Vor-)Bedingungen beinhalten.
Jedes RuleSets kann eine Menge von "Rules" enthalten, die ebenfalls über Bedingungen verfügen.
Weiterhin können ein "defaultRuleSet" sowie in RuleSets "defaultRules" angelegt werden, die keine Bedingungen enthalten.
Jede Bedingung muss einen boolschen Zustand zurückgeben. Jede Bedingung, die keinen boolschen Zustand zurückgibt, ist automatisch "Falsch" (false). In den Bedingungen kann auf Inhalte des Kontextes und der eingehenden Nachricht (header & body & context) zugegriffen werden. Damit eine "Rule" zutrifft, müssen alle Bedingungen von dieser erfüllt sein.
Zuerst werden die Bedingungen der definierten RuleSets der Reihe nach, von RuleSet-1 ausgehend, geprüft. Sobald ein RuleSet die Bedingungen erfüllt, wird dieses RuleSet für die weitere Verarbeitung der Nachricht ausgewählt. Erfüllt kein RuleSet die Bedingung, wird das defaultRuleSet gewählt, sofern es angegeben ist, andernfalls erfolgt ein Abbruch.
Anschließend werden im ausgewählten RuleSet die Rules geprüft. Die Reihenfolge der Prüfung ist abhängig von der definierten „Evaluation Stategy“ (FirstMatch, BestMatch oder LeastMatch). Entsprechend kann eine Überprüfung der weiteren im RuleSet enthaltenen "Rules" übersprungen, die „zutreffendste“ oder die letzte zutreffende "Rule" everwendet werden. Erfüllt keine Rule die Bedingungen erfolgt ein Abbruch. Folglich wird im Rule Engine-Block nur ein RuleSet und darin nur eine Rule verarbeitet.
Das Ergebnis einer "Rule" wird in dem Kontext abgelegt, der Inhalt des Kontext wird gepatched. Es können gezielt Inhalte aus dem Kontext nach dem Patchen entfernt werden.
Wird das angegebene Message Template nicht gefunden, wird das Message Template "default" genommen.
Die Ausgangsnachricht wird anhand des Message Templates erzeugt.
Ist in dem Message Template kein Header definiert, werden die Informationen "source" und "scope" aus der Eingangsnachricht übernommen.
Ist in dem Message Template Header kein "source" und/oder "scope" definiert, werden die Informationen aus der Eingangsnachricht übernommen.
Ist in dem Message Template kein Body definiert, enthält die Ausgangsnachricht keine Informationen im Body.
Nach dem Erfüllen einer "Rule" wird eine Ausgangsnachricht an alle zugewiesenen Input bzw. Output Topics gesendet.
Bezeichnungen und Syntax
Die in der folgenden Tabelle erläuterten Parameter sind in einer beispielhaften Konfiguration und deren Abbildungen im folgenden Absatz zu finden.
Parameter | Beschreibung |
---|---|
Name | Name der Blockinstanz |
Context | Optionaler Bereich, in dem Variablen mit oder ohne initialen Inhalt initialisiert werden. Jedes Contextelement kann in den Bedingungen verwendet werden. |
Context – Name | optional, Name des Contextelementes |
Context – Value | optional, Inhalt des Contextelementes |
RuleSets | Zusammenstellung von Bedingungspfaden/ Zustandsüberwachungen. Es können beliebig viele erstellt werden. Die angelegten RuleSets erscheinen als Reiter unterhalb der Eingabemöglichkeit. Die Namen können nicht geändert werden – RuleSetsn, n steht hierbei für eine fortlaufende Zahl. Hier kann auch ein „DefaultRuleSet“ definiert werden. |
RuleSets – Evaluation Strategy | Es kann zwischen FirstMatch, BestMatch oder LeastMatch, also erster, bester oder letzter Übereinstimmung, entschieden werden. Die Voreinstellung ist „FirstMatch“. |
RuleSets – DefaultRuleSet | Zu befüllen wie ein normales RuleSet mit Rules – jedoch ohne Bedingungen. In diesem RuleSet können wiederum Rules angelegt werden, die diverse Bedingungen enthalten. |
RuleSets – conditions | optional, Globale Bedingung(en) damit in diesem RuleSet die Rules auf Übereinstimmung geprüft werden. |
RuleSets – conditions – Name | optional, Bezeichnung der Bedingung eingeben |
RuleSets – conditions – Value | optional – sofern kein Name/ key definiert wurde, Wert aus dem Context, body, header oder selbstdefiniert |
RuleSets – Rules | Es kann zwischen den Reitern per Pfeiltasten gewechselt werden. Zusammenstellung von Bedingungspfaden/ Zustandsüberwachungen zur Verfeinerung der Bedingungspfade. Es können beliebig viele erstellt werden. Die angelegten Rules erscheinen als Reiter unterhalb der Eingabemöglichkeit. Die Namen können nicht geändert werden – Rules-n, n steht hierbei für eine fortlaufende Zahl. Hier kann auch ein „DefaultRule“ definiert werden. |
RuleSets – Rules – Context | Beliebige Anzahl von Contextelementen |
RuleSets – Rules – Context – Name | Name des Contextelementes |
RuleSets – Rules – Context – Value | Inhalt des Contextelementes |
RuleSets – Rules – Into | Folgeblock bestimmen, wenn notwendig |
RuleSets – Rules – Obsolete context | Hier können Elemente des Context entfernt werden. |
RuleSets – Rules – outputTopic | Optionale Eingabe eines Outputtopics, bei keiner Angabe, werden alle Informationen, bis auf den Context verworfen. |
RuleSets – Rules – into | Optionale Eingabe eines Blocknamens zur internen Weiterverarbeitung |
RuleSets – Rules – Message Template | Optionale Eingabe eines Namens für das Messagetemplate zur Erstellung der Outputnachricht |
Message Templates | Beliebig viele Vorlagen für die Ausgabe von Nachrichten und Daten. Es können hier alle Elemente aus dem Nachrichtenbereich Header und Body bearbeitet werden. |
Message Templates – Header – Name | optional, Name des zu erstellenden keys |
Message Templates – Header – Value | optional, sofern kein Name/ key definiert wurde, Wert aus dem Context, Body, Header oder selbstdefiniert |
Message Templates – Body – Name | optional, Name des zu erstellenden keys |
Message Templates – Body – Value | optional, sofern kein Name definiert wurde, Wert aus dem Context, Body, Header oder selbstdefiniert |
Output Topic | Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen. |
Verwendung von Elementen aus Header, Body und Context:
case sensitive Namensverwendung
Um das Element source aus dem Nachrichtenbereich Header zu erhalten, muss header_ für den zu verwendenden Bereich und source für das Element eingegeben werden. → header_scope
Verwendung einer Zeichenkette/ String
Eine Zeichenkette wird mit einem vorangehenden „ ‘ „ und einem abschließenden „ ‘ „ gekennzeichnet. Bsp.: ‘Zeit verstrichen‘
Werden die Zeichen nicht verwendet, so wird die Eingabe wie eine Variable verwendet.
Beispielkonfiguration Rule Engine-Block
Die schrittweise Konfiguration des Stream Processors mit dem Block Rule Engine wird am folgenden Beispiel dargestellt. Es werden Daten eines OPC UA Servers verarbeitet. Die Reihenfolge sollte wie beschrieben erfolgen, da so alle angezeigten Auswahlmöglichkeiten im Zuge der Konfiguration zur Verfügung stehen. Auswahlwahlmöglichkeiten stehen immer erst nach einem Speichervorgang zur Verfügung.
Voraussetzung: Im Modul OPC UA wurde ein OPC UA Server vollständig mit dem Output Topic „OpcuaOut1“ konfiguriert (Abbildung 16).

Nun werden die Grundeinstellungen im Stream Processor konfiguriert - Input Edge Topics, Input Switch mit den zugehörigen Topics, Output Topics und ein RuleEngine-Block mit dem Namen "UsingRules" (Abbildung 17)

Es folgt die Definition der Message Templates "default" und "Rule1":
default: Änderung des Inhaltes des Headers von Source mit einer Zeichenfolge und des Inhaltes Zusatzinformation mit einer Zeichenfolge im Body (Abbildung 18)
Rule1: Änderung des Inhaltes des Headers von Source mit einer Zeichenfolge und Scope mit dem Inhalt eines Elementes vom Context. Der Body wird mit Werten aus dem ursprünglichen Body - statUint und dynUint – befüllt und mit während der Abarbeitung der Rules generierten Werten – Kontext1 und Zusatz (Abbildung 19).


Anschließend wird der Context des Block "UsingRules" konfiguriert: hier optional die Eingabe von Kontext mit Inhalten (Abbildung 20).

Es folgt die Konfiguration der RuleSets:
DefaultRuleSet (Abbildung 21)
Evaluation Strategy eintragen: FirstMatch
Rules: Add Default → DefaultRule erstellt
Context und Into wurden leer gelassen
Im Obsolete Context werden die Elemente Kontext2 und Kontext3 entfernt
OutputTopic mit OUTBlock und Message Template mit default angeben

Ruleset-2 (Abbildung 22)
Evaluation Strategy eintragen: FirstMatch
Rules: Add Rule → Rule-1 erstellt
In Conditions wird der Name mit – dynUint >= 35000 – eingetragen.
In Conditions wird die Bedingung mit – body_dynUint >= 30000 – eingetragen.
Im Context werden zwei Elemente mit einem neuen Wert beschrieben:
RulesetTest mit der Zeichenfolge 'RuleSet2 Rule1'
Kontext1 mit der Zeichenfolge 'dynUint >= 35000'
Into wird leer gelassen, es soll keine Folgeverarbeitung in anderen Blöcken stattfinden
Obsolete Context wird leer gelassen
OutputTopic mit OUTBlock und Message Template mit Rule1 angeben

Sind die RuleSets erstellt, müssen die Rules mit ihren Bedingungen konfiguriert werden.
Rules: Add Rule → Rule-2 erstellt (Abbildung 23)
Conditions
Name: dynUint kleiner gleich 10000 – dies ist nur eine Beschreibung der Bedingung.
Value: body_dynUint ⇐ 10000 – dies ist die Bedingung
Im Context werden zwei Elemente mit einem neuen Wert beschrieben:
RulesetTest mit der Zeichenfolge 'RuleSet2 Rule2'
Kontext1 mit der Zeichenfolge 'dynUint ⇐ 10000'
Into wird leer gelassen, es soll keine Folgeverarbeitung in anderen Blöcken stattfinden
Obsolete Context wird leer gelassen
OutputTopic mit OUTBlock und Message Template mit Rule1 angeben

Mit dieser Konfiguration kann nun über den MQTT oder über andere Schnittstellen die fertige Nachricht ausgegeben werden.
SNCFilter
Funktionsbeschreibung und Anwendung
Der Significant-Numeric-Change-Filter (SNCFilter) dient der Reduzierung der Datenlast durch schwankende Sensor- oder Analogwerte. Der Filter sendet nur eine Nachricht, wenn:
Der Daten Key der zu filternden Variablen unter Thresholds konfiguriert ist
UND der zugehörige Value ggü. der zuletzt weitergeleiteten Nachricht eine Änderung betraglich größer/gleich des konfigurierten Thresholds hat.
Beim erstmaligen Eintreffen eines neuen Parameters, d.h. der Parameter ist im internen Cache noch nicht vorhanden, wird dieser angelegt und zukünftig immer durchgeleitet.
Folgende Parameter sind relevant für die Filterfunktion:
Value: aktueller Wert der Variable in der Eingangsnachricht
ValueCache: letzter durchgereichter Wert der Variable
Threshold: konfigurierte Schwelle
Abbildung 24 veranschaulicht das Verhalten des Filters.

Die in folgender Tabelle erläuterten Parameter sind Abbildung 26 zu finden.
Parameter | Beschreibung |
---|---|
Name | Name der Blockinstanz. |
Into | Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. |
Thresholds - Name | Name des Daten Key (Variable) in der eingehenden Nachricht. Zugriff nur auf die oberste Objektebene im JSON. Die Variable muss einen numerischen Wert (Ganz- oder Fließkomma Typen) haben. |
Thresholds - Value | double oder integer; Numerische Schwelle (einseitig) der Variable. |
Overwrite Header Scope with | string; optional. Name für neue Scope Bezeichnung, zum Überschreiben des Header-Feldes Scope. Option ist inaktiv, wenn String nicht vorhanden oder leer ist. |
Issue Full Model | boolean, default=true; |
Output Topic | Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen. |
Beispielkonfiguration SNCFilter-Block
Folgende Abbildungen zeigen eine beispielhafte Konfiguration des SNCFilter.


Timer
Funktionsbeschreibung und Anwendung
Der Timer-Block kann einlaufende Nachrichten verzögern. Im Detail besitzt der Block die folgenden Events:
a) Start bzw. Neustart des Timers.
Startet den Timer oder startet einen laufenden Timer von vorn.
Es wird jedes mal eine Action (falls konfiguriert) OnStartEvent
ausgelöst.
Der Event Trigger ist über die Bedingungen in OnStartEvent.conditions
festzulegen.
b) Cancel Timer. Ein laufender Timer wird aufgrund einer Nachricht gestoppt und die Action OnCancelEvent
ausgeführt.
Der Event Trigger ist über die Bedingungen in OnCancelEvent.conditions
festzulegen.
c) Timer Expired. Ein Timer hat die konfigurierte Zeitspanne erreicht.
Das Event wird nur durch den Timer getriggert und kann nicht per Bedingung konfiguriert werden.
Das Event OnExpiredEvent
ausgelöst, wenn konfiguriert wird die entsprechende Action ausgeführt.
Die angegebenen conditions
müssen alle erfüllt sein (true), um den Trigger/die Aktion zu erhalten/auszulösen.
Der Zugriff auf die Nachrichten Properties der Timer Start Nachricht über body_<BODY_KEY>
und header_<HEADER_KEY>
ist ebenso wie in den Blöcken RuleEngine und Router möglich.
Als Aktion auf ein Event stehen die folgenden Möglichkeiten über das Property Type
zur Wahl:
SendMessage
⇒ Sende eine vordefinierte Nachricht. Der Nachrichteninhalt ist unter Data zu konfigurieren.SendStartMessage
⇒ Sende die (komplette) Nachricht aus, die zuletzt zum Start des Timers geführt hat. Dertimestamp
der Nachricht wird mit dem Aussenden aktualisiert.
Wird ein Timer bspw. mit den identischen Bedingungen für Start und Cancel konfiguriert und treffen diese mit einem Nachrichteneingang zu, so wird der Timer gestartet und sofort wieder gestoppt und die zugehörigen Aktionen ausgeführt.
Die folgende Tabelle erläutertet die Konfigurationsparameter des Timer-Blocks.
Parameter | Beschreibung |
---|---|
Name | Name der Blockinstanz. |
Into | Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. mit Folgeblöcken verbunden und Ausgabenachrichten an die entsprechend angegeben Blockinstanzen (Namen) weitergeleitet werden. |
Timespan(ms) | Zeitspanne für den Timer in Millisekunden |
Konfiguration Timer | Folgende Events sind im Block vorhanden
Die konfigurierbaren Actions OnStartEvent, OnCancelEvent und OnExpiredEvent sind nur bei Bedarf zu konfigurieren und können ggf. weggelassen werden. Bei Auftreten eines der Events und einer konfigurierten Aktion wird der aktuelle Timestamp im Message Body hinzugefügt (und ggf. überschrieben). Bei ausgehenden Nachrichten kann das Event per Header Property timer-was: {started, canceled, expired} für weitere Prüfung herangezogen werden. |
Conditions | Alle in diesem Objekt angegebene Ausdrücke müssen mit true ausgewertet werden, um das Event auszuführen. |
Name | Nutzerdefinierter Name der Bedingung. Der Wert kann frei gewählt werden, es bestehen keine Abhängigkeiten zu anderen Konfigurationsparametern. |
Value | string, Ausdruck für die Prüfung der Regel. Für den Zugriff auf die Properties der zugehörigen Start-Nachricht ist Folgendes zu beachten: |
Header | Nachrichten-Body der auszusendenden Nachricht bei diesem Event. |
Body | Nachrichten-Header der auszusendenden Nachricht bei diesem Event. |
Output Topic | Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen. |
In der folgenden Darstellung ist ein beispielhaftes Szenario aufgezeigt.
Es läuft die Nachricht [1] ein. Diese wird vom Block ignoriert, da die Timer condition (state >=10
) nicht erfüllt ist.
Nachricht [2] folgt einlaufend und triggert den Start, da hier state >=10
. Der Timer startet und sendet eine Nachricht [3], die vielleicht für Debugging Zwecke konfiguriert ist, sofort heraus.
Der Timer erreicht die konfigurierte Zeit von 20 Sekunden und sendet die Nachricht [4], welche der Nachricht [2] mit aktuellem Zeitstempel entspricht, am Ausgang heraus.
+----------------+ in ----->| |------> out | MyTimer | | | +----------------+ Timeline: Message: [1] [2][3] [4] Direction: in in out out -------------------------------------------------------------------> time |========Timer running============>| Event: / start expired Action: onStart onExpired Message Payloads: [1] [2] [4] +--------------+ +--------------+ +--------------+ | a:7, | | a:8, | | a:8, | | state: 2, | | state: 12, | | state: 12, | | timestamp:.. | | timestamp:.. | | timestamp:.. | +--------------+ +--------------+ +--------------+ [3] +---------------------------+ | info: "timer was started",| | timestamp:.. | +---------------------------+
Beispielkonfiguration Timer-Block
Folgende Abbildungen zeigen eine beispielhafte Konfiguration des Timer-Blocks in der EdgeGateway UI.

