import socket import threading import time import random from typing import Optional, Dict, List, Tuple class DatabaseTestClient: def __init__(self, host: str = 'localhost', server_port: int = 40000): self.host = host self.server_port = server_port self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Bind to any available interface (empty string) on an ephemeral port (0) self.sock.bind(('', 0)) # Get the actual port assigned self.client_port = self.sock.getsockname()[1] self.sock.settimeout(1.0) # 1 second timeout for receives print(f"Client bound to port {self.client_port}") def send_insert(self, key: str, value: str) -> None: """Send an insert request.""" message = f"{key}={value}" if len(message) >= 1000: raise ValueError("Message too long") self.sock.sendto(message.encode(), (self.host, self.server_port)) def send_retrieve(self, key: str) -> Optional[str]: """Send a retrieve request and return the response.""" if len(key) >= 1000: raise ValueError("Key too long") self.sock.sendto(key.encode(), (self.host, self.server_port)) try: data, addr = self.sock.recvfrom(1000) # Changed from recvmsg to recvfrom return data.decode() except socket.timeout: return None def close(self): self.sock.close() def run_basic_tests(client: DatabaseTestClient) -> List[Tuple[str, bool, str]]: """Run basic functionality tests.""" results = [] # Test 1: Basic insert and retrieve client.send_insert("test1", "value1") time.sleep(0.1) # Give server time to process response = client.send_retrieve("test1") results.append(("Basic insert/retrieve", response == "test1=value1", f"Expected 'test1=value1', got '{response}'")) # Test 2: Update existing key client.send_insert("test1", "value2") time.sleep(0.1) response = client.send_retrieve("test1") results.append(("Update existing key", response == "test1=value2", f"Expected 'test1=value2', got '{response}'")) # Test 3: Empty key client.send_insert("", "empty_key_value") time.sleep(0.1) response = client.send_retrieve("") results.append(("Empty key", response == "=empty_key_value", f"Expected '=empty_key_value', got '{response}'")) # Test 4: Empty value client.send_insert("empty_value", "") time.sleep(0.1) response = client.send_retrieve("empty_value") results.append(("Empty value", response == "empty_value=", f"Expected 'empty_value=', got '{response}'")) # Test 5: Version check response = client.send_retrieve("version") results.append(("Version check", response is not None and response.startswith("version=") and len(response) > 8, f"Version response invalid: '{response}'")) # Test 6: Version modification attempt client.send_insert("version", "HACKED") time.sleep(0.1) response = client.send_retrieve("version") results.append(("Version modification prevention", response is not None and not response.endswith("HACKED"), f"Version should not be modifiable, got: '{response}'")) return results def run_edge_case_tests(client: DatabaseTestClient) -> List[Tuple[str, bool, str]]: """Run edge case tests.""" results = [] # Test 1: Key with multiple equals signs client.send_insert("multi=equals", "value=with=equals") time.sleep(0.1) response = client.send_retrieve("multi=equals") results.append(("Multiple equals in key", response is None or not response.startswith("multi=equals"), "Key with equals sign should not be stored")) # Test 2: Value with equals signs client.send_insert("test2", "value=with=equals") time.sleep(0.1) response = client.send_retrieve("test2") results.append(("Equals in value", response == "test2=value=with=equals", f"Expected 'test2=value=with=equals', got '{response}'")) # Test 3: Non-existent key response = client.send_retrieve("nonexistent") results.append(("Non-existent key", response is None or response == "nonexistent=", f"Expected None or 'nonexistent=', got '{response}'")) return results def run_concurrent_tests(num_clients: int = 5) -> List[Tuple[str, bool, str]]: """Run concurrent access tests.""" results = [] clients = [DatabaseTestClient() for _ in range(num_clients)] def concurrent_ops(client_id: int): client = clients[client_id] key = f"concurrent_{client_id}" for i in range(10): value = f"value_{i}" client.send_insert(key, value) time.sleep(0.05) response = client.send_retrieve(key) if response != f"{key}={value}": results.append((f"Concurrent client {client_id}", False, f"Expected '{key}={value}', got '{response}'")) return results.append((f"Concurrent client {client_id}", True, "All operations successful")) threads = [threading.Thread(target=concurrent_ops, args=(i,)) for i in range(num_clients)] for thread in threads: thread.start() for thread in threads: thread.join() for client in clients: client.close() return results def main(): # MESSAGE = "Hello, UDP!" # sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP # sock.sendto(bytes(MESSAGE, "utf-8"), ("localhost", 40000)) # quit() # Allow command line arguments for host and port import argparse parser = argparse.ArgumentParser(description='Test UDP Database Server') parser.add_argument('--host', default='localhost', help='Server host') parser.add_argument('--port', type=int, default=40000, help='Server port') args = parser.parse_args() print(f"Starting Unusual Database Program Tests on {args.host}:{args.port}\n") client = DatabaseTestClient(args.host, args.port) # ... rest of the main function remains the same ... print("Running basic functionality tests...") basic_results = run_basic_tests(client) print("\nRunning edge case tests...") edge_results = run_edge_case_tests(client) client.close() print("\nRunning concurrent access tests...") concurrent_results = run_concurrent_tests() # Print results all_results = basic_results + edge_results + concurrent_results passed = sum(1 for _, success, _ in all_results if success) total = len(all_results) print("\nTest Results:") print("=" * 60) for test_name, success, message in all_results: status = "✓" if success else "✗" print(f"{status} {test_name}") if not success: print(f" {message}") print("\nSummary:") print(f"Passed: {passed}/{total} tests") print(f"Success rate: {(passed/total)*100:.1f}%") if __name__ == "__main__": main()