/* Copyright (c) 1996, Ruslan R. Laishev (@RRL) */ #include "nntp.h" #include "nntp_log.h" #include "nntp_rms.h" #include "nntp_worker.h" #include "nntp_commands.h" #include "nntp_suck.h" WorkerContext *Wctxp0 [ NNTP_ClientWorker ]; pthread_mutex_t Wctxm0; WorkerContext *Wctxp1; pthread_mutex_t Wctxm1; int pthread_sleep (int); /* *-------------------------------------------------------------------------------- */ void NNTP_WorkerClient (WorkerContext *Wctxp) { long status; int (*nntp_fun) (WorkerContext *); nntp_log(logd,(stdout,"[Th:%d,Ch:%d]Started.",Wctxp->indx,Wctxp->chan)); sprintf(Wctxp->bufp,"200 %s is ready (Posing Ok)\r\n",ID$IDver); if ( 0 > send(Wctxp->chan,Wctxp->bufp,strlen(Wctxp->bufp), 0) ) { NNTP_WorkerKill (Wctxp,T_Clnt); pthread_exit ((pthread_addr_t) 0); } while (1) { pthread_mutex_lock(&Wctxp->mutex); time (&Wctxp->time); Wctxp->time += 300; pthread_mutex_unlock(&Wctxp->mutex); if ( 0 > nntp_cmd_get(Wctxp->chan,Wctxp->bufp ,512) ) { nntp_log(logw,(stdout,"[Th:%d,Ch:%d]'recv' error.", Wctxp->indx,Wctxp->chan)); NNTP_WorkerKill (Wctxp,T_Clnt); pthread_exit ((pthread_addr_t) 0); } pthread_mutex_lock(&Wctxp->mutex); time (&Wctxp->time); Wctxp->time += 6000; pthread_mutex_unlock(&Wctxp->mutex); if ( nntp_fun = nntp_cmd_parse (Wctxp) ) { if ( nntp_fun (Wctxp) ) { NNTP_WorkerKill (Wctxp,T_Clnt); pthread_exit ((pthread_addr_t) 0); } } else { sprintf(Wctxp->bufp,"500 UnRecognized command\r\n"); send(Wctxp->chan,Wctxp->bufp,strlen(Wctxp->bufp),0); } } } /* *-------------------------------------------------------------------------------- */ void NNTP_WorkerKill (WorkerContext *Wctxp,int WorkerType) { pthread_mutex_lock(&Wctxm0); Wctxp0 [ Wctxp->indx ] = NULL; pthread_mutex_unlock(&Wctxm0); pthread_mutex_lock(&Wctxp->mutex); DBclose_stream (&Wctxp->Msgrab); DBclose_stream (&Wctxp->Grprab); if ( WorkerType == T_Suck ) DBclose_stream (&Wctxp->Suckrab); NNTP_SockShut(Wctxp->chan); pthread_mutex_unlock(&Wctxp->mutex); nntp_log(logw,(stdout,"[Th:%d,Ch:%d]End.", Wctxp->indx,Wctxp->chan)); free(Wctxp); } /* *-------------------------------------------------------------------------------- */ void *NNTP_WorkerInit (int WorkerType) { long status; WorkerContext *Wctxp; if ( NULL == (Wctxp = calloc(1,sizeof (WorkerContext))) ) { nntp_log(loge,(stdout,"[WorkerInit]'calloc' memory-Error.")); return NULL; } nntp_log(loge,(stdout,"[WorkerInit]'calloc' memory (%d b)-Ok.", sizeof(WorkerContext))); status = pthread_mutex_init (&Wctxp->mutex,pthread_mutexattr_default); if (-1 == status) { nntp_log(loge,(stdout,"[WorkerInit]Init mutex-Error.")); free (Wctxp); return NULL; } if ( MsgDBopen_stream (&Wctxp->Msgrab) ) { nntp_log(loge,(stdout,"[WorkerInit]open MsgStream-Error.")); free (Wctxp); return NULL; } if ( GrpDBopen_stream (&Wctxp->Grprab) ) { nntp_log(loge,(stdout,"[WorkerInit]open GrpStream-Error.")); DBclose_stream(&Wctxp->Msgrab); free (Wctxp); return NULL; } if ( WorkerType == T_Suck ) { if ( SuckDBopen_stream (&Wctxp->Suckrab) ) { nntp_log(loge,(stdout,"[SuckInit]open SuckStream-Error.")); DBclose_stream(&Wctxp->Msgrab); DBclose_stream(&Wctxp->Grprab); free (Wctxp); return NULL; } } nntp_log(logd,(stdout,"[WorkerInit]-Ok.")); return (void *) Wctxp; } /* *-------------------------------------------------------------------------------- */ void NNTP_SockShut (int sock) { shutdown(sock,0); close(sock); } /* *-------------------------------------------------------------------------------- */ void NNTP_ClientBoss (void) { long status; pthread_t thread; int chan; int indx; WorkerContext *Wctxp; int MainSocketHnd, SocketHnd; struct sockaddr_in MainSocket, SockAddr; struct hostent MainHostEnt, HostEnt, *pHostEnt; int namelength; struct timespec interval; status = pthread_mutex_init (&Wctxm0,pthread_mutexattr_default); if (-1 == status) { nntp_log_err (errno); nntp_log(loge,(stdout,"[Th:ClientBoss]Init Wctxm mutex-Error.")); pthread_exit ((pthread_addr_t) 0); } if ( -1 == (MainSocketHnd = socket (AF_INET, SOCK_STREAM, 0)) ) { nntp_log_err (errno); nntp_log(logf,(stdout,"[Th:ClientBoss]'socket' error-End.")); pthread_exit ((pthread_addr_t) 0); } pthread_lock_global_np (); if ( NULL == (pHostEnt = gethostbyname (nntp_conf.LocalHost)) ) { nntp_log_err (errno); nntp_log(logf,(stdout,"[Th:ClientBoss]'gethostbyname'error-End.")); pthread_unlock_global_np (); pthread_exit ((pthread_addr_t) 0); } else nntp_log(logi,(stdout,"[Th:ClientBoss]Local Host =%s",pHostEnt->h_name)); MainHostEnt = *pHostEnt; pthread_unlock_global_np (); MainSocket.sin_family = AF_INET; MainSocket.sin_port = htons(nntp_conf.LocalPort); MainSocket.sin_addr.s_addr = INADDR_ANY; if ( -1 == setsockopt(MainSocketHnd,SOL_SOCKET,SO_REUSEADDR,(char *)&status, sizeof (status)) ) { nntp_log_err (errno); nntp_log(logf,(stdout,"[Th:ClientBoss]'setsockopt' error-End.")); pthread_exit ((pthread_addr_t) 0); } /* * Bind MainSocket to MainSocketHnd */ if ( -1 == bind (MainSocketHnd,(struct sockaddr *) &MainSocket, sizeof (MainSocket)) ) { nntp_log_err (errno); nntp_log(logf,(stdout,"[Th:ClientBoss]'bind' error-End.")); pthread_exit ((pthread_addr_t) 0); } if ( -1 == listen (MainSocketHnd, 5) ) { nntp_log_err (errno); nntp_log(logf,(stdout,"[Th:ClientBoss]'listen' error-End.")); pthread_exit ((pthread_addr_t) 0); } while (1) { nntp_log(logd,(stdout,"[Th:ClientBoss]Check free Wctx.")); pthread_mutex_lock(&Wctxm0); for (indx = 0;indx < NNTP_ClientWorker;indx++) if ( NULL == Wctxp0[indx] ) break; pthread_mutex_unlock(&Wctxm0); if ( indx >= NNTP_ClientWorker ) { nntp_log(logd,(stdout,"[Th:ClientBoss]No free Wctx-Wait 5 sec.")); pthread_sleep(5); continue; } nntp_log(logd,(stdout,"[Th:ClientBoss]Check free Wctx-Found.")); if ( 0 > (chan = accept(MainSocketHnd, (struct sockaddr *) &MainSocket,&namelength)) ) { nntp_log_err (errno); nntp_log(logf,(stdout,"[Th:ClientBoss]'accept' error-Skip request.")); continue; } nntp_log(logd,(stdout,"[Th:ClientBoss]Incoming connection request,chan=%d",chan)); if ( !(Wctxp = NNTP_WorkerInit(T_Clnt)) ) { NNTP_SockShut(chan); nntp_log(logd,(stdout,"[Th:ClientBoss]No free Wctx-Wait 5 sec.")); pthread_sleep(5); continue; } nntp_log(logd,(stdout,"[Th:ClientBoss]Fill Wctx [Th:%d,Ch:%d].",indx,chan)); Wctxp->type = T_Clnt; Wctxp->chan = chan; Wctxp->indx = indx; Wctxp->time = nntp_conf.ClientTimeOut; nntp_log(logd,(stdout,"[Th:ClientBoss]Start Worker [Th:%d,Ch:%d].", Wctxp->indx,Wctxp->chan)); status = pthread_create(&Wctxp->tid,pthread_attr_default, NNTP_WorkerClient, (pthread_addr_t) Wctxp); if (status == -1) { nntp_log_err (errno); NNTP_WorkerKill (Wctxp,T_Clnt); continue; } nntp_log(logw,(stdout,"[Th:ClientBoss]Start Worker [Th:%d,Ch:%d]-Ok.", Wctxp->indx,Wctxp->chan)); pthread_mutex_lock(&Wctxm0); Wctxp0[indx] = Wctxp; pthread_mutex_unlock(&Wctxm0); } } /* *-------------------------------------------------------------------------------- */ void NNTP_SuckBoss (void) { long status; char *cp0,*cp1; int sock_1; struct sockaddr_in sock2_name; struct sockaddr_in sock3_name; struct hostent HostEnt; struct hostent *pHostEnt; char *SuckR; int i; int indx; int chan; /* */ if ( !(*nntp_conf.Suck) ) { nntp_log(logi,(stdout,"[Th:Suck]No Suck defined-End.")); pthread_exit ((pthread_addr_t) 0); } status = pthread_mutex_init (&Wctxm1,pthread_mutexattr_default); if (-1 == status) { nntp_log_err (errno); nntp_log(loge,(stdout,"[Th:Suck]Init Wctxm mutex error-End.")); pthread_exit ((pthread_addr_t) 0); } SuckR = nntp_conf.Suck; /* */ while (1) { if ( !(Wctxp1 = NNTP_WorkerInit(T_Suck)) ) { nntp_log(loge,(stdout,"[Th:Suck]No free Wctx-End.")); NNTP_WorkerKill (Wctxp1,T_Suck); pthread_exit ((pthread_addr_t) 0); } nntp_log(logd,(stdout,"[Th:Suck]Check free Wctx-Found.")); if ( !(*SuckR) ) { nntp_log(logw,(stdout,"[Th:Suck]All hosts is polled-End.")); NNTP_WorkerKill (Wctxp1,T_Suck); pthread_exit ((pthread_addr_t) 0); } nntp_log(logd,(stdout,"[Th:Suck]Get next suck host.")); if ( NULL != (cp0 = strchr (SuckR,'%')) ) { strncpy (&Wctxp1->bufp[0],SuckR,cp0-SuckR); Wctxp1->bufp [cp0-SuckR ] = 0; SuckR = cp0+1; } else { strcpy (&Wctxp1->bufp[0],SuckR); SuckR += strlen(SuckR); } cp0 = strchr(&Wctxp1->bufp[0],':'); *(cp0) = 0; cp0++; cp1 = strchr(cp0,':'); *(cp1) = 0; cp1++; nntp_log(logd,(stdout,"[Th:Suck]Try connect to '%s'-Ok.", &Wctxp1->bufp[0])); pthread_lock_global_np (); if ( NULL == (pHostEnt = gethostbyname (&Wctxp1->bufp[0])) ) { nntp_log_err (errno); nntp_log(logw,(stdout,"[Th:Suck]'gethostbyname'error-Try by addr.")); if ( NULL == (pHostEnt = gethostbyaddr (&Wctxp1->bufp[0], strlen(&Wctxp1->bufp[0]),AF_INET)) ) { nntp_log_err (errno); nntp_log(loge,(stdout,"[Th:Suck]'gethostbyaddr'error-Skip.")); pthread_unlock_global_np (); continue; } } HostEnt = *pHostEnt; pthread_unlock_global_np (); sock2_name.sin_port = htons (atoi(cp1)); sock2_name.sin_family = HostEnt.h_addrtype; sock2_name.sin_addr = *((struct in_addr *) HostEnt.h_addr); if ( -1 == (sock_1 = socket (AF_INET, SOCK_STREAM, 0)) ) { nntp_log_err (errno); nntp_log(logf,(stdout,"[Th:Suck]'socket' error-End.")); NNTP_WorkerKill (Wctxp1,T_Suck); pthread_exit ((pthread_addr_t) 0); } nntp_log(logi,(stdout,"[Th:Suck]Group list <%s>.",cp0)); nntp_log(logi,(stdout,"[Th:Suck]Suck from <%s(%s):%s>.", &Wctxp1->bufp[0],inet_ntoa(sock2_name.sin_addr),cp1)); if ( 0 > connect(sock_1,(struct sockaddr *) &sock2_name,sizeof (sock2_name)) ) { nntp_log_err (errno); nntp_log(loge,(stdout,"[Th:Suck]'connect'error-Skip.")); NNTP_SockShut (sock_1); continue; } nntp_log(logd,(stdout,"[Th:Suck,Ch:%d]Connection established-Ok.",sock_1)); Wctxp1->type = T_Suck; Wctxp1->chan = sock_1; Wctxp1->indx = indx; Wctxp1->time = nntp_conf.SuckTimeOut; nntp_suck (Wctxp1,cp0,inet_ntoa(sock2_name.sin_addr)); NNTP_SockShut(sock_1); } /* end while (1) */ } /* *-------------------------------------------------------------------------------- */ int pthread_sleep (int sec) { long status; pthread_mutex_t mtx; pthread_cond_t cnd; struct timespec delta,abs; delta.tv_sec = sec; delta.tv_nsec= 0; if ( status = pthread_mutex_init (&mtx,pthread_mutexattr_default) ) return status; if ( status = pthread_cond_init (&cnd ,pthread_condattr_default) ) return status; if ( status = pthread_get_expiration_np ( &delta , &abs ) ) return status; return ( pthread_cond_timedwait( &cnd , &mtx , &abs ) ); }