Robotron Datenbank-Software GmbH Firmenlogo
MIT DATEN MEHR BEWEGEN.

Stream Processor

Mit dem Stream Processor können einlaufende Nachrichten durch Verschaltung verschiedener Verarbeitungsblöcke verarbeitet und anschließend zurück an den Edge Broker des RoboGate EdgeGateways gegeben werden. Zu den Funktionen gehören z.B. Datenaggregation und mathematische Berechnungen auf Grundlage einlaufender Nachrichten. Mehrere unabhängige Datenströme können unabhängig voneinander und gleichartig verarbeitet werden, siehe dazu die nachfolgende Skizze.

                        +----------------------+
   +---------------+    |   +------------+     |
   |    Pollgroup1 |----+-->| Aggregator |-----+->Aggregation(Pollgroup1)
 =>|               |    |   |            |     |
   |    Pollgroup2 |----+-->|  n Buffers |-----+->Aggregation(Pollgroup2)
   |               |    |   +------------+     |
   +---------------+    +----------------------+
    OPC UA-Reader         Stream Processor

Konfigurationsschema

Die Konfiguration des Stream Processor setzt sich aus der Definition der Edge Broker-Verbindungen, d.h. dem Input in den Stream Processor und dem Output an den Edge Broker, sowie den Verarbeitungsblöcken zusammen. Diese werden im folgenden beschrieben.

Input - Edge Topics und Switch

Über die Angabe von Input Edge Topics wird definiert, welche der im RoboGate EdgeGateway vorhandenen Topics im Stream Processor verarbeitet werden sollen. Diese Auswahl dient somit als Eingangsfilter in den Stream Processor.

Über den Input Switch werden die einlaufenden Nachrichten weitergefiltert und Verarbeitungsblöcken zugeordnet. Diese Filterung ist durch Reguläre Ausdrücke (RegEx) realisiert. Stimmen die Header-Felder "Source" und "Scope" aus den einlaufenden Nachrichten mit der Konfiguration des Input Switch überein, wird die Nachricht an die unter "Target Blocks" zugewiesenen Blockinstanzen der Verarbeitungsblöcke übergeben. Hierfür müssen diese Blockinstanzen zuvor angelegt worden sein.

Die Konfiguration des Input Switch, d.h. Zuweisung von Source und Scope zu einem Verarbeitungsblock, ist Voraussetzung für die weitere Verarbeitung von Nachrichten im Stream Processor!

Die folgende Tabelle gibt einen Überblick über die Konfigurationsparameter.

Tabelle 1. Konfigurationsparameter Input Edge Topics und Input Switch
ParameterBeschreibung

New Topic

Name der Topics, die verarbeitet werden sollen.

Name

Name des Input Switch, welcher frei wählbar ist und eindeutig sein muss. Es können mehrere Einträge angelegt werden. Jeder Eintrag erhält einen Namen. Der Name spielt im weiteren Verarbeitungsprozess keine Rolle.

Source / Scope Pattern

Filterung der einlaufenden Nachrichten durch reguläre Ausdrücke (RegEx).

Target Blocks

In diese Blöcken laufen die Nachrichten aus Source und Scope entsprechend gewählter Filterung ein. Es können je Zeile mehrere Blöcke zugeordnet werden. Die Blöcke stehen erst nach erfolgter Konfiguration zur Auswahl.

Beispiel: Der zu verarbeitende Daten-Input wird von der „PollGroup a“ des OPC UA-Moduls bezogen. Abbildung 1 zeigt die nachfolgend beschriebene Konfiguration im OPC UA-Modul:

  • Poll Groups: „PollGroup a“ mit Interval 1000ms, Name: “a”, Output Topic: “PollGroup a”

  • Field Name: a und b

Die angelegte "Pollgroup a" muss nun als Input Edge Topic im Stream Processor angelegt werden. Anschließend müssen die erläuterten Parameter für den Input Switch definiert werden. Im Beispiel werden die Nachrichten im Input Switch nach Source ("OPCUA") und Scope ("^a$") weitergefiltert und dem Verarbeitungsblock "ExpressionEvaluator" zugewiesen (Abbildung 2).

83
Abbildung 1. Konfiguration OPCUA Modul-Input Edge Topic “PollGroup a” vom Stream Processor
84
Abbildung 2. Konfiguration Input Edge Topic und Input Switch

Processing Blocks

Die Verarbeitungsblöcke (Processing Blocks) verarbeiten die einlaufenden Nachrichten und geben die verarbeiteten Nachrichten anschließend über Output Topics zurück an den Edge Broker des EdgeGateways. Jeder Block kann mit Folgeblöcken verbunden werden, Ausgabenachrichten werden dann an die angegeben Blockinstanzen weitergeleitet (per JSON Property into: ["<BlockNameA>, <BlockNameB>"]).

Um Nachrichten mit unterschiedlichen Source- und Scope-Werten (d.h. aus unterschiedlichen Datenquellen) unabhängig voneinander und parallel zueinander zu verarbeiten, bauen die Verarbeitungsblöcke in der Regel für jede Kombination von Source und Scope unabhängige Zustände (z.B. eine Timerinstanz auf dotnet Ebene oder einen Puffer) auf.

Die grundlegenden Parameter der Verarbeitungsblöcke sind in der folgenden Tabelle erläutert und in der EdgeControl UI, wie in Abbildung 3 dargestellt, zu konfigurieren. Die verfügbaren Typen von Verarbeitungsblöcken werden mit ihren spezifischen Konfigurationsparametern in den nachfolgenden Abschnitten beschrieben.

