1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | // Components _MQTT.IASyncPropertyProvider( IASyncPropertyProvider ); _MQTT.ITLSContextProvider( ITLSContextProvider ); _MQTT(); _SparkplugB( AllmetricsList := _AllMetricsList ); IF _MQTT.SubscriberMessageReceived() THEN {region "onSubscribeMessage"} _TopicParser( pTopic := ADR(_RecvTopic) ); //parses the topic WriteDeviceLog( UDINT_TO_DWORD( CmpLog.LogClass.LOG_INFO ), 'PrimaryHost Received Topic: %s', TO_STRING( _TopicParser.MessageType ) ); CASE _TopicParser.MessageType OF //SparkplugMessageTypes _MyEdge REF= THIS^.GetEdge( GroupId := _TopicParser.GroupId, eError => _HandledError ); CASE _HandledError OF ERROR.GroupIDNotFound: // ==> NEW GROUP! eError => eError ); IF eError = ERROR.NO_ERROR THEN itfGroupID := _MyGroupId.itfGroupID, _MyEdge.ProcessNBIRTH( itfPrimaryHost := THIS^, xError => xError, _MyGroupId REF= THIS^.GetGroupID( GroupId := _TopicParser.GroupId, _MyEdge.ProcessNBIRTH( itfPrimaryHost := THIS^, _MyEdge.ProcessNDATA( itfPrimaryHost := THIS^, WriteDeviceLog( UDINT_TO_DWORD( CmpLog.LogClass.LOG_INFO ), SparkplugMessageTypes.NDEATH: //Death certificate for MQTT EoN nodes. 'PrimaryHost process NDEATH metric error: %s', END_CASE //TODO: Check if edge is online (isQualityOK). If not OK, DO NOT process the DBIRTH, instead just send a NCMD/Rebirth EdgeId := _TopicParser.EdgeId, eError => _HandledError ); WriteDeviceLog( UDINT_TO_DWORD( CmpLog.LogClass.LOG_INFO ), ERROR.EdgeIDNotFound: // ==> NEW EDGE! must rebirth the edge _MyEdge REF= THIS^.GetEdge( GroupId := _TopicParser.GroupId, EdgeId := _TopicParser.EdgeId, _MyDevice REF= THIS^.allocDevicetoEdge( DeviceName := _TopicParser.DeviceId, _MyDevice.ProcessDBIRTH( itfPrimaryHost := THIS^, SparkplugMessageTypes.DDEATH: //Death certificate for Devices. _MyDevice.ProcessDDEATH(); _MyDevice.ProcessDDATA( itfPrimaryHost := THIS^, 'PrimaryHost process DDATA metric error: %s', SparkplugMessageTypes.STATE: //Critical application state message ; //{info 'TODO: maybe some sort of error??'} {endregion} _ExponentialBackOff( Enable := (Connect AND NOT Connected), LockTimesArray := _LockTimesArray, OutTimeRemaining => _OutTimeRemaining, _StateMachine( Connect := _ExponentialBackOff.out, pMQTT := ADR( _MQTT ), pPrimaryHost := THIS, Diag(); {a9ed5b7e-75c5-4651-af16-d2c27e98cb94} Primary Host Node Allows you to concenrate Edge of Network Data sent by various Edges or other Sparkplug compatible nodes {attribute 'reflection'} VAR_INPUT ServerPort : UINT := 1883; // eg: 1883 KeepAlive : UINT := 60; // eg: 60 (Value is in seconds) // * The client has limited resources. // * You want the broker to store the subscription information of the client and restore the interrupted communication quickly. // * The client needs to reNumberOfMetricse all QoS 1 and 2 publish messages after a reconnect. // Clean session (TRUE) // * The client needs only to publish messages to topics, the client does not need to subscribe to topics. // * You don t want the broker to store session information or retry transmission of QoS 1 and 2 messages. // * The client does not need to get messages that it misses offline. CleanSession : BOOL := FALSE; Username : WSTRING := ""; // specify a username if any (optional) Password : WSTRING := ""; // specify the password for the username (optional) UseTLS : BOOL; // Enables usage of the TLS encryption (optional) hCert : SysTypes.RTS_IEC_HANDLE; // Handle to the client certificate (optional) and only used if UseTLS is TRUE ITLSContextProvider : MQTT.NBS.ITLSContext; //Encapsulates all the data neccessaray to handle encrypted tcp connections //Static initialization, as shown in the following code snippet; // commonName : STRING := 'MyRasPi'; // ciCertInfo : NBS.CERT_INFO := (psInfo:=ADR(commonName), udiSize:=LEN(commonName)); // ePurpose:=NBS.PURPOSE.CLIENT_SIDE, // sTLSVersion:='1.3', // udiVerificationMode:=2 //END_VAR IASyncPropertyProvider : MQTT.NBS.IAsyncProperty; // Runs the connect process in a own background task. Use this property if the connection setup takes longer than one task cycle (e.g. TLS connections) VAR_OUTPUT SessionState : SparkplugSessionStateType; eError : ERROR; VAR |