diff --git a/src/server.c b/src/server.c index ec44a9e..9243ec1 100644 --- a/src/server.c +++ b/src/server.c @@ -36,7 +36,7 @@ static void *sigHandler(void *arg); static void usage(const char *argv0) { // TODO change this - fprintf(stderr, "use: %s config file location\n", argv0); + fprintf(stderr, "use: %s \n", argv0); } static void checkargs(int argc, char* argv[]) { @@ -130,6 +130,14 @@ int main(int argc, char *argv[]) { if( taglia_log(taglia, buf) < 0) goto _cleanup; + + // creo lock per operazioni su file + pthread_mutex_t lock; + if (pthread_mutex_init(&lock, NULL) != 0) { + perror("pthread_mutex_init lock"); + goto _cleanup; + } + // creo socket int listenfd; if ((listenfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { @@ -169,6 +177,7 @@ int main(int argc, char *argv[]) { // creo la lista dei client in attesa a una lock waiting_t *waiting; + // creo la threadpool threadpool_t *pool = NULL; pool = createThreadPool(threadsInPool, pendingSize); @@ -253,14 +262,40 @@ int main(int argc, char *argv[]) { goto _cleanup; } + + // scrivo sul log + char buf[512]; + int n; + n = snprintf(buf, sizeof(buf), "New client: %l\n", *connfd); + if( n<0 || mconnfd = connfd; + args->quit = &quit; + args->request_pipe = request_pipe[1]; + args->q = &queue; + args->taglia = &taglia; + args->pool = pool; + args->lock = &lock; + args->waiting = &waiting; // aggiungo al threadpool int r = addToThreadPool(pool, threadF, args); - if (r == 0) + if (r == 0) { numberOfConnections++; continue; // aggiunto con successo + } if (r < 0) // errore interno fprintf(stderr, "ERROR FATAL adding to the thread pool\n"); else // coda dei pendenti piena @@ -270,6 +305,32 @@ int main(int argc, char *argv[]) { free(connfd); continue; } + if (i == request_pipe[0]) { // un worker ha finito di servire un client + int pdr; // ottengo il descrittore della pipe + if (readn(requestPipe[0], &pdr, sizeof(int)) == -1) { + perror("readn"); + break; + } + + switch (pdr) { + case -1: // client disconnected + --numberOfConnections; + if (stopNewConnections && numberOfConnections <= 0) { + quit = 1; + // termino il signalThread + pthread_cancel(sighandler_thread); + break; + } + continue; + default: // client served but not disconnected + FD_SET(pdr, &set); + if (pdr > fdmax) { + fdmax = fdr; + } + break; + } + continue; + } if (i == signal_pipe[0]) { // controllo se devo terminare int code; if (readn(signal_pipe[0], &code, sizeof(int)) == -1) { @@ -279,15 +340,23 @@ int main(int argc, char *argv[]) { switch (code) { case 0: { // stop alle connessioni stopNewConnections = 1; - // TODO log + + // scrivo sul log + if( taglia_log(taglia, "Stop new connections\n") < 0) + goto _cleanup; + if (numberOfConnections == 0) { quit = 1; - // pthread_cancel(st); // termino il signalThread + // termino il signalThread + pthread_cancel(sighandler_thread); } break; } case 1: { // stop immediato quit = 1; + // scrivo sul log + if( taglia_log(taglia, "Immediate quit\n") < 0) + goto _cleanup; break; } default: @@ -302,17 +371,33 @@ int main(int argc, char *argv[]) { fdmax = (i>fdmax)?i:fdmax; // creo gli argomenti da passare al thread + threadT* args = calloc(1, sizeof(threadT)); + if(!args) { + perror("ERROR FATAL calloc"); + goto _cleanup; + } + args->connfd = connfd; + args->quit = &quit; + args->request_pipe = request_pipe[1]; + args->q = &queue; + args->taglia = &taglia; + args->pool = pool; + args->lock = &lock; + args->waiting = &waiting; - /* - int r = addToThreadPool(pool, serverThread, (void*) t); - if (r == 0) - continue; + // aggiungo al threadpool + int r = addToThreadPool(pool, threadF, args); + if (r == 0) { + numberOfConnections++; + continue; // aggiunto con successo + } if (r < 0) // errore interno fprintf(stderr, "ERROR FATAL adding to the thread pool\n"); else // coda dei pendenti piena fprintf(stderr, "ERROR SERVER TOO BUSY\n"); - */ - + free(args); + close(*connfd); + free(connfd); continue; } } @@ -320,9 +405,10 @@ int main(int argc, char *argv[]) { } destroyThreadPool(pool, 0); // notifico che i thread dovranno uscire + clearWaiting(&waiting); + taglia_stats(taglia); // print stats - // TODO print statistiche - + // print all files in storage during shutdown if (printQueue(queue) == -1) { perror("printQueue"); return 1; @@ -332,13 +418,14 @@ int main(int argc, char *argv[]) { // aspetto la terminazione de signal handler thread pthread_join(sighandler_thread, NULL); + taglia_del(taglia); + unlink(socketName); printf("File Storage Server terminato.\n"); fflush(stdout); return 0; - _cleanup: unlink(socketName); return -1; diff --git a/src/serverWorker.c b/src/serverWorker.c index 8871cad..654360a 100644 --- a/src/serverWorker.c +++ b/src/serverWorker.c @@ -18,7 +18,8 @@ void threadF(void *arg) { } // TODO add necessary variables from main - long* connfd = (long*)arg[0]; + long* connfd = ; + fd_set set, tmpset; FD_ZERO(&set); @@ -38,7 +39,7 @@ void threadF(void *arg) { goto _cleanup; continue; } - break; // r==0 and quit==0 + break; // r!=0 and quit==0 } @@ -90,7 +91,7 @@ _cleanup: } -int parser(int len, char *command, queueT *queue, long fd_c, logT* logFileT, pthread_mutex_t *lock, waitingT **waiting) { +int parser(int len, char *command, queueT *queue, long fd_c, taglia_t* taglia, pthread_mutex_t *lock, waitingT **waiting) { if(len<0 || !command || !queue || !logFileT || !waiting) { errno = EINVAL; return -1; diff --git a/src/serverWorker.h b/src/serverWorker.h index decbdac..71b73f7 100644 --- a/src/serverWorker.h +++ b/src/serverWorker.h @@ -4,12 +4,15 @@ #include #include +#include // struttura dati che contiene gli argomenti da passare ai worker threads typedef struct struct_thread { - long *args; + int connfd; + int *quit; + int request_pipe; queueT *q; // puntatore alla queue dei file - // logT *logFileT; // puntatore alla struct del file di log + taglia_t *taglia; // puntatore alla struct del file di log threadpool_t *pool; // puntatore alla threadpool pthread_mutex_t *lock; waitingT **waiting; // puntatore ai client in attesa di ottenere la lock su un file