Die Blockinstanz eines Processing Block steht erst nach erfolgter Konfiguration für die Target Blocks im Input Switch zur Verfügung.

Tabelle 2. Übersicht Parameter Processing Blocks
ParameterBeschreibung

Name

Name der Blockinstanz.

Type

Das Feld type legt den Typ des Verarbeitungsblocks fest. Folgende Typen stehen zur Verfügung:

  • Aggregator

  • Dedup

  • ExpEval

  • Resend

  • Router

  • Rule Engine

  • SNCFilter

  • Timer

85
Abbildung 3. Konfiguration Processing Blocks

Output Blocks

Die in den Processing Blocks definierten Output Topics müssen über die Auflistung in "Output Blocks" an den EdgeGateway Broker zurückgegeben werden, um diese in weiteren Modulen zu verarbeiten.

Die in der folgenden Tabelle erläuterten Parameter sind im EdgeGateway konfigurierbar (Abbildung 4).

Tabelle 3. Konfigurationsparameter Output Blocks
ParameterBeschreibung

DefaultOutputTopic

Optional. In das Modul einlaufende Nachrichten, die durch keine Filter Konfiguration passen (inputs), werden, falls konfiguriert, auf dieses Edge Broker Topic geleitet.

Name

Name des konfigurierten Output Topic des jeweiligen Blocks.

Edge Topic

Um eine Nachricht aus dem Stream Processor zurück an den Edge Broker zu geben, muss ein Output Topic konfiguriert werden. Dieses Topic kann zur Auswertung in einem Datenausgabemodul genutzt werden.

86
Abbildung 4. Konfiguration Output Block

Verarbeitungsblöcke

Im folgenden werden die Verarbeitungsblöcke und ihre Konfiguration näher beschrieben.

Aggregator

Funktionsbeschreibung und Anwendung

Der Aggregator-Block sammelt einlaufende Nachrichten. Die in Nachrichten enthaltenen Felder können entsprechend der Konfiguration verschiedenartig behandelt werden.

Ein Anwendungsbeispiel ist eine Minimum/Maximum-Aggregation, um das aufkommende Datenvolumen zu reduzieren. Beispielsweise laufen Temperaturdaten der Außentemperatur im Sekundentakt ein, statt aber 60 Momentan-Werte zu übertragen können für jede Minute nur der Minimum- und Maximum-Wert übertragen werden.

Die in der folgenden Tabelle erläuterten Parameter sind in Abbildung 6 und Abbildung 5 zu finden.

Tabelle 4. Konfigurationsparameter Aggregator-Block
ParameterBeschreibung

Name

Name der Blockinstanz.

Into

Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen.

Window - Based On

Auswahl des Window-Typ

  • Sample-basiertes Window: nutzt die Sample-Anzahl im Puffer, unabhängig von der Zeit. Bei einem (voll) gefülltem Windowpuffer wird mit jedem Einlaufen einer neuen Nachricht eine alte gelöscht. Bei einem gefüllten Window liegt deshalb immer eine gleiche Anzahl von Samples für nachfolgende Berechnungen vor. Das Triggern erfolgt ausschließlich anhand einlaufender Nachrichten.

  • TimeMs-basiertes Window: nutzt die Zeit in Millisekunden (timestamp) als Grundlage für das Verwerfen alter Samples aus dem Puffer. Es kann passieren, dass ganz unterschiedliche Sample-Anzahlen für eine Berechnung vorliegen. Das Fenster und die Aggregatsbildung agieren getriggert durch die Systemzeit und unabhängig von einlaufenden Nachrichten. Die Eigenschaft timestamp ist für den Windowtyp TimeMs zwingend erforderlich!

Window – Size

Die Windowgröße bestimmt den vorhandenen Puffer je nach Window-Typ sample-basiert (Wert als Anzahl) oder zeitbasiert (Wert in Millisekunden).

Window - HoppingEach

Häufigkeit einer Berechnung und Ausgabe je nach Window-Typ sample-basiert (Wert als Anzahl) oder zeitbasiert (Wert in Millisekunden). Ein hoher Wert führt seltener zu einer Ausführung und Ausgabe als ein kleiner.

Behaviour

Die verarbeitende Funktion kann für jedes Feld der einlaufenden Nachricht angegeben werden. Sollte eine ausgehende Nachricht keinen Zeitstempel (Key timestamp) besitzen, so wird dieser mit der aktuellen Zeit erzeugt.

Default

Default-Wert, der für alle Felder gilt, die nicht explizit angegeben sind.

Name

Feldname (Variablenname) im JSON der einlaufenden Nachricht

Value

Funktionsauswahl für das jeweilige Feld. Folgende Werttypen stehen zur Verfügung:

  • ignore: Daten werden verworfen und bei der Ausgabe nicht beachtet

  • collect: Sammeln der Values der einlaufenden Nachrichten des zu bewertenden „Zeitraumes“ in einem Array

  • min: minimaler Datenwert (erfordert numerische Values)

  • max: maximaler Datenwert (erfordert numerische Values)

  • first: zuerst eingelaufener Datenwert

  • last: zuletzt eingelaufener Datenwert

  • sum: Summe der eingelaufen Datenwerte (erfordert numerische Values)

  • avg: mathematisch Durchschnitt (erfordert numerische Values)

  • median: Median (erfordert numerische Values)

  • stddev: Standardabweichung (erfordert numerische Values)

  • default: legt Default-Funktion für alle nicht explizit genannten Felder fest

Output Topic

Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen.

Beispielkonfiguration Aggregator-Block

