Files
protohackersc/server03_chat_server_tests.py
2025-07-30 12:56:32 +01:00

178 lines
6.4 KiB
Python

import socket
import threading
import time
import unittest
import sys
class TestBudgetChat(unittest.TestCase):
SERVER_HOST = 'localhost'
SERVER_PORT = 40000
def create_client(self):
"""Helper function to create a new client socket"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.SERVER_HOST, self.SERVER_PORT))
return sock
def receive_message(self, sock, timeout=2):
"""Helper function to receive a message with timeout"""
sock.settimeout(timeout)
try:
return sock.recv(1024).decode('ascii').strip()
except socket.timeout:
return None
def test_initial_connection(self):
"""Test the initial connection and welcome message"""
client = self.create_client()
welcome_msg = self.receive_message(client)
self.assertIsNotNone(welcome_msg)
self.assertTrue("Welcome" in welcome_msg)
client.close()
def test_invalid_username(self):
"""Test that invalid usernames are rejected"""
invalid_names = ["", "user@name", "user name", "!invalid"]
for name in invalid_names:
with self.subTest(name=name):
client = self.create_client()
_ = self.receive_message(client) # Welcome message
client.send(f"{name}\n".encode('ascii'))
# Server should disconnect us
with self.assertRaises((ConnectionResetError, ConnectionAbortedError, socket.error)):
client.sendall("test\n".encode('ascii'))
response = self.receive_message(client)
response = self.receive_message(client)
response = self.receive_message(client)
client.close()
def test_valid_username(self):
"""Test that valid usernames are accepted"""
client = self.create_client()
_ = self.receive_message(client) # Welcome message
client.send("validuser123\n".encode('ascii'))
room_info = self.receive_message(client)
self.assertIsNotNone(room_info)
self.assertTrue(room_info.startswith("*"))
client.close()
def test_chat_message_broadcast(self):
"""Test that chat messages are broadcast to other users"""
# First client
client1 = self.create_client()
_ = self.receive_message(client1) # Welcome message
client1.send("user1\n".encode('ascii'))
_ = self.receive_message(client1) # Room info
# Second client
client2 = self.create_client()
_ = self.receive_message(client2) # Welcome message
client2.send("user2\n".encode('ascii'))
_ = self.receive_message(client2) # Room info
# User1 should see User2's join message
join_msg = self.receive_message(client1)
self.assertTrue("user2" in join_msg)
# Send message from user1
test_message = "Hello, everyone!"
client1.send(f"{test_message}\n".encode('ascii'))
# User2 should receive the message
received = self.receive_message(client2)
self.assertEqual(f"[user1] {test_message}", received)
client1.close()
client2.close()
def test_multiple_clients(self):
"""Test that the server can handle multiple clients"""
clients = []
usernames = [f"user{i}" for i in range(10)]
# Connect 10 clients
for username in usernames:
client = self.create_client()
_ = self.receive_message(client) # Welcome message
client.send(f"{username}\n".encode('ascii'))
_ = self.receive_message(client) # Room info
clients.append(client)
# time.sleep(0.1) # Small delay to prevent race conditions
# Send a message from the last client
test_message = "Hello from last user!"
clients[-1].send(f"{test_message}\n".encode('ascii'))
# Check that all other clients received the message
expected = f"[{usernames[-1]}] {test_message}"
for i in range(len(clients)-1):
received = self.receive_message(clients[i])
print(f"Received: {received}")
self.assertEqual(expected, received)
# Cleanup
for client in clients:
client.close()
def test_user_departure(self):
"""Test that user departure is announced"""
client1 = self.create_client()
_ = self.receive_message(client1)
client1.send("user1\n".encode('ascii'))
_ = self.receive_message(client1)
client2 = self.create_client()
_ = self.receive_message(client2)
client2.send("user2\n".encode('ascii'))
_ = self.receive_message(client2)
# Close client2 and check if client1 receives departure message
client2.close()
departure_msg = self.receive_message(client1)
print(departure_msg)
self.assertTrue("user2" in departure_msg)
self.assertTrue(departure_msg.startswith("*"))
client1.close()
def list_tests():
"""List all available tests"""
print("\nAvailable tests:")
test_methods = [method for method in dir(TestBudgetChat) if method.startswith('test_')]
for i, test in enumerate(test_methods, 1):
print(f"{i}. {test}")
print("\n0. Run all tests")
def run_selected_test(test_number):
"""Run a specific test based on user selection"""
suite = unittest.TestSuite()
test_methods = [method for method in dir(TestBudgetChat) if method.startswith('test_')]
if test_number == 0:
# Run all tests
suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestBudgetChat))
else:
try:
selected_test = test_methods[test_number - 1]
suite.addTest(TestBudgetChat(selected_test))
except IndexError:
print("Invalid test number!")
return
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)
if __name__ == '__main__':
while True:
list_tests()
try:
choice = input("\nEnter test number (or 'q' to quit): ")
if choice.lower() == 'q':
break
test_number = int(choice)
run_selected_test(test_number)
except ValueError:
print("Please enter a valid number!")
input("\nPress Enter to continue...")