#define _XOPEN_SOURCE 700 #include #include #include #include #include #include #include #include #include #include "threadpool.h" #include "conn.h" #include "util.h" #include "serverWorker.h" #include "ini.h" #include "serverUtil.h" #include "fileQueue.h" #include "taglialegna.h" typedef struct { sigset_t *set; /* set of signals to handle (masked) */ int signal_pipe; /* unnamed pipe's descriptor */ } sigHandler_t; /* signal handler thread function */ static void *sigHandler(void *arg); static void usage(const char *argv0) { fprintf(stderr, "Uso: %s \n", argv0); } static void checkargs(int argc, char* argv[]) { if (argc != 2) { usage(argv[0]); _exit(EXIT_FAILURE); } ini_t *config = ini_load(argv[1]); if (config == NULL) { fprintf(stderr, "Error: reading config file.\n"); usage(argv[0]); _exit(EXIT_FAILURE); } ini_free(config); } int main(int argc, char *argv[]) { /* read config file */ checkargs(argc, argv); ini_t *config = ini_load(argv[1]); int threadsInPool; CONFGETINT(threadsInPool, config, "threadpool", "quantity", NULL, 10); int pendingSize; CONFGETINT(pendingSize, config, "threadpool", "pending", NULL, 10); int maxFiles; CONFGETINT(maxFiles, config, "files", "MaxFiles", NULL, 10); int maxSize; CONFGETINT(maxSize, config, "files", "MaxSize", NULL, 10); const char *buff; char *logFile=NULL; CONFGETSTR(logFile, config, "log", "logFile", buff); char *socketName=NULL; CONFGETSTR(socketName, config, "socket", "name", buff); int maxBacklog; CONFGETINT(maxBacklog, config, "socket", "backlog", NULL, 10); ini_free(config); queueT *queue = NULL; taglia_t *taglia = NULL; sigset_t mask; sigfillset(&mask); sigdelset(&mask, SIGPIPE); /* only sigpipe is excluded */ if (pthread_sigmask(SIG_SETMASK, &mask, NULL) != 0) { fprintf(stderr, "Error: setting mask.\n"); goto _cleanup_beforesigthread; } struct sigaction s; memset(&s, 0, sizeof(s)); s.sa_handler = SIG_IGN; if ( (sigaction(SIGPIPE,&s,NULL)) == -1 ) { perror("main: sigaction"); goto _cleanup_beforesigthread; } (void) unlink(socketName); /* create structure for logging */ taglia = taglia_init(logFile, 0); free(logFile); /* free the name of the file */ if(taglia==NULL) { perror("main: taglia_init"); goto _cleanup_beforesigthread; } /* buffer for loggin */ char buf[2048]; int n; /* pipes for comunication between main thread, sigHandler thread * and worker threads */ int signal_pipe[2]; int request_pipe[2]; if (pipe(signal_pipe) == -1) { perror("main: pipe, signal_pipe"); goto _cleanup_beforesigthread; } if (pipe(request_pipe) == -1) { perror("main: pipe, request_pipe"); goto _cleanup_beforesigthread; } /* thread for handling interruptions */ pthread_t sighandler_thread; sigHandler_t handlerArgs = { &mask, signal_pipe[1] }; if (pthread_create(&sighandler_thread, NULL, sigHandler, &handlerArgs) != 0) { fprintf(stderr, "Error: creating signal handler thread.\n"); goto _cleanup; } /* write to logfile */ n = snprintf(buf, sizeof(buf), "Creato signal thread con id: %ld\n", (long) sighandler_thread); if( n<0 ) { perror("main: snprintf"); goto _cleanup; } if(taglia_log(taglia, buf) < 0) goto _cleanup; /* lock for operations on files */ pthread_mutex_t lock; if (pthread_mutex_init(&lock, NULL) != 0) { perror("main: pthread_mutex_init"); goto _cleanup; } /* create socket */ int listenfd; if ((listenfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { perror("main: socket"); goto _cleanup; } struct sockaddr_un serv_addr; memset(&serv_addr, '0', sizeof(serv_addr)); serv_addr.sun_family = AF_UNIX; strncpy(serv_addr.sun_path, socketName, strlen(socketName)+1); if (bind(listenfd, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) == -1) { perror("main: bind"); goto _cleanup; } if (listen(listenfd, maxBacklog) == -1) { perror("main: listen"); goto _cleanup; } /* write to logfile */ n = snprintf(buf, sizeof(buf), "Creato socket: %s\n", socketName); if( n<0 ) { perror("main: snprintf"); goto _cleanup; } if( taglia_log(taglia, buf) < 0) goto _cleanup; /* create queue */ queue = createQueue(maxFiles, maxSize); /* create queue for clients waiting on a lock */ waiting_t *waiting = NULL; /* create threadpool */ threadpool_t *pool = NULL; pool = createThreadPool(threadsInPool, pendingSize); if (!pool) { fprintf(stderr, "Error: creating thread pool.\n"); goto _cleanup; } /* write to logfile */ n = snprintf(buf, sizeof(buf), "Creato threadpool di dimensione %d e pending size %d\n", threadsInPool, pendingSize); if(n<0) { perror("main: snprintf"); goto _cleanup; } if(taglia_log(taglia, buf) < 0) goto _cleanup; /* selector */ fd_set set, tmpset; FD_ZERO(&set); FD_ZERO(&tmpset); /* add fd for socket, signal pipe and request pipe. */ FD_SET(listenfd, &set); FD_SET(signal_pipe[0], &set); FD_SET(request_pipe[0], &set); /* get max file descriptor */ int fdmax = (listenfd > signal_pipe[0]) ? listenfd : signal_pipe[0]; fdmax = (fdmax > request_pipe[0]) ? fdmax : request_pipe[0]; /* write to logfile */ n = snprintf(buf, sizeof(buf), "File Server ready.\n\tMaxFiles: %d\n\tMaxSize: %d\n", maxFiles, maxSize); if( n<0 ) { perror("main: snprintf"); goto _cleanup; } if( taglia_log(taglia, buf) < 0) goto _cleanup; printf("File Server ready.\n"); fflush(stdout); volatile int quit = 0; volatile int stopNewConnections = 0; volatile int numberOfConnections = 0; while(!quit) { /* copy the set in the tmp variable for the select */ tmpset = set; if (select(fdmax+1, &tmpset, NULL, NULL, NULL) == -1) { perror("main: select"); goto _cleanup; } /* search for the ready fd */ for(long i=0; i <= fdmax; ++i) { if (FD_ISSET(i, &tmpset)) { long* connfd = malloc(sizeof(long)); if (!connfd) { perror("main: malloc"); goto _cleanup; } if (i == listenfd) { /* new request for connection */ if(stopNewConnections) { /* no new connections allowed */ /* write to logfile */ if( taglia_log(taglia, "Nuova connessione rifiutata, server in terminazione\n") < 0) { free(connfd); goto _cleanup; } FD_CLR(i, &set); close(i); free(connfd); continue; } /* accept new connection */ if ((*connfd = accept(listenfd, (struct sockaddr*)NULL ,NULL)) == -1) { perror("main: accept"); free(connfd); goto _cleanup; } /* write to logfile */ n = snprintf(buf, sizeof(buf), "Nuovo client: %ld\n", *connfd); if( n<0 ) { perror("main: snprintf"); free(connfd); goto _cleanup; } if(taglia_log(taglia, buf) < 0) { free(connfd); goto _cleanup; } /* create args to pass to the worker */ threadT* args = calloc(1, sizeof(threadT)); if(!args) { perror("main: calloc"); free(connfd); goto _cleanup; } args->connfd = *connfd; args->quit = &quit; args->request_pipe = request_pipe[1]; args->q = queue; args->taglia = taglia; args->pool = pool; args->lock = &lock; args->waiting = &waiting; /* add to threadpool */ int r = addToThreadPool(pool, threadF, args); if (r == 0) { /* no errors */ numberOfConnections++; free(connfd); continue; } if (r < 0) /* internal error */ fprintf(stderr, "Error: adding to the thread pool.\n"); else /* pending queue full */ fprintf(stderr, "Error: server too busy.\n"); close(*connfd); free(connfd); continue; } if (i == request_pipe[0]) { /* worker finished task */ long pdr; /* get pipe descriptor */ if (readn(request_pipe[0], &pdr, sizeof(long)) == -1) { perror("main: readn"); free(connfd); break; } switch (pdr) { case -1: /* client disconnected */ --numberOfConnections; if (stopNewConnections && numberOfConnections <= 0) { /* quitting and terminating signalThread */ quit = 1; pthread_cancel(sighandler_thread); break; } free(connfd); continue; default: /* client served but not disconnected */ FD_SET(pdr, &set); if (pdr > fdmax) { fdmax = pdr; } break; } free(connfd); continue; } if (i == signal_pipe[0]) { /* check if we need to terminate */ int code; if (readn(signal_pipe[0], &code, sizeof(int)) == -1) { perror("main: readn"); free(connfd); break; } switch (code) { case 0: { /* stop to new connections */ stopNewConnections = 1; /* write to logfile */ if(taglia_log(taglia, "Stop new connections\n") < 0){ free(connfd); goto _cleanup; } if(numberOfConnections == 0) { quit = 1; /* stop signalThread */ pthread_cancel(sighandler_thread); } break; } case 1: { /* immediate stop */ quit = 1; /* write to logfile */ if( taglia_log(taglia, "Immediate quit\n") < 0) { free(connfd); goto _cleanup; } break; } default: perror(NULL); fprintf(stderr, "Error: invalid code recived from sigThread.\n"); break; } free(connfd); break; } else { /* request from an already connected client */ FD_CLR(i, &set); fdmax = (i>fdmax)?i:fdmax; /* create args for worker thread */ threadT* args = calloc(1, sizeof(threadT)); if(!args) { perror("main: calloc"); free(connfd); goto _cleanup; } args->connfd = i; args->quit = &quit; args->request_pipe = request_pipe[1]; args->q = queue; args->taglia = taglia; args->pool = pool; args->lock = &lock; args->waiting = &waiting; /* add to the threadpool */ int r = addToThreadPool(pool, threadF, args); if (r == 0) { free(connfd); continue; } if (r < 0) /* internal error */ fprintf(stderr, "Error: adding to the thread pool.\n"); else /* pending queue full */ fprintf(stderr, "Error: server too busy.\n"); close(*connfd); free(connfd); continue; } } } } fprintf(stdout, "\n"); destroyThreadPool(pool, 0); /* notify the threads that need to stop */ clearWaiting(&waiting); /* destroy waiting list */ taglia_stats(taglia, stdout); /* print stats */ /* print all files in storage during shutdown */ if (printQueue(stdout, queue) == -1) { perror("main: printQueue"); return 1; } destroyQueue(queue); /* wait for sigHandler thread */ pthread_join(sighandler_thread, NULL); /* free log file structure */ taglia_del(taglia); /* unlink socket */ unlink(socketName); free(socketName); printf("File Storage Server terminato.\n"); fflush(stdout); return 0; _cleanup: pthread_cancel(sighandler_thread); pthread_join(sighandler_thread, NULL); _cleanup_beforesigthread: if(queue) { destroyQueue(queue); } if(taglia) { taglia_del(taglia); } unlink(socketName); free(socketName); return -1; } /* signal handler thread function */ static void *sigHandler(void *arg) { sigset_t *set = ((sigHandler_t*)arg)->set; int fd_pipe = ((sigHandler_t*)arg)->signal_pipe; if(pthread_sigmask(SIG_SETMASK, set, NULL)) { fprintf(stderr, "Error: setting mask.\n"); return (void*) 1; } while (1) { int sig; int code; int r = sigwait(set, &sig); if (r != 0) { errno = r; perror("sigHandler: sigwait"); return NULL; } switch (sig) { case SIGHUP: code = 0; /* notify main thread to stop new connections */ if (writen(fd_pipe, &code, sizeof(int)) == -1) { perror("sigHandler: writen"); } break; case SIGINT: case SIGQUIT: code = 1; /* notify main thread to stop immediatly */ if (writen(fd_pipe, &code, sizeof(int)) == -1) { perror("sigHandler: writen"); } return NULL; default: break; } } return NULL; }