Folgende Beispielkonfiguration ist in Abbildung 5 und Abbildung 6 dargestellt:

  • „a“ ist explizit konfiguriert und wird in einem Array gesammelt (Value: "Collect") und ausgegeben.

  • „b“ wird explizit ignoriert (Value: "Ignore") und entfällt für die Ausgabenachricht.

  • Da der Timestamp nicht konfiguriert ist, erhält die Default-Funktion die Konfiguration first.

88
Abbildung 5. Konfiguration des Aggregator-Block
87
Abbildung 6. Gesamtkonfiguration Stream Processor mit Aggregator-Block

Dedup

Funktionsbeschreibung und Anwendung

Der Deduplicate-Block (Dedup) verwirft Nachrichten, die gleiche Informationen zur vorherigen Nachricht aufweisen. Die Prüfung kann mittels des Parameter Watch maskiert, d.h. eingeschränkt werden.

Der Block wird verwendet, um redundante Daten/Nachrichten auszufiltern.

Die in der folgenden Tabelle erläuterten Parameter sind in Abbildung 7 zu finden.

Tabelle 5. Konfigurationsparameter Dedup-Block
ParameterBeschreibung

Name

Name der Blockinstanz.

Into

Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen.

Output Topic

Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen.

Overwrite Header Scope with

string, Name für neue Scope Bezeichnung (optional). Dient dem Überschreiben des Header-Feldes Scope. Die Option ist inaktiv, wenn der String nicht vorhanden oder leer ist.

Watch

string oder string Array, Watch ist der Ausdruck für die Auswahl des Prüfkriteriums in der Nachricht. Wenn aufeinanderfolgende Nachrichten an der Stelle/den Stellen „x“ gleich sind, wird die Nachricht verworfen.

Für den Zugriff auf Nachrichtenkeys mit einem Punkt „.“, bspw. Key Data.Value, ist der Zugriff über die Syntax ['Data.Value'] erforderlich.

Beispielkonfiguration Dedup-Block

Die erläuterten Parameter sind in Abbildung 8 und Abbildung 7 beispielhaft angewendet.

90
Abbildung 7. Konfiguration des Dedup-Block
89
Abbildung 8. Gesamtkonfiguration Stream Processor mit Dedup-Block

ExpEval

Funktionsbeschreibung und Anwendung

Der Expression Evaluator-Block (ExpEval) kann Berechnungen und sonstige mathematische oder logische Verknüpfungen auf Grundlage einlaufender Nachrichten durchführen. Der Verarbeitungsblock hat Zugriff auf einlaufende Nachrichten-Payloads.

Die folgenden Skizze veranschaulicht die Funktionsweise:

                         +----------------+
                         |                |
                in ====> |   Expression   | ====> out
                         |     Evaluator  |
   Message in            |                |       Message out
  +-----------------+    +----------------+      +----------------+
  |                 |       ^                    | "a": 7,        |
  | "a": 7,         |       |                    | "b": 2,        |
  | "b": 2,    =================================>| "result": 12,  |
  |                 |       |                    +----------------+
  +-----------------+       |
                            |
           Config           |
            -----------------------
           / "result": "a*b - 2"  /
           -----------------------

Im Beispiel der Skizee wird laut Konfiguration die Berechnung eines Wertes result durchgeführt. Dafür wird auf die einlaufenden Datenfelder a und b zugegriffen. Nach Verarbeitung, wird die einlaufende Nachricht mit dem Wert result erweitert und ausgegeben.

In der folgenden Tabelle werden die Konfigurationsparameter erläutert.

Tabelle 6. Konfigurationsparameter Evaluator-Block
ParameterBeschreibung

Name

Name der Blockinstanz.

Into

Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen.

Expressions - Name

Name des Ergebnis (Key) für einen angegebenen Ausdruck.

Value

string, Auszuwertender Ausdruck. Die Syntax ist unter Ausdrücke beschrieben.

Preserve input properties

optional, boolean, default=false,
Bei Wert true bleiben die einlaufenden Nachrichten-Properties in den ausgehenden Nachrichten erhalten.

Results in object

optional, string, Wenn an dieser Stelle etwas angegeben wird, werden Ergebnisse in einem Unterobjekt der Ausgangsnachricht eingefügt.

Output Topic

Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen.

Ausdrücke

Folgende Operatoren und Ausdrücke stehen zur Verfügung und können ausgewertet werden:

  • mathematische Verknüpfungen mit den Operatoren +, -, *, /

  • Modulo mit Operator %

  • mathematische Funktionen aus .NET Math. Bspw. Round(..), Abs(..), Pow(..), Sqrt(..), ..

  • binäre Verknüpfungen mit den Operatoren | , &, ^, ~

  • logische Verknüpfungen mit den Operatoren ||, &&, or, and, !, not

  • mathematische Vergleiche mit den Operatoren <, >, <=, >=, ==, !=

Neben numerischen Konstanten (wie 12 oder 7.91) oder booleschen true/false kann auf die Properties der einlaufenden Nachricht per JSON Key zugegriffen werden. Die folgende Tabelle zeigt einige Beispiele.

Tabelle 7. Beispiel-Expressions
Expression (JSON Value)Variables (aus einlaufender Nachricht)Ergebnis (JSON Value)

"1.1*6"

6.6

"20 % 10"

0

"not true"

false

"true && true"

true

"true || false"

true

"not(2>8)"

true

"foo*bar"

foo:20; bar:4

80

"foo-bar"

foo:20; bar:4

16

"foo/bar"

foo:20; bar:4

