[r810]: / trunk / mqttsparkplug / mqttsparkplug / SparkplugB / Function Blocks / FB_PrimaryHost / FB_PrimaryHost / svnobj  Maximize  Restore  History

Download this file

113 lines (111 with data), 34.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Components
_MQTT.IASyncPropertyProvider( IASyncPropertyProvider );
_MQTT.ITLSContextProvider( ITLSContextProvider );
_MQTT();
    _RecvMesg REF= _MQTT.GetSubscriberMessage(MetricList := THIS^._AllMetricsList, MessageTopic => _RecvTopic );
    //onSubscribeMessage( pTopic := ADR( _RecvTopic ),
    //                    pMessage := _RecvMesg );
        SparkplugMessageTypes.NBIRTH: //Birth certificate for MQTT EoN nodes.
                                        EdgeId := _TopicParser.EdgeId,
                    _MyGroupId REF= THIS^.allocGroupId( GroupId := _TopicParser.GroupId,
                        _MyEdge REF= THIS^.allocEdgetoGroupId( EdgeId := _TopicParser.EdgeId,
                                                               eError => eError );
                        IF eError = ERROR.NO_ERROR THEN
                                                   pMessage := _RecvMesg,
                                                   eError => eError );
                        ELSE
                            xError := TRUE;
                            //eError := eError;
                        END_IF
                    ELSE
                        xError := TRUE;
                        //eError := eError;
                    END_IF
                ERROR.EdgeIDNotFound: // ==> NEW EDGE!
                                                      eError => eError );
                ERROR.NO_ERROR: // ==> UPDATE EXISTING EDGE
                                           pMessage := _RecvMesg,
                                           xError => xError,
                                           eError => eError );
            ELSE
                ; //{info 'TODO: maybe some sort of error??'}
            END_CASE
            
        SparkplugMessageTypes.NDATA: //Node data message.
                                          pMessage := _RecvMesg,
                                          xError => xError,
                                          eError => eError );
                xError := TRUE;
                eError := Error.EdgeIDNotFound;
                                'PrimaryHost process NDATA metric error: %s',
                                TO_STRING( _HandledError ) );  
            END_CASE                             
                ERROR.NO_ERROR: // ==> UPDATE EXISTING EDGE              
                    _MyEdge.ProcessNDEATH();
                eError := Error.OhMy_ThatShouldNotHaveHappened;
        SparkplugMessageTypes.DBIRTH: //Birth certificate forDevices.
            _MyDevice REF= THIS^.GetDevice( GroupId := _TopicParser.GroupId,
                                            DeviceId := _TopicParser.DeviceId,
                ERROR.GroupIDNotFound:  // ==> NEW GROUP! must rebirth the edge
					 eError := ERROR.NotImplemented;
					 xError := TRUE;
                                     'DBIRTH Received with no existing groupID, but haven$'t implemented NCMD Rebirth yet %s',
                                     '' );  
                                     'DBIRTH Received with no existing edge, but haven$'t implemented NCMD Rebirth yet %s',
                                     '' );
                ERROR.DeviceIDNotFound: // ==> NEW DEVICE!
                                                 eError => eError );
                                                               itfEdge := _MyEdge.itfRemoteEoN,
                ERROR.NO_ERROR:         // ==> UPDATE EXISTING DEVICE
					_MyDevice.ProcessDBIRTH( itfPrimaryHost := THIS^,
                                'PrimaryHost process DDEATH metric error: %s',
        SparkplugMessageTypes.DDATA: //Device data message. 
                                            pMessage := _RecvMesg,
                                            xError => xError,
                                            eError => eError );
                eError := Error.DeviceIDNotFound;
        SparkplugMessageTypes.NCMD,      //Node command message.
        SparkplugMessageTypes.DCMD,  //Device command message.
            ;                            //do nothing, ignore these
        ; //{info 'TODO: maybe some sort of error??'}
    {endregion}
_ExponentialBackOff( Enable := (Connect AND NOT Connected),
                     LockTimesArray := _LockTimesArray,
                     OutTimeRemaining => _OutTimeRemaining,
_StateMachine( Connect := _ExponentialBackOff.out,
               pMQTT := ADR( _MQTT ),
               pPrimaryHost := THIS,
Diag();
{a9ed5b7e-75c5-4651-af16-d2c27e98cb94}
    Primary Host Node
    Allows you to concenrate Edge of Network Data sent by various Edges or other Sparkplug compatible nodes
{attribute 'reflection'}
VAR_INPUT
    ServerUrl     : STRING;          // eg:'test.mosquitto.org';
    PrimaryHostID : WSTRING := "";   // eg:'PrimaryHost';
    KeepAlive     : UINT    := 60;   // eg: 60 (Value is in seconds)
    //  * The client has limited resources. 
    //  * You want the broker to store the subscription information of the client and restore the interrupted communication quickly.
    //  * The client needs to reNumberOfMetricse all QoS 1 and 2 publish messages after a reconnect.
    //  Clean session (TRUE)
    //  * The client needs only to publish messages to topics, the client does not need to subscribe to topics. 
    //  * You don
t want the broker to store session information or retry transmission of QoS 1 and 2 messages.
    //  * The client does not need to get messages that it misses offline. 
    CleanSession : BOOL    := FALSE;
    Username     : WSTRING := "";           // specify a username if any (optional)
    Password     : WSTRING := "";           // specify the password for the username (optional)
    UseTLS       : BOOL;                    // Enables usage of the TLS encryption (optional)
    ITLSContextProvider : MQTT.NBS.ITLSContext
    ; //Encapsulates all the data neccessaray to handle encrypted tcp connections
    //Static initialization, as shown in the following code snippet;
    //    commonName : STRING := 'MyRasPi';
    //    ciCertInfo : NBS.CERT_INFO := (psInfo:=ADR(commonName), udiSize:=LEN(commonName));
    //        ePurpose:=NBS.PURPOSE.CLIENT_SIDE,
    //        sTLSVersion:='1.3',
    //        udiVerificationMode:=2
    //END_VAR
    IASyncPropertyProvider : MQTT.NBS.IAsyncProperty; // Runs the connect process in a own background task. Use this property if the connection setup takes longer than one task cycle (e.g. TLS connections)
VAR_OUTPUT
    SessionState   : SparkplugSessionStateType;
    eError         : ERROR;
VAR