OSError: [Errno 104] ECONNRESET in publish function in MQTT library

Questions and discussion about The WiPy 1.0 board and CC3200 boards.
Target audience: Users with a WiPy 1.0 or CC3200 board.
Post Reply
gatorchu
Posts: 20
Joined: Sun Sep 22, 2019 3:50 pm

OSError: [Errno 104] ECONNRESET in publish function in MQTT library

Post by gatorchu » Tue Nov 26, 2019 10:28 pm

Hi,

I have a Lopy board that collects the sensor's data over serial and logs them to the local SD card and then publishes the data to the database using the MQTT protocol. Most of the time the mqtt works properly, but sometimes it was errored out (I don't know what does exactly the error message mean.). Below is the error message, I print the raw data on REPL every second and log the 5-min average. The error often happened after logging the data to the SD card.

Code: Select all

Raw:b'000.010,2.0,+27.2,031,1016.3,00,*01528\r\n', length:45
Raw:b'000.010,2.0,+27.2,031,1016.3,00,*01528\r\n', length:45
LOGGED: 2019-11-26 16:30:00,EST,9,0,2.0,27.2,31,1016.3,00,V
Traceback (most recent call last):
  File "main.py", line 223, in <module>
  File "/flash/lib/mqtt.py", line 122, in publish
OSError: [Errno 104] ECONNRESET
Pycom MicroPython 1.20.0.rc13 [v1.9.4-94bb382] on 2019-08-22; LoPy4 with ESP32
Type "help()" for more information.
>>>
>>>
The 122 line in my MQTT library is:

Code: Select all

self.sock.write(pkt, i + 1)
I had tried to print out the "pkt", sometimes I got

Code: Select all

0x4 b'30:4f:00:00'
, sometimes

Code: Select all