5

"foo+bar"

foo:20; bar:4

24

"foo%bar"

foo:20; bar:4

0

"foo-(bar * 100)"

foo:20; bar:4

-380

Zu beachten ist, dass dieser Verarbeitungsblock über keinen Zustand verfügt. Jede Berechnung erfolgt anhand der Konfiguration und der einlaufenden Nachrichten!

.Net Math Operations Syntax: Auf die Funktionen aus dotnet math kann folgendermaßen zugegriffen werden:

  • Potenzieren: y = x^a ⇒ "y": "Pow(x,a)"

  • Runden: y = x (auf zwei Nachkommastellen) ⇒ "y": "Round(x,2)"

  • Quadratwurzel: y = sqrt(x) ⇒ "y": "Sqrt(x)"

  • Abrunden auf Ganzzahl: x ⇒ "y": "Floor(x)"

  • Aufrunden auf Ganzzahl: x ⇒ "y": "Ceiling(x)"

  • Betrag einer Zahl: x ⇒ "y": "Abs(x)"

Weitere Details sind in der Dokumentation der Math Klasse .Net Core zu finden: https://docs.microsoft.com/de-de/dotnet/api/system.math.sqrt?view=netcore-3.1

Beispielkonfiguration Expression Evaluator-Block

Die Berechnung im Beispiel in Abbildung 10 und Abbildung 9, ergibt sich aus dem Feld „a“ einer einlaufenden Nachricht und Addition mit einem konstanten Wert 1. Das Ergebnis wird in dem Feld „aPlus1“ abgelegt. Im Weiteren sollen alle vorhandenen Felder auf der Eingangsseite erhalten bleiben (→ Preserve input properties: true). Das Ergebnis soll direkt in das Top Level JSON-Objekt eingefügt werden (→ Results in object: null).

92
Abbildung 9. Konfiguration des Expression Evaluator-Block
91
Abbildung 10. Gesamtkonfiguration Stream Processor mit Expression Evaluator-Block

Resend

Funktionsbeschreibung und Anwendung

Der Resend-Block speichert jede einlaufende Nachricht in einem Cache und sendet diese in einem Intervall erneut aus. Jede einlaufende Nachricht wird dabei sofort wieder am Ausgang ausgesendet und der Timer für das erneute Aussenden neugestartet.

Der Block kann verwendet werden, um eine Datenübertragung in einem Zeitintervall zu erzwingen.

Beispielsweise wird ein Temperaturwert per Subscription gelesen und nur gesendet, wenn sich dieser ändert. In der Zielplattform möchte man jedoch mindestens einmal pro Tag ein Wert vom EdgeGateway erhalten, da sonst z.B. ein Alarm ausgelöst wird. Der Resend-Block kann nun vor der Übergabe an die Zielplattform platziert werden, um mindestens alle 24h erneut den letzten Wert zu übertragen.

In der folgenden Tabelle werden die Konfigurationsparameter erläutert.

Tabelle 8. Konfigurationsparameter Resend-Block
ParameterBeschreibung

Name

Name der Blockinstanz.

Into

Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen.

Output Topic

Name des Output Topic des jeweiligen Blocks

ResendInterval

Millisekunden-Intervall, in dem die letzte eingelaufene Nachricht zyklisch erneut gesendet wird.

Upate Timestamp On Resend

default=false, true = Beim Resend wird der Zeitstempel mit der aktuellen Zeit überschrieben.

Beispielkonfiguration Resend-Block

109
Abbildung 11. Gesamtkonfiguration Stream Processor mit Resend-Block
110
Abbildung 12. Konfiguration des Resend-Block

Router

Funktionsbeschreibung und Anwendung

Mit dem Router-Block können einlaufende Nachrichten bedingungsbezogen in bestimmte Verarbeitungsblöcke weitergeleitet werden.

Hierfür gilt:

  • Jede Bedingung muss einen booleschen Zustand zurückgeben.

  • Jede Bedingung, die keinen booleschen Zustand zurückgibt, ist automatisch "false".

  • Sind alle Bedingungen einer Route erfüllt, wird die Nachricht an alle Blöcke, die in "into" aufgelistet sind und/oder an das Output Topic weitergeleitet.

  • Es werden immer alle konfigurierten Routes überprüft.

  • Wenn keine Route erfüllt wird, wird die Nachricht an den defaultInto-Block und/oder an das defaultOutputTopic weitergeleitet.

Folgende Ausdrücke sind in den Bedingungen zulässig:

  • Regular Expression

    • Syntax: RegEx(<value>,'<pattern>')

    • Hochkomma (''): Für Value kann ein statischer Wert eingetragen werden

    • Ohne Hochkomma (''): Value aus Parametern (Header & Body) wird geholt.

  • alle Ausdrücke, die auch im Expression Evaluator Block angewendet werden können

Der Zugriff auf Nachrichteninhalte, wie die Eigenschaften des Headers und des Bodys, kann über die Bedingungen umgesetzt werden. Dies funktioniert über den Prefix header_ oder body_ innerhalb der Bedingung.

In der folgenden Tabelle werden die Konfigurationsparameter erläutert. Die Route-Blöcke enthalten die Routes mit ihren Bedingungen (conditions). Werden mehrere Bedingungen angegeben, so werden diese per Und-Logik verbunden. Das Ergebnis einer Bedingung muss immer einen boolschen Wert ergeben.

Tabelle 9. Konfigurationsparameter Router-Block
ParameterBeschreibung

Name

Name der Blockinstanz

Default Output Topic

