Added multithreaded server02 for handling query and insert requests, utility functions for byte operations in data.c, updated tests, and extended project configuration.
This commit is contained in:
7
.idea/dictionaries/project.xml
generated
Normal file
7
.idea/dictionaries/project.xml
generated
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
<component name="ProjectDictionaryState">
|
||||||
|
<dictionary name="project">
|
||||||
|
<words>
|
||||||
|
<w>mintime</w>
|
||||||
|
</words>
|
||||||
|
</dictionary>
|
||||||
|
</component>
|
||||||
@@ -22,3 +22,7 @@ add_executable(server01 server01.c
|
|||||||
${COMMON_SOURCES})
|
${COMMON_SOURCES})
|
||||||
|
|
||||||
target_link_libraries(server01 PRIVATE wsock32 ws2_32 json-c::json-c)
|
target_link_libraries(server01 PRIVATE wsock32 ws2_32 json-c::json-c)
|
||||||
|
|
||||||
|
|
||||||
|
add_executable(server02 server02.c server02.h ${COMMON_SOURCES})
|
||||||
|
target_link_libraries(server02 wsock32 ws2_32)
|
||||||
34
data.c
34
data.c
@@ -10,9 +10,7 @@
|
|||||||
SOCKET get_listen_socket() {
|
SOCKET get_listen_socket() {
|
||||||
WSADATA WSAData;
|
WSADATA WSAData;
|
||||||
|
|
||||||
SOCKET client;
|
SOCKADDR_IN serverAddr;
|
||||||
|
|
||||||
SOCKADDR_IN serverAddr, clientAddr;
|
|
||||||
|
|
||||||
WSAStartup(MAKEWORD(2,0), &WSAData);
|
WSAStartup(MAKEWORD(2,0), &WSAData);
|
||||||
SOCKET server = socket(AF_INET, SOCK_STREAM, 0);
|
SOCKET server = socket(AF_INET, SOCK_STREAM, 0);
|
||||||
@@ -89,13 +87,27 @@ char *char_array_get_until_char(char_array_t *array, char c) {
|
|||||||
char *ret = malloc(idx+1);
|
char *ret = malloc(idx+1);
|
||||||
memcpy(ret, array->data, idx);
|
memcpy(ret, array->data, idx);
|
||||||
ret[idx] = '\0';
|
ret[idx] = '\0';
|
||||||
char *temp = calloc(array->capacity, sizeof(char));
|
char_array_shift_bytes(array, idx+1);
|
||||||
size_t new_i = 0;
|
|
||||||
for (size_t i = idx+1; i < array->size; i++) {
|
|
||||||
temp[new_i++] = array->data[i];
|
|
||||||
}
|
|
||||||
free(array->data);
|
|
||||||
array->data = temp;
|
|
||||||
array->size = new_i;
|
|
||||||
return ret;
|
return ret;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
char *char_array_get_bytes(char_array_t *array, size_t length) {
|
||||||
|
if (length < array->size) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *ret = malloc(length);
|
||||||
|
memcpy(ret, array->data, length);
|
||||||
|
char_array_shift_bytes(array, length);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
void char_array_shift_bytes(char_array_t *array, size_t length) {
|
||||||
|
if (length >= array->size) {
|
||||||
|
array->size = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
memmove(array->data, array->data + length, array->size - length);
|
||||||
|
array->size -= length;
|
||||||
|
|
||||||
|
}
|
||||||
2
data.h
2
data.h
@@ -14,5 +14,7 @@ void char_array_print(const char_array_t *array);
|
|||||||
void char_array_wipe(char_array_t *array);
|
void char_array_wipe(char_array_t *array);
|
||||||
bool char_array_has_char(char_array_t *array, char c);
|
bool char_array_has_char(char_array_t *array, char c);
|
||||||
char *char_array_get_until_char(char_array_t *array, char c);
|
char *char_array_get_until_char(char_array_t *array, char c);
|
||||||
|
char *char_array_get_bytes(char_array_t *array, size_t length);
|
||||||
|
void char_array_shift_bytes(char_array_t *array, size_t length);
|
||||||
|
|
||||||
SOCKET get_listen_socket();
|
SOCKET get_listen_socket();
|
||||||
@@ -49,6 +49,7 @@ void *handle_connection(void *args) {
|
|||||||
}
|
}
|
||||||
memset(buffer, 0, sizeof(buffer));
|
memset(buffer, 0, sizeof(buffer));
|
||||||
}
|
}
|
||||||
|
closesocket(handleArgs->client);
|
||||||
free(handleArgs);
|
free(handleArgs);
|
||||||
pthread_exit(NULL);
|
pthread_exit(NULL);
|
||||||
}
|
}
|
||||||
|
|||||||
127
server02.c
Normal file
127
server02.c
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
//
|
||||||
|
// Created by Ajurna on 28/07/2025.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include "server02.h"
|
||||||
|
#include <winsock2.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "data.h"
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
SOCKET server = get_listen_socket();
|
||||||
|
SOCKADDR_IN clientAddr;
|
||||||
|
SOCKET client;
|
||||||
|
int clientAddrSize = sizeof(clientAddr);
|
||||||
|
int connection_number = 1;
|
||||||
|
printf("Listening for incoming connections...\n");
|
||||||
|
while((client = accept(server, (SOCKADDR *)&clientAddr, &clientAddrSize)) != INVALID_SOCKET)
|
||||||
|
{
|
||||||
|
handle_args_t *args = malloc(sizeof(handle_args_t));
|
||||||
|
args->client = client;
|
||||||
|
args->connection = connection_number++;
|
||||||
|
pthread_t thread;
|
||||||
|
pthread_create(&thread, nullptr, handle_connection, args);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
void *handle_connection(void *args) {
|
||||||
|
handle_args_t *handleArgs = args;
|
||||||
|
char buffer[1024] = {0};
|
||||||
|
int bytesReceived;
|
||||||
|
char_array_t *data = char_array_create(1024);
|
||||||
|
price_array_t *prices = price_array_create(1024);
|
||||||
|
while ((bytesReceived = recv(handleArgs->client, buffer, sizeof(buffer), 0)) > 0) {
|
||||||
|
printf("Client sent {%d}: |%d| \n", handleArgs->connection, bytesReceived);
|
||||||
|
char_array_append(data, buffer, bytesReceived);
|
||||||
|
query_or_insert_t *request;
|
||||||
|
while ((request = fix_message(char_array_get_bytes(data, 9))) != NULL) {
|
||||||
|
//parse_request(handleArgs, request, data);
|
||||||
|
switch (request->insert.type) {
|
||||||
|
case INSERT:
|
||||||
|
printf("{%d} timestamp: %d, price: %d\n", handleArgs->connection, request->insert.timestamp, request->insert.price);
|
||||||
|
price_array_append(prices, &request->insert);
|
||||||
|
break;
|
||||||
|
case QUERY:
|
||||||
|
printf("{%d} Query min: %d, max: %d\n", handleArgs->connection, request->query.mintime, request->query.maxtime);
|
||||||
|
int average = price_array_query(prices, &request->query);
|
||||||
|
printf("{%d} Query min: %d, max: %d avg %d\n", handleArgs->connection, request->query.mintime, request->query.maxtime, average);
|
||||||
|
int32_t response = htonl(average);
|
||||||
|
send(handleArgs->client, (char*)&response, sizeof(response), 0);
|
||||||
|
default:
|
||||||
|
closesocket(handleArgs->client);
|
||||||
|
free(handleArgs);
|
||||||
|
pthread_exit(NULL);
|
||||||
|
exit(3);
|
||||||
|
}
|
||||||
|
// free(request);
|
||||||
|
}
|
||||||
|
memset(buffer, 0, sizeof(buffer));
|
||||||
|
}
|
||||||
|
|
||||||
|
closesocket(handleArgs->client);
|
||||||
|
free(handleArgs);
|
||||||
|
pthread_exit(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
query_or_insert_t *fix_message(char *message) {
|
||||||
|
if (message == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
query_or_insert_t *ret = malloc(sizeof(query_or_insert_t));
|
||||||
|
ret->insert.type = message[0];
|
||||||
|
ret->query.mintime = message[4] | (message[3] << 8) | (message[2] << 16) | (message[1] << 24);
|
||||||
|
ret->query.maxtime = message[8] | (message[7] << 8) | (message[6] << 16) | (message[5] << 24);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
price_array_t *price_array_create(int capacity) {
|
||||||
|
price_array_t *array = malloc(sizeof(price_array_t));
|
||||||
|
array->capacity = capacity;
|
||||||
|
array->size = 0;
|
||||||
|
array->data = calloc(capacity, sizeof(price_t));
|
||||||
|
return array;
|
||||||
|
};
|
||||||
|
void price_array_free(price_array_t *array) {
|
||||||
|
free(array->data);
|
||||||
|
free(array);
|
||||||
|
};
|
||||||
|
void price_array_append(price_array_t *array, insert_t *value) {
|
||||||
|
if (array->data == NULL) {
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
size_t new_size = array->size + 1;
|
||||||
|
if (new_size > array->capacity) {
|
||||||
|
array->capacity = array->capacity+1024;
|
||||||
|
price_t *new_array = realloc(array->data, array->capacity);
|
||||||
|
array->data = new_array;
|
||||||
|
if (array->data == NULL) {
|
||||||
|
printf("Failed to allocate memory for array\n");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
price_t *temp = malloc(sizeof(price_t));
|
||||||
|
temp->timestamp = value->timestamp;
|
||||||
|
temp->price = value->price;
|
||||||
|
array->data[array->size] = *temp;
|
||||||
|
array->size = new_size;
|
||||||
|
};
|
||||||
|
int price_array_query(price_array_t *array, query_t *query) {
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
int total = 0;
|
||||||
|
for (int i = 0; i < array->size; i++) {
|
||||||
|
if (array->data[i].timestamp >= query->mintime && array->data[i].timestamp <= query->maxtime) {
|
||||||
|
count++;
|
||||||
|
total += array->data[i].price;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
printf("{%d} Found %d prices\n", query->mintime, count);
|
||||||
|
int ret;
|
||||||
|
if (count == 0) {
|
||||||
|
ret = 0;
|
||||||
|
} else {
|
||||||
|
ret = total/count;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
};
|
||||||
56
server02.h
Normal file
56
server02.h
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
//
|
||||||
|
// Created by Ajurna on 28/07/2025.
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef SERVER02_H
|
||||||
|
#define SERVER02_H
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
#endif //SERVER02_H
|
||||||
|
|
||||||
|
#include <winsock2.h>
|
||||||
|
typedef struct handleArgs {
|
||||||
|
int connection;
|
||||||
|
SOCKET client;
|
||||||
|
} handle_args_t;
|
||||||
|
|
||||||
|
typedef enum MessageType {
|
||||||
|
QUERY = 'Q',
|
||||||
|
INSERT = 'I'
|
||||||
|
} message_type_t;
|
||||||
|
|
||||||
|
typedef struct Insert {
|
||||||
|
char type;
|
||||||
|
int32_t timestamp;
|
||||||
|
int32_t price;
|
||||||
|
} insert_t;
|
||||||
|
|
||||||
|
typedef struct Query {
|
||||||
|
char type;
|
||||||
|
int32_t mintime;
|
||||||
|
int32_t maxtime;
|
||||||
|
} query_t;
|
||||||
|
|
||||||
|
typedef union QueryOrInsert {
|
||||||
|
query_t query;
|
||||||
|
insert_t insert;
|
||||||
|
} query_or_insert_t;
|
||||||
|
|
||||||
|
typedef struct Price {
|
||||||
|
int32_t timestamp;
|
||||||
|
int32_t price;
|
||||||
|
} price_t;
|
||||||
|
|
||||||
|
typedef struct PriceArray {
|
||||||
|
size_t size;
|
||||||
|
size_t capacity;
|
||||||
|
price_t *data;
|
||||||
|
} price_array_t;
|
||||||
|
|
||||||
|
void *handle_connection(void *args);
|
||||||
|
query_or_insert_t *fix_message(char *message);
|
||||||
|
|
||||||
|
price_array_t *price_array_create(int capacity);
|
||||||
|
void price_array_free(price_array_t *array);
|
||||||
|
void price_array_append(price_array_t *array, insert_t *value);
|
||||||
|
int price_array_query(price_array_t *array, query_t *query);
|
||||||
66
server02_socket_communication_test.py
Normal file
66
server02_socket_communication_test.py
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
import socket
|
||||||
|
import struct
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
def create_insert_message(timestamp: int, price: int) -> bytes:
|
||||||
|
"""Create a 9-byte insert message."""
|
||||||
|
return struct.pack('!cii', b'I', timestamp, price)
|
||||||
|
|
||||||
|
def create_query_message(mintime: int, maxtime: int) -> bytes:
|
||||||
|
"""Create a 9-byte query message."""
|
||||||
|
return struct.pack('!cii', b'Q', mintime, maxtime)
|
||||||
|
|
||||||
|
def receive_response(sock: socket.socket) -> int:
|
||||||
|
"""Receive and decode a 4-byte response."""
|
||||||
|
data = sock.recv(4)
|
||||||
|
return struct.unpack('!i', data)[0]
|
||||||
|
|
||||||
|
def run_test():
|
||||||
|
# Connect to the server
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.connect(('localhost', 40000)) # Using the same port as your other servers
|
||||||
|
|
||||||
|
# Test case 1: Insert some values
|
||||||
|
test_data = [
|
||||||
|
(12345, 101), # (timestamp, price)
|
||||||
|
(12346, 102),
|
||||||
|
(12347, 100),
|
||||||
|
(40960, 5),
|
||||||
|
]
|
||||||
|
|
||||||
|
print("Inserting test data...")
|
||||||
|
for timestamp, price in test_data:
|
||||||
|
message = create_insert_message(timestamp, price)
|
||||||
|
sock.send(message)
|
||||||
|
print(f"Inserted: timestamp={timestamp}, price={price}")
|
||||||
|
sleep(0.1) # Small delay to avoid overwhelming the server
|
||||||
|
|
||||||
|
# Test case 2: Query for mean price
|
||||||
|
mintime = 12288
|
||||||
|
maxtime = 16384
|
||||||
|
print(f"\nQuerying mean price between {mintime} and {maxtime}...")
|
||||||
|
|
||||||
|
query = create_query_message(mintime, maxtime)
|
||||||
|
sock.send(query)
|
||||||
|
|
||||||
|
# Get response
|
||||||
|
mean_price = receive_response(sock)
|
||||||
|
print(f"Received mean price: {mean_price}")
|
||||||
|
|
||||||
|
# Test case 3: Query with no data in range
|
||||||
|
print("\nTesting query with no data in range...")
|
||||||
|
query = create_query_message(1, 1000)
|
||||||
|
sock.send(query)
|
||||||
|
mean_price = receive_response(sock)
|
||||||
|
print(f"Received mean price (should be 0): {mean_price}")
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
run_test()
|
||||||
|
except ConnectionRefusedError:
|
||||||
|
print("Could not connect to server. Make sure it's running on port 40000.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"An error occurred: {e}")
|
||||||
Reference in New Issue
Block a user