136 lines
5.0 KiB
Python
136 lines
5.0 KiB
Python
import socket
|
|
import threading
|
|
import time
|
|
import unittest
|
|
|
|
class TestBudgetChat(unittest.TestCase):
|
|
SERVER_HOST = 'localhost'
|
|
SERVER_PORT = 40000
|
|
|
|
def create_client(self):
|
|
"""Helper function to create a new client socket"""
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.connect((self.SERVER_HOST, self.SERVER_PORT))
|
|
return sock
|
|
|
|
def receive_message(self, sock, timeout=2):
|
|
"""Helper function to receive a message with timeout"""
|
|
sock.settimeout(timeout)
|
|
try:
|
|
return sock.recv(1024).decode('ascii').strip()
|
|
except socket.timeout:
|
|
return None
|
|
|
|
def test_initial_connection(self):
|
|
"""Test the initial connection and welcome message"""
|
|
client = self.create_client()
|
|
welcome_msg = self.receive_message(client)
|
|
self.assertIsNotNone(welcome_msg)
|
|
self.assertTrue("Welcome" in welcome_msg)
|
|
client.close()
|
|
|
|
def test_invalid_username(self):
|
|
"""Test that invalid usernames are rejected"""
|
|
invalid_names = ["", "user@name", "user name", "!invalid"]
|
|
|
|
for name in invalid_names:
|
|
with self.subTest(name=name):
|
|
client = self.create_client()
|
|
_ = self.receive_message(client) # Welcome message
|
|
client.send(f"{name}\n".encode('ascii'))
|
|
# Server should disconnect us
|
|
response = self.receive_message(client)
|
|
with self.assertRaises((ConnectionResetError, ConnectionAbortedError, socket.error)):
|
|
client.send("test\n".encode('ascii'))
|
|
client.close()
|
|
|
|
def test_valid_username(self):
|
|
"""Test that valid usernames are accepted"""
|
|
client = self.create_client()
|
|
_ = self.receive_message(client) # Welcome message
|
|
client.send("validuser123\n".encode('ascii'))
|
|
room_info = self.receive_message(client)
|
|
self.assertIsNotNone(room_info)
|
|
self.assertTrue(room_info.startswith("*"))
|
|
client.close()
|
|
|
|
def test_chat_message_broadcast(self):
|
|
"""Test that chat messages are broadcast to other users"""
|
|
# First client
|
|
client1 = self.create_client()
|
|
_ = self.receive_message(client1) # Welcome message
|
|
client1.send("user1\n".encode('ascii'))
|
|
_ = self.receive_message(client1) # Room info
|
|
|
|
# Second client
|
|
client2 = self.create_client()
|
|
_ = self.receive_message(client2) # Welcome message
|
|
client2.send("user2\n".encode('ascii'))
|
|
_ = self.receive_message(client2) # Room info
|
|
|
|
# User1 should see User2's join message
|
|
join_msg = self.receive_message(client1)
|
|
self.assertTrue("user2" in join_msg)
|
|
|
|
# Send message from user1
|
|
test_message = "Hello, everyone!"
|
|
client1.send(f"{test_message}\n".encode('ascii'))
|
|
|
|
# User2 should receive the message
|
|
received = self.receive_message(client2)
|
|
self.assertEqual(f"[user1] {test_message}", received)
|
|
|
|
client1.close()
|
|
client2.close()
|
|
|
|
def test_multiple_clients(self):
|
|
"""Test that the server can handle multiple clients"""
|
|
clients = []
|
|
usernames = [f"user{i}" for i in range(10)]
|
|
|
|
# Connect 10 clients
|
|
for username in usernames:
|
|
client = self.create_client()
|
|
_ = self.receive_message(client) # Welcome message
|
|
client.send(f"{username}\n".encode('ascii'))
|
|
_ = self.receive_message(client) # Room info
|
|
clients.append(client)
|
|
time.sleep(0.1) # Small delay to prevent race conditions
|
|
|
|
# Send a message from the last client
|
|
test_message = "Hello from last user!"
|
|
clients[-1].send(f"{test_message}\n".encode('ascii'))
|
|
|
|
# Check that all other clients received the message
|
|
expected = f"[{usernames[-1]}] {test_message}"
|
|
for i in range(len(clients)-1):
|
|
received = self.receive_message(clients[i])
|
|
self.assertEqual(expected, received)
|
|
|
|
# Cleanup
|
|
for client in clients:
|
|
client.close()
|
|
|
|
def test_user_departure(self):
|
|
"""Test that user departure is announced"""
|
|
client1 = self.create_client()
|
|
_ = self.receive_message(client1)
|
|
client1.send("user1\n".encode('ascii'))
|
|
_ = self.receive_message(client1)
|
|
|
|
client2 = self.create_client()
|
|
_ = self.receive_message(client2)
|
|
client2.send("user2\n".encode('ascii'))
|
|
_ = self.receive_message(client2)
|
|
|
|
# Close client2 and check if client1 receives departure message
|
|
client2.close()
|
|
departure_msg = self.receive_message(client1)
|
|
self.assertTrue("user2" in departure_msg)
|
|
self.assertTrue(departure_msg.startswith("*"))
|
|
|
|
client1.close()
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|