Name des Output Topic. Wird verwendet, wenn kein OutputTopic in den Routes angegeben wurde.

Default Into

Optional. Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen.

Route Name

Name des Route-Blocks. Wurden mehrere Route-Blöcke angelegt, erscheinen diese als wählbare Reiter unter der Eingabezeile.

Conditions - Name

Name für die jeweilige Bedingung.

Conditions - Value

String, der die Logik für die Bewertung der keys enthält.

Into

stringarray, optional. Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen.

Output Topic

Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen.

Beispielkonfiguration Router-Block

Die schrittweise Konfiguration des Stream Processors mit Block Routing wird am folgenden Beispiel erläutert (vgl. Abbildung 15):

  1. Aggregate-Blöcke 1 bis 3 mit dem Output Topic „out“ und den gleichen Parametern anlegen (Abbildung 13)

    • Samples mit Size „1” und HoppingEach „1“

    • Behavior: ignore default and collect für „b”

  2. Neuen Processing Block vom Type Router mit Name "Routing" anlegen (Abbildung 14)

    • Routes anlegen:

      • Route Name: b<10000

      • Conditions:

        • Name: toAggregate1

        • Value: RegEx(header_scope, '^a$') && body_b<10000

      • into: aggregate1

94
Abbildung 13. Konfiguration des Aggregator-Blocks "aggregate1"
95
Abbildung 14. Konfiguration des Route-Block "Routing"
93
Abbildung 15. Gesamtkonfiguration Stream Processor mit Router-Block

RuleEngine

Funktionsbeschreibung und Anwendung

Der RuleEngine-Block dient der Beschreibung von Zustandsübergängen anhand einlaufender Nachrichten in Abhängigkeit von Vorbedingungen. Dabei können Form und Inhalt der Ausgangsnachricht frei definiert werden.

  • Es existiert ein Kontext in dem Informationen abgelegt werden können, die bei der Auswertung der Bedingungen oder in der Ausgangsnachricht verwendet werden können.

  • Der Kontext kann initial mit Informationen gefüllt werden.

  • Es gibt eine Menge von "RuleSets", wobei jede einzelne eine Menge von "Rules" enthalten kann.

  • Jedes "RuleSet" hat Bedingungen.

  • Jede "Rule" in einem "RuleSet" hat Bedingungen.

  • Jedes "RuleSet" kann eine "defaultRule" beinhalten, diese besitzt keine Bedingung.

  • Es kann neben den "RuleSets" auch ein "defaultRuleSet" existieren, wobei dieses keine Bedingungen enthält.

  • Das "defaultRuleSet" kann wiederrum eine Menge von "Rule" sowie eine "defaultRule" beinhalten.

  • Jede Bedingung muss einen booleschen Zustand zurückgeben.

  • Jede Bedingung, die keinen booleschen Zustand zurückgibt, ist automatisch "Falsch" (false).

  • In den Bedingungen kann auf Inhalte des Kontextes und der eingehenden Nachricht (header & body & context) zugegriffen werden.

  • Damit eine "Rule" zutrifft, müssen alle Bedingungen von dieser erfüllt sein.

  • Je nach Einstellung „Evaluation Stategy“: FirstMatch, BestMatch oder LeastMatch kann eine Überprüfung der weiteren "Rules" übersprungen, die „zutreffendste“ oder die letzte zutreffende "Rule" eines "RuleSets" verwendet werden.

  • Das Ergebnis einer "Rule" wird in dem Kontext abgelegt, der Inhalt des Kontext wird gepatched.

  • Es können gezielt Inhalte aus dem Kontext nach dem Patchen entfernt werden.

  • Wird das angegebene MessageTemplate nicht gefunden, wird das MessageTemplate "default" genommen.

  • Die Ausgangsnachricht wird anhand des MessageTemplates erzeugt.

  • Ist in dem MessageTemplate kein Header definiert, werden die Informationen "source" und "scope" aus der Eingangsnachricht übernommen.

  • Ist in dem MessageTemplate Header kein "source" und/oder "scope" definiert, werden die Informationen aus der Eingangsnachricht übernommen.

  • Ist in dem MessageTemplate kein Body definiert, enthält die Ausgangsnachricht keine Informationen im Body.

  • Nach dem Erfüllen einer "Rule" wird eine Ausgangsnachricht an alle "Into" und/oder das Output Topic gesendet.

Bezeichnungen und Syntax

Die in der folgenden Tabelle erläuterten Parameter sind in einer beispielhaften Konfiguration und deren Abbildungen im folgenden Absatz zu finden.

Tabelle 10. Konfigurationsparameter Rule Engine-Block
ParameterBeschreibung

Name

Name der Blockinstanz

Context

Optionaler Bereich, in dem Variablen mit oder ohne initialen Inhalt initialisiert werden. Jedes Contextelement kann in den Bedingungen verwendet werden.

Context – Name

optional, Name des Contextelementes

Context – Value

optional, Inhalt des Contextelementes

RuleSets

Zusammenstellung von Bedingungspfaden/ Zustandsüberwachungen. Es können beliebig viele erstellt werden. Die angelegten RuleSets erscheinen als Reiter unterhalb der Eingabemöglichkeit. Die Namen können nicht geändert werden – RuleSetsn, n steht hierbei für eine fortlaufende Zahl. Hier kann auch ein „DefaultRuleSet“ definiert werden.

RuleSets – Evaluation Strategy

Es kann zwischen FirstMatch, BestMatch oder LeastMatch, also erster, bester oder letzter Übereinstimmung, entschieden werden. Die Voreinstellung ist „FirstMatch“.

