Compleated main logic

This commit is contained in:
elvis
2022-03-27 00:20:34 +01:00
parent c1977248d7
commit e86f981972
3 changed files with 110 additions and 19 deletions

View File

@ -36,7 +36,7 @@ static void *sigHandler(void *arg);
static void usage(const char *argv0) { static void usage(const char *argv0) {
// TODO change this // TODO change this
fprintf(stderr, "use: %s config file location\n", argv0); fprintf(stderr, "use: %s <config file location>\n", argv0);
} }
static void checkargs(int argc, char* argv[]) { static void checkargs(int argc, char* argv[]) {
@ -130,6 +130,14 @@ int main(int argc, char *argv[]) {
if( taglia_log(taglia, buf) < 0) if( taglia_log(taglia, buf) < 0)
goto _cleanup; goto _cleanup;
// creo lock per operazioni su file
pthread_mutex_t lock;
if (pthread_mutex_init(&lock, NULL) != 0) {
perror("pthread_mutex_init lock");
goto _cleanup;
}
// creo socket // creo socket
int listenfd; int listenfd;
if ((listenfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { if ((listenfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
@ -169,6 +177,7 @@ int main(int argc, char *argv[]) {
// creo la lista dei client in attesa a una lock // creo la lista dei client in attesa a una lock
waiting_t *waiting; waiting_t *waiting;
// creo la threadpool
threadpool_t *pool = NULL; threadpool_t *pool = NULL;
pool = createThreadPool(threadsInPool, pendingSize); pool = createThreadPool(threadsInPool, pendingSize);
@ -253,14 +262,40 @@ int main(int argc, char *argv[]) {
goto _cleanup; goto _cleanup;
} }
// scrivo sul log
char buf[512];
int n;
n = snprintf(buf, sizeof(buf), "New client: %l\n", *connfd);
if( n<0 || m<n ) {
perror("snprintf");
goto _cleanup;
}
if( taglia_log(taglia, buf) < 0)
goto _cleanup;
// creo gli argomenti da passare al thread // creo gli argomenti da passare al thread
void* args = NULL; threadT* args = calloc(1, sizeof(threadT));
if(!args) {
perror("ERROR FATAL calloc");
goto _cleanup;
}
args->connfd = connfd;
args->quit = &quit;
args->request_pipe = request_pipe[1];
args->q = &queue;
args->taglia = &taglia;
args->pool = pool;
args->lock = &lock;
args->waiting = &waiting;
// aggiungo al threadpool // aggiungo al threadpool
int r = addToThreadPool(pool, threadF, args); int r = addToThreadPool(pool, threadF, args);
if (r == 0) if (r == 0) {
numberOfConnections++; numberOfConnections++;
continue; // aggiunto con successo continue; // aggiunto con successo
}
if (r < 0) // errore interno if (r < 0) // errore interno
fprintf(stderr, "ERROR FATAL adding to the thread pool\n"); fprintf(stderr, "ERROR FATAL adding to the thread pool\n");
else // coda dei pendenti piena else // coda dei pendenti piena
@ -270,6 +305,32 @@ int main(int argc, char *argv[]) {
free(connfd); free(connfd);
continue; continue;
} }
if (i == request_pipe[0]) { // un worker ha finito di servire un client
int pdr; // ottengo il descrittore della pipe
if (readn(requestPipe[0], &pdr, sizeof(int)) == -1) {
perror("readn");
break;
}
switch (pdr) {
case -1: // client disconnected
--numberOfConnections;
if (stopNewConnections && numberOfConnections <= 0) {
quit = 1;
// termino il signalThread
pthread_cancel(sighandler_thread);
break;
}
continue;
default: // client served but not disconnected
FD_SET(pdr, &set);
if (pdr > fdmax) {
fdmax = fdr;
}
break;
}
continue;
}
if (i == signal_pipe[0]) { // controllo se devo terminare if (i == signal_pipe[0]) { // controllo se devo terminare
int code; int code;
if (readn(signal_pipe[0], &code, sizeof(int)) == -1) { if (readn(signal_pipe[0], &code, sizeof(int)) == -1) {
@ -279,15 +340,23 @@ int main(int argc, char *argv[]) {
switch (code) { switch (code) {
case 0: { // stop alle connessioni case 0: { // stop alle connessioni
stopNewConnections = 1; stopNewConnections = 1;
// TODO log
// scrivo sul log
if( taglia_log(taglia, "Stop new connections\n") < 0)
goto _cleanup;
if (numberOfConnections == 0) { if (numberOfConnections == 0) {
quit = 1; quit = 1;
// pthread_cancel(st); // termino il signalThread // termino il signalThread
pthread_cancel(sighandler_thread);
} }
break; break;
} }
case 1: { // stop immediato case 1: { // stop immediato
quit = 1; quit = 1;
// scrivo sul log
if( taglia_log(taglia, "Immediate quit\n") < 0)
goto _cleanup;
break; break;
} }
default: default:
@ -302,17 +371,33 @@ int main(int argc, char *argv[]) {
fdmax = (i>fdmax)?i:fdmax; fdmax = (i>fdmax)?i:fdmax;
// creo gli argomenti da passare al thread // creo gli argomenti da passare al thread
threadT* args = calloc(1, sizeof(threadT));
if(!args) {
perror("ERROR FATAL calloc");
goto _cleanup;
}
args->connfd = connfd;
args->quit = &quit;
args->request_pipe = request_pipe[1];
args->q = &queue;
args->taglia = &taglia;
args->pool = pool;
args->lock = &lock;
args->waiting = &waiting;
/* // aggiungo al threadpool
int r = addToThreadPool(pool, serverThread, (void*) t); int r = addToThreadPool(pool, threadF, args);
if (r == 0) if (r == 0) {
continue; numberOfConnections++;
continue; // aggiunto con successo
}
if (r < 0) // errore interno if (r < 0) // errore interno
fprintf(stderr, "ERROR FATAL adding to the thread pool\n"); fprintf(stderr, "ERROR FATAL adding to the thread pool\n");
else // coda dei pendenti piena else // coda dei pendenti piena
fprintf(stderr, "ERROR SERVER TOO BUSY\n"); fprintf(stderr, "ERROR SERVER TOO BUSY\n");
*/ free(args);
close(*connfd);
free(connfd);
continue; continue;
} }
} }
@ -320,9 +405,10 @@ int main(int argc, char *argv[]) {
} }
destroyThreadPool(pool, 0); // notifico che i thread dovranno uscire destroyThreadPool(pool, 0); // notifico che i thread dovranno uscire
clearWaiting(&waiting);
taglia_stats(taglia); // print stats
// TODO print statistiche // print all files in storage during shutdown
if (printQueue(queue) == -1) { if (printQueue(queue) == -1) {
perror("printQueue"); perror("printQueue");
return 1; return 1;
@ -332,13 +418,14 @@ int main(int argc, char *argv[]) {
// aspetto la terminazione de signal handler thread // aspetto la terminazione de signal handler thread
pthread_join(sighandler_thread, NULL); pthread_join(sighandler_thread, NULL);
taglia_del(taglia);
unlink(socketName); unlink(socketName);
printf("File Storage Server terminato.\n"); printf("File Storage Server terminato.\n");
fflush(stdout); fflush(stdout);
return 0; return 0;
_cleanup: _cleanup:
unlink(socketName); unlink(socketName);
return -1; return -1;

View File

@ -18,7 +18,8 @@ void threadF(void *arg) {
} }
// TODO add necessary variables from main // TODO add necessary variables from main
long* connfd = (long*)arg[0]; long* connfd = ;
fd_set set, tmpset; fd_set set, tmpset;
FD_ZERO(&set); FD_ZERO(&set);
@ -38,7 +39,7 @@ void threadF(void *arg) {
goto _cleanup; goto _cleanup;
continue; continue;
} }
break; // r==0 and quit==0 break; // r!=0 and quit==0
} }
@ -90,7 +91,7 @@ _cleanup:
} }
int parser(int len, char *command, queueT *queue, long fd_c, logT* logFileT, pthread_mutex_t *lock, waitingT **waiting) { int parser(int len, char *command, queueT *queue, long fd_c, taglia_t* taglia, pthread_mutex_t *lock, waitingT **waiting) {
if(len<0 || !command || !queue || !logFileT || !waiting) { if(len<0 || !command || !queue || !logFileT || !waiting) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;

View File

@ -4,12 +4,15 @@
#include <apiFile.h> #include <apiFile.h>
#include <fileQueue.h> #include <fileQueue.h>
#include <taglialegna.h>
// struttura dati che contiene gli argomenti da passare ai worker threads // struttura dati che contiene gli argomenti da passare ai worker threads
typedef struct struct_thread { typedef struct struct_thread {
long *args; int connfd;
int *quit;
int request_pipe;
queueT *q; // puntatore alla queue dei file queueT *q; // puntatore alla queue dei file
// logT *logFileT; // puntatore alla struct del file di log taglia_t *taglia; // puntatore alla struct del file di log
threadpool_t *pool; // puntatore alla threadpool threadpool_t *pool; // puntatore alla threadpool
pthread_mutex_t *lock; pthread_mutex_t *lock;
waitingT **waiting; // puntatore ai client in attesa di ottenere la lock su un file waitingT **waiting; // puntatore ai client in attesa di ottenere la lock su un file