MQTT memory leak problem

gustavocsw
2024-09-09
2024-09-09
  • gustavocsw - 2024-09-09

    Hello everyone,
    I'm using the IoT Library to implement the MQTT communication with my local broker server in order to publish and subscribe at specifics topics to share and consume information about my application. But, it seems that are occurring some memory leak problem in a "high" frequency (more than 10 Hz) subscribe process.
    I follow the same method as in IoT Lib exemples, and at first looks perfect but my PLC was rebooting frequently and when I check its memory usage that was increasing as fast as the subscribe massage was sent.
    I'm using a WEG PLC410 and a WEG PLC500, and this error occurred in both of them (including in CODESYS Control Win x64). The application sends to the system a message JSON with the float payload Ex. {"data" : 0.8500}, but this happens with a INT, or BOL as well.

    I use the follow code in my application to find the value:

    //FindFirstValueByKey VARs
    PROGRAM JSON_VELO
    VAR
    //------Setting the JSON Subscriber to Set the Relay Value
      jsonDataVelo          : JSON.JSONData;
      jsonByteArrayReaderVelo      : JSON.JSONByteArrayReader;
      xST1okVelo            : BOOL;
      FindFirstValueByKeyVelo      : JSON.FindFirstValueByKey;
      jsonElementVelo          : JSON.JSONElement;
      xDoneReaderVelo          : BOOL;
      xDoneFindVelo          : BOOL;
    //STRING and WSTRING for Subscribe the massage  
      sPayloadJsonVelo   : STRING := 'opa';
      psPayloadJsonVelo  : POINTER TO BYTE := ADR(sPayloadJsonVelo);  
      //wsPayloadJsonRelaySet   : WSTRING := "opa";
      wsPayloadJsonVelo   : WSTRING := STRING_TO_WSTRING('opa');
      pwsPayloadJsonVelo  : POINTER TO WORD := ADR(wsPayloadJsonVelo);
      lrVelo        : LREAL;
      xKeepAliveVelo    : BOOL;
      xSetVelo      : BOOL;
      RSSet        : RS;
      LIMPAR        : STRING;
      //Find the msg end
      sFindVelo       : STRING := '}';
      psFindVelo       : POINTER TO STRING := ADR(sFindVelo);
      iLenVelo      : INT;
      iSizeVelo       : INT := 12;

    udiContMsg      : UDINT;
    END_VAR

    // FindFirstValueByKey CODE
    // Relay Set configuration
    xSetVelo := MQTT_SUBSCRIBER.RSVelo.Q1;
    IF xSetVelo THEN
      xKeepAliveVelo := TRUE;
    END_IF
    IF xKeepAliveVelo THEN
      udiContMsg := udiContMsg + 1;
      iLenVelo := TO_INT(StrLenA(psPayloadJsonVelo));
      iSizeVelo := iLenVelo - TO_INT(MQTT_SUBSCRIBER.udiPayloadSizeVelo);
      StrDeleteA(psPayloadJsonVelo,iSizeVelo,iLenVelo);
      wsPayloadJsonVelo := STRING_TO_WSTRING(sPayloadJsonVelo);
      pwsPayloadJsonVelo := ADR(wsPayloadJsonVelo);

    //MQTT.ConvertUTF8toUTF16(sourceStart:= ADR(sPayloadJsonVelo), targetStart:= ADR(wsPayloadJsonVelo), dwTargetBufferSize:= TAM, bStrictConversion:= 1);
      //Reset jsonByteArrayReader
      jsonByteArrayReaderVelo (
                    xExecute   := TRUE,
                    pwData     := pwsPayloadJsonVelo,
                    jsonData   := jsonDataVelo,
                    xDone     => xDoneReaderVelo
                    );
      FindFirstValueByKeyVelo(
                    xExecute   := xDoneReaderVelo,
                    wsKey     := "data",
                    diStartIndex:= 0,
                    jsonData   := jsonDataVelo,
                    jsonElement => jsonElementVelo,
                    xDone    => xDoneFindVelo
                    );                
      IF   xDoneFindVelo THEN
        lrVelo := jsonElementVelo.value.lrValue;
        //Reset jsonByteArrayReader
        jsonByteArrayReaderVelo (
                  xExecute   := FALSE,
                  pwData     := pwsPayloadJsonVelo,
                  jsonData   := jsonDataVelo,
                  xDone     => xDoneReaderVelo
                  );
        FindFirstValueByKeyVelo(
                    xExecute   := FALSE,
                    wsKey     := "data",
                    diStartIndex:= 1,
                    jsonData   := jsonDataVelo,
                    jsonElement => jsonElementVelo,
                    xDone    => xDoneFindVelo
                    );  
        xKeepAliveVelo := FALSE;
        GVL.xSetVeloRead := TRUE;
      END_IF
    END_IF

    And this to subscribe at the topic:

    //SUBSCRIBE VAR:
    //----------------- Subscribe Velocity -----------------------
      MQTTSubscribeVelo    : MQTT.MQTTSubscribe;//Variable MQTTSubscriber block -X - function-X
      wsTopicSubscribeVelo   : WSTRING(1024) := "CORE/odometry/GET/data/simp"; // Topic to publish a message
      sSubscribeMassageVelo  : STRING;
      udiPayloadSizeVelo    : UDINT;
      xSDoneVelo   : BOOL;
      xSErrorVelo   : BOOL;
      xReceiveVelo      : BOOL;
      eSTypeVelo   : MQTT.MQTT_ERROR;
      eSMQTTErrorVelo      : MQTT.MQTT_ERROR;
      RSVelo          : RS;  
      udiCont          : UDINT;

    //SUBSCRIBE CODE:
    MQTTSubscribeVelo(
            xEnable:= MQTT_CLIENT.xConnection_Broker AND NOT xSErrorVelo AND NOT JSON_VELO.xKeepAliveVelo,
            pbPayload:= JSON_VELO.psPayloadJsonVelo,
            udiMaxPayloadSize:= SIZEOF(JSON_VELO.sPayloadJsonVelo),
            udiPayloadSize => udiPayloadSizeVelo,
            mqttClient:= MQTT_CLIENT.ClientMQTT,
            wsTopicFilter:=wsTopicSubscribeVelo,
            xDone => xSDoneVelo,
            xError=> xSErrorVelo,
            xReceived => xReceiveVelo,
            eMQTTError=> eSMQTTErrorVelo
            );
            RSVelo(SET := xReceiveVelo, RESET1 := JSON_VELO.xKeepAliveVelo);

     
  • eschwellinger

    eschwellinger - 2024-09-09

    which version are you using? is this latest available version?

     
    • gustavocsw - 2024-09-09

      I'm using the V3.5 PS19 Patch4. That is a problem with that version with IoT Lib?

       

Log in to post a comment.