RuleSets – DefaultRuleSet

Zu befüllen wie ein normales RuleSet mit Rules – jedoch ohne Bedingungen. In diesem RuleSet können wiederum Rules angelegt werden, die diverse Bedingungen enthalten.

RuleSets – conditions

optional, Globale Bedingung(en) damit in diesem RuleSet die Rules auf Übereinstimmung geprüft werden.

RuleSets – conditions – Name

optional, Bezeichnung der Bedingung eingeben

RuleSets – conditions – Value

optional – sofern kein Name/ key definiert wurde, Wert aus dem Context, body, header oder selbstdefiniert

RuleSets – Rules

Es kann zwischen den Reitern per Pfeiltasten gewechselt werden. Zusammenstellung von Bedingungspfaden/ Zustandsüberwachungen zur Verfeinerung der Bedingungspfade. Es können beliebig viele erstellt werden. Die angelegten Rules erscheinen als Reiter unterhalb der Eingabemöglichkeit. Die Namen können nicht geändert werden – Rules-n, n steht hierbei für eine fortlaufende Zahl. Hier kann auch ein „DefaultRule“ definiert werden.

RuleSets – Rules – Context

Beliebige Anzahl von Contextelementen

RuleSets – Rules – Context – Name

Name des Contextelementes

RuleSets – Rules – Context – Value

Inhalt des Contextelementes

RuleSets – Rules – Into

Folgeblock bestimmen, wenn notwendig

RuleSets – Rules – Obsolete context

Hier können Elemente des Context entfernt werden.

RuleSets – Rules – outputTopic

Optionale Eingabe eines Outputtopics, bei keiner Angabe, werden alle Informationen, bis auf den Context verworfen.

RuleSets – Rules – into

Optionale Eingabe eines Blocknamens zur internen Weiterverarbeitung

RuleSets – Rules – Message Template

Optionale Eingabe eines Namens für das Messagetemplate zur Erstellung der Outputnachricht

Message Templates

Beliebig viele Vorlagen für die Ausgabe von Nachrichten und Daten. Es können hier alle Elemente aus dem Nachrichtenbereich Header und Body bearbeitet werden.

Message Templates – Header – Name

optional, Name des zu erstellenden keys

Message Templates – Header – Value

optional, sofern kein Name/ key definiert wurde, Wert aus dem Context, Body, Header oder selbstdefiniert

Message Templates – Body – Name

optional, Name des zu erstellenden keys

Message Templates – Body – Value

optional, sofern kein Name definiert wurde, Wert aus dem Context, Body, Header oder selbstdefiniert

Output Topic

Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen.

Verwendung von Elementen aus Header, Body und Context:

  • case sensitive Namensverwendung

  • Um das Element source aus dem Nachrichtenbereich Header zu erhalten, muss header_ für den zu verwendenden Bereich und source für das Element eingegeben werden. → header_scope

Verwendung einer Zeichenkette/ String

  • Eine Zeichenkette wird mit einem vorangehenden „ ‘ „ und einem abschließenden „ ‘ „ gekennzeichnet. Bsp.: ‘Zeit verstrichen‘

  • Werden die Zeichen nicht verwendet, so wird die Eingabe wie eine Variable verwendet.

Beispielkonfiguration Rule Engine-Block

Die schrittweise Konfiguration des Stream Processors mit dem Block Rule Engine wird am folgenden Beispiel dargestellt. Es werden Daten eines OPC UA Servers verarbeitet. Die Reihenfolge sollte wie beschrieben erfolgen, da so alle angezeigten Auswahlmöglichkeiten im Zuge der Konfiguration zur Verfügung stehen. Auswahlwahlmöglichkeiten stehen immer erst nach einem Speichervorgang zur Verfügung.

Voraussetzung: Im Modul OPC UA wurde ein OPC UA Server vollständig mit dem Output Topic „OpcuaOut1“ konfiguriert (Abbildung 16).

96
Abbildung 16. Pollgroup im OPC UA-Modul

Nun werden die Grundeinstellungen im Stream Processor konfiguriert - Input Edge Topics, Input Switch mit den zugehörigen Topics, Output Topics und ein RuleEngine-Block mit dem Namen "UsingRules" (Abbildung 17)

97
Abbildung 17. Grundeinstellungen im Stream Processor

Es folgt die Definition der Message Templates "default" und "Rule1":

  • default: Änderung des Inhaltes des Headers von Source mit einer Zeichenfolge und des Inhaltes Zusatzinformation mit einer Zeichenfolge im Body (Abbildung 18)

  • Rule1: Änderung des Inhaltes des Headers von Source mit einer Zeichenfolge und Scope mit dem Inhalt eines Elementes vom Context. Der Body wird mit Werten aus dem ursprünglichen Body - statUint und dynUint – befüllt und mit während der Abarbeitung der Rules generierten Werten – Kontext1 und Zusatz (Abbildung 19).

98
Abbildung 18. Einstellungen in Message Template "default"
99
Abbildung 19. Einstellungen in Message Template "Rule1"

Anschließend wird der Context des Block "UsingRules" konfiguriert: hier optional die Eingabe von Kontext mit Inhalten (Abbildung 20).

100
Abbildung 20. befüllter Context, erstelltes DefaultRuleSet und RuleSet-2

Es folgt die Konfiguration der RuleSets:

  • DefaultRuleSet (Abbildung 21)

    • Evaluation Strategy eintragen: FirstMatch

    • Rules: Add Default → DefaultRule erstellt

    • Context und Into wurden leer gelassen

    • Im Obsolete Context werden die Elemente Kontext2 und Kontext3 entfernt

    • OutputTopic mit OUTBlock und Message Template mit default angeben

