Code: Select all
import uasyncio as asyncio
l = Lock()
async def test():
async with l:
print("hi")
return False
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(test())
hi
>>> print(l.locked())
True
This is my implementation (based on the Lock class in mqtt_as by @pythoncoder:
Code: Select all
class Lock():
def __init__(self):
self._locked = False
async def __aenter__(self):
while True:
if self._locked:
await asyncio.sleep_ms(_DEFAULT_MS)
else:
self._locked = True
break
async def __aexit__(self, *args):
self._locked = False
await asyncio.sleep_ms(_DEFAULT_MS)
def locked(self):
return self._locked