Code: Select all
def inet_aton(self, addr):
ip_as_bytes = bytes(map(int, addr.split(".")))
return ip_as_bytes
async def init_socket(self):
ok = True
self.ip = "239.255.255.250"
self.port = 1900
try:
# This is needed to join a multicast group
self.mreq = struct.pack("4sl", self.inet_aton(self.ip), 0)
# Set up server socket
self.ssock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.ssock.bind((self.ip, self.port)) # Also tried ("", self.port)
except Exception as e:
dbg("WARNING: Failed to bind %s:%d: %s", (self.ip, self.port, e))
ok = False
try:
self.ssock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, self.mreq)
except Exception as e:
dbg("WARNING: Failed to join multicast group!: " + str(e))
ok = False
except Exception as e:
dbg("Failed to initialize UPnP sockets!")
return False
if ok:
dbg("Listening for UPnP broadcasts")