101
Abbildung 21. DefaultRuleSet ausgefüllt
  • Ruleset-2 (Abbildung 22)

    • Evaluation Strategy eintragen: FirstMatch

    • Rules: Add Rule → Rule-1 erstellt

      • In Conditions wird der Name mit – dynUint >= 35000 – eingetragen.

      • In Conditions wird die Bedingung mit – body_dynUint >= 30000 – eingetragen.

      • Im Context werden zwei Elemente mit einem neuen Wert beschrieben:

        • RulesetTest mit der Zeichenfolge 'RuleSet2 Rule1'

        • Kontext1 mit der Zeichenfolge 'dynUint >= 35000'

      • Into wird leer gelassen, es soll keine Folgeverarbeitung in anderen Blöcken stattfinden

      • Obsolete Context wird leer gelassen

      • OutputTopic mit OUTBlock und Message Template mit Rule1 angeben

102
Abbildung 22. Ruleset-2 Rule1 ausgefüllt

Sind die RuleSets erstellt, müssen die Rules mit ihren Bedingungen konfiguriert werden.

  • Rules: Add Rule → Rule-2 erstellt (Abbildung 23)

    • Conditions

      • Name: dynUint kleiner gleich 10000 – dies ist nur eine Beschreibung der Bedingung.

      • Value: body_dynUint ⇐ 10000 – dies ist die Bedingung

    • Im Context werden zwei Elemente mit einem neuen Wert beschrieben:

      • RulesetTest mit der Zeichenfolge 'RuleSet2 Rule2'

      • Kontext1 mit der Zeichenfolge 'dynUint ⇐ 10000'

    • Into wird leer gelassen, es soll keine Folgeverarbeitung in anderen Blöcken stattfinden

    • Obsolete Context wird leer gelassen

    • OutputTopic mit OUTBlock und Message Template mit Rule1 angeben

103
Abbildung 23. Ruleset-2 Rule2 ausgefüllt

Mit dieser Konfiguration kann nun über den MQTT oder über andere Schnittstellen die fertige Nachricht ausgegeben werden.

SNCFilter

Funktionsbeschreibung und Anwendung

Der Significant-Numeric-Change-Filter (SNCFilter) dient der Reduzierung der Datenlast durch schwankende Sensor- oder Analogwerte. Der Filter sendet nur eine Nachricht, wenn:

  • Der Daten Key der zu filternden Variablen unter Thresholds konfiguriert ist

  • UND der zugehörige Value ggü. der zuletzt weitergeleiteten Nachricht eine Änderung betraglich größer/gleich des konfigurierten Thresholds hat.

Beim erstmaligen Eintreffen eines neuen Parameters, d.h. der Parameter ist im internen Cache noch nicht vorhanden, wird dieser angelegt und zukünftig immer durchgeleitet.

Folgende Parameter sind relevant für die Filterfunktion:

  • Value: aktueller Wert der Variable in der Eingangsnachricht

  • ValueCache: letzter durchgereichter Wert der Variable

  • Threshold: konfigurierte Schwelle

Abbildung 24 veranschaulicht das Verhalten des Filters.

104
Abbildung 24. Skizze SNC-Filterverhalten

Die in folgender Tabelle erläuterten Parameter sind Abbildung 26 zu finden.

Tabelle 11. Konfigurationsparameter SNCFilter-Block
ParameterBeschreibung

Name

Name der Blockinstanz.

Into

Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen.

Thresholds - Name

Name des Daten Key (Variable) in der eingehenden Nachricht. Zugriff nur auf die oberste Objektebene im JSON. Die Variable muss einen numerischen Wert (Ganz- oder Fließkomma Typen) haben.

Thresholds - Value

double oder integer; Numerische Schwelle (einseitig) der Variable.
Genauigkeitsverlust: Die Variable kann ein Bereich von knapp 2*Threshold überschreiten, ohne ein Nachrichten-Update aus Ausgang.

Overwrite Header Scope with

string; optional. Name für neue Scope Bezeichnung, zum Überschreiben des Header-Feldes Scope. Option ist inaktiv, wenn String nicht vorhanden oder leer ist.

Issue Full Model

boolean, default=true;
IssueFullModel=false: Sendet jede Änderung einer Variablen separat in einer Nachricht (mit timestamp) an den Broker. Es entstehen ggf. mehrere Nachrichten aus einer einlaufenden Nachricht, wenn mehrere konfigurierte Schwellen überschritten wurden.
IssueFullModel: true: Sendet die gesamte einlaufende Nachricht mit allen Parametern, sobald eine Schwelle überschritten wurde.

Output Topic

Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen.

Beispielkonfiguration SNCFilter-Block

Folgende Abbildungen zeigen eine beispielhafte Konfiguration des SNCFilter.

105
Abbildung 25. Gesamtkonfiguration Stream Processor mit SNCFilter Block
106
Abbildung 26. Konfiguration des SNCFilter-Block

Timer

Funktionsbeschreibung und Anwendung

Der Timer-Block kann einlaufende Nachrichten verzögern. Im Detail besitzt der Block die folgenden Events:

a) Start bzw. Neustart des Timers.
Startet den Timer oder startet einen laufenden Timer von vorn. Es wird jedes mal eine Action (falls konfiguriert) OnStartEvent ausgelöst. Der Event Trigger ist über die Bedingungen in OnStartEvent.conditions festzulegen.

