From b893a483b5e6f82262ccf3e214ad5bc515406764 Mon Sep 17 00:00:00 2001 From: elvis Date: Wed, 16 Mar 2022 23:44:53 +0100 Subject: [PATCH] Added worker logic and started apiFile --- config.ini | 1 + src/server.c | 187 +++++++++++++++++++++++++-------------- src/serverWorker.c | 212 ++++++++++++++++++++++++++++++++++----------- 3 files changed, 282 insertions(+), 118 deletions(-) diff --git a/config.ini b/config.ini index b7f2f21..024c6c7 100644 --- a/config.ini +++ b/config.ini @@ -1,6 +1,7 @@ [threadpool] quantity = 10 +pending = 20 [files] diff --git a/src/server.c b/src/server.c index f34cddc..19c55c0 100644 --- a/src/server.c +++ b/src/server.c @@ -31,31 +31,7 @@ typedef struct { // funzione eseguita dal signal handler thread -static void *sigHandler(void *arg) { - sigset_t *set = ((sigHandler_t*)arg)->set; - int fd_pipe = ((sigHandler_t*)arg)->signal_pipe; - - while(1) { - int sig; - int r = sigwait(set, &sig); - if (r != 0) { - errno = r; - // TODO logging utility - perror("FATAL ERROR 'sigwait'"); - return NULL; - } - - switch(sig) { - case SIGINT: - case SIGTERM: - case SIGQUIT: - close(fd_pipe); // notifico il listener thread della ricezione del segnale - return NULL; - default: ; - } - } - return NULL; -} +static void *sigHandler(void *arg); static void usage(const char *argv0) { // TODO change this @@ -82,6 +58,7 @@ int main(int argc, char *argv[]) { checkargs(argc, argv); ini_t *config = ini_load(argv[1]); int threadsInPool; CONFGETINT(threadsInPool, config, "threadpool", "quantity", NULL, 10); + int pendingSize; CONFGETINT(pendingSize, config, "threadpool", "pending", NULL, 10); int maxFiles; CONFGETINT(maxFiles, config, "files", "MaxFiles", NULL, 10); int maxSize; CONFGETINT(maxSize, config, "files", "MaxSize", NULL, 10); ini_free(config); @@ -119,6 +96,7 @@ int main(int argc, char *argv[]) { + // thread per la gestione delle interruzioni pthread_t sighandler_thread; sigHandler_t handlerArgs = { &mask, signal_pipe[1] }; if (pthread_create(&sighandler_thread, NULL, sigHandler, &handlerArgs) != 0) { @@ -150,14 +128,10 @@ int main(int argc, char *argv[]) { // creo la queue queueT *queue = createQueue(maxFiles, maxSize); - - - threadpool_t *pool = NULL; - pool = createThreadPool(threadsInPool, threadsInPool); + pool = createThreadPool(threadsInPool, pendingSize); if (!pool) { - // TODO logging utility fprintf(stderr, "ERROR creating thread pool\n"); goto _cleanup; } @@ -172,28 +146,13 @@ int main(int argc, char *argv[]) { // tengo traccia del file descriptor con id piu' grande int fdmax = (listenfd > signal_pipe[0]) ? listenfd : signal_pipe[0]; - serverStatus* status = malloc(sizeof(serverStatus)); - if (!status) { - // TODO logging utility - perror("ERROR FATAL malloc"); - goto _cleanup; - } - if (pthread_mutex_init(status->mtx, NULL) != 0) { - fprintf(stderr, "ERRORE: creazione mutex"); - goto _cleanup; - } - if (pthread_cond_init(status->cond, NULL) != 0) { - fprintf(stderr, "ERRORE: creazione condition variable"); - goto _cleanup; - } - status->max_space_occupied = 100000000; // in bytes - status->cur_space_occupied = 0; - status->max_files = 100; - status->cur_files = 0; - status->exiting = 0; - while(!status->exiting) { + volatile int quit = 0; + sig_atomic_t stopNewConnections = 0; // true to stop new connections + sig_atomic_t numberOfConnections = 0; + + while(!quit) { // copio il set nella variabile temporanea per la select tmpset = set; if (select(fdmax+1, &tmpset, NULL, NULL, NULL) == -1) { @@ -210,52 +169,102 @@ int main(int argc, char *argv[]) { goto _cleanup; } if (i == listenfd) { // e' una nuova richiesta di connessione + if(stopNewConnections) { // non vogliamo nuove connessioni + // TODO log + FD_CLR(i, &set); + close(i); + continue; + } + + // accetto la connessione nuova if ((*connfd = accept(listenfd, (struct sockaddr*)NULL ,NULL)) == -1) { - // TODO logging utility perror("accept"); goto _cleanup; } - void** args = NULL; - *args = malloc(2*sizeof(void*)); - if (!args) { - // TODO logging utility - perror("ERROR FATAL malloc"); - goto _cleanup; - } - args[0] = connfd; - args[1] = status; + // creo gli argomenti da passare al thread + void* args = NULL; + // aggiungo al threadpool int r = addToThreadPool(pool, threadF, args); if (r == 0) + numberOfConnections++; continue; // aggiunto con successo - if (r < 0)// errore interno + if (r < 0) // errore interno fprintf(stderr, "ERROR FATAL adding to the thread pool\n"); - // TODO logging utility else // coda dei pendenti piena fprintf(stderr, "ERROR SERVER TOO BUSY\n"); - // TODO logging utility free(args); close(*connfd); free(connfd); continue; } - if (i == signal_pipe[0]) { - // ricevuto un segnale, esco ed inizio il protocollo di terminazione - status->exiting = 1; + if (i == signal_pipe[0]) { // controllo se devo terminare + int code; + if (readn(signal_pipe[0], &code, sizeof(int)) == -1) { + perror("readn"); + break; + } + switch (code) { + case 0: { // stop alle connessioni + stopNewConnections = 1; + // TODO log + if (numberOfConnections == 0) { + quit = 1; + // pthread_cancel(st); // termino il signalThread + } + break; + } + case 1: { // stop immediato + quit = 1; + break; + } + default: + perror("ERROR codice inviato dal sigThread invalido.\n"); + break; + } break; } + else { // richiesta di un client già connesso + FD_CLR(i, &set); + + fdmax = (i>fdmax)?i:fdmax; + + // creo gli argomenti da passare al thread + + /* + int r = addToThreadPool(pool, serverThread, (void*) t); + if (r == 0) + continue; + if (r < 0) // errore interno + fprintf(stderr, "ERROR FATAL adding to the thread pool\n"); + else // coda dei pendenti piena + fprintf(stderr, "ERROR SERVER TOO BUSY\n"); + */ + + continue; + } } } } - free(status); destroyThreadPool(pool, 0); // notifico che i thread dovranno uscire + // TODO print statistiche + + if (printQueue(queue) == -1) { + perror("printQueue"); + return 1; + } + destroyQueue(queue); + // aspetto la terminazione de signal handler thread pthread_join(sighandler_thread, NULL); unlink(SOCKNAME); + + printf("File Storage Server terminato.\n"); + fflush(stdout); return 0; @@ -263,3 +272,47 @@ _cleanup: unlink(SOCKNAME); return -1; } + +// funzione eseguita dal signal handler thread +static void *sigHandler(void *arg) { + sigset_t *set = ((sigHandler_t*)arg)->set; + int fd_pipe = ((sigHandler_t*)arg)->signal_pipe; + + if(pthread_sigmask(SIG_SETMASK, &set, NULL)) { + fprintf(stderr, "ERROR setting mask\n"); + return (void*) 1; + } + + while (1) { + int sig; + int code; + int r = sigwait(&set, &sig); + + if (r != 0) { + errno = r; + perror("FATAL ERROR 'sigwait'"); + return NULL; + } + + switch (sig) { + case SIGHUP: + code = 0; + // notifico il thread manager di smettere di accettare nuove connessioni in entrata + if (writen(fd_pipe, &code, sizeof(int)) == -1) { + perror("writen"); + } + break; + case SIGINT: + case SIGQUIT: + code = 1; + // notifico il thread manager di terminare il server il prima possibile + if (writen(fd_pipe, &code, sizeof(int)) == -1) { + perror("writen"); + } + return NULL; + default: + break; + } + } + return NULL; +} diff --git a/src/serverWorker.c b/src/serverWorker.c index 0e12f57..c5eb423 100644 --- a/src/serverWorker.c +++ b/src/serverWorker.c @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -6,31 +7,24 @@ #include #include -#include - -// converte tutti i carattere minuscoli in maiuscoli -static void toup(char *str) { - char *p = str; - while(*p != '\0') { - *p = (islower(*p)?toupper(*p):*p); - ++p; - } -} +#include +#include // funzione eseguita dal Worker thread del pool -// gestisce una intera connessione di un client -// void threadF(void *arg) { - assert(arg); + if(!arg){ + errno = EINVAL; + return; + } + // TODO add necessary variables from main long* connfd = (long*)arg[0]; - serverStatus* status = (serverStatus*)(arg[1]); fd_set set, tmpset; FD_ZERO(&set); FD_SET(*connfd, &set); - do { + while(*quit == 0) { tmpset=set; int r; // ogni tanto controllo se devo terminare @@ -40,47 +34,163 @@ void threadF(void *arg) { break; } if (r==0) { - if ((status->exiting) != 0) - break; + if (*quit) + goto _cleanup; continue; } + break; // r==0 and quit==0 + } - // comunicate with the client - msg_t str; - long n; - if ((n=readn(connfd, &str.len, sizeof(long))) == -1) { - perror("read1"); - break; - } + // comunicate with the client + msg_t str; + long n; + if ((n=readn(connfd, &str.len, sizeof(long))) == -1) { + perror("read1"); + break; + } - if (n==0) - break; - str.str = calloc(str.len, sizeof(char)); - if (!str.str) { - perror("calloc"); - fprintf(stderr, "Memoria esaurita....\n"); - break; - } - if ((n=readn(connfd, str.str, str.len * sizeof(char))) == -1) { - perror("read2"); - free(str.str); - break; - } - - toup(str.str); - - if ((n=writen(connfd, &str.len, sizeof(int))) == -1) { - perror("write1"); - free(str.str); - break; - } - if ((n=writen(connfd, str.str, str.len*sizeof(char))) == -1) { - perror("write2"); - free(str.str); - break; - } + if (n==0) + break; + str.str = calloc(str.len+1, sizeof(char)); + if (!str.str) { + perror("calloc"); + fprintf(stderr, "Calloc.\n"); + break; + } + if ((n=readn(connfd, str.str, str.len * sizeof(char))) == -1) { + perror("read2"); free(str.str); - } while(status->exiting == 0); + break; + } + str.str[str.len+1] = '\0'; + + if(strncmp(str.str, "quit", 5)) { // il client vuole chiudere la connessione + // comunico al manager che ho chiuso la connessione + // ... + // log chiusura connessione + // ... + goto _cleanup; + } + + // eseguo quello che mi chiede il client di fare + if (parser(str.len, str.str) == -1) { + // str.str non è più valido perchè parser fa free + + } + + // comunico al manager che è stata servita la richiesta + + // log + +_cleanup: + if(arg) + free(arg); close(connfd); } + + +int parser(int len, char *command, queueT *queue, long fd_c, logT* logFileT, pthread_mutex_t *lock, waitingT **waiting) { + if(len<0 || !command || !queue || !logFileT || !waiting) { + errno = EINVAL; + return -1; + } + + char *save = command; + char *token = NULL; + char *token2 = NULL; + char *token3 = NULL; + + token = strsep(&string, "|"); + token2 = strsep(&string, "|"); + token3 = strsep(&string, "|"); + + if(!token) + goto _parser_cleanup; + + if(strcmp(token, "openFile") == 0) { + if(!token3 || !token2) + goto _parser_cleanup; + + int arg = (int) strtol(token3, NULL, 10); + + openFile(token2, arg, queue, fd_c, logFileT); + goto _parser_end; + } + + if (strcmp(token, "readFile") == 0) { + if(!token2) + goto _parser_cleanup; + + readFile(token2, queue, fd_c, logFileT); + goto _parser_end; + } + + if (strcmp(token, "readNFiles") == 0) { + if(!token2) + goto _parser_cleanup; + + readNFiles(token2, queue, fd_c, logFileT); + goto _parser_end; + } + + if (strcmp(token, "writeFile") == 0) { + if(!token3 || !token2) + goto _parser_cleanup; + size_t sz = (size_t) strtol(token3, NULL, 10); + + writeFile(token2, sz, queue, fd_c, logFileT, 0); + goto _parser_end; + } + + if (strcmp(token, "appendToFile") == 0) { + if(!token3 || !token2) + goto _parser_cleanup; + size_t sz = (size_t) strtol(token3, NULL, 10); + + writeFile(token2, sz, queue, fd_c, logFileT, 1); + goto _parser_end; + } + + if (strcmp(token, "lockFile") == 0) { + if(!token2) + goto _parser_cleanup; + + lockFile(token2, queue, fd_c, logFileT, lock, waiting); + goto _parser_end; + } + + if (strcmp(token, "unlockFile") == 0) { + if(!token2) + goto _parser_cleanup; + + unlockFile(token2, queue, fd_c, logFileT, lock, waiting); + goto _parser_end; + } + + if (strcmp(token, "closeFile") == 0) { + if(!token2) + goto _parser_cleanup; + + closeFile(token2, queue, fd_c, logFileT, lock, waiting); + goto _parser_end; + } + + if (strcmp(token, "removeFile") == 0) { + if(!token2) + goto _parser_cleanup; + + removeFile(token2, queue, fd_c, logFileT, lock, waiting); + goto _parser_end; + } + + goto _parser_cleanup; // nessun comando riconosciuto + +_parser_end: + free(command); + return 0; + +_parser_cleanup: + free(command); + return -1; +}