blob: 52ae2b47b47199a8a41a29266ea6b9add38c77aa [file] [log] [blame]
# Copyright (c) 2023, Bjarki Arge Andreasen
# SPDX-License-Identifier: Apache-2.0
import socket
import threading
import select
import time
import copy
class TEUDPReceiveSession():
def __init__(self, address, timeout = 1):
self.address = address
self.last_packet_received_at = time.monotonic()
self.timeout = timeout
self.packets_received = 0
self.packets_dropped = 0
def get_address(self):
return self.address
def on_packet_received(self, data):
if self._validate_packet_(data):
self.packets_received += 1
else:
self.packets_dropped += 1
self.last_packet_received_at = time.monotonic()
def update(self):
if (time.monotonic() - self.last_packet_received_at) > self.timeout:
return (self.packets_received, self.packets_dropped)
return None
def _validate_packet_(self, data: bytes) -> bool:
prng_state = 1234
for b in data:
prng_state = ((1103515245 * prng_state) + 12345) % (1 << 31)
if prng_state & 0xFF != b:
return False
return True
class TEUDPReceive():
def __init__(self):
self.running = True
self.thread = threading.Thread(target=self._target_)
self.sessions = []
def start(self):
self.thread.start()
def stop(self):
self.running = False
self.thread.join(1)
def _target_(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
sock.bind(('0.0.0.0', 7781))
while self.running:
try:
ready_to_read, _, _ = select.select([sock], [sock], [], 0.5)
if not ready_to_read:
self._update_sessions_(sock)
continue
data, address = sock.recvfrom(4096)
print(f'udp received {len(data)} bytes -> {address[0]}:{address[1]}')
session = self._get_session_by_address_(address)
session.on_packet_received(data)
except Exception as e:
print(e)
break
sock.close()
def _get_session_by_address_(self, address) -> TEUDPReceiveSession:
# Search for existing session
for session in self.sessions:
if session.get_address() == address:
return session
# Create and return new session
print(f'Created session for {address[0]}:{address[1]}')
self.sessions.append(TEUDPReceiveSession(address, 2))
return self.sessions[-1]
def _update_sessions_(self, sock):
sessions = copy.copy(self.sessions)
for session in sessions:
result = session.update()
if result is None:
continue
response = bytes([result[0], result[1]])
print(f'Sending result {response} to address {session.get_address()}')
sock.sendto(response, session.get_address())
print(f'Removing session for address {session.get_address()}')
self.sessions.remove(session)