// // Created by Ajurna on 28/07/2025. // #include "server02.h" #include #include #include "data.h" #include int main() { SOCKET server = get_listen_socket(); SOCKADDR_IN clientAddr; SOCKET client; int clientAddrSize = sizeof(clientAddr); int connection_number = 1; printf("Listening for incoming connections...\n"); while((client = accept(server, (SOCKADDR *)&clientAddr, &clientAddrSize)) != INVALID_SOCKET) { handle_args_t *args = malloc(sizeof(handle_args_t)); args->client = client; args->connection = connection_number++; pthread_t thread; pthread_create(&thread, nullptr, handle_connection, args); } return 0; } void *handle_connection(void *args) { handle_args_t *handleArgs = args; uint8_t buffer[1024] = {0}; int bytesReceived; byte_array_t *data = byte_array_create(1024); price_array_t *prices = price_array_create(1024); while ((bytesReceived = recv(handleArgs->client, buffer, sizeof(buffer), 0)) > 0) { printf("{%d} Client sent: |%d| \n", handleArgs->connection, bytesReceived); byte_array_append(data, buffer, bytesReceived); // byte_array_print(data); uint8_t *raw_request; while ((raw_request = byte_array_get_bytes(data, 9)) != NULL) { //parse_request(handleArgs, request, data); query_or_insert_t *request = fix_message(raw_request); switch (request->insert.type) { case INSERT: // printf("{%d} timestamp: %d, price: %d\n", handleArgs->connection, request->insert.timestamp, request->insert.price); price_array_append(prices, &request->insert); break; case QUERY: printf("{%d} Query min: %d, max: %d\n", handleArgs->connection, request->query.mintime, request->query.maxtime); int average = price_array_query(prices, &request->query); printf("{%d} Query min: %d, max: %d avg %d\n", handleArgs->connection, request->query.mintime, request->query.maxtime, average); int32_t response = htonl(average); send(handleArgs->client, (char*)&response, sizeof(response), 0); break; default: closesocket(handleArgs->client); free(handleArgs); pthread_exit(NULL); exit(3); } } memset(buffer, 0, sizeof(buffer)); } closesocket(handleArgs->client); free(handleArgs); pthread_exit(NULL); } query_or_insert_t *fix_message(const uint8_t *message) { if (message == NULL) { return NULL; } query_or_insert_t *ret = malloc(sizeof(query_or_insert_t)); ret->insert.type = (char)message[0]; ret->query.mintime = message[4] | (message[3] << 8) | (message[2] << 16) | (message[1] << 24); ret->query.maxtime = message[8] | (message[7] << 8) | (message[6] << 16) | (message[5] << 24); return ret; } price_array_t *price_array_create(int capacity) { price_array_t *array = malloc(sizeof(price_array_t)); array->capacity = capacity; array->size = 0; array->data = calloc(capacity, sizeof(price_t)); return array; }; void price_array_free(price_array_t *array) { free(array->data); free(array); }; void price_array_append(price_array_t *array, insert_t *value) { if (array->data == NULL) { exit(1); } size_t new_size = array->size + 1; if (new_size > array->capacity) { array->capacity = array->capacity+1024; price_t *new_array = realloc(array->data, array->capacity * sizeof(price_t)); array->data = new_array; if (array->data == NULL) { printf("Failed to allocate memory for array\n"); exit(1); } printf("array resized to %llu\n", array->capacity); } array->data[array->size].timestamp = value->timestamp; array->data[array->size].price = value->price; array->size = new_size; }; int price_array_query(price_array_t *array, query_t *query) { int count = 0; long long total = 0; for (int i = 0; i < array->size; i++) { if (array->data[i].timestamp >= query->mintime && array->data[i].timestamp <= query->maxtime) { count++; total += array->data[i].price; } } printf("{%d} Found %d prices\n", query->mintime, count); int ret; if (count == 0) { ret = 0; } else { ret = total/count; } return ret; };