- Interrupt is being executed during outages, which is good so I can build a watchdog
- I get this log from time to time
Code: Select all
Coroutine <generator object '_handle_msg' at 3fff6e50> took 4294968ms
On the other hand that log would actually mean that no other coroutine was executed for 1h 10 minutes as only the execution of a coroutine until the next yield is meassured. But as I log something else every 10 minutes I know that this did not happen and there were coroutines scheduled during that time.
This makes me wonder if my changes to uasyncio to log the execution time of coroutines work correctly. Changes are covered in ##########
Code: Select all
def run_forever(self):
cur_task = [0, 0, 0]
while True:
# Expire entries in waitq and move them to runq
tnow = self.time()
while self.waitq:
t = self.waitq.peektime()
delay = time.ticks_diff(t, tnow)
if delay > 0:
break
self.waitq.pop(cur_task)
if __debug__ and DEBUG:
log.debug("Moving from waitq to runq: %s", cur_task[1])
self.call_soon(cur_task[1], *cur_task[2])
# Process runq
l = len(self.runq)
if __debug__ and DEBUG:
log.debug("Entries in runq: %d", l)
while l:
cb = self.runq.popleft()
l -= 1
args = ()
if not isinstance(cb, type_gen):
args = self.runq.popleft()
l -= 1
if __debug__ and DEBUG:
log.info("Next callback to run: %s", (cb, args))
#############################
st = time.time()
cb(*args)
et = time.time()
if (et - st) > 10000:
global_log.error("Callback {!s} took {!s}ms".format(cb, et - st))
#############################
continue
if __debug__ and DEBUG:
log.info("Next coroutine to run: %s", (cb, args))
self.cur_task = cb
delay = 0
##############################
st = time.ticks_ms()
#############################
try:
if args is ():
ret = next(cb)
else:
ret = cb.send(*args)
if __debug__ and DEBUG:
log.info("Coroutine %s yield result: %s", cb, ret)
if isinstance(ret, SysCall1):
arg = ret.arg
if isinstance(ret, SleepMs):
delay = arg
elif isinstance(ret, IORead):
cb.pend_throw(False)
self.add_reader(arg, cb)
#############################
et = time.ticks_ms()
if (et - st) > 10000:
global_log.error("Coroutine {!s} took {!s}ms".format(cb, et - st))
##############################
continue
elif isinstance(ret, IOWrite):
cb.pend_throw(False)
self.add_writer(arg, cb)
#############################
et = time.ticks_ms()
if (et - st) > 10000:
global_log.error("Coroutine {!s} took {!s}ms".format(cb, et - st))
##############################
continue
elif isinstance(ret, IOReadDone):
self.remove_reader(arg)
elif isinstance(ret, IOWriteDone):
self.remove_writer(arg)
elif isinstance(ret, StopLoop):
return arg
else:
assert False, "Unknown syscall yielded: %r (of type %r)" % (ret, type(ret))
elif isinstance(ret, type_gen):
self.call_soon(ret)
elif isinstance(ret, int):
# Delay
delay = ret
elif ret is None:
# Just reschedule
pass
elif ret is False:
# Don't reschedule
#############################
et = time.ticks_ms()
if (et - st) > 10000:
global_log.error("Coroutine {!s} took {!s}ms".format(cb, et - st))
##############################
continue
else:
assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret))
except StopIteration as e:
if __debug__ and DEBUG:
log.debug("Coroutine finished: %s", cb)
#############################
et = time.ticks_ms()
if (et - st) > 10000:
global_log.error("Coroutine {!s} took {!s}ms".format(cb, et - st))
##############################
continue
except CancelledError as e:
if __debug__ and DEBUG:
log.debug("Coroutine cancelled: %s", cb)
#############################
et = time.ticks_ms()
if (et - st) > 10000:
global_log.error("Coroutine {!s} took {!s}ms".format(cb, et - st))
##############################
continue
#############################
et = time.ticks_ms()
if (et - st) > 10000:
global_log.error("Coroutine {!s} took {!s}ms".format(cb, et - st))
##############################
# Currently all syscalls don't return anything, so we don't
# need to feed anything to the next invocation of coroutine.
# If that changes, need to pass that value below.
if delay:
self.call_later_ms(delay, cb)
else:
self.call_soon(cb)
# Wait until next waitq task or I/O availability
delay = 0
if not self.runq:
delay = -1
if self.waitq:
tnow = self.time()
t = self.waitq.peektime()
delay = time.ticks_diff(t, tnow)
if delay < 0:
delay = 0
self.wait(delay)
A reconnected log happens from time to time but never close to "Coroutine <generator object '_handle_msg' at 3fff6e50> took 4294968ms" but sometimes close to the log about the outage (which makes sense of course as it was offline).
So I'm actually quite confused about the information I got. Maybe @pythoncoder can help me out with this as it's mostly about your mqtt library and you know asyncio very well?