blob: dbf9cd3aa98bf00d7db611a420082f81bceca11d [file] [log] [blame]
# Copyright (c) 2023, Bjarki Arge Andreasen
# SPDX-License-Identifier: Apache-2.0
import socket
import threading
import select
class TEUDPEcho():
def __init__(self):
self.running = True
self.thread = threading.Thread(target=self._target_)
def start(self):
self.thread.start()
def stop(self):
self.running = False
self.thread.join(1)
def _target_(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
sock.bind(('0.0.0.0', 7780))
while self.running:
try:
ready_to_read, _, _ = select.select([sock], [sock], [], 0.5)
if not ready_to_read:
continue
data, address = sock.recvfrom(4096)
print(f'udp echo {len(data)} bytes to {address[0]}:{address[1]}')
sock.sendto(data, address)
except Exception as e:
print(e)
break
sock.close()