diff --git a/.gitignore b/.gitignore index 11641b7..5778749 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ +# ---> system & emacs +.DS_Store +.dumbjump + # ---> C # Prerequisites *.d diff --git a/lib/threadpool/threadpool.c b/lib/threadpool/threadpool.c new file mode 100644 index 0000000..952de23 --- /dev/null +++ b/lib/threadpool/threadpool.c @@ -0,0 +1,240 @@ +/** + * @file threadpool.c + * @brief File di implementazione dell'interfaccia Threadpool + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +/** + * @function void *threadpool_thread(void *threadpool) + * @brief funzione eseguita dal thread worker che appartiene al pool + */ +static void *workerpool_thread(void *threadpool) { + threadpool_t *pool = (threadpool_t *)threadpool; // cast + taskfun_t task; // generic task + pthread_t self = pthread_self(); + int myid = -1; + + // non efficiente, si puo' fare meglio..... + do { + for (int i=0;inumthreads;++i) + if (pthread_equal(pool->threads[i], self)) { + myid = i; + break; + } + } while (myid < 0); + + LOCK_RETURN(&(pool->lock), NULL); + for (;;) { + + // in attesa di un messaggio, controllo spurious wakeups. + while((pool->count == 0) && (!pool->exiting)) { + pthread_cond_wait(&(pool->cond), &(pool->lock)); + } + + if (pool->exiting > 1) break; // exit forzato, esco immediatamente + // devo uscire ma ci sono messaggi pendenti + if (pool->exiting == 1 && !pool->count) break; + + // nuovo task + task.fun = pool->pending_queue[pool->head].fun; + task.arg = pool->pending_queue[pool->head].arg; + + pool->head++; pool->count--; + pool->head = (pool->head == abs(pool->queue_size)) ? 0 : pool->head; + + pool->taskonthefly++; + UNLOCK_RETURN(&(pool->lock), NULL); + + // eseguo la funzione + (*(task.fun))(task.arg); + + LOCK_RETURN(&(pool->lock), NULL); + pool->taskonthefly--; + } + UNLOCK_RETURN(&(pool->lock), NULL); + + fprintf(stderr, "thread %d exiting\n", myid); + return NULL; +} + + + +static int freePoolResources(threadpool_t *pool) { + if(pool->threads) { + free(pool->threads); + free(pool->pending_queue); + + pthread_mutex_destroy(&(pool->lock)); + pthread_cond_destroy(&(pool->cond)); + } + free(pool); + return 0; +} + +threadpool_t *createThreadPool(int numthreads, int pending_size) { + if(numthreads <= 0 || pending_size < 0) { + errno = EINVAL; + return NULL; + } + + threadpool_t *pool = (threadpool_t *)malloc(sizeof(threadpool_t)); + if (pool == NULL) return NULL; + + // condizioni iniziali + pool->numthreads = 0; + pool->taskonthefly = 0; + pool->queue_size = (pending_size == 0 ? -1 : pending_size); + pool->head = pool->tail = pool->count = 0; + pool->exiting = 0; + + /* Allocate thread and task queue */ + pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * numthreads); + if (pool->threads == NULL) { + free(pool); + return NULL; + } + pool->pending_queue = (taskfun_t *)malloc(sizeof(taskfun_t) * abs(pool->queue_size)); + if (pool->pending_queue == NULL) { + free(pool->threads); + free(pool); + return NULL; + } + if ((pthread_mutex_init(&(pool->lock), NULL) != 0) || + (pthread_cond_init(&(pool->cond), NULL) != 0)) { + free(pool->threads); + free(pool->pending_queue); + free(pool); + return NULL; + } + for(int i = 0; i < numthreads; i++) { + if(pthread_create(&(pool->threads[i]), NULL, + workerpool_thread, (void*)pool) != 0) { + /* errore fatale, libero tutto forzando l'uscita dei threads */ + destroyThreadPool(pool, 1); + errno = EFAULT; + return NULL; + } + pool->numthreads++; + } + return pool; +} + + +int destroyThreadPool(threadpool_t *pool, int force) { + if(pool == NULL || force < 0) { + errno = EINVAL; + return -1; + } + + LOCK_RETURN(&(pool->lock), -1); + + pool->exiting = 1 + force; + + if (pthread_cond_broadcast(&(pool->cond)) != 0) { + UNLOCK_RETURN(&(pool->lock),-1); + errno = EFAULT; + return -1; + } + UNLOCK_RETURN(&(pool->lock), -1); + + for(int i = 0; i < pool->numthreads; i++) { + if (pthread_join(pool->threads[i], NULL) != 0) { + errno = EFAULT; + UNLOCK_RETURN(&(pool->lock),-1); + return -1; + } + } + freePoolResources(pool); + return 0; +} + +int addToThreadPool(threadpool_t *pool, void (*f)(void *), void *arg) { + if(pool == NULL || f == NULL) { + errno = EINVAL; + return -1; + } + + LOCK_RETURN(&(pool->lock), -1); + int queue_size = abs(pool->queue_size); + int nopending = (pool->queue_size == -1); // non dobbiamo gestire messaggi pendenti + + // coda piena o in fase di uscita + if (pool->count >= queue_size || pool->exiting) { + UNLOCK_RETURN(&(pool->lock),-1); + return 1; // esco con valore "coda piena" + } + + if (pool->taskonthefly >= pool->numthreads) { + if (nopending) { + // tutti i thread sono occupati e non si gestiscono task pendenti + assert(pool->count == 0); + + UNLOCK_RETURN(&(pool->lock),-1); + return 1; // esco con valore "coda piena" + } + } + + pool->pending_queue[pool->tail].fun = f; + pool->pending_queue[pool->tail].arg = arg; + pool->count++; + pool->tail++; + if (pool->tail >= queue_size) pool->tail = 0; + + int r; + if((r=pthread_cond_signal(&(pool->cond))) != 0) { + UNLOCK_RETURN(&(pool->lock),-1); + errno = r; + return -1; + } + + UNLOCK_RETURN(&(pool->lock),-1); + return 0; +} + + +/** + * @function void *thread_proxy(void *argl) + * @brief funzione eseguita dal thread worker che non appartiene al pool + */ +static void *proxy_thread(void *arg) { + taskfun_t *task = (taskfun_t*)arg; + // eseguo la funzione + (*(task->fun))(task->arg); + + free(task); + return NULL; +} + +// fa lo spawn di un thread in modalità detached +int spawnThread(void (*f)(void*), void* arg) { + if (f == NULL) { + errno = EINVAL; + return -1; + } + + taskfun_t *task = malloc(sizeof(taskfun_t)); // la memoria verra' liberata dal proxy + if (!task) return -1; + task->fun = f; + task->arg = arg; + + pthread_t thread; + pthread_attr_t attr; + if (pthread_attr_init(&attr) != 0) return -1; + if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0) return -1; + if (pthread_create(&thread, &attr, + proxy_thread, (void*)task) != 0) { + free(task); + errno = EFAULT; + return -1; + } + return 0; +} diff --git a/lib/threadpool/threadpool.h b/lib/threadpool/threadpool.h new file mode 100644 index 0000000..3696999 --- /dev/null +++ b/lib/threadpool/threadpool.h @@ -0,0 +1,81 @@ +#ifndef THREADPOOL_H_ +#define THREADPOOL_H_ + +#include + +/** + * @file threadpool.h + * @brief Interfaccia per il ThreadPool + */ + +/** + * @struct threafun_t + * @brief generico task che un thread del threadpool deve eseguire + * + * @var fun Puntatore alla funzione da eseguire + * @var arg Argomento della funzione + */ +typedef struct taskfun_t { + void (*fun)(void *); + void *arg; +} taskfun_t; + +/** + * @struct threadpool + * @brief Rappresentazione dell'oggetto threadpool + */ +typedef struct threadpool_t { + pthread_mutex_t lock; // mutua esclusione nell'accesso all'oggetto + pthread_cond_t cond; // usata per notificare un worker thread + pthread_t * threads; // array di worker id + int numthreads; // numero di thread (size dell'array threads) + taskfun_t *pending_queue; // coda interna per task pendenti + int queue_size; // massima size della coda, puo' essere anche -1 ad indicare che non si vogliono gestire task pendenti + int taskonthefly; // numero di task attualmente in esecuzione + int head, tail; // riferimenti della coda + int count; // numero di task nella coda dei task pendenti + int exiting; // se > 0 e' iniziato il protocollo di uscita, se 1 il thread aspetta che non ci siano piu' lavori in coda +} threadpool_t; + +/** + * @function createThreadPool + * @brief Crea un oggetto thread pool. + * @param numthreads è il numero di thread del pool + * @param pending_size è la size delle richieste che possono essere pendenti. Questo parametro è 0 se si vuole utilizzare un modello per il pool con 1 thread 1 richiesta, cioe' non ci sono richieste pendenti. + * + * @return un nuovo thread pool oppure NULL ed errno settato opportunamente + */ +threadpool_t *createThreadPool(int numthreads, int pending_size); + +/** + * @function destroyThreadPool + * @brief stoppa tutti i thread e distrugge l'oggetto pool + * @param pool oggetto da liberare + * @param force se 1 forza l'uscita immediatamente di tutti i thread e libera subito le risorse, se 0 aspetta che i thread finiscano tutti e soli i lavori pendenti (non accetta altri lavori). + * + * @return 0 in caso di successo <0 in caso di fallimento ed errno viene settato opportunamente + */ +int destroyThreadPool(threadpool_t *pool, int force); + +/** + * @function addTaskToThreadPool + * @brief aggiunge un task al pool, se ci sono thread liberi il task viene assegnato ad uno di questi, se non ci sono thread liberi e pending_size > 0 allora si cerca di inserire il task come task pendente. Se non c'e' posto nella coda interna allora la chiamata fallisce. + * @param pool oggetto thread pool + * @param fun funzione da eseguire per eseguire il task + * @param arg argomento della funzione + * @return 0 se successo, 1 se non ci sono thread disponibili e/o la coda è piena, -1 in caso di fallimento, errno viene settato opportunamente. + */ +int addToThreadPool(threadpool_t *pool, void (*fun)(void *),void *arg); + + +/** + * @function spawnThread + * @brief lancia un thread che esegue la funzione fun passata come parametro, il thread viene lanciato in modalità detached e non fa parte del pool. + * @param fun funzione da eseguire per eseguire il task + * @param arg argomento della funzione + * @return 0 se successo, -1 in caso di fallimento, errno viene settato opportunamente. + */ +int spawnThread(void (*f)(void*), void* arg); + +#endif /* THREADPOOL_H_ */ + diff --git a/lib/utils/conn.h b/lib/utils/conn.h new file mode 100644 index 0000000..e427094 --- /dev/null +++ b/lib/utils/conn.h @@ -0,0 +1,68 @@ +#ifndef CONN_H +#define CONN_H + +#include +#include +#include +#include +#include +#include +#include +#include + +#if !defined(BUFSIZE) +#define BUFSIZE 256 +#endif +#if !defined(SOCKNAME) +#define SOCKNAME "./cs_sock" +#endif +#if !defined(MAXBACKLOG) +#define MAXBACKLOG 32 +#endif + +/** Evita letture parziali + * + * \retval -1 errore (errno settato) + * \retval 0 se durante la lettura da fd leggo EOF + * \retval size se termina con successo + */ +static inline int readn(long fd, void *buf, size_t size) { + size_t left = size; + int r; + char *bufptr = (char*)buf; + while(left>0) { + if ((r=read((int)fd ,bufptr,left)) == -1) { + if (errno == EINTR) continue; + return -1; + } + if (r == 0) return 0; // EOF + left -= r; + bufptr += r; + } + return size; +} + +/** Evita scritture parziali + * + * \retval -1 errore (errno settato) + * \retval 0 se durante la scrittura la write ritorna 0 + * \retval 1 se la scrittura termina con successo + */ +static inline int writen(long fd, void *buf, size_t size) { + size_t left = size; + int r; + char *bufptr = (char*)buf; + while(left>0) { + if ((r=write((int)fd ,bufptr,left)) == -1) { + if (errno == EINTR) continue; + return -1; + } + if (r == 0) return 0; + left -= r; + bufptr += r; + } + return 1; +} + + +#endif /* CONN_H */ diff --git a/lib/utils/message.h b/lib/utils/message.h new file mode 100644 index 0000000..b81c8d9 --- /dev/null +++ b/lib/utils/message.h @@ -0,0 +1,12 @@ +#ifndef _MESSAGE_H +#define _MESSAGE_H + +/** + * tipo del messaggio + */ +typedef struct msg { + int len; + char *str; +} msg_t; + +#endif diff --git a/lib/utils/util.h b/lib/utils/util.h new file mode 100644 index 0000000..5ead69f --- /dev/null +++ b/lib/utils/util.h @@ -0,0 +1,151 @@ +#ifndef _UTIL_H +#define _UTIL_H + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if !defined(BUFSIZE) +#define BUFSIZE 256 +#endif + +#if !defined(EXTRA_LEN_PRINT_ERROR) +#define EXTRA_LEN_PRINT_ERROR 512 +#endif + +#define SYSCALL_EXIT(name, r, sc, str, ...) \ + if ((r=sc) == -1) { \ + perror(#name); \ + int errno_copy = errno; \ + print_error(str, __VA_ARGS__); \ + exit(errno_copy); \ + } + +#define SYSCALL_PRINT(name, r, sc, str, ...) \ + if ((r=sc) == -1) { \ + perror(#name); \ + int errno_copy = errno; \ + print_error(str, __VA_ARGS__); \ + errno = errno_copy; \ + } + +#define SYSCALL_RETURN(name, r, sc, str, ...) \ + if ((r=sc) == -1) { \ + perror(#name); \ + int errno_copy = errno; \ + print_error(str, __VA_ARGS__); \ + errno = errno_copy; \ + return r; \ + } + +#define CHECK_EQ_EXIT(name, X, val, str, ...) \ + if ((X)==val) { \ + perror(#name); \ + int errno_copy = errno; \ + print_error(str, __VA_ARGS__); \ + exit(errno_copy); \ + } + +#define CHECK_NEQ_EXIT(name, X, val, str, ...) \ + if ((X)!=val) { \ + perror(#name); \ + int errno_copy = errno; \ + print_error(str, __VA_ARGS__); \ + exit(errno_copy); \ + } + +/** + * \brief Procedura di utilita' per la stampa degli errori + * + */ +static inline void print_error(const char * str, ...) { + const char err[]="ERROR: "; + va_list argp; + char * p=(char *)malloc(strlen(str)+strlen(err)+EXTRA_LEN_PRINT_ERROR); + if (!p) { + perror("malloc"); + fprintf(stderr,"FATAL ERROR nella funzione 'print_error'\n"); + return; + } + strcpy(p,err); + strcpy(p+strlen(err), str); + va_start(argp, str); + vfprintf(stderr, p, argp); + va_end(argp); + free(p); +} + + +/** + * \brief Controlla se la stringa passata come primo argomento e' un numero. + * \return 0 ok 1 non e' un numbero 2 overflow/underflow + */ +static inline int isNumber(const char* s, long* n) { + if (s==NULL) return 1; + if (strlen(s)==0) return 1; + char* e = NULL; + errno=0; + long val = strtol(s, &e, 10); + if (errno == ERANGE) return 2; // overflow/underflow + if (e != NULL && *e == (char)0) { + *n = val; + return 0; // successo + } + return 1; // non e' un numero +} + +#define LOCK(l) if (pthread_mutex_lock(l)!=0) { \ + fprintf(stderr, "ERRORE FATALE lock\n"); \ + pthread_exit((void*)EXIT_FAILURE); \ + } +#define LOCK_RETURN(l, r) if (pthread_mutex_lock(l)!=0) { \ + fprintf(stderr, "ERRORE FATALE lock\n"); \ + return r; \ + } + +#define UNLOCK(l) if (pthread_mutex_unlock(l)!=0) { \ + fprintf(stderr, "ERRORE FATALE unlock\n"); \ + pthread_exit((void*)EXIT_FAILURE); \ + } +#define UNLOCK_RETURN(l,r) if (pthread_mutex_unlock(l)!=0) { \ + fprintf(stderr, "ERRORE FATALE unlock\n"); \ + return r; \ + } +#define WAIT(c,l) if (pthread_cond_wait(c,l)!=0) { \ + fprintf(stderr, "ERRORE FATALE wait\n"); \ + pthread_exit((void*)EXIT_FAILURE); \ + } +/* ATTENZIONE: t e' un tempo assoluto! */ +#define TWAIT(c,l,t) { \ + int r=0; \ + if ((r=pthread_cond_timedwait(c,l,t))!=0 && r!=ETIMEDOUT) { \ + fprintf(stderr, "ERRORE FATALE timed wait\n"); \ + pthread_exit((void*)EXIT_FAILURE); \ + } \ + } +#define SIGNAL(c) if (pthread_cond_signal(c)!=0) { \ + fprintf(stderr, "ERRORE FATALE signal\n"); \ + pthread_exit((void*)EXIT_FAILURE); \ + } +#define BCAST(c) if (pthread_cond_broadcast(c)!=0) { \ + fprintf(stderr, "ERRORE FATALE broadcast\n"); \ + pthread_exit((void*)EXIT_FAILURE); \ + } +static inline int TRYLOCK(pthread_mutex_t* l) { + int r=0; + if ((r=pthread_mutex_trylock(l))!=0 && r!=EBUSY) { + fprintf(stderr, "ERRORE FATALE unlock\n"); + pthread_exit((void*)EXIT_FAILURE); + } + return r; +} + +#endif /* _UTIL_H */ diff --git a/src/server.c b/src/server.c new file mode 100644 index 0000000..614f0b7 --- /dev/null +++ b/src/server.c @@ -0,0 +1,226 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + + +/** + * @struct sigHandlerArgs_t + * @brief struttura contenente le informazioni da passare + * al signal handler thread + * + */ +typedef struct { + sigset_t *set; // set dei segnali da gestire (mascherati) + int signal_pipe; // descrittore di scrittura di una pipe senza nome +} sigHandler_t; + + +// funzione eseguita dal signal handler thread +static void *sigHandler(void *arg) { + sigset_t *set = ((sigHandler_t*)arg)->set; + int fd_pipe = ((sigHandler_t*)arg)->signal_pipe; + + while(1) { + int sig; + int r = sigwait(set, &sig); + if (r != 0) { + errno = r; + // TODO logging utility + perror("FATAL ERROR 'sigwait'"); + return NULL; + } + + switch(sig) { + case SIGINT: + case SIGTERM: + case SIGQUIT: + close(fd_pipe); // notifico il listener thread della ricezione del segnale + return NULL; + default: ; + } + } + return NULL; +} + +static void usage(const char *argv0) { + // TODO change this + fprintf(stderr, "use: %s threads-in-the-pool\n", argv0); +} + +static void checkargs(int argc, char* argv[]) { + // TODO change all this + if (argc != 2) { + usage(argv[0]); + _exit(EXIT_FAILURE); + } + if ((int) strtol(argv[1], NULL, 10) <= 0) { + fprintf(stderr, "ERROR: threads-in-the-pool must be greater than zero\n\n"); + usage(argv[0]); + _exit(EXIT_FAILURE); + } +} + +int main(int argc, char *argv[]) { + // TODO read config file + checkargs(argc, argv); + int threadsInPool = (int) strtol(argv[1], NULL, 10); + + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGINT); + sigaddset(&mask, SIGQUIT); + sigaddset(&mask, SIGTERM); + + if (pthread_sigmask(SIG_BLOCK, &mask, NULL) != 0) { + // TODO logging utility + fprintf(stderr, "ERROR setting mask\n"); + goto _cleanup; + } + + // ignoro SIGPIPE per evitare di essere terminato da una scrittura su un socket + struct sigaction s; + memset(&s, 0, sizeof(s)); + s.sa_handler = SIG_IGN; + if ( (sigaction(SIGPIPE,&s,NULL)) == -1 ) { + // TODO logging utility + perror("sigaction"); + goto _cleanup; + } + + /* + * La pipe viene utilizzata come canale di comunicazione tra il signal handler thread ed il + * thread lisener per notificare la terminazione. + * Una alternativa è quella di utilizzare la chiamata di sistema + * 'signalfd' ma non e' POSIX. + */ + int signal_pipe[2]; + if (pipe(signal_pipe) == -1) { + // TODO logging utility + perror("pipe"); + goto _cleanup; + } + + pthread_t sighandler_thread; + sigHandler_t handlerArgs = { &mask, signal_pipe[1] }; + if (pthread_create(&sighandler_thread, NULL, sigHandler, &handlerArgs) != 0) { + // TODO logging utility + fprintf(stderr, "ERROR creating signal handler thread\n"); + goto _cleanup; + } + + int listenfd; + if ((listenfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { + // TODO logging utility + perror("socket"); + goto _cleanup; + } + + struct sockaddr_un serv_addr; + memset(&serv_addr, '0', sizeof(serv_addr)); + serv_addr.sun_family = AF_UNIX; + strncpy(serv_addr.sun_path, SOCKNAME, strlen(SOCKNAME)+1); + + if (bind(listenfd, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) == -1) { + // TODO logging utility + perror("bind"); + goto _cleanup; + } + if (listen(listenfd, MAXBACKLOG) == -1) { + // TODO logging utility + perror("listen"); + goto _cleanup; + } + + threadpool_t *pool = NULL; + + pool = createThreadPool(threadsInPool, threadsInPool); + if (!pool) { + // TODO logging utility + fprintf(stderr, "ERROR creating thread pool\n"); + goto _cleanup; + } + + fd_set set, tmpset; + FD_ZERO(&set); + FD_ZERO(&tmpset); + + FD_SET(listenfd, &set); // aggiungo il listener fd al master set + FD_SET(signal_pipe[0], &set); // aggiungo il descrittore di lettura della signal_pipe + + // tengo traccia del file descriptor con id piu' grande + int fdmax = (listenfd > signal_pipe[0]) ? listenfd : signal_pipe[0]; + + volatile long termina=0; + while(!termina) { + // copio il set nella variabile temporanea per la select + tmpset = set; + if (select(fdmax+1, &tmpset, NULL, NULL, NULL) == -1) { + perror("select"); + goto _cleanup; + } + // cerchiamo di capire da quale fd abbiamo ricevuto una richiesta + for(int i=0; i <= fdmax; ++i) { + if (FD_ISSET(i, &tmpset)) { + long connfd; + if (i == listenfd) { // e' una nuova richiesta di connessione + if ((connfd = accept(listenfd, (struct sockaddr*)NULL ,NULL)) == -1) { + // TODO logging utility + perror("accept"); + goto _cleanup; + } + + long* args = malloc(2*sizeof(long)); + if (!args) { + // TODO logging utility + perror("ERROR FATAL malloc"); + goto _cleanup; + } + args[0] = connfd; + args[1] = (long)&termina; + + int r = addToThreadPool(pool, threadF, (void*)args); + if (r == 0) + continue; // aggiunto con successo + if (r < 0)// errore interno + fprintf(stderr, "ERROR FATAL adding to the thread pool\n"); + // TODO logging utility + else // coda dei pendenti piena + fprintf(stderr, "ERROR SERVER TOO BUSY\n"); + // TODO logging utility + free(args); + close(connfd); + continue; + } + if (i == signal_pipe[0]) { + // ricevuto un segnale, esco ed inizio il protocollo di terminazione + termina = 1; + break; + } + } + } + } + + destroyThreadPool(pool, 0); // notifico che i thread dovranno uscire + + // aspetto la terminazione de signal handler thread + pthread_join(sighandler_thread, NULL); + + unlink(SOCKNAME); + return 0; + + +_cleanup: + unlink(SOCKNAME); + return -1; +} diff --git a/src/serverWorker.c b/src/serverWorker.c new file mode 100644 index 0000000..fa18d59 --- /dev/null +++ b/src/serverWorker.c @@ -0,0 +1,80 @@ +#include +#include +#include +#include +#include + +#include +#include + +// converte tutti i carattere minuscoli in maiuscoli +static void toup(char *str) { + char *p = str; + while(*p != '\0') { + *p = (islower(*p)?toupper(*p):*p); + ++p; + } +} + +// funzione eseguita dal Worker thread del pool +// gestisce una intera connessione di un client +// +void threadF(void *arg) { + assert(arg); + long* args = (long*)arg; + long connfd = args[0]; + long* termina = (long*)(args[1]); + free(arg); + fd_set set, tmpset; + FD_ZERO(&set); + FD_SET(connfd, &set); + + do { + tmpset=set; + int r; + // ogni tanto controllo se devo terminare + struct timeval timeout={0, 100000}; // 100 milliseconds + if ((r=select(connfd+1, &tmpset, NULL, NULL, &timeout)) < 0) { + perror("select"); + break; + } + if (r==0) { + if (*termina) break; + continue; + } + msg_t str; + int n; + if ((n=readn(connfd, &str.len, sizeof(int))) == -1) { + perror("read1"); + break; + } + + if (n==0) break; + str.str = calloc((str.len), sizeof(char)); + if (!str.str) { + perror("calloc"); + fprintf(stderr, "Memoria esaurita....\n"); + break; + } + if ((n=readn(connfd, str.str, str.len * sizeof(char))) == -1) { + perror("read2"); + free(str.str); + break; + } + + toup(str.str); + + if ((n=writen(connfd, &str.len, sizeof(int))) == -1) { + perror("write1"); + free(str.str); + break; + } + if ((n=writen(connfd, str.str, str.len*sizeof(char))) == -1) { + perror("write2"); + free(str.str); + break; + } + free(str.str); + } while(*termina == 0); + close(connfd); +} diff --git a/src/serverWorker.h b/src/serverWorker.h new file mode 100644 index 0000000..f00b615 --- /dev/null +++ b/src/serverWorker.h @@ -0,0 +1,7 @@ +#ifndef SERVERWORKER +#define SERVERWORKER + +// funzione eseguita dal generico Worker del pool di thread +void threadF(void *arg); + +#endif /* SERVERWORKER */