0x4 b'30:52:00:00'
(don't know what does it mean).

I had tried to add 50 ms delay to see if it solve the issue, but no help.

If you have any clues or instructions that can address this issue, please let me know! Thank you in advance.

I also copy the MQTT library that I used for my project.

Code: Select all

#!/usr/bin/env python
#
# Copyright (c) 2019, Pycom Limited.
#
# This software is licensed under the GNU GPL version 3 or any
# later version, with permitted additional terms. For more information
# see the Pycom Licence v1.0 document supplied with this file, or
# available at https://www.pycom.io/opensource/licensing
#

import usocket as socket
import ustruct as struct
import time
from ubinascii import hexlify

class MQTTException(Exception):
    pass

class MQTTClient:

    def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
                 ssl=False, ssl_params={}):
        if port == 0:
            port = 8883 if ssl else 1883
        self.client_id = client_id
        self.sock = None
        self.addr = socket.getaddrinfo(server, port)[0][-1]
        self.ssl = ssl
        self.ssl_params = ssl_params
        self.pid = 0
        self.cb = None
        self.user = user
        self.pswd = password
        self.keepalive = keepalive
        self.lw_topic = None
        self.lw_msg = None
        self.lw_qos = 0
        self.lw_retain = False

    def _send_str(self, s):
        self.sock.write(struct.pack("!H", len(s)))
        self.sock.write(s)

    def _recv_len(self):
        n = 0
        sh = 0
        while 1:
            b = self.sock.read(1)[0]
            n |= (b & 0x7f) << sh
            if not b & 0x80:
                return n
            sh += 7

    def set_callback(self, f):
        self.cb = f

    def set_last_will(self, topic, msg, retain=False, qos=0):
        assert 0 <= qos <= 2
        assert topic
        self.lw_topic = topic
        self.lw_msg = msg
        self.lw_qos = qos
        self.lw_retain = retain

    def connect(self, clean_session=True):
        self.sock = socket.socket()
        self.sock.connect(self.addr)
        if self.ssl:
            import ussl
            self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
        msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0")
        msg[1] = 10 + 2 + len(self.client_id)
        msg[9] = clean_session << 1
        if self.user is not None:
            msg[1] += 2 + len(self.user) + 2 + len(self.pswd)
            msg[9] |= 0xC0
        if self.keepalive:
            assert self.keepalive < 65536
            msg[10] |= self.keepalive >> 8
            msg[11] |= self.keepalive & 0x00FF
        if self.lw_topic:
            msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
            msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
            msg[9] |= self.lw_retain << 5
        self.sock.write(msg)
        #print(hex(len(msg)), hexlify(msg, ":"))
        self._send_str(self.client_id)
        if self.lw_topic:
            self._send_str(self.lw_topic)
            self._send_str(self.lw_msg)
        if self.user is not None:
            self._send_str(self.user)
            self._send_str(self.pswd)
        resp = self.sock.read(4)
        assert resp[0] == 0x20 and resp[1] == 0x02
        if resp[3] != 0:
            raise MQTTException(resp[3])
        return resp[2] & 1

    def disconnect(self):
        self.sock.write(b"\xe0\0")
        self.sock.close()

    def ping(self):
        self.sock.write(b"\xc0\0")

    def publish(self, topic, msg, retain=False, qos=0):
        pkt = bytearray(b"\x30\0\0\0")
        pkt[0] |= qos << 1 | retain
        sz = 2 + len(topic) + len(msg)
        if qos > 0:
            sz += 2
        assert sz < 2097152
        i = 1
        while sz > 0x7f:
            pkt[i] = (sz & 0x7f) | 0x80
            sz >>= 7
            i += 1
        pkt[i] = sz
        #print(hex(len(pkt)), hexlify(pkt, ":"))
        #time.sleep_ms(50)   # add sleep for test
        self.sock.write(pkt, i + 1)
        #time.sleep_ms(50)   # add sleep for test
        self._send_str(topic)
        if qos > 0:
            self.pid += 1
            pid = self.pid
            struct.pack_into("!H", pkt, 0, pid)
            self.sock.write(pkt, 2)
        self.sock.write(msg)
        if qos == 1:
            while 1:
                op = self.wait_msg()
                if op == 0x40:
                    sz = self.sock.read(1)
                    assert sz == b"\x02"
                    rcv_pid = self.sock.read(2)
                    rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
                    if pid == rcv_pid:
                        return
        elif qos == 2:
            assert 0

    def subscribe(self, topic, qos=0):
        assert self.cb is not None, "Subscribe callback is not set"
        pkt = bytearray(b"\x82\0\0\0")
        self.pid += 1
        struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
        #print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(pkt)
        self._send_str(topic)
        self.sock.write(qos.to_bytes(1, 'little'))
        while 1:
            op = self.wait_msg()
            if op == 0x90:
                resp = self.sock.read(4)
                #print(resp)
                assert resp[1] == pkt[2] and resp[2] == pkt[3]
                if resp[3] == 0x80:
                    raise MQTTException(resp[3])
                return

    # Wait for a single incoming MQTT message and process it.
    # Subscribed messages are delivered to a callback previously
    # set by .set_callback() method. Other (internal) MQTT
    # messages processed internally.
    def wait_msg(self):
        res = self.sock.read(1)
        self.sock.setblocking(True)
        if res is None:
            return None
        if res == b"":
            raise OSError(-1)
        if res == b"\xd0":  # PINGRESP
            sz = self.sock.read(1)[0]
            assert sz == 0
            return None
        op = res[0]
        if op & 0xf0 != 0x30:
            return op
        sz = self._recv_len()
        topic_len = self.sock.read(2)
        topic_len = (topic_len[0] << 8) | topic_len[1]
        topic = self.sock.read(topic_len)
        sz -= topic_len + 2
        if op & 6:
            pid = self.sock.read(2)
            pid = pid[0] << 8 | pid[1]
            sz -= 2
        msg = self.sock.read(sz)
        self.cb(topic, msg)
        if op & 6 == 2:
            pkt = bytearray(b"\x40\x02\0\0")
            struct.pack_into("!H", pkt, 2, pid)
            self.sock.write(pkt)
        elif op & 6 == 4:
            assert 0

    # Checks whether a pending message from server is available.
    # If not, returns immediately with None. Otherwise, does
    # the same processing as wait_msg.
    def check_msg(self):
        self.sock.setblocking(False)
        return self.wait_msg()

