ESP32 either SMD, DevBoard, esp32-idf4-20200902-v1.13.bin
Video https://youtu.be/TezDU61fiCw
Code https://github.com/divergentti/airquali ... en/main.py
Network part of the code:
Code: Select all
class ConnectWiFi:
""" This class creates network object for WiFi-connection. SSID may be defined in the parameters.py or
user may input a password, which is tried to WiFi APs within range. """
def __init__(self):
# Check if we are already connected
self.ip_address = None
self.wifi_strenth = None
self.timeset = False
self.webrepl_started = False
self.searh_list = []
self.ssid_list = []
self.mqttclient = None
if network.WLAN(network.STA_IF).config('essid') != '':
display.row_by_row_text("Connected to network %s" % network.WLAN(network.STA_IF).config('essid'), 'fuschia')
self.use_ssid = network.WLAN(network.STA_IF).config('essid')
display.row_by_row_text(('IP-address: %s' % network.WLAN(network.STA_IF).ifconfig()[0]), 'fuschia')
self.ip_address = network.WLAN(network.STA_IF).ifconfig()[0]
display.row_by_row_text("WiFi-signal strength %s" % (network.WLAN(network.STA_IF).status('rssi')), 'fuschia')
self.wifi_strenth = network.WLAN(network.STA_IF).status('rssi')
self.network_connected = True
self.use_ssid = network.WLAN(network.STA_IF).config('essid')
# resolve is essid in the predefined networks presented in parameters.py
if self.use_ssid == SSID1:
self.use_password = PASSWORD1
self.predefined = True
elif self.use_ssid == SSID2:
self.use_password = PASSWORD2
self.predefined = True
else:
# We are not connected, check if SSID1 or SSID2 is predefined
if (SSID1 is not None) or (SSID2 is not None):
self.predefined = True
else:
self.predefined = False
self.password = None
self.use_password = None # Password may be from parameters.py or from user input
self.use_ssid = None # SSID to be used will be decided later even it is in the parameters.py
self.network_connected = False
self.search_wifi_networks()
def start_webrepl(self):
if self.webrepl_started is False:
if WEBREPL_PASSWORD is not None:
try:
webrepl.start(password=WEBREPL_PASSWORD)
self.webrepl_started = True
except OSError:
pass
else:
try:
webrepl.start()
self.webrepl_started = True
except OSError as e:
print("WebREPL do not start. Error %s" % e)
display.row_by_row_text("WebREPL do not start. Error %s" % e, 'red')
return False
def set_time(self):
if self.timeset is False:
try:
ntptime.settime()
self.timeset = True
except OSError as e:
print("No time from NTP server %s! Error %s" % (NTPSERVER, e))
display.row_by_row_text("No time from NTP server %s! Error %s" % (NTPSERVER, e), 'red')
self.timeset = True
return False
print("Time: %s " % str(utime.localtime(utime.time())))
display.row_by_row_text("Time: %s " % str(utime.localtime(utime.time())), 'white')
def search_wifi_networks(self):
# Begin with adapter reset
network.WLAN(network.STA_IF).active(False)
utime.sleep(1)
network.WLAN(network.STA_IF).active(True)
utime.sleep(2)
if DHCP_NAME is not None:
network.WLAN(network.STA_IF).config(dhcp_hostname=DHCP_NAME)
if NTPSERVER is not None:
ntptime.host = NTPSERVER
display.row_by_row_text("Check what hotspots we see", 'white')
try:
# Generate list of WiFi hotspots in range
self.ssid_list = network.WLAN(network.STA_IF).scan()
utime.sleep(3)
except self.ssid_list == []:
print("No WiFi-networks within range!")
display.row_by_row_text("No WiFi-networks within range!", 'red')
utime.sleep(10)
except OSError:
return False
if len(self.ssid_list) > 0:
display.row_by_row_text("Found following hotspots:", 'white')
for i in self.ssid_list:
display.row_by_row_text(i[0].decode(), 'white')
if self.predefined is True:
# Network to be connected is in the parameters.py. Check if SSID1 or SSID2 is in the list
print("Checking if paramaters.py networks are in the list...")
display.row_by_row_text("Checking predefined networks...", 'white')
try:
self.searh_list = [item for item in self.ssid_list if item[0].decode() == SSID1 or
item[0].decode() == SSID2]
except ValueError:
# SSDI not found within signal range
print("Parameters.py SSIDs not found in the signal range!")
display.row_by_row_text("Parameters.py SSIDs not found in the signal range!", 'red')
utime.sleep(10)
return False
# If both are found, select one which has highest stregth
if len(self.searh_list) == 2:
# third from end of list is rssi
if self.searh_list[0][-3] > self.searh_list[1][-3]:
self.use_ssid = self.searh_list[0][0].decode()
self.use_password = PASSWORD1
display.row_by_row_text("Using hotspot: %s" % self.use_ssid, 'yellow')
else:
self.use_ssid = self.searh_list[1][0].decode()
self.use_password = PASSWORD2
display.row_by_row_text("Using hotspot: %s" % self.use_ssid, 'yellow')
else:
# only 1 in the list
self.use_ssid = self.searh_list[0][0].decode()
self.use_password = PASSWORD1
display.row_by_row_text("Using hotspot: %s" % self.use_ssid, 'yellow')
if self.predefined is False:
# Networks not defined in the parameters.py, let's try password to any WiFi order by signal strength
# Tries empty password too
# ToDo: rebuild, ask user input which hotspot we want to connect!
self.ssid_list.sort(key=lambda x: [x][-3])
if len(self.ssid_list) == 1:
self.use_ssid = self.ssid_list[0][0].decode()
elif len(self.ssid_list) > 1:
z = 0
while (network.WLAN(network.STA_IF).ifconfig()[0] == '0.0.0.0') and (z <= len(self.ssid_list)) and \
(self.network_connected is False):
self.use_ssid = self.ssid_list[z][0].decode()
z = +1
def connect_to_network(self):
# We know which network we should connect to, but shall we connect?
print("Connecting to AP %s ..." % self.use_ssid)
display.row_by_row_text("Connecting to AP %s ..." % self.use_ssid, 'white')
try:
network.WLAN(network.STA_IF).connect(self.use_ssid, self.use_password)
utime.sleep(5)
except network.WLAN(network.STA_IF).ifconfig()[0] == '0.0.0.0':
print("No IP address!")
display.row_by_row_text("No IP address!", 'red')
utime.sleep(10)
return False
except OSError:
pass
finally:
if network.WLAN(network.STA_IF).ifconfig()[0] != '0.0.0.0':
self.set_time()
self.start_webrepl()
display.row_by_row_text("Connected to network %s" % network.WLAN(network.STA_IF).config('essid'),
'white')
self.use_ssid = network.WLAN(network.STA_IF).config('essid')
display.row_by_row_text(('IP-address: %s' % network.WLAN(network.STA_IF).ifconfig()[0]), 'white')
self.ip_address = network.WLAN(network.STA_IF).ifconfig()[0]
display.row_by_row_text("WiFi-signal strength %s" % (network.WLAN(network.STA_IF).status('rssi')),
'white')
self.wifi_strenth = network.WLAN(network.STA_IF).status('rssi')
self.network_connected = True
return True
else:
display.row_by_row_text("No network connection! Soft rebooting in 10s...", 'red')
self.network_connected = False
utime.sleep(10)
reset()
async def update_status_loop(self):
while True:
if self.network_connected is True:
self.use_ssid = network.WLAN(network.STA_IF).config('essid')
self.ip_address = network.WLAN(network.STA_IF).ifconfig()[0]
self.wifi_strenth = network.WLAN(network.STA_IF).status('rssi')
await asyncio.sleep(1)
def mqtt_init(self):
config['server'] = MQTT_SERVER
config['ssid'] = self.use_ssid
config['wifi_pw'] = self.use_password
config['user'] = MQTT_USER
config['password'] = MQTT_PASSWORD
config['port'] = MQTT_PORT
config['client_id'] = CLIENT_ID
config['subs_cb'] = self.update_mqtt_status
config['connect_coro'] = self.mqtt_subscribe
# Communication object
self.mqttclient = MQTTClient(config)
async def mqtt_up_loop(self):
# This loop just keeps the mqtt connection up
await self.mqtt_subscribe()
n = 0
while True:
await asyncio.sleep(5)
print('mqtt-publish', n)
await self.mqttclient.publish('result', '{}'.format(n), qos=1)
n += 1
async def mqtt_subscribe(self):
await asyncio.sleep(1)
# await self.mqttclient.subscribe(TOPIC_OUTDOOR, 0)
def update_mqtt_status(self, topic, msg, retained):
# print("Topic: %s, message %s" % (topic, msg))
status = int(msg)
if status == 1:
print("daa")
elif status == 0:
print("kaa")