Refactored project structure, added multithreaded echo server and prime checking server, updated tests.

This commit is contained in:
2025-07-28 17:31:05 +01:00
parent 4926cfe0d3
commit fec3f555e9
14 changed files with 621 additions and 110 deletions

163
prime_time_server_tests.py Normal file
View File

@@ -0,0 +1,163 @@
import socket
import json
import threading
import time
import random
def send_request(sock, request):
"""Send a request and get response"""
request_str = json.dumps(request) + '\n'
sock.sendall(request_str.encode())
response = sock.recv(1024).decode()
try:
return json.loads(response.strip())
except json.JSONDecodeError:
return response.strip()
def batch_test_client():
"""Run a test with 50 problems in quick succession"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
sock.connect(('localhost', 40000))
print("Batch Test: Connected")
# Generate 50 test cases with known prime and non-prime numbers
test_numbers = (
# Known primes
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47] +
# Non-primes
[4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20, 21, 22, 24, 25] +
# Edge cases
[0, 1, -1, 1.5, 2.0] +
# Larger numbers
[97, 100, 121, 169, 289, 361, 529, 841, 961, 1001] +
# Random numbers to complete 50
[random.randint(1000, 10000) for _ in range(5)]
)
success_count = 0
start_time = time.time()
for i, number in enumerate(test_numbers):
request = {"method": "isPrime", "number": number}
try:
response = send_request(sock, request)
expected_prime = (
isinstance(number, int) and
number > 1 and
all(number % i != 0 for i in range(2, int(number ** 0.5) + 1))
)
expected = {"method": "isPrime", "prime": expected_prime}
if response == expected:
success_count += 1
print(f"✓ Test {i+1}/50: {number} -> {response['prime']}")
else:
print(f"✗ Test {i+1}/50: {number} Expected: {expected}, Got: {response}")
except Exception as e:
print(f"Error on test {i+1}/50 with number {number}: {e}")
break
end_time = time.time()
duration = end_time - start_time
print(f"\nBatch Test Results:")
print(f"Total tests: 50")
print(f"Successful: {success_count}")
print(f"Failed: {50 - success_count}")
print(f"Time taken: {duration:.2f} seconds")
print(f"Average time per request: {(duration/50)*1000:.2f} ms")
except ConnectionRefusedError:
print("Batch Test: Connection failed! Is the server running?")
def test_client(test_cases, client_id):
"""Run a series of test cases"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
try:
sock.connect(('localhost', 40000))
print(f"Client {client_id}: Connected")
for i, test in enumerate(test_cases):
request = test['request']
expected = test['expected']
desc = test.get('desc', 'No description')
print(f"\nClient {client_id}: Test {i + 1} - {desc}")
print(f"Request: {request}")
try:
response = send_request(sock, request)
print(f"Response: {response}")
if test.get('should_disconnect', False):
try:
send_request(sock, {"method": "isPrime", "number": 2})
print("ERROR: Server didn't disconnect when it should have")
except:
print("SUCCESS: Server correctly disconnected")
break
if response == expected:
print("SUCCESS: Response matches expected")
else:
print(f"FAILURE: Expected {expected}, got {response}")
except Exception as e:
print(f"Error during test: {e}")
if test.get('should_disconnect', False):
print("SUCCESS: Expected disconnect occurred")
else:
print("FAILURE: Unexpected error")
break
except ConnectionRefusedError:
print(f"Client {client_id}: Connection failed! Is the server running?")
def run_all_tests():
print("Starting Prime Time Server tests...")
# Test 1: Basic functionality
print("\nTest 1: Single client with valid requests")
test_client([{
'desc': "Basic prime test",
'request': {"method": "isPrime", "number": 7},
'expected': {"method": "isPrime", "prime": True}
}], 0)
time.sleep(1)
# Test 2: Malformed request
print("\nTest 2: Single client with malformed request")
test_client([{
'desc': "Malformed request test",
'request': {"method": "wrong", "number": 7},
'expected': "malformed",
'should_disconnect': True
}], 0)
time.sleep(1)
# Test 3: Batch test with 50 problems
print("\nTest 3: Batch test with 50 problems")
batch_test_client()
time.sleep(1)
# Test 4: Multiple parallel clients
print("\nTest 4: Multiple parallel clients")
threads = []
for i in range(5):
thread = threading.Thread(target=batch_test_client)
threads.append(thread)
thread.start()
time.sleep(0.1)
for thread in threads:
thread.join()
if __name__ == "__main__":
run_all_tests()