gatorchu
Posts: 20
Joined: Sun Sep 22, 2019 3:50 pm

Re: OSError: [Errno 104] ECONNRESET in publish function in MQTT library

Post by gatorchu » Mon Dec 09, 2019 4:04 am

:geek: anyone please gives some clues!

User avatar
pythoncoder
Posts: 3716
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: OSError: [Errno 104] ECONNRESET in publish function in MQTT library

Post by pythoncoder » Mon Dec 09, 2019 11:22 am

The LoPy is a Pycom board with their own fork of MicroPython. You may get a better response in the Pycom forum where users are more likely to own that board.
Peter Hinch

gatorchu
Posts: 20
Joined: Sun Sep 22, 2019 3:50 pm

Re: OSError: [Errno 104] ECONNRESET in publish function in MQTT library

Post by gatorchu » Tue Dec 10, 2019 2:04 am

pythoncoder wrote:
Mon Dec 09, 2019 11:22 am
The LoPy is a Pycom board with their own fork of MicroPython. You may get a better response in the Pycom forum where users are more likely to own that board.
Thank you pythoncoder!

I had left the message several months ago. Seems like I haven't got any reply from there yet.

User avatar
MostlyHarmless
Posts: 65
Joined: Thu Nov 21, 2019 6:25 pm
Location: Pennsylvania, USA

Re: OSError: [Errno 104] ECONNRESET in publish function in MQTT library

Post by MostlyHarmless » Tue Dec 10, 2019 3:33 am

In standard TCP/IP networking, ECONNRESET (Connection reset by peer) means one of:
  1. your local TCP/IP stack determined via keepalive or timeout that the remote is not responding,
  2. the remote process has closed the socket without consuming the data you sent, so the remote kernel is generating an RST packet,
  3. or a router in between considered the connection dead and is kind enough to generate an RST packet instead of just dropping it silently (most consumer grade routers aren't kind and they usually do that when using port forwarding with whatever the programmer thought is an appropriate timeout).
It will be hard to tell what exactly happened unless you are quite familiar with TCP on the wire and can intercept traffic at various network points via packet sniffers.

@pythoncoder: Am I guessing right that the uPy MQTT implementation is propagating the error up because it doesn't have a reliable way to queue messages for later retransmit attempts?


Regards, Jan

User avatar
pythoncoder
Posts: 3716
Joined: Fri Jul 18, 2014 8:01 am
Location: UK
Contact:

Re: OSError: [Errno 104] ECONNRESET in publish function in MQTT library

Post by pythoncoder » Tue Dec 10, 2019 8:51 am

MostlyHarmless wrote:
Tue Dec 10, 2019 3:33 am
...
@pythoncoder: Am I guessing right that the uPy MQTT implementation is propagating the error up because it doesn't have a reliable way to queue messages for later retransmit attempts?...
The code in the OP looks like a copy of the official MQTT implementation. This has poor error recovery which is why I wrote a resilient MQTT module. This aims to honour qos==1 communication in the face of WiFi and/or broker outages. Inevitably communication will be delayed for the duration of an outage, but the qos==1 guarantee will be met once connectivity resumes. The official version can throw exceptions or even hang under these circumstances.
Peter Hinch

User avatar
MostlyHarmless
Posts: 65
Joined: Thu Nov 21, 2019 6:25 pm
Location: Pennsylvania, USA

Re: OSError: [Errno 104] ECONNRESET in publish function in MQTT library

Post by MostlyHarmless » Tue Dec 10, 2019 1:59 pm

pythoncoder wrote:
Tue Dec 10, 2019 8:51 am
The code in the OP looks like a copy of the official MQTT implementation. This has poor error recovery which is why I wrote a resilient MQTT module. This aims to honour qos==1 communication in the face of WiFi and/or broker outages.
Thanks for both, the code and the explanation.


Regards, Jan

Post Reply