Files
protohackersc/speed_ticket_monitoring_system.py
2025-08-01 08:47:25 +01:00

142 lines
4.8 KiB
Python

import socket
import struct
import time
from threading import Thread
from typing import List, Optional
class MessageType:
ERROR = 0x10
PLATE = 0x20
TICKET = 0x21
WANT_HEARTBEAT = 0x40
HEARTBEAT = 0x41
I_AM_CAMERA = 0x80
I_AM_DISPATCHER = 0x81
class SpeedDaemonClient:
def __init__(self, host: str = 'localhost', port: int = 12345):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((host, port))
self.running = True
self.received_tickets = []
def send_bytes(self, data: bytes):
self.socket.send(data)
def receive_message(self) -> Optional[tuple]:
try:
msg_type = self.socket.recv(1)
if not msg_type:
return None
msg_type = msg_type[0]
if msg_type == MessageType.ERROR:
length = struct.unpack('!B', self.socket.recv(1))[0]
message = self.socket.recv(length).decode('ascii')
return (MessageType.ERROR, message)
elif msg_type == MessageType.TICKET:
# Read plate length
plate_len = struct.unpack('!B', self.socket.recv(1))[0]
plate = self.socket.recv(plate_len).decode('ascii')
# Read other fields
data = self.socket.recv(16)
road, mile1, timestamp1, mile2, timestamp2, speed = struct.unpack('!HHIHHI', data)
return (MessageType.TICKET, {
'plate': plate,
'road': road,
'mile1': mile1,
'timestamp1': timestamp1,
'mile2': mile2,
'timestamp2': timestamp2,
'speed': speed
})
elif msg_type == MessageType.HEARTBEAT:
return (MessageType.HEARTBEAT, None)
except Exception as e:
print(f"Error receiving message: {e}")
return None
def close(self):
self.running = False
self.socket.close()
class Camera(SpeedDaemonClient):
def __init__(self, road: int, mile: int, limit: int, host: str = 'localhost', port: int = 12345):
super().__init__(host, port)
self.road = road
self.mile = mile
self.limit = limit
self.register()
def register(self):
# Send IAmCamera message
msg = struct.pack('!BHHH', MessageType.I_AM_CAMERA, self.road, self.mile, self.limit)
self.send_bytes(msg)
def report_plate(self, plate: str, timestamp: int):
# Send Plate message
msg = struct.pack(f'!BB{len(plate)}sI', MessageType.PLATE, len(plate),
plate.encode('ascii'), timestamp)
self.send_bytes(msg)
class Dispatcher(SpeedDaemonClient):
def __init__(self, roads: List[int], host: str = 'localhost', port: int = 12345):
super().__init__(host, port)
self.roads = roads
self.register()
# Start listening for tickets
self.listener_thread = Thread(target=self._listen_for_tickets)
self.listener_thread.start()
def register(self):
# Send IAmDispatcher message
roads_data = struct.pack(f'!{len(self.roads)}H', *self.roads)
msg = struct.pack('!BB', MessageType.I_AM_DISPATCHER, len(self.roads)) + roads_data
self.send_bytes(msg)
def _listen_for_tickets(self):
while self.running:
message = self.receive_message()
if message and message[0] == MessageType.TICKET:
self.received_tickets.append(message[1])
def run_speed_test():
# Create a dispatcher for road 123
dispatcher = Dispatcher([123])
time.sleep(0.1) # Give dispatcher time to register
# Create two cameras on road 123
camera1 = Camera(123, 8, 60) # Camera at mile 8, 60mph limit
camera2 = Camera(123, 9, 60) # Camera at mile 9, same limit
time.sleep(0.1) # Give cameras time to register
# Report a speeding car (same as in example session)
camera1.report_plate("UN1X", 0)
camera2.report_plate("UN1X", 45) # 1 mile in 45 seconds = 80mph
# Wait for ticket processing
time.sleep(1)
# Check received tickets
if dispatcher.received_tickets:
print("Test passed! Received tickets:")
for ticket in dispatcher.received_tickets:
print(f"Plate: {ticket['plate']}")
print(f"Road: {ticket['road']}")
print(f"Speed: {ticket['speed']/100.0} mph")
else:
print("Test failed: No tickets received")
# Cleanup
dispatcher.close()
camera1.close()
camera2.close()
if __name__ == "__main__":
run_speed_test()