b) Cancel Timer. Ein laufender Timer wird aufgrund einer Nachricht gestoppt und die Action OnCancelEvent ausgeführt.
Der Event Trigger ist über die Bedingungen in OnCancelEvent.conditions festzulegen.

c) Timer Expired. Ein Timer hat die konfigurierte Zeitspanne erreicht.
Das Event wird nur durch den Timer getriggert und kann nicht per Bedingung konfiguriert werden. Das Event OnExpiredEvent ausgelöst, wenn konfiguriert wird die entsprechende Action ausgeführt.

Die angegebenen conditions müssen alle erfüllt sein (true), um den Trigger/die Aktion zu erhalten/auszulösen. Der Zugriff auf die Nachrichten Properties der Timer Start Nachricht über body_<BODY_KEY> und header_<HEADER_KEY> ist ebenso wie in den Blöcken RuleEngine und Router möglich.

Als Aktion auf ein Event stehen die folgenden Möglichkeiten über das Property Type zur Wahl:

  • SendMessage ⇒ Sende eine vordefinierte Nachricht. Der Nachrichteninhalt ist unter Data zu konfigurieren.

  • SendStartMessage ⇒ Sende die (komplette) Nachricht aus, die zuletzt zum Start des Timers geführt hat. Der timestamp der Nachricht wird mit dem Aussenden aktualisiert.

Wird ein Timer bspw. mit den identischen Bedingungen für Start und Cancel konfiguriert und treffen diese mit einem Nachrichteneingang zu, so wird der Timer gestartet und sofort wieder gestoppt und die zugehörigen Aktionen ausgeführt.

Die folgende Tabelle erläutertet die Konfigurationsparameter des Timer-Blocks.

Tabelle 12. Konfigurationsparameter Timer Block
ParameterBeschreibung

Name

Name der Blockinstanz.

Into

Name der Blockinstanz des Folgeblocks an welchen Ausgabenachrichten weitergeleitet werden sollen. mit Folgeblöcken verbunden und Ausgabenachrichten an die entsprechend angegeben Blockinstanzen (Namen) weitergeleitet werden.

Timespan(ms)

Zeitspanne für den Timer in Millisekunden

Konfiguration Timer

Folgende Events sind im Block vorhanden

  • OnStartEvent

  • OnCancelEvent

  • OnExpiredEvent

Die konfigurierbaren Actions OnStartEvent, OnCancelEvent und OnExpiredEvent sind nur bei Bedarf zu konfigurieren und können ggf. weggelassen werden. Bei Auftreten eines der Events und einer konfigurierten Aktion wird der aktuelle Timestamp im Message Body hinzugefügt (und ggf. überschrieben). Bei ausgehenden Nachrichten kann das Event per Header Property timer-was: {started, canceled, expired} für weitere Prüfung herangezogen werden.

Conditions

Alle in diesem Objekt angegebene Ausdrücke müssen mit true ausgewertet werden, um das Event auszuführen.

Name

Nutzerdefinierter Name der Bedingung. Der Wert kann frei gewählt werden, es bestehen keine Abhängigkeiten zu anderen Konfigurationsparametern.

Value

string, Ausdruck für die Prüfung der Regel. Für den Zugriff auf die Properties der zugehörigen Start-Nachricht ist Folgendes zu beachten:
Der Zugriff auf die Nachrichten-Properties der Timer Start-Nachricht ist über body_<BODY_KEY> und header_<HEADER_KEY> möglich.

Header

Nachrichten-Body der auszusendenden Nachricht bei diesem Event.

Body

Nachrichten-Header der auszusendenden Nachricht bei diesem Event.

Output Topic

Name des modulinternen Output Topic, um Output-Nachrichten auf die Edge Broker Topic(s) zu mappen.

In der folgenden Darstellung ist ein beispielhaftes Szenario aufgezeigt. Es läuft die Nachricht [1] ein. Diese wird vom Block ignoriert, da die Timer condition (state >=10) nicht erfüllt ist. Nachricht [2] folgt einlaufend und triggert den Start, da hier state >=10. Der Timer startet und sendet eine Nachricht [3], die vielleicht für Debugging Zwecke konfiguriert ist, sofort heraus. Der Timer erreicht die konfigurierte Zeit von 20 Sekunden und sendet die Nachricht [4], welche der Nachricht [2] mit aktuellem Zeitstempel entspricht, am Ausgang heraus.

                         +----------------+
                in ----->|                |------> out
                         |    MyTimer     |
                         |                |
                         +----------------+


     Timeline:

Message:   [1]                   [2][3]                             [4]
Direction: in                    in out                             out
          -------------------------------------------------------------------> time
                                  |========Timer running============>|
Event:     /                      start                              expired
Action:                            onStart                           onExpired

Message Payloads:

           [1]                   [2]                                [4]
           +--------------+      +--------------+                   +--------------+
           | a:7,         |      | a:8,         |                   | a:8,         |
           | state: 2,    |      | state: 12,   |                   | state: 12,   |
           | timestamp:.. |      | timestamp:.. |                   | timestamp:.. |
           +--------------+      +--------------+                   +--------------+
                                   [3]
                                   +---------------------------+
                                   | info: "timer was started",|
                                   | timestamp:..              |
                                   +---------------------------+

Beispielkonfiguration Timer-Block

Folgende Abbildungen zeigen eine beispielhafte Konfiguration des Timer-Blocks in der EdgeControl UI.

107
Abbildung 27. Gesamtkonfiguration Stream Processor mit Timer-Block
108
Abbildung 28. Konfiguration des Timer-Block