/* Copyright (c) 1996, Ruslan R. Laishev (@RRL) */ #include "nntp.h" WorkerContext *WctxPtr [ NNTPMAXWORKER ]; pthread_mutex_t Wctxm; int pthread_sleep (int); /* *-------------------------------------------------------------------------------- */ void NNTP_WorkerClient (WorkerContext *Wctxp) { long status; int (*nntp_fun) (WorkerContext *); int sz; NNTP_LOGT(Wctxp,LOGD,"Started."); sz = sprintf(Wctxp->bufp,"200 %s is ready (Posting Ok)\r\n",ID$IDver); if ( 0 > send(Wctxp->chan,Wctxp->bufp,sz, 0) ) { NNTP_WorkerKill (Wctxp); pthread_exit ((pthread_addr_t) 0); } while (1) { if ( 0 > nntp_cmd_get(Wctxp->chan,Wctxp->bufp ,BUFPSZ) ) { NNTP_LOGT(Wctxp,LOGE,"'nntp_cmd_get':%s.",strerror(errno)); NNTP_WorkerKill (Wctxp); pthread_exit ((pthread_addr_t) 0); } if ( nntp_fun = nntp_cmd_parse (Wctxp) ) { if ( 0 > nntp_fun (Wctxp) ) { NNTP_WorkerKill (Wctxp); pthread_exit ((pthread_addr_t) 0); } } else { sz = sprintf(Wctxp->bufp,"500 UnRecognized command\r\n"); send(Wctxp->chan,Wctxp->bufp,sz,0); } } } /* *-------------------------------------------------------------------------------- */ void NNTP_WorkerKill ( WorkerContext *Wctxp ) { pthread_mutex_lock(&Wctxm); NNTP_LOGT(Wctxp,LOGD,"Kill worker."); WctxPtr [ Wctxp->indx ] = NULL; DBclose_stream (&Wctxp->Msgrab); DBclose_stream (&Wctxp->Grprab); if ( Wctxp->type == T_Suck ) DBclose_stream (&Wctxp->Suckrab); if ( Wctxp->type == T_Feed ) DBclose_stream (&Wctxp->Feedrab); nntp_sockshut(Wctxp->chan); NNTP_LOGT(Wctxp,LOGW,"End."); free(Wctxp); pthread_mutex_unlock(&Wctxm); } /* *-------------------------------------------------------------------------------- */ void *NNTP_WorkerInit( int WorkerType, int chan ) { long status; WorkerContext *Wctxp; int indx; pthread_mutex_lock(&Wctxm); while(1) { Wctxp = NULL; for (indx = 0;indx < NNTPMAXWORKER;indx++) if ( NULL == WctxPtr [indx] ) break; if ( indx >= NNTPMAXWORKER ) break; if ( NULL == (Wctxp = calloc(1,sizeof (WorkerContext))) ) { NNTP_LOG(LOGE,"[WorkerInit]'calloc'."); break; } NNTP_LOG(LOGD,"[WorkerInit]'calloc' memory (%d b)-Ok.", sizeof(WorkerContext)); if (-1 == status) { NNTP_LOG(LOGE,"[WorkerInit]Init mutex."); free (Wctxp); Wctxp = NULL; break; } if ( MsgDBopen_stream (&Wctxp->Msgrab) ) { NNTP_LOG(LOGE,"[WorkerInit]open MsgStream."); free (Wctxp); Wctxp = NULL; break; } if ( GrpDBopen_stream (&Wctxp->Grprab) ) { NNTP_LOG(LOGE,"[WorkerInit]GrpDBopen_stream."); DBclose_stream(&Wctxp->Msgrab); free (Wctxp); Wctxp = NULL; break; } if ( WorkerType == T_Suck ) { if ( SuckDBopen_stream (&Wctxp->Suckrab) ) { NNTP_LOG(LOGE,"[WorkerInit]SuckDBopen_stream."); DBclose_stream(&Wctxp->Msgrab); DBclose_stream(&Wctxp->Grprab); free (Wctxp); Wctxp = NULL; break; } } if ( WorkerType == T_Feed ) { if ( FeedDBopen_stream (&Wctxp->Feedrab) ) { NNTP_LOG(LOGE,"[WorkerInit]FeedDBopen_stream."); DBclose_stream(&Wctxp->Msgrab); DBclose_stream(&Wctxp->Grprab); free (Wctxp); Wctxp = NULL; break; } } Wctxp->indx = indx; Wctxp->chan = chan; Wctxp->type = WorkerType; WctxPtr[indx] = Wctxp; break; } pthread_mutex_unlock(&Wctxm); NNTP_LOG(LOGD,"[WorkerInit][Th:%d,Ch:%d]-Ok.",indx,chan); return (void *) Wctxp; } /* *-------------------------------------------------------------------------------- */ int NNTP_InitBosses (void) { memset (WctxPtr,0,sizeof(WctxPtr)); return (pthread_mutex_init (&Wctxm,pthread_mutexattr_default)); } /* *-------------------------------------------------------------------------------- */ void NNTP_ClientBoss (void) { long status; int chan; WorkerContext *Wctxp; int MainSocketHnd, SocketHnd; struct sockaddr_in MainSocket, SockAddr; struct hostent MainHostEnt, HostEnt, *pHostEnt; int len; char SRVBUSY[] = "\r\n500 Sorry, NNTP Server is busy, try later.\r\n"; struct timespec interval; if ( -1 == (MainSocketHnd = socket (AF_INET, SOCK_STREAM, 0)) ) { NNTP_LOG(LOGF,"[ClientBoss]'socket' %s.",strerror(errno)); pthread_exit ((pthread_addr_t) 0); } status = 1; if ( -1 == setsockopt(MainSocketHnd,SOL_SOCKET,SO_REUSEADDR,(char *)&status, sizeof (status)) ) { NNTP_LOG(LOGF,"[ClientBoss]'setsockopt' %s.",strerror(errno)); pthread_exit ((pthread_addr_t) 0); } pthread_lock_global_np (); if ( NULL == (pHostEnt = gethostbyname (nntp_conf.LocalHost)) ) { NNTP_LOG(LOGF,"[ClientBoss]'gethostbyname' %s.",strerror(errno)); pthread_unlock_global_np (); pthread_exit ((pthread_addr_t) 0); } else NNTP_LOG(LOGI,"[ClientBoss]Local Host=%s",pHostEnt->h_name); MainHostEnt = *pHostEnt; pthread_unlock_global_np (); MainSocket.sin_family = AF_INET; MainSocket.sin_port = htons(nntp_conf.LocalPort); MainSocket.sin_addr.s_addr = INADDR_ANY; /* * Bind MainSocket to MainSocketHnd */ if ( -1 == bind (MainSocketHnd,(struct sockaddr *) &MainSocket, sizeof (MainSocket)) ) { NNTP_LOG(LOGF,"[ClientBoss]'bind' %s.",strerror(errno)); pthread_exit ((pthread_addr_t) 0); } if ( -1 == listen (MainSocketHnd, 5) ) { NNTP_LOG(LOGF,"[ClientBoss]'listen' %s.",strerror(errno)); pthread_exit ((pthread_addr_t) 0); } while (1) { if ( 0 > (chan = accept(MainSocketHnd,(struct sockaddr *)&MainSocket,&len)) ) { NNTP_LOG(LOGF,"[ClientBoss]'accept' %s.",strerror(errno)); continue; } NNTP_LOG(LOGD,"[ClientBoss]Incoming connection request,chan=%d",chan); if ( !(Wctxp = NNTP_WorkerInit(T_Clnt,chan)) ) { send(chan,SRVBUSY,sizeof(SRVBUSY) - 1,0); nntp_sockshut(chan); NNTP_LOG(LOGE,"[ClientBoss]No free Wctxp."); continue; } NNTP_LOG(LOGW,"[ClientBoss]Start Worker [Th:%d,Ch:%d].", Wctxp->indx,Wctxp->chan); status = pthread_create(&Wctxp->tid,pthread_attr_default, NNTP_WorkerClient, (pthread_addr_t) Wctxp); if (status == -1) { NNTP_LOG(LOGF,"[ClientBoss]pthread_create-%s.",strerror(errno)); NNTP_WorkerKill (Wctxp); continue; } NNTP_LOG(LOGD,"[ClientBoss]Start Worker [Th:%d,Ch:%d]-Ok.", Wctxp->indx,Wctxp->chan); } } /* *-------------------------------------------------------------------------------- */ void NNTP_SuckBoss (void) { WorkerContext *Wctxp; long status; char *cp0,*cp1; int sock_1; struct sockaddr_in sock2_name; struct sockaddr_in sock3_name; struct hostent HostEnt; struct hostent *pHostEnt; char *SuckR = nntp_conf.Suck; if ( !(*nntp_conf.Suck) ) { NNTP_LOG(LOGI,"[Suck]No Suck defined."); pthread_exit ((pthread_addr_t) 0); } while (1) { if ( !(*SuckR) ) { NNTP_LOGT(Wctxp,LOGW,"All hosts is polled."); SuckR = nntp_conf.Suck; pthread_sleep (nntp_conf.SuckInterval); continue; } if ( !(Wctxp = NNTP_WorkerInit(T_Suck,0)) ) { NNTP_LOG(LOGF,"[Suck]No free Wctx."); pthread_exit ((pthread_addr_t) 0); } if ( NULL != (cp0 = strchr (SuckR,FIELDSEP)) ) { strncpy (Wctxp->bufp,SuckR,cp0-SuckR); Wctxp->bufp [cp0-SuckR ] = 0; SuckR = cp0+1; } else { strcpy (Wctxp->bufp,SuckR); SuckR += strlen(SuckR); } cp0 = strchr(Wctxp->bufp,':'); *(cp0) = 0; cp0++; cp1 = strchr(cp0,':'); *(cp1) = 0; cp1++; NNTP_LOGT(Wctxp,LOGD,"Try connect to '%s'.",Wctxp->bufp); pthread_lock_global_np (); if ( NULL == (pHostEnt = gethostbyname (Wctxp->bufp)) ) { NNTP_LOGT(Wctxp,LOGW,"'gethostbyname' %s.",strerror(errno)); if ( NULL == (pHostEnt = gethostbyaddr (Wctxp->bufp, strlen(Wctxp->bufp),AF_INET)) ) { NNTP_LOGT(Wctxp,LOGE,"'gethostbyaddr' %s.",strerror(errno)); pthread_unlock_global_np (); NNTP_WorkerKill(Wctxp); continue; } } HostEnt = *pHostEnt; pthread_unlock_global_np (); sock2_name.sin_port = htons (atoi(cp1)); sock2_name.sin_family = HostEnt.h_addrtype; sock2_name.sin_addr = *((struct in_addr *) HostEnt.h_addr); if ( -1 == (sock_1 = socket (AF_INET, SOCK_STREAM, 0)) ) { NNTP_LOGT(Wctxp,LOGF,"'socket' %s.",strerror(errno)); NNTP_WorkerKill (Wctxp); continue; } NNTP_LOGT(Wctxp,LOGI,"Suck from.....<%s(%s):%s>.",Wctxp->bufp, inet_ntoa(sock2_name.sin_addr),cp1); NNTP_LOGT(Wctxp,LOGI,"Group list....<%s>.",cp0); Wctxp->chan = sock_1; if ( 0 > connect(sock_1,(struct sockaddr *) &sock2_name,sizeof (sock2_name)) ) { NNTP_LOGT(Wctxp,LOGE,"'connect' %s.",strerror(errno)); NNTP_WorkerKill (Wctxp); continue; } NNTP_LOGT(Wctxp,LOGW,"Connection established-Ok."); nntp_suck (Wctxp,cp0,inet_ntoa(sock2_name.sin_addr)); NNTP_WorkerKill (Wctxp); } /* end while (1) */ } /* *-------------------------------------------------------------------------------- */ void NNTP_FeedBoss (void) { WorkerContext *Wctxp; long status; char *cp0,*cp1,*cp2,*cp3; int sock_1; struct sockaddr_in sock2_name; struct sockaddr_in sock3_name; struct hostent HostEnt; struct hostent *pHostEnt; char *FeedR = nntp_conf.Feed; /* int i; int indx; int chan; */ if ( !(*nntp_conf.Feed) ) { NNTP_LOG(LOGI,"[Feed]No Feed defined."); pthread_exit ((pthread_addr_t) 0); } while (1) { if ( !(*FeedR) ) { NNTP_LOGT(Wctxp,LOGW,"All hosts is polled."); FeedR = nntp_conf.Feed; pthread_sleep (nntp_conf.FeedInterval); continue; } if ( !(Wctxp = NNTP_WorkerInit(T_Feed,0)) ) { NNTP_LOG(LOGF,"[Feed]No free Wctx."); pthread_exit ((pthread_addr_t) 0); } if ( NULL != (cp0 = strchr (FeedR,FIELDSEP)) ) { strncpy (Wctxp->bufp,FeedR,cp0-FeedR); Wctxp->bufp [cp0-FeedR ] = 0; FeedR = cp0+1; } else { strcpy (Wctxp->bufp,FeedR); FeedR += strlen(FeedR); } cp0 = strchr(Wctxp->bufp,':'); *(cp0) = 0; cp0++; cp1 = strchr(cp0,':'); *(cp1) = 0; cp1++; cp2 = strchr(cp1,':'); *(cp2) = 0; cp2++; cp3 = strchr(cp2,':'); *(cp3) = 0; cp3++; NNTP_LOGT(Wctxp,LOGD,"Try connect to '%s',port=%s.",Wctxp->bufp,cp2); pthread_lock_global_np (); if ( NULL == (pHostEnt = gethostbyname (Wctxp->bufp)) ) { NNTP_LOG(LOGW,"[Feed]'gethostbyname' %s.",strerror(errno)); if ( NULL == (pHostEnt = gethostbyaddr (Wctxp->bufp, strlen(Wctxp->bufp),AF_INET)) ) { NNTP_LOG(LOGE,"[Feed]'gethostbyaddr' %s.",strerror(errno)); pthread_unlock_global_np (); NNTP_WorkerKill(Wctxp); continue; } } HostEnt = *pHostEnt; pthread_unlock_global_np (); sock2_name.sin_port = htons (atoi(cp2)); sock2_name.sin_family = HostEnt.h_addrtype; sock2_name.sin_addr = *((struct in_addr *) HostEnt.h_addr); if ( -1 == (sock_1 = socket (AF_INET, SOCK_STREAM, 0)) ) { NNTP_LOG(LOGF,"[Feed]'socket' %s.",strerror(errno)); NNTP_WorkerKill (Wctxp); continue; } NNTP_LOGT(Wctxp,LOGI,"Feed to.......<%s(%s):%s>.",Wctxp->bufp, inet_ntoa(sock2_name.sin_addr),cp2); NNTP_LOGT(Wctxp,LOGI,"Group list....<%s>.",cp0); NNTP_LOGT(Wctxp,LOGI,"Posting Type..<%s>.",cp1); NNTP_LOGT(Wctxp,LOGI,"Exclude Paths.<%s>.",cp3); Wctxp->chan = sock_1; if ( 0 > connect(sock_1,(struct sockaddr *) &sock2_name,sizeof (sock2_name)) ) { NNTP_LOGT(Wctxp,LOGE,"'connect' %s.",strerror(errno)); NNTP_WorkerKill (Wctxp); continue; } NNTP_LOGT(Wctxp,LOGW,"Connection established-Ok."); nntp_feed (Wctxp,cp0,inet_ntoa(sock2_name.sin_addr),(*cp1=='p'?1:0),cp3); NNTP_WorkerKill (Wctxp); } /* end while (1) */ } /* *-------------------------------------------------------------------------------- */ int pthread_sleep (int sec) { long status; pthread_mutex_t mtx; pthread_cond_t cnd; struct timespec delta,abs; delta.tv_sec = sec; delta.tv_nsec= 0; if ( status = pthread_mutex_init (&mtx,pthread_mutexattr_default) ) return status; if ( status = pthread_cond_init (&cnd ,pthread_condattr_default) ) return status; if ( status = pthread_get_expiration_np ( &delta , &abs ) ) return status; return ( pthread_cond_timedwait( &cnd , &mtx , &abs ) ); }