1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | // Components _MQTT.IASyncPropertyProvider( IASyncPropertyProvider ); _MQTT.ITLSContextProvider( ITLSContextProvider ); _MQTT(); _RecvMesg REF= _MQTT.GetSubscriberMessage(MetricList := THIS^._AllMetricsList, MessageTopic => _RecvTopic ); //onSubscribeMessage( pTopic := ADR( _RecvTopic ), // pMessage := _RecvMesg ); SparkplugMessageTypes.NBIRTH: //Birth certificate for MQTT EoN nodes. EdgeId := _TopicParser.EdgeId, _MyGroupId REF= THIS^.allocGroupId( GroupId := _TopicParser.GroupId, _MyEdge REF= THIS^.allocEdgetoGroupId( EdgeId := _TopicParser.EdgeId, eError => eError ); IF eError = ERROR.NO_ERROR THEN pMessage := _RecvMesg, eError => eError ); ELSE xError := TRUE; //eError := eError; END_IF ELSE xError := TRUE; //eError := eError; END_IF ERROR.EdgeIDNotFound: // ==> NEW EDGE! eError => eError ); ERROR.NO_ERROR: // ==> UPDATE EXISTING EDGE pMessage := _RecvMesg, xError => xError, eError => eError ); ELSE ; //{info 'TODO: maybe some sort of error??'} END_CASE SparkplugMessageTypes.NDATA: //Node data message. pMessage := _RecvMesg, xError => xError, eError => eError ); xError := TRUE; eError := Error.EdgeIDNotFound; 'PrimaryHost process NDATA metric error: %s', TO_STRING( _HandledError ) ); END_CASE ERROR.NO_ERROR: // ==> UPDATE EXISTING EDGE _MyEdge.ProcessNDEATH(); eError := Error.OhMy_ThatShouldNotHaveHappened; SparkplugMessageTypes.DBIRTH: //Birth certificate forDevices. _MyDevice REF= THIS^.GetDevice( GroupId := _TopicParser.GroupId, DeviceId := _TopicParser.DeviceId, ERROR.GroupIDNotFound: // ==> NEW GROUP! must rebirth the edge eError := ERROR.NotImplemented; xError := TRUE; 'DBIRTH Received with no existing groupID, but haven$'t implemented NCMD Rebirth yet %s', '' ); 'DBIRTH Received with no existing edge, but haven$'t implemented NCMD Rebirth yet %s', '' ); ERROR.DeviceIDNotFound: // ==> NEW DEVICE! eError => eError ); itfEdge := _MyEdge.itfRemoteEoN, ERROR.NO_ERROR: // ==> UPDATE EXISTING DEVICE _MyDevice.ProcessDBIRTH( itfPrimaryHost := THIS^, 'PrimaryHost process DDEATH metric error: %s', SparkplugMessageTypes.DDATA: //Device data message. pMessage := _RecvMesg, xError => xError, eError => eError ); eError := Error.DeviceIDNotFound; SparkplugMessageTypes.NCMD, //Node command message. SparkplugMessageTypes.DCMD, //Device command message. ; //do nothing, ignore these ; //{info 'TODO: maybe some sort of error??'} {endregion} _ExponentialBackOff( Enable := (Connect AND NOT Connected), LockTimesArray := _LockTimesArray, OutTimeRemaining => _OutTimeRemaining, _StateMachine( Connect := _ExponentialBackOff.out, pMQTT := ADR( _MQTT ), pPrimaryHost := THIS, Diag(); {a9ed5b7e-75c5-4651-af16-d2c27e98cb94} Primary Host Node Allows you to concenrate Edge of Network Data sent by various Edges or other Sparkplug compatible nodes {attribute 'reflection'} VAR_INPUT ServerUrl : STRING; // eg:'test.mosquitto.org'; PrimaryHostID : WSTRING := ""; // eg:'PrimaryHost'; KeepAlive : UINT := 60; // eg: 60 (Value is in seconds) // * The client has limited resources. // * You want the broker to store the subscription information of the client and restore the interrupted communication quickly. // * The client needs to reNumberOfMetricse all QoS 1 and 2 publish messages after a reconnect. // Clean session (TRUE) // * The client needs only to publish messages to topics, the client does not need to subscribe to topics. // * You don t want the broker to store session information or retry transmission of QoS 1 and 2 messages. // * The client does not need to get messages that it misses offline. CleanSession : BOOL := FALSE; Username : WSTRING := ""; // specify a username if any (optional) Password : WSTRING := ""; // specify the password for the username (optional) UseTLS : BOOL; // Enables usage of the TLS encryption (optional) ITLSContextProvider : MQTT.NBS.ITLSContext ; //Encapsulates all the data neccessaray to handle encrypted tcp connections //Static initialization, as shown in the following code snippet; // commonName : STRING := 'MyRasPi'; // ciCertInfo : NBS.CERT_INFO := (psInfo:=ADR(commonName), udiSize:=LEN(commonName)); // ePurpose:=NBS.PURPOSE.CLIENT_SIDE, // sTLSVersion:='1.3', // udiVerificationMode:=2 //END_VAR IASyncPropertyProvider : MQTT.NBS.IAsyncProperty; // Runs the connect process in a own background task. Use this property if the connection setup takes longer than one task cycle (e.g. TLS connections) VAR_OUTPUT SessionState : SparkplugSessionStateType; eError : ERROR; VAR |