[r825]: / branches / FeatureBranches / refactorpayload / SparkplugB / Function Blocks / FB_PrimaryHost / FB_PrimaryHost / svnobj  Maximize  Restore  History

Download this file

88 lines (84 with data), 34.4 kB

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Components
_MQTT.IASyncPropertyProvider( IASyncPropertyProvider );
_MQTT.ITLSContextProvider( ITLSContextProvider );
_MQTT();
_SparkplugB( AllmetricsList := _AllMetricsList );
IF _MQTT.SubscriberMessageReceived() THEN
	{region "onSubscribeMessage"}
    _TopicParser( pTopic := ADR(_RecvTopic) ); //parses the topic
    
    WriteDeviceLog( UDINT_TO_DWORD( CmpLog.LogClass.LOG_INFO ),
                    'PrimaryHost Received Topic: %s',
                    TO_STRING( _TopicParser.MessageType ) );
    CASE _TopicParser.MessageType OF  //SparkplugMessageTypes
            _MyEdge REF= THIS^.GetEdge( GroupId := _TopicParser.GroupId,
                                        eError => _HandledError );
            CASE _HandledError OF
                ERROR.GroupIDNotFound: // ==> NEW GROUP!
                                                        eError => eError );
                    IF eError = ERROR.NO_ERROR THEN
                                                               itfGroupID := _MyGroupId.itfGroupID,
                            _MyEdge.ProcessNBIRTH( itfPrimaryHost := THIS^,
                                                   xError => xError,
                    _MyGroupId REF= THIS^.GetGroupID( GroupId := _TopicParser.GroupId,
                    _MyEdge.ProcessNBIRTH( itfPrimaryHost := THIS^,
                    _MyEdge.ProcessNDATA( itfPrimaryHost := THIS^,
                WriteDeviceLog( UDINT_TO_DWORD( CmpLog.LogClass.LOG_INFO ),
        SparkplugMessageTypes.NDEATH: //Death certificate for MQTT EoN nodes.
                                'PrimaryHost process NDEATH metric error: %s',
            END_CASE 
        
            //TODO: Check if edge is online (isQualityOK).  If not OK, DO NOT process the DBIRTH, instead just send a NCMD/Rebirth
 
                                            EdgeId := _TopicParser.EdgeId,
                                            eError => _HandledError );
					 WriteDeviceLog( UDINT_TO_DWORD( CmpLog.LogClass.LOG_INFO ),
                ERROR.EdgeIDNotFound:   // ==> NEW EDGE! must rebirth the edge
					 _MyEdge REF= THIS^.GetEdge( GroupId := _TopicParser.GroupId,
							                     EdgeId := _TopicParser.EdgeId,
                        _MyDevice REF= THIS^.allocDevicetoEdge( DeviceName := _TopicParser.DeviceId,
                            _MyDevice.ProcessDBIRTH( itfPrimaryHost := THIS^,
        SparkplugMessageTypes.DDEATH:    //Death certificate for Devices.
                    _MyDevice.ProcessDDEATH();
                    _MyDevice.ProcessDDATA( itfPrimaryHost := THIS^,
                                'PrimaryHost process DDATA metric error: %s',
        SparkplugMessageTypes.STATE: //Critical application state message
        ; //{info 'TODO: maybe some sort of error??'}
    {endregion}
_ExponentialBackOff( Enable := (Connect AND NOT Connected),
                     LockTimesArray := _LockTimesArray,
                     OutTimeRemaining => _OutTimeRemaining,
_StateMachine( Connect := _ExponentialBackOff.out,
               pMQTT := ADR( _MQTT ),
               pPrimaryHost := THIS,
Diag();
{a9ed5b7e-75c5-4651-af16-d2c27e98cb94}
    Primary Host Node
    Allows you to concenrate Edge of Network Data sent by various Edges or other Sparkplug compatible nodes
{attribute 'reflection'}
VAR_INPUT
    ServerPort : UINT := 1883; // eg: 1883
    KeepAlive : UINT := 60; // eg: 60 (Value is in seconds)
    //  * The client has limited resources. 
    //  * You want the broker to store the subscription information of the client and restore the interrupted communication quickly.
    //  * The client needs to reNumberOfMetricse all QoS 1 and 2 publish messages after a reconnect.
    //  Clean session (TRUE)
    //  * The client needs only to publish messages to topics, the client does not need to subscribe to topics. 
    //  * You don
t want the broker to store session information or retry transmission of QoS 1 and 2 messages.
    //  * The client does not need to get messages that it misses offline. 
    CleanSession : BOOL := FALSE;
    Username : WSTRING := ""; // specify a username if any (optional)
    Password : WSTRING := ""; // specify the password for the username (optional)
    UseTLS : BOOL; // Enables usage of the TLS encryption (optional)
    hCert : SysTypes.RTS_IEC_HANDLE; // Handle to the client certificate (optional) and only used if UseTLS is TRUE
    ITLSContextProvider : MQTT.NBS.ITLSContext; //Encapsulates all the data neccessaray to handle encrypted tcp connections
    //Static initialization, as shown in the following code snippet;
    //    commonName : STRING := 'MyRasPi';
    //    ciCertInfo : NBS.CERT_INFO := (psInfo:=ADR(commonName), udiSize:=LEN(commonName));
    //        ePurpose:=NBS.PURPOSE.CLIENT_SIDE,
    //        sTLSVersion:='1.3',
    //        udiVerificationMode:=2
    //END_VAR
    IASyncPropertyProvider : MQTT.NBS.IAsyncProperty; // Runs the connect process in a own background task. Use this property if the connection setup takes longer than one task cycle (e.g. TLS connections)
VAR_OUTPUT
    SessionState   : SparkplugSessionStateType;
    eError         : ERROR;
VAR