diff --git a/.idea/dictionaries/project.xml b/.idea/dictionaries/project.xml new file mode 100644 index 0000000..d602d26 --- /dev/null +++ b/.idea/dictionaries/project.xml @@ -0,0 +1,7 @@ + + + + mintime + + + \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 980bd4e..b554a4b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,4 +21,8 @@ add_executable(server01 server01.c server01.h ${COMMON_SOURCES}) -target_link_libraries(server01 PRIVATE wsock32 ws2_32 json-c::json-c) \ No newline at end of file +target_link_libraries(server01 PRIVATE wsock32 ws2_32 json-c::json-c) + + +add_executable(server02 server02.c server02.h ${COMMON_SOURCES}) +target_link_libraries(server02 wsock32 ws2_32) \ No newline at end of file diff --git a/data.c b/data.c index 5e6caae..9de6e52 100644 --- a/data.c +++ b/data.c @@ -10,9 +10,7 @@ SOCKET get_listen_socket() { WSADATA WSAData; - SOCKET client; - - SOCKADDR_IN serverAddr, clientAddr; + SOCKADDR_IN serverAddr; WSAStartup(MAKEWORD(2,0), &WSAData); SOCKET server = socket(AF_INET, SOCK_STREAM, 0); @@ -89,13 +87,27 @@ char *char_array_get_until_char(char_array_t *array, char c) { char *ret = malloc(idx+1); memcpy(ret, array->data, idx); ret[idx] = '\0'; - char *temp = calloc(array->capacity, sizeof(char)); - size_t new_i = 0; - for (size_t i = idx+1; i < array->size; i++) { - temp[new_i++] = array->data[i]; - } - free(array->data); - array->data = temp; - array->size = new_i; + char_array_shift_bytes(array, idx+1); return ret; -}; \ No newline at end of file +}; + +char *char_array_get_bytes(char_array_t *array, size_t length) { + if (length < array->size) { + return NULL; + } + + char *ret = malloc(length); + memcpy(ret, array->data, length); + char_array_shift_bytes(array, length); + return ret; +} +void char_array_shift_bytes(char_array_t *array, size_t length) { + if (length >= array->size) { + array->size = 0; + return; + } + + memmove(array->data, array->data + length, array->size - length); + array->size -= length; + +} \ No newline at end of file diff --git a/data.h b/data.h index e005199..81e5a42 100644 --- a/data.h +++ b/data.h @@ -14,5 +14,7 @@ void char_array_print(const char_array_t *array); void char_array_wipe(char_array_t *array); bool char_array_has_char(char_array_t *array, char c); char *char_array_get_until_char(char_array_t *array, char c); +char *char_array_get_bytes(char_array_t *array, size_t length); +void char_array_shift_bytes(char_array_t *array, size_t length); SOCKET get_listen_socket(); \ No newline at end of file diff --git a/server01.c b/server01.c index 103d2c1..706b7c3 100644 --- a/server01.c +++ b/server01.c @@ -49,6 +49,7 @@ void *handle_connection(void *args) { } memset(buffer, 0, sizeof(buffer)); } + closesocket(handleArgs->client); free(handleArgs); pthread_exit(NULL); } diff --git a/server02.c b/server02.c new file mode 100644 index 0000000..142bafc --- /dev/null +++ b/server02.c @@ -0,0 +1,127 @@ +// +// Created by Ajurna on 28/07/2025. +// + +#include "server02.h" +#include +#include +#include "data.h" +#include + +int main() { + SOCKET server = get_listen_socket(); + SOCKADDR_IN clientAddr; + SOCKET client; + int clientAddrSize = sizeof(clientAddr); + int connection_number = 1; + printf("Listening for incoming connections...\n"); + while((client = accept(server, (SOCKADDR *)&clientAddr, &clientAddrSize)) != INVALID_SOCKET) + { + handle_args_t *args = malloc(sizeof(handle_args_t)); + args->client = client; + args->connection = connection_number++; + pthread_t thread; + pthread_create(&thread, nullptr, handle_connection, args); + } + return 0; +} +void *handle_connection(void *args) { + handle_args_t *handleArgs = args; + char buffer[1024] = {0}; + int bytesReceived; + char_array_t *data = char_array_create(1024); + price_array_t *prices = price_array_create(1024); + while ((bytesReceived = recv(handleArgs->client, buffer, sizeof(buffer), 0)) > 0) { + printf("Client sent {%d}: |%d| \n", handleArgs->connection, bytesReceived); + char_array_append(data, buffer, bytesReceived); + query_or_insert_t *request; + while ((request = fix_message(char_array_get_bytes(data, 9))) != NULL) { + //parse_request(handleArgs, request, data); + switch (request->insert.type) { + case INSERT: + printf("{%d} timestamp: %d, price: %d\n", handleArgs->connection, request->insert.timestamp, request->insert.price); + price_array_append(prices, &request->insert); + break; + case QUERY: + printf("{%d} Query min: %d, max: %d\n", handleArgs->connection, request->query.mintime, request->query.maxtime); + int average = price_array_query(prices, &request->query); + printf("{%d} Query min: %d, max: %d avg %d\n", handleArgs->connection, request->query.mintime, request->query.maxtime, average); + int32_t response = htonl(average); + send(handleArgs->client, (char*)&response, sizeof(response), 0); + default: + closesocket(handleArgs->client); + free(handleArgs); + pthread_exit(NULL); + exit(3); + } + // free(request); + } + memset(buffer, 0, sizeof(buffer)); + } + + closesocket(handleArgs->client); + free(handleArgs); + pthread_exit(NULL); +} + +query_or_insert_t *fix_message(char *message) { + if (message == NULL) { + return NULL; + } + query_or_insert_t *ret = malloc(sizeof(query_or_insert_t)); + ret->insert.type = message[0]; + ret->query.mintime = message[4] | (message[3] << 8) | (message[2] << 16) | (message[1] << 24); + ret->query.maxtime = message[8] | (message[7] << 8) | (message[6] << 16) | (message[5] << 24); + return ret; +} + +price_array_t *price_array_create(int capacity) { + price_array_t *array = malloc(sizeof(price_array_t)); + array->capacity = capacity; + array->size = 0; + array->data = calloc(capacity, sizeof(price_t)); + return array; +}; +void price_array_free(price_array_t *array) { + free(array->data); + free(array); +}; +void price_array_append(price_array_t *array, insert_t *value) { + if (array->data == NULL) { + exit(1); + } + size_t new_size = array->size + 1; + if (new_size > array->capacity) { + array->capacity = array->capacity+1024; + price_t *new_array = realloc(array->data, array->capacity); + array->data = new_array; + if (array->data == NULL) { + printf("Failed to allocate memory for array\n"); + exit(1); + } + } + price_t *temp = malloc(sizeof(price_t)); + temp->timestamp = value->timestamp; + temp->price = value->price; + array->data[array->size] = *temp; + array->size = new_size; +}; +int price_array_query(price_array_t *array, query_t *query) { + + int count = 0; + int total = 0; + for (int i = 0; i < array->size; i++) { + if (array->data[i].timestamp >= query->mintime && array->data[i].timestamp <= query->maxtime) { + count++; + total += array->data[i].price; + } + } + printf("{%d} Found %d prices\n", query->mintime, count); + int ret; + if (count == 0) { + ret = 0; + } else { + ret = total/count; + } + return ret; +}; \ No newline at end of file diff --git a/server02.h b/server02.h new file mode 100644 index 0000000..2d80487 --- /dev/null +++ b/server02.h @@ -0,0 +1,56 @@ +// +// Created by Ajurna on 28/07/2025. +// + +#ifndef SERVER02_H +#define SERVER02_H +#include + +#endif //SERVER02_H + +#include +typedef struct handleArgs { + int connection; + SOCKET client; +} handle_args_t; + +typedef enum MessageType { + QUERY = 'Q', + INSERT = 'I' +} message_type_t; + +typedef struct Insert { + char type; + int32_t timestamp; + int32_t price; +} insert_t; + +typedef struct Query { + char type; + int32_t mintime; + int32_t maxtime; +} query_t; + +typedef union QueryOrInsert { + query_t query; + insert_t insert; +} query_or_insert_t; + +typedef struct Price { + int32_t timestamp; + int32_t price; +} price_t; + +typedef struct PriceArray { + size_t size; + size_t capacity; + price_t *data; +} price_array_t; + +void *handle_connection(void *args); +query_or_insert_t *fix_message(char *message); + +price_array_t *price_array_create(int capacity); +void price_array_free(price_array_t *array); +void price_array_append(price_array_t *array, insert_t *value); +int price_array_query(price_array_t *array, query_t *query); \ No newline at end of file diff --git a/server02_socket_communication_test.py b/server02_socket_communication_test.py new file mode 100644 index 0000000..9c7a647 --- /dev/null +++ b/server02_socket_communication_test.py @@ -0,0 +1,66 @@ +import socket +import struct +from time import sleep + +def create_insert_message(timestamp: int, price: int) -> bytes: + """Create a 9-byte insert message.""" + return struct.pack('!cii', b'I', timestamp, price) + +def create_query_message(mintime: int, maxtime: int) -> bytes: + """Create a 9-byte query message.""" + return struct.pack('!cii', b'Q', mintime, maxtime) + +def receive_response(sock: socket.socket) -> int: + """Receive and decode a 4-byte response.""" + data = sock.recv(4) + return struct.unpack('!i', data)[0] + +def run_test(): + # Connect to the server + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('localhost', 40000)) # Using the same port as your other servers + + # Test case 1: Insert some values + test_data = [ + (12345, 101), # (timestamp, price) + (12346, 102), + (12347, 100), + (40960, 5), + ] + + print("Inserting test data...") + for timestamp, price in test_data: + message = create_insert_message(timestamp, price) + sock.send(message) + print(f"Inserted: timestamp={timestamp}, price={price}") + sleep(0.1) # Small delay to avoid overwhelming the server + + # Test case 2: Query for mean price + mintime = 12288 + maxtime = 16384 + print(f"\nQuerying mean price between {mintime} and {maxtime}...") + + query = create_query_message(mintime, maxtime) + sock.send(query) + + # Get response + mean_price = receive_response(sock) + print(f"Received mean price: {mean_price}") + + # Test case 3: Query with no data in range + print("\nTesting query with no data in range...") + query = create_query_message(1, 1000) + sock.send(query) + mean_price = receive_response(sock) + print(f"Received mean price (should be 0): {mean_price}") + + # Clean up + sock.close() + +if __name__ == "__main__": + try: + run_test() + except ConnectionRefusedError: + print("Could not connect to server. Make sure it's running on port 40000.") + except Exception as e: + print(f"An error occurred: {e}")