I'm home, and at Thonny, and it's now I discover the plot thickens!
It seems I already have an edited version of the library file we're discussing. I'm not sure now where I got it from because I haven't edited it. This is what it looks like (sorry, I can't give you a link as it's only on my laptop without hunting on the internet again)
Code: Select all
from machine import I2C, Pin
import time
__version__ = '0.2.3'
__author__ = 'Roberto Sánchez'
__license__ = "Apache License 2.0. https://www.apache.org/licenses/LICENSE-2.0"
# I2C address B 0x44 ADDR (pin 2) not connected to VDD
DEFAULT_I2C_ADDRESS = 0x44
# I2C address B 0x45 ADDR (pin 2) connected to VDD
#DEFAULT_I2C_ADDRESS = 0x45
class SHT30:
"""
SHT30 sensor driver in pure python based on I2C bus
References:
* https://www.sensirion.com/fileadmin/user_upload/customers/sensirion/Dokumente/2_Humidity_Sensors/Sensirion_Humidity_Sensors_SHT3x_Datasheet_digital.pdf # NOQA
* https://www.wemos.cc/sites/default/files/2016-11/SHT30-DIS_datasheet.pdf
* https://github.com/wemos/WEMOS_SHT3x_Arduino_Library
* https://www.sensirion.com/fileadmin/user_upload/customers/sensirion/Dokumente/11_Sample_Codes_Software/Humidity_Sensors/Sensirion_Humidity_Sensors_SHT3x_Sample_Code_V2.pdf
"""
POLYNOMIAL = 0x131 # P(x) = x^8 + x^5 + x^4 + 1 = 100110001
ALERT_PENDING_MASK = 0x8000 # 15
HEATER_MASK = 0x2000 # 13
RH_ALERT_MASK = 0x0800 # 11
T_ALERT_MASK = 0x0400 # 10
RESET_MASK = 0x0010 # 4
CMD_STATUS_MASK = 0x0002 # 1
WRITE_STATUS_MASK = 0x0001 # 0
# MSB = 0x2C LSB = 0x06 Repeatability = High, Clock stretching = enabled
MEASURE_CMD = b'\x2C\x10'
STATUS_CMD = b'\xF3\x2D'
RESET_CMD = b'\x30\xA2'
CLEAR_STATUS_CMD = b'\x30\x41'
ENABLE_HEATER_CMD = b'\x30\x6D'
DISABLE_HEATER_CMD = b'\x30\x66'
def __init__(self, i2c=None, delta_temp=0, delta_hum=0, i2c_address=DEFAULT_I2C_ADDRESS):
if i2c is None:
raise ValueError('An I2C object is required.')
self.i2c = i2c
self.i2c_addr = i2c_address
self.set_delta(delta_temp, delta_hum)
time.sleep_ms(50)
def is_present(self):
"""
Return true if the sensor is correctly conneced, False otherwise
"""
return self.i2c_addr in self.i2c.scan()
def set_delta(self, delta_temp=0, delta_hum=0):
"""
Apply a delta value on the future measurements of temperature and/or humidity
The units are Celsius for temperature and percent for humidity (can be negative values)
"""
self.delta_temp = delta_temp
self.delta_hum = delta_hum
def _check_crc(self, data):
# calculates 8-Bit checksum with given polynomial
crc = 0xFF
for b in data[:-1]:
crc ^= b
for _ in range(8, 0, -1):
if crc & 0x80:
crc = (crc << 1) ^ SHT30.POLYNOMIAL
else:
crc <<= 1
crc_to_check = data[-1]
return crc_to_check == crc
def send_cmd(self, cmd_request, response_size=6, read_delay_ms=100):
"""
Send a command to the sensor and read (optionally) the response
The responsed data is validated by CRC
"""
try:
self.i2c.writeto(self.i2c_addr, cmd_request)
if not response_size:
return
time.sleep_ms(read_delay_ms)
data = self.i2c.readfrom(self.i2c_addr, response_size)
for i in range(response_size//3):
if not self._check_crc(data[i*3:(i+1)*3]): # pos 2 and 5 are CRC
raise SHT30Error(SHT30Error.CRC_ERROR)
if data == bytearray(response_size):
raise SHT30Error(SHT30Error.DATA_ERROR)
return data
except OSError as ex:
raise SHT30Error(SHT30Error.BUS_ERROR)
except Exception as ex:
raise ex
def clear_status(self):
"""
Clear the status register
"""
return self.send_cmd(SHT30.CLEAR_STATUS_CMD, None)
def reset(self):
"""
Send a soft-reset to the sensor
"""
return self.send_cmd(SHT30.RESET_CMD, None)
def status(self, raw=False):
"""
Get the sensor status register.
It returns a int value or the bytearray(3) if raw==True
"""
data = self.send_cmd(SHT30.STATUS_CMD, 3, read_delay_ms=20)
if raw:
return data
status_register = data[0] << 8 | data[1]
return status_register
def measure(self, raw=False):
"""
If raw==True returns a bytearrya(6) with sensor direct measurement otherwise
It gets the temperature (T) and humidity (RH) measurement and return them.
The units are Celsius and percent
"""
data = self.send_cmd(SHT30.MEASURE_CMD, 6)
if raw:
return data
t_celsius = (((data[0] << 8 | data[1]) * 175) / 0xFFFF) - 45 + self.delta_temp
rh = (((data[3] << 8 | data[4]) * 100.0) / 0xFFFF) + self.delta_hum
return t_celsius, rh
def measure_int(self, raw=False):
"""
Get the temperature (T) and humidity (RH) measurement using integers.
If raw==True returns a bytearrya(6) with sensor direct measurement otherwise
It returns a tuple with 4 values: T integer, T decimal, H integer, H decimal
For instance to return T=24.0512 and RH= 34.662 This method will return
(24, 5, 34, 66) Only 2 decimal digits are returned, .05 becomes 5
Delta values are not applied in this method
The units are Celsius and percent.
"""
data = self.send_cmd(SHT30.MEASURE_CMD, 6)
if raw:
return data
aux = (data[0] << 8 | data[1]) * 175
t_int = (aux // 0xffff) - 45
t_dec = (aux % 0xffff * 100) // 0xffff
aux = (data[3] << 8 | data[4]) * 100
h_int = aux // 0xffff
h_dec = (aux % 0xffff * 100) // 0xffff
return t_int, t_dec, h_int, h_dec
class SHT30Error(Exception):
"""
Custom exception for errors on sensor management
"""
BUS_ERROR = 0x01
DATA_ERROR = 0x02
CRC_ERROR = 0x03
def __init__(self, error_code=None):
self.error_code = error_code
super().__init__(self.get_message())
def get_message(self):
if self.error_code == SHT30Error.BUS_ERROR:
return "Bus error"
elif self.error_code == SHT30Error.DATA_ERROR:
return "Data error"
elif self.error_code == SHT30Error.CRC_ERROR:
return "CRC error"
else:
return "Unknown error"
You can see from this, that someone previously has already edited out the parts you guys were suggesting to me to do.
I'm going to now show you my own code for my SHT30 which I have running on my Pico W.
I've # out the following WDT stuff again:
Code: Select all
#wdt = wdt(timeout= 8000)
#wdt.feed()
#print("woof!...good boy!")
But other than this, this is what I have working right now:
Code: Select all
#Necessary library imports for this code to function
from secrets import SSID
from secrets import PASSWORD
from secrets import IP
from secrets import IP_mask
from secrets import IP_gateway
from secrets import IP_DNS
from secrets import MQTTSERVE
from umqtt.simple import MQTTClient
import time
import network
import ubinascii
import socket
from machine import I2C, Pin, #WDT
import sht30
from sht30 import SHT30
from sht30 import SHT30Error
led = machine.Pin("LED", machine.Pin.OUT)
#initialise LED to OFF
led.off()
def appendfile(message, error):
file=open("errorlog.txt","a")
file.write(str(message, error)+",")
file.close()
#file.flush()
def sht_error(ex):
print("something went wrong reading the SHT-30!")
print('Error:', ex)
message1 = ', SHT Error: '
ex2 = str(ex)
appendfile(message1, ex2) #message, error
#Define the interface of the SHT30 Temp/Humidity sensor with the Pico W
i2c=I2C(0, sda=Pin(4), scl=Pin(5), freq=5000)
sht=sht30.SHT30(i2c=i2c, i2c_address=68)
def readsensor():
global temperature
global humidity
try:
sensor = sht.measure()
except SHT30Error as ex:
sht_error(ex)
try:
restartsensor = SHT30()
time.sleep(2)
restartsensor.reset()
time.sleep(2)
sensor = sht.measure()
except SHT30Error as ex:
sht_error(ex)
machine.reset()
temperature, humidity = sensor
#wdt = WDT(timeout= 8000)
readsensor()
#wdt.feed()
#print("woof!...good boy!")
print('SHT Temperature: {:3.1f}ºC'.format(temperature))
print('SHT Humidity: {:3.1f}%'.format(humidity))
#Define the onboard Pico Temperature sensor and necessary conversion factor
sensor_temp = machine.ADC(4)
conversion_factor = 3.3 / (65535)
#Active the onboard Pico W Wi-Fi and connect to home network defined through "secrets.py"
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
print("Scanning Available Wifi Connections...")
#wdt.feed()
#print("woof!...good boy!")
print(wlan.scan())
time.sleep(2)
#wdt.feed()
#print("woof!...good boy!")
print("Connecting to Wifi...")
print()
led.on()
#wdt.feed()
#print("woof!...good boy!")
time.sleep(5)
#wdt.feed()
#print("woof!...good boy!")
try:
wlan.connect(SSID, PASSWORD)
except:
try:
wlan.active(False)
led.toggle()
time.sleep(5)
#wdt.feed()
#print("woof!...good boy!")
led.toggle()
time.sleep(5)
#wdt.feed()
#print("woof!...good boy!")
wlan.active(True)
time.sleep(5)
#wdt.feed()
#print("woof!...good boy!")
wlan.connect(SSID, PASSWORD)
except:
machine.reset()
#wdt.feed()
#print("woof!...good boy!")
#Configure this Pico W for a static IP address on router using the following
UseStaticIP = True
IP = IP
IP_mask = IP_mask
IP_gateway = IP_gateway
IP_DNS = IP_DNS
led.off()
#Read the MAC address from the Pico W itself
mac = ubinascii.hexlify(network.WLAN().config('mac'),':').decode()
print("MAC address for this Pico W:", mac)
# Wait for connect or fail (3mins)
max_wait = 90
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
#print(wlan.status)
break
led.on()
max_wait -= 1
print(wlan.status)
print('waiting for connection...')
time.sleep(1)
led.off()
time.sleep(1)
#wdt.feed()
#print("woof!...good boy!")
while max_wait == 0:
print("cannot connected to the wifi!")
print("rebooting...")
led.on()
time.sleep(5)
machine.reset()
# Handle connection error
if wlan.status() != 3:
print("Network connection attempt failed!")
#raise RuntimeError('network connection attempt failed')
time.sleep(1)
machine.reset()
else:
print("Pico W Wi-Fi is connected to: ",SSID)
status = wlan.ifconfig()
print( "ip address for this Pico W: " + status[0] )
print()
print("Channel: ", wlan.config('channel'))
print("SSID: ", wlan.config('essid'))
print("TXPOWER: ", wlan.config('txpower'))
#wdt.feed()
#print("woof!...good boy!")
#Configure a high power wi-fi connection
wlan.config(pm = 0xa11140)
print()
print("Flashing LEDs for success display")
print()
#wdt.feed()
#print("woof!...good boy!")
#Show some onboard LED flashing to show that attempting to connect to the Wi-Fi has been completed
time.sleep(2)
led.off()
time.sleep(2)
led.toggle()
print("3")
time.sleep(0.2)
led.toggle()
time.sleep(1)
#wdt.feed()
#print("woof!...good boy!")
led.toggle()
print("2")
time.sleep(0.2)
led.toggle()
time.sleep(1)
#wdt.feed()
#print("woof!...good boy!")
led.toggle()
print("1")
time.sleep(0.2)
led.toggle()
time.sleep(1)
led.toggle()
print("blast off!")
print()
#wdt.feed()
#print("woof!...good boy!")
time.sleep(2)
#Setup MQTT server, local client ID (not really required),
#and published data transmissions hierarchy
mqtt_server = MQTTSERVE
client_id = b'PICOW'
#topic_pub1 = b'PicoW/OS_Temp'
#topic_pub2 = b'PicoW/OS_Hum'
#topic_pub3 = b'PicoW/Pico_Temp'
topic_pub1 = b'PicoW/OS_Temp'
topic_pub2 = b'PicoW/OS_Hum'
topic_pub3 = b'PicoW/Pico_Temp'
#For the above topics, use "PicoW/+"
#at the subscriber side to receive all
#three messages at once or use the full
#titles individually i.e PicoW/OS_Temp for
#just the outside temperature variable
def mqtt_connect():
client = MQTTClient(client_id, mqtt_server, keepalive=3600)
client.connect()
print('Connected to %s MQTT Broker'%(mqtt_server))
return client
def reconnect():
print('Failed to connect to the MQTT Broker. Rebooting...')
time.sleep(5)
machine.reset()
print("Attempting to connect to MQTT Broker...")
print()
#wdt.feed()
#print("woof!...good boy!")
try:
client = mqtt_connect()
except OSError as mqtterror:
print(mqtterror)
message2 = ', MQTT Error: '
mqtterror2 = str(mqtterror)
appendfile(message2, mqtterror2)
reconnect()
#wdt.feed()
#print("woof!...good boy!")
#The main script of repeativeness for reading in from the connected sensor
#and taking in onboard temperature readings
while True: #do stuff here
led.on()
time.sleep(0.5)
#Collect readings from the SHT-30
readsensor()
#wdt.feed()
#print("woof!...good boy!")
#Collect temperature reading for the Pico W CPU
def cpumeasure():
global picotemp
try:
reading = sensor_temp.read_u16() * conversion_factor
picotemp = 27 - (reading - 0.706)/0.001721
except OSError as cpu:
print("something went wrong reading the onboard CPU Temperature!")
print(cpu)
print("rebooting...")
message3 = ', CPU Temp Error: '
cpu2 = str(cpu)
appendfile(message3, cpu2)
time.sleep(5)
machine.reset()
cpumeasure()
#make readings 1 decimal place
tpm1 = round(temperature,1)
tpm2 = round(humidity,1)
tpm3 = round(picotemp,1)
#convert float reading values into bytes ready for MQTT transmission
tpm4 = f"{tpm1}".encode()
tpm5 = f"{tpm2}".encode()
tpm6 = f"{tpm3}".encode()
topic_msg1 = tpm4
topic_msg2 = tpm5
topic_msg3 = tpm6
#Message displayed to the screen
print()
print("Publishing...")
print()
print('SHT Temperature: {:3.1f}ºC'.format(tpm1))
print('SHT Humidity: {:3.1f}%'.format(tpm2))
print('Pico W Temperature: {:3.1f}ºC'.format(tpm3))
print()
#Equivalent transmission of displayed readings above
#(reverse order to match above order on subscriber receipt)
try:
client.publish(topic_pub3, topic_msg3)
client.publish(topic_pub2, topic_msg2)
client.publish(topic_pub1, topic_msg1)
print("Publishing complete")
except:
print("Disconnected from MQTT Broker! Attempting Reconnection...")
print()
try:
client = mqtt_connect()
except OSError as mqtterror:
print(mqtterror)
message4 = ', MQTT2 Error: '
mqtterror3 = str(mqtterror)
appendfile(message4, mqtterror3)
reconnect()
led.off()
#wdt.feed()
#print("woof!...good boy!")
time.sleep(5)
#wdt.feed()
#print("woof!...good boy!")
time.sleep(5)
If I change the I2C address from 68 (correct) to 67 (wrong) in order to force an error, this is the results from this code:
Thonny Screen Error:
Code: Select all
something went wrong reading the SHT-30!
Error: Bus error
Traceback (most recent call last):
File "<stdin>", line 60, in <module>
File "<stdin>", line 49, in readsensor
File "sht30.py", line 45, in __init__
ValueError: An I2C object is required.
and the errorlog.txt gets this error message logged into it:
I have two things I'm asking for help with.
1. Why doesn't my restart sensor code work?
Code: Select all
from sht30 import SHT30
restartsensor = SHT30()
restartsensor.reset()
2. Why doesn't my SHTError code "ex" get printed to my errorlog.txt file along with the ", SHT Error: ,"?
Is anyone clever enough to show me what I need to do to fix these please?