import socket import threading import time import unittest import sys class TestBudgetChat(unittest.TestCase): SERVER_HOST = 'localhost' SERVER_PORT = 40000 def create_client(self): """Helper function to create a new client socket""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((self.SERVER_HOST, self.SERVER_PORT)) return sock def receive_message(self, sock, timeout=2): """Helper function to receive a message with timeout""" sock.settimeout(timeout) try: return sock.recv(1024).decode('ascii').strip() except socket.timeout: return None def test_initial_connection(self): """Test the initial connection and welcome message""" client = self.create_client() welcome_msg = self.receive_message(client) self.assertIsNotNone(welcome_msg) self.assertTrue("Welcome" in welcome_msg) client.close() def test_invalid_username(self): """Test that invalid usernames are rejected""" invalid_names = ["", "user@name", "user name", "!invalid"] for name in invalid_names: with self.subTest(name=name): client = self.create_client() _ = self.receive_message(client) # Welcome message client.send(f"{name}\n".encode('ascii')) # Server should disconnect us with self.assertRaises((ConnectionResetError, ConnectionAbortedError, socket.error)): client.sendall("test\n".encode('ascii')) response = self.receive_message(client) response = self.receive_message(client) response = self.receive_message(client) client.close() def test_valid_username(self): """Test that valid usernames are accepted""" client = self.create_client() _ = self.receive_message(client) # Welcome message client.send("validuser123\n".encode('ascii')) room_info = self.receive_message(client) self.assertIsNotNone(room_info) self.assertTrue(room_info.startswith("*")) client.close() def test_chat_message_broadcast(self): """Test that chat messages are broadcast to other users""" # First client client1 = self.create_client() _ = self.receive_message(client1) # Welcome message client1.send("user1\n".encode('ascii')) _ = self.receive_message(client1) # Room info # Second client client2 = self.create_client() _ = self.receive_message(client2) # Welcome message client2.send("user2\n".encode('ascii')) _ = self.receive_message(client2) # Room info # User1 should see User2's join message join_msg = self.receive_message(client1) self.assertTrue("user2" in join_msg) # Send message from user1 test_message = "Hello, everyone!" client1.send(f"{test_message}\n".encode('ascii')) # User2 should receive the message received = self.receive_message(client2) self.assertEqual(f"[user1] {test_message}", received) client1.close() client2.close() def test_multiple_clients(self): """Test that the server can handle multiple clients""" clients = [] usernames = [f"user{i}" for i in range(10)] # Connect 10 clients for username in usernames: client = self.create_client() _ = self.receive_message(client) # Welcome message client.send(f"{username}\n".encode('ascii')) _ = self.receive_message(client) # Room info clients.append(client) # time.sleep(0.1) # Small delay to prevent race conditions # Send a message from the last client test_message = "Hello from last user!" clients[-1].send(f"{test_message}\n".encode('ascii')) # Check that all other clients received the message expected = f"[{usernames[-1]}] {test_message}" for i in range(len(clients)-1): received = self.receive_message(clients[i]) print(f"Received: {received}") self.assertEqual(expected, received) # Cleanup for client in clients: client.close() def test_user_departure(self): """Test that user departure is announced""" client1 = self.create_client() _ = self.receive_message(client1) client1.send("user1\n".encode('ascii')) _ = self.receive_message(client1) client2 = self.create_client() _ = self.receive_message(client2) client2.send("user2\n".encode('ascii')) _ = self.receive_message(client2) # Close client2 and check if client1 receives departure message client2.close() departure_msg = self.receive_message(client1) print(departure_msg) self.assertTrue("user2" in departure_msg) self.assertTrue(departure_msg.startswith("*")) client1.close() def list_tests(): """List all available tests""" print("\nAvailable tests:") test_methods = [method for method in dir(TestBudgetChat) if method.startswith('test_')] for i, test in enumerate(test_methods, 1): print(f"{i}. {test}") print("\n0. Run all tests") def run_selected_test(test_number): """Run a specific test based on user selection""" suite = unittest.TestSuite() test_methods = [method for method in dir(TestBudgetChat) if method.startswith('test_')] if test_number == 0: # Run all tests suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestBudgetChat)) else: try: selected_test = test_methods[test_number - 1] suite.addTest(TestBudgetChat(selected_test)) except IndexError: print("Invalid test number!") return runner = unittest.TextTestRunner(verbosity=2) runner.run(suite) if __name__ == '__main__': while True: list_tests() try: choice = input("\nEnter test number (or 'q' to quit): ") if choice.lower() == 'q': break test_number = int(choice) run_selected_test(test_number) except ValueError: print("Please enter a valid number!") input("\nPress Enter to continue...")