#32 FB_MQTT_PubSub Considerations

MVP EoN
closed
aliazzz
None
2020-09-14
2020-05-24
aliazzz
No

PubSub can inform us that a message has been received via .MessageReceived().
The message & topic then can be obtained via .getmessage().

Example

IF PubSub.MessageReceived() THEN
        myRecvMessage := PubSub.MyMessage( wsTopic => MyTopic);

        // pass the message to the payload decoder
        MyDecodedMessage := Payload.Decode( pBuf:=ADR(myRecvMessage),udiBufSize:=SIZEOF(myRecvMessage));
END_IF;

=> Passing a pointer/length of the blob (the undecoded received message) is far superior in speed and resources as no data is physically copied around. Binding a dynamic buffer is under future consideration.

=> Interaction/handshaking between PubSub and Payload in the usual way (xbusy, xdone, xerror) and methods for this are provided.

=> according to the documentation of the CODESYTS IIoT lib, there is a maximum message payload length.

In the CODESYS IIot MQTT library, is a single subscriber only capable of just subscribing to a single topic?
=> I Assume so, as the provided example shows a single topic per subscriber instance.

Discussion

  • aliazzz

    aliazzz - 2020-05-24
    • Description has changed:

    Diff:

    --- old
    +++ new
    @@ -8,5 +8,7 @@
     To mitigate send/receive issues, maybe we should implement a send/receive message buffer?
     This buffer then can act as LIFO on and ideally isn't filled (the contents is sent/received directly), but when many events occur at once, the buffer gets utilized.
    
    -If we implement this buffer, where should it be implemented?
    +Is such a buffer allready implemented within the MQTT lib?
    +If not, and we should implement this buffer, where should it be implemented?
     My guts say it should be in the vicinity of the Payload either before/after or embedded into.
    +Ideas?
    
     
  • aliazzz

    aliazzz - 2020-05-24
    • Description has changed:

    Diff:

    --- old
    +++ new
    @@ -12,3 +12,11 @@
     If not, and we should implement this buffer, where should it be implemented?
     My guts say it should be in the vicinity of the Payload either before/after or embedded into.
     Ideas?
    +
    +////
    +
    +Also at this moment, I have code which publishes arbitrary messages.
    +The initialisation of  sending a new message takes 2 cycles, as the first cycle resets the state machine (xDone = false, xBusy = false, Xerror = false, internal client state is dormant)
    +then a rising edge on exectue sends a new message.
    +
    +I guess there is no "shorter" way to handshake the process, as the stats output need to be digested after a send. When Execute is set to False, the statemachine is reset and dormant.
    
     
  • aliazzz

    aliazzz - 2020-05-24
    • Description has changed:

    Diff:

    --- old
    +++ new
    @@ -15,8 +15,28 @@
    
     ////
    
    +Part2
     Also at this moment, I have code which publishes arbitrary messages.
     The initialisation of  sending a new message takes 2 cycles, as the first cycle resets the state machine (xDone = false, xBusy = false, Xerror = false, internal client state is dormant)
     then a rising edge on exectue sends a new message.
    
     I guess there is no "shorter" way to handshake the process, as the stats output need to be digested after a send. When Execute is set to False, the statemachine is reset and dormant.
    +
    +EDIT => part2 Solved
    +
    +~~~
    +// publish
    +xPublishBusy := MQTT_PubSub.PublisherBusy();
    +xPublishDone := MQTT_PubSub.PublisherDone();
    +xPublishError := MQTT_PubSub.PublisherError( eError => ePublishError );
    +
    +MQTT_PubSub.Publish( Execute := xPublish,
    +                                                    PubTopic:= PublishTopic, 
    +                                                    PubMessage:= PublishMessage, 
    +                                                    PubQos:= PublishQos, 
    +                                                    PubRetain:= PublishRetain );
    +               
    +xPublish := xPublishBusy AND NOT(xPublishDone OR xPublishError);
    +~~
    +
    +
    
     
  • aliazzz

    aliazzz - 2020-05-24
    • Description has changed:

    Diff:

    --- old
    +++ new
    @@ -37,6 +37,6 @@
                                                         PubRetain:= PublishRetain );
    
     xPublish := xPublishBusy AND NOT(xPublishDone OR xPublishError);
    -~~
    +~~~
    
     
  • aliazzz

    aliazzz - 2020-05-24
    • Description has changed:

    Diff:

    --- old
    +++ new
    @@ -31,10 +31,10 @@
     xPublishError := MQTT_PubSub.PublisherError( eError => ePublishError );
    
     MQTT_PubSub.Publish( Execute := xPublish,
    -                                                    PubTopic:= PublishTopic, 
    -                                                    PubMessage:= PublishMessage, 
    -                                                    PubQos:= PublishQos, 
    -                                                    PubRetain:= PublishRetain );
    +                                PubTopic:= PublishTopic, 
    +                                PubMessage:= PublishMessage, 
    +                                PubQos:= PublishQos, 
    +                                PubRetain:= PublishRetain );
    
     xPublish := xPublishBusy AND NOT(xPublishDone OR xPublishError);
     ~~~
    
     
  • aliazzz

    aliazzz - 2020-05-24
    • Description has changed:

    Diff:

    --- old
    +++ new
    @@ -1,20 +1,52 @@
    -PubSub exposes the pointer to the memory area and the lenght of the buffer.
    +PubSub can inform us a message has been received via pubsub.IsMessageReceived().
    +PubSub can then be called to obtain the message via pubsub.getmessage().
     The blob data which is received should be consumed by FB_Payload.
    
    -=> interaction/handshaking between PubSub and Payload in the usual way (xbusy, xdone, xerror) methods for this are provided.
    -=> pubsub.receive() should be called continuously and is by design asynchronous (I wrote it in a non blocking style/ therefore penalty is that it should be called continuously)
    -=> as the memory buffer of pubsub is envisioned to be filled ad-hoc with fresh data, payload should be able to process the received data within a single cycle.
    +thinking aloud:
    +Passing a pointer/length of the blob is far superior (as no data is physically copied around)
    +Binding a dynamic buffer is also under my consideration as an option if needed/wanted or has some unforseen advantage.
    +Offcourse the simplest idea is the best and should be chosen.
    
    +=> interaction/handshaking between PubSub and Payload in the usual way (xbusy, xdone, xerror) methods for this are provided. However I thought of the following, simple handshake;
    +
    +~~~
    +IF pubsub.IsMessageReceived() THEN
    +        // pseudocode call, true implementation will probably differ
    +        pubsub.GetMessage( pBuf=>pBuf, udiBufSize=>udiBufSize);
    +        xDecode := TRUE;
    +END_IF;
    +IF xDecode THEN
    +        // GO GO GADGET DECODE
    +        MyDecodedMessage := Payload.Decode( pBuf:=pBuf, udiBufSize:=udiBufSize);
    +        xDecode := FALSE;
    +END_IF;
    +~~~
    +
    +
    +=> 
    +~~~pubsub.IsMessageReceived()
    +~~~
    +
    +should be called continuously and is asynchronous by design as I wrote it in a non blocking style. Therefore the penalty is that it should be called continuously. If It can be designed in aonther (simpeler) way please let me know!
    +
    +=> As the memory buffer of pubsub is envisioned to be filled ad-hoc with fresh data, payload should be able to process the received data within a single cycle (!) 
    +
    +**Questions:**
     To mitigate send/receive issues, maybe we should implement a send/receive message buffer?
    -This buffer then can act as LIFO on and ideally isn't filled (the contents is sent/received directly), but when many events occur at once, the buffer gets utilized.
    -
    -Is such a buffer allready implemented within the MQTT lib?
    +This buffer then can act as LIFO on and ideally isn't filled (the contents is sent/received directly), but when many events occur at once, the buffer gets utilized. Is such a buffer even necessary ?
    +Is such a buffer allready implemented within the CODESYS IIot MQTT library?
     If not, and we should implement this buffer, where should it be implemented?
     My guts say it should be in the vicinity of the Payload either before/after or embedded into.
    -Ideas?
    +Any ideas?
    
    -////
    +In the CODESYS IIot MQTT library, is a single subscriber only capable of just subscribing to a single topic?
    +=> The provided example shows a subscribed topic per subscriber instance.
    +I assume this this means that for every subscription, a new FB instance is needed. Correct?
    +How many topics subscriptions should the EoN node subscribe too?
    +Thus how many subscribers should I implement (offcourse with an accompanying subscription strategy)?
    
    +
    +////////////////////////////////////////////////////
     Part2
     Also at this moment, I have code which publishes arbitrary messages.
     The initialisation of  sending a new message takes 2 cycles, as the first cycle resets the state machine (xDone = false, xBusy = false, Xerror = false, internal client state is dormant)
    @@ -38,5 +70,4 @@
    
     xPublish := xPublishBusy AND NOT(xPublishDone OR xPublishError);
     ~~~
    -
    -
    +////////////////////////////////////////////////////
    
     
  • aliazzz

    aliazzz - 2020-05-24
    • Description has changed:

    Diff:

    --- old
    +++ new
    @@ -24,10 +24,9 @@
    
    
     => 
    -~~~pubsub.IsMessageReceived()
    -~~~
    -
    -should be called continuously and is asynchronous by design as I wrote it in a non blocking style. Therefore the penalty is that it should be called continuously. If It can be designed in aonther (simpeler) way please let me know!
    +pubsub.IsMessageReceived()
    +should be called continuously and is asynchronous by design as I wrote it in a non blocking style on purpose. 
    +Therefore the penalty is that it should be called continuously to detect the edge and react on it as that is the only chance to grab the data. If this can be designed in **simpeler AND safer** way, please share your thoughts..
    
     => As the memory buffer of pubsub is envisioned to be filled ad-hoc with fresh data, payload should be able to process the received data within a single cycle (!) 
    
     
  • aliazzz

    aliazzz - 2020-05-24
    • Description has changed:

    Diff:

    --- old
    +++ new
    @@ -1,8 +1,8 @@
    -PubSub can inform us a message has been received via pubsub.IsMessageReceived().
    -PubSub can then be called to obtain the message via pubsub.getmessage().
    +PubSub can inform us that a message has been received via .IsMessageReceived().
    +PubSub can then be called to obtain the message via .getmessage().
     The blob data which is received should be consumed by FB_Payload.
    
    -thinking aloud:
    +Thinking aloud:
     Passing a pointer/length of the blob is far superior (as no data is physically copied around)
     Binding a dynamic buffer is also under my consideration as an option if needed/wanted or has some unforseen advantage.
     Offcourse the simplest idea is the best and should be chosen.
    @@ -22,8 +22,6 @@
     END_IF;
     ~~~
    
    -
    -=> 
     pubsub.IsMessageReceived()
     should be called continuously and is asynchronous by design as I wrote it in a non blocking style on purpose. 
     Therefore the penalty is that it should be called continuously to detect the edge and react on it as that is the only chance to grab the data. If this can be designed in **simpeler AND safer** way, please share your thoughts..
    
     
  • aliazzz

    aliazzz - 2020-05-31
    • labels: discussion about several points of attention -->
    • summary: Consumption of received messages by FB_Payload / FB_MQTT_PubSub --> FB_MQTT_PubSub Considerations
    • Description has changed:

    Diff:

    --- old
    +++ new
    @@ -1,70 +1,22 @@
    -PubSub can inform us that a message has been received via .IsMessageReceived().
    -PubSub can then be called to obtain the message via .getmessage().
    -The blob data which is received should be consumed by FB_Payload.
    +PubSub can inform us that a message has been received via .MessageReceived().
    +The message & topic then can be obtained via .getmessage().
    
    -Thinking aloud:
    -Passing a pointer/length of the blob is far superior (as no data is physically copied around)
    -Binding a dynamic buffer is also under my consideration as an option if needed/wanted or has some unforseen advantage.
    -Offcourse the simplest idea is the best and should be chosen.
    -
    -=> interaction/handshaking between PubSub and Payload in the usual way (xbusy, xdone, xerror) methods for this are provided. However I thought of the following, simple handshake;
    -
    +Example
     ~~~
    -IF pubsub.IsMessageReceived() THEN
    -        // pseudocode call, true implementation will probably differ
    -        pubsub.GetMessage( pBuf=>pBuf, udiBufSize=>udiBufSize);
    -        xDecode := TRUE;
    -END_IF;
    -IF xDecode THEN
    -        // GO GO GADGET DECODE
    -        MyDecodedMessage := Payload.Decode( pBuf:=pBuf, udiBufSize:=udiBufSize);
    -        xDecode := FALSE;
    +IF PubSub.MessageReceived() THEN
    +        myRecvMessage := PubSub.MyMessage( wsTopic => MyTopic);
    +        
    +        // pass the message to the payload decoder
    +        MyDecodedMessage := Payload.Decode( pBuf:=ADR(myRecvMessage),udiBufSize:=SIZEOF(myRecvMessage));
     END_IF;
     ~~~
    
    -pubsub.IsMessageReceived()
    -should be called continuously and is asynchronous by design as I wrote it in a non blocking style on purpose. 
    -Therefore the penalty is that it should be called continuously to detect the edge and react on it as that is the only chance to grab the data. If this can be designed in **simpeler AND safer** way, please share your thoughts..
    +=> Passing a pointer/length of the blob (the undecoded received message) is far superior in speed and resources as no data is physically copied around. Binding a dynamic buffer is under future consideration.
    
    -=> As the memory buffer of pubsub is envisioned to be filled ad-hoc with fresh data, payload should be able to process the received data within a single cycle (!) 
    +=> Interaction/handshaking between PubSub and Payload in the usual way (xbusy, xdone, xerror) and methods for this are provided. 
    
    -**Questions:**
    -To mitigate send/receive issues, maybe we should implement a send/receive message buffer?
    -This buffer then can act as LIFO on and ideally isn't filled (the contents is sent/received directly), but when many events occur at once, the buffer gets utilized. Is such a buffer even necessary ?
    -Is such a buffer allready implemented within the CODESYS IIot MQTT library?
    -If not, and we should implement this buffer, where should it be implemented?
    -My guts say it should be in the vicinity of the Payload either before/after or embedded into.
    -Any ideas?
    +=> according to the documentation of the CODESYTS IIoT lib, there is a maximum message payload length.
    
     In the CODESYS IIot MQTT library, is a single subscriber only capable of just subscribing to a single topic?
    -=> The provided example shows a subscribed topic per subscriber instance.
    -I assume this this means that for every subscription, a new FB instance is needed. Correct?
    -How many topics subscriptions should the EoN node subscribe too?
    -Thus how many subscribers should I implement (offcourse with an accompanying subscription strategy)?
    +=> I Assume so, as the provided example shows a single topic per subscriber instance.
    
    -
    -////////////////////////////////////////////////////
    -Part2
    -Also at this moment, I have code which publishes arbitrary messages.
    -The initialisation of  sending a new message takes 2 cycles, as the first cycle resets the state machine (xDone = false, xBusy = false, Xerror = false, internal client state is dormant)
    -then a rising edge on exectue sends a new message.
    -
    -I guess there is no "shorter" way to handshake the process, as the stats output need to be digested after a send. When Execute is set to False, the statemachine is reset and dormant.
    -
    -EDIT => part2 Solved
    -
    -~~~
    -// publish
    -xPublishBusy := MQTT_PubSub.PublisherBusy();
    -xPublishDone := MQTT_PubSub.PublisherDone();
    -xPublishError := MQTT_PubSub.PublisherError( eError => ePublishError );
    -
    -MQTT_PubSub.Publish( Execute := xPublish,
    -                                PubTopic:= PublishTopic, 
    -                                PubMessage:= PublishMessage, 
    -                                PubQos:= PublishQos, 
    -                                PubRetain:= PublishRetain );
    -               
    -xPublish := xPublishBusy AND NOT(xPublishDone OR xPublishError);
    -~~~
    -////////////////////////////////////////////////////
    
    • assigned_to: aliazzz
     
  • aliazzz

    aliazzz - 2020-06-20
    • status: open --> pending
     
  • i-campbell

    i-campbell - 2020-06-20

    You can use the wildcards defined my MQTT, but I don't see anyway to add a list of topics.
    So '#' is multilevel topic subscription. '+' is single level topic subscription.

     
  • aliazzz

    aliazzz - 2020-06-20

    I assume that the subscriber implemented in the IIoT suite MQTT library can only subscribe a single topic.
    That is why I allready implemented two subscribers in the FB_MQTT_PubSub located in in my private branch as a single topic does not suffice for sparkplugB.

    I checked the CODESYS documentation whether a list of topics can be added to a single subscriber, but I gathered it cannot be achieved. The provided example also implements two seperate subscribers. Granted I didn't try it, but the example should have shown this if it was a feature.

    I still have to merge it into the trunk and add the appropriate unittests.

     
  • aliazzz

    aliazzz - 2020-06-27
    • status: pending --> closed
     

Log in to post a comment.