/* Copyright (c) 1996-2002, Ruslan R. Laishev (@RRL) */ #include "nntp.h" wctx_t *WctxPtr [ NNTPMAXWORKER ]; pthread_mutex_t Wctxm; extern pthread_attr_t tattr; int pthread_sleep (int); /* *-------------------------------------------------------------------------------- */ void NNTP_WorkerClient (wctx_t *Wctxp) { int status,sz; int (*nntp_fun) (wctx_t *); sz = sprintf(Wctxp->_t_buf,"200 %s is ready, (Posting Ok)",ID$IDver); if ( !(1 & (status = net_send_line (Wctxp->_a_chan,Wctxp->_t_buf,sz))) ) { NNTP_WorkerKill (Wctxp); pthread_exit (NULL); } if ( !(1 & (status = nntp_cmd_exec (Wctxp))) ) NNTP_LOGT(Wctxp,LOGE,"End worker (status = %d).",status); else NNTP_LOGT(Wctxp,LOGW,"End worker."); NNTP_WorkerKill(Wctxp); pthread_exit(NULL); } /* *-------------------------------------------------------------------------------- */ void NNTP_WorkerKill (wctx_t *Wctxp) { int status, sz = 0; pthread_mutex_lock(&Wctxm); DBclose_stream (&Wctxp->_s_msgrab); DBclose_stream (&Wctxp->_s_grprab); if ( (Wctxp->_b_type == T_Feed) || (Wctxp->_b_type == T_Suck) ) DBclose_stream (&Wctxp->_s_feedsuckrab); if ( Wctxp->_a_chan ) net_close(Wctxp->_a_chan); WctxPtr [ Wctxp->_b_indx ] = NULL; /* if ( (sz = Wctxp->_w_ipadrlen) && Wctxp->_a_ipadr ) if ( !(1 & (status = lib$free_vm (&sz,&Wctxp->_a_ipadr,0))) ) lib$signal(status); if ( (sz = Wctxp->_w_ipnamelen) && Wctxp->_a_ipname ) if ( !(1 & (status = lib$free_vm (&sz,&Wctxp->_a_ipname,0))) ) lib$signal(status); */ sz = sizeof(wctx_t); if ( !(1 & (status = lib$free_vm (&sz,&Wctxp,0))) ) lib$signal(status); pthread_mutex_unlock(&Wctxm); } /* *-------------------------------------------------------------------------------- */ void *NNTP_WorkerInit ( int WorkerType, void *chan, char *ipadr, int ipadrlen, char *ipname, int ipnamelen ) { int status,indx,sz = sizeof(wctx_t); wctx_t *Wctxp; ushort num_tm[7]; memset(num_tm,0,sizeof(num_tm)); num_tm[4] = nntp_conf._w_clnt_tmo; pthread_mutex_lock(&Wctxm); while(1) { Wctxp = NULL; for (indx = 0;indx < NNTPMAXWORKER;indx++) if ( NULL == WctxPtr [indx] ) break; if ( indx >= NNTPMAXWORKER ) break; if ( WorkerType == T_Suck ) num_tm[4] = nntp_conf._w_suck_tmo; if ( WorkerType == T_Feed ) num_tm[4] = nntp_conf._w_feed_tmo; if ( !(1 & (status = lib$get_vm (&sz,&Wctxp,0))) ) break; if ( !(1 & (status = lib$cvt_vectim(num_tm,Wctxp->_d_tmo))) ) break; if ( !(1 & (status = MsgDBopen_stream (&Wctxp->_s_msgrab))) ) { NNTP_LOG(LOGE,"[WorkerInit]MsgDBopen_stream."); if ( !(1 & (status = lib$free_vm (&sz,&Wctxp,0))) ) lib$signal(status); break; } if ( !(1 & (status = GrpDBopen_stream (&Wctxp->_s_grprab))) ) { NNTP_LOG(LOGE,"[WorkerInit]GrpDBopen_stream."); DBclose_stream(&Wctxp->_s_msgrab); if ( !(1 & (status = lib$free_vm (&sz,&Wctxp,0))) ) lib$signal(status); break; } if ( (WorkerType == T_Clnt) && ipadrlen ) { /* if ( !(1 & (status = lib$get_vm (&ipadrlen,&Wctxp->_a_ipadr,0))) ) { NNTP_LOG(LOGF,"[WorkerInit]'lib$get_vm'."); DBclose_stream(&Wctxp->_s_msgrab); DBclose_stream(&Wctxp->_s_grprab); if ( !(1 & (status = lib$free_vm (&sz,&Wctxp,0))) ) lib$signal(status); break; } memcpy (Wctxp->_a_ipadr,ipadr,ipadrlen); Wctxp->_w_ipadrlen = (short) ipadrlen; */ } if ( (WorkerType == T_Clnt) && ipnamelen ) { /* if ( !(1 & (status = lib$get_vm (&ipnamelen,&Wctxp->_a_ipname,0))) ) { NNTP_LOG(LOGF,"[WorkerInit]'lib$get_vm'."); DBclose_stream(&Wctxp->_s_msgrab); DBclose_stream(&Wctxp->_s_grprab); if ( !(1 & (status = lib$free_vm (&sz,&Wctxp,0))) ) lib$signal(status); if ( !(1 & (status = lib$free_vm (&ipadrlen,&Wctxp->_a_ipadr,0))) ) lib$signal(status); break; } memcpy (Wctxp->_a_ipadr,ipname,ipnamelen); Wctxp->_w_ipnamelen = (short) ipnamelen; */ } if ( (WorkerType == T_Feed) || (WorkerType == T_Suck) ) { if ( !(1 & (status = FeedSuckDBopen_stream (&Wctxp->_s_feedsuckrab))) ) { NNTP_LOG(LOGE,"[WorkerInit]FeedSuckDBopen_stream."); DBclose_stream(&Wctxp->_s_msgrab); DBclose_stream(&Wctxp->_s_grprab); if ( !(1 & (status = lib$free_vm (&sz,&Wctxp,0))) ) lib$signal(status); break; } } Wctxp->_b_indx = indx; Wctxp->_a_chan = chan; Wctxp->_b_type = WorkerType; WctxPtr[indx] = Wctxp; break; } if (!$VMS_STATUS_SUCCESS(status)) { Wctxp = NULL; NNTP_LOG(LOGF,"[WorkerInit],(status = %d).",status); } else { NNTP_LOG(LOGW,"[WorkerInit][Th:%03u]-Ok.",indx); } pthread_mutex_unlock(&Wctxm); return (void *) Wctxp; } /* *-------------------------------------------------------------------------------- */ int NNTP_InitBosses (void) { memset (WctxPtr,0,sizeof(WctxPtr)); return ($PTHREAD_MUTEX_INIT(&Wctxm)); } /* *-------------------------------------------------------------------------------- */ char SRVBUSY[] = "501 Sorry, NNTP Server is too busy, try later."; char NOACCESS[]= "502 Sorry, You are not in my access list."; void *NNTP_ClientBoss (void *empty) { int status; void *chan; wctx_t *Wctxp; char buf0[512],buf1[512]; int buf0sz,buf1sz; while (1) { buf0sz = sizeof(buf0); buf1sz = sizeof(buf1); if ( !(1 & (status = net_connect_inc (&chan,nntp_conf._w_localport,buf0,&buf0sz,buf1,&buf1sz))) ) lib$signal(status); if ( !chk_IP_prot (buf0,buf0sz,buf1,buf1sz) ) { net_send_line(chan,ASCIC(NOACCESS)); net_close (chan); NNTP_LOG(LOGS,"[Client]No access for %.*s,(%.*s).",buf0sz,buf0,buf1sz,buf1); continue; } if ( !(Wctxp = NNTP_WorkerInit(T_Clnt,chan,buf0,buf0sz,buf1,buf1sz)) ) { net_send_line(chan,ASCIC(SRVBUSY)); net_close (chan); NNTP_LOG(LOGF,"[Client]No free Wctxp."); continue; } NNTP_LOG(LOGI,"[Client]Start Worker [Th:%03u] for %.*s,(%.*s).", Wctxp->_b_indx,buf0sz,buf0,buf1sz,buf1); if ( 0 > $PTHREAD_CREATE(&Wctxp->_l_tid,tattr,NNTP_WorkerClient,Wctxp) ) { NNTP_LOG(LOGF,"[Client]pthread_create (errno = %d).",errno); NNTP_WorkerKill (Wctxp); continue; } NNTP_LOG(LOGD,"[Client]Start Worker [Th:%03u]-Ok.",Wctxp->_b_indx); } return; } /* *-------------------------------------------------------------------------------- */ int FeedSuckArg ( char *list, ushort listlen, ushort argnum, struct FSArg *fsargp ) { ushort cnt; struct dsc$descriptor dsc_list; INIT_SDESC(dsc_list,listlen,list); /* ** Extract elements of entry */ for (cnt = 0;cnt < argnum;cnt++,fsargp++) fsargp->_w_len = strelem(&dsc_list,':', cnt,&fsargp->_a_arg); return SS$_NORMAL; } /* *-------------------------------------------------------------------------------- */ void *NNTP_SuckBoss (void *empty) { long status; wctx_t *Wctxp; char *Ent; ushort EntLen; struct FSArg ArgList[SUCK$ARGS]; ushort Idx,port; if ( !nntp_conf._s_suck_list.dsc$w_length ) { NNTP_LOG(LOGI,"[Suck]No Suck defined."); pthread_exit (NULL); } if ( !(Wctxp = NNTP_WorkerInit(T_Suck,0,0,0,0,0)) ) { NNTP_LOG(LOGF,"[Suck]No free Wctx."); pthread_exit (NULL); } Idx = 0; while (1) { /* ** Select next suck entry from nntp_conf._s_suck_list */ if ( !(EntLen = strelem(&nntp_conf._s_suck_list,'|', Idx,&Ent)) ) { Idx = 0; pthread_sleep (nntp_conf._w_suck_delta); continue; } NNTP_LOG(LOGI,"[Suck]Start (%.*s)",EntLen,Ent); Idx++; /* ** Extract parameters from suck's entry */ FeedSuckArg(Ent,EntLen,SUCK$ARGS,ArgList); lib$cvt_dtb(ArgList[SUCK$TCPPORT]._w_len,ArgList[SUCK$TCPPORT]._a_arg,&port); /* ** Try to connect to target host */ if ( !(1 & (status = net_connect_out(&Wctxp->_a_chan, ArgList[SUCK$NAME]._a_arg,ArgList[SUCK$NAME]._w_len,port))) ) { NNTP_LOGT(Wctxp,LOGE,"'net_connect_out',status = %d.",status); net_close (Wctxp->_a_chan); continue; } if ( !(1 & (status = nntp_suck (Wctxp,ArgList))) ) NNTP_LOGT(Wctxp,LOGE,"End of sucking (status = %d).",status); else NNTP_LOGT(Wctxp,LOGI,"End of sucking."); net_close (Wctxp->_a_chan); } NNTP_WorkerKill (Wctxp); return; } /* *-------------------------------------------------------------------------------- */ void *NNTP_FeedBoss (void *empty) { long status; wctx_t *Wctxp; char *Ent; ushort EntLen; struct FSArg ArgList[FEED$ARGS]; ushort Idx,port; if ( !nntp_conf._s_feed_list.dsc$w_length ) { NNTP_LOG(LOGI,"[Feed]No Feed record defined."); pthread_exit (NULL); } if ( !(Wctxp = NNTP_WorkerInit(T_Feed,0,0,0,0,0)) ) { NNTP_LOG(LOGF,"[Feed]No free Wctx."); pthread_exit (NULL); } Idx = 0; while (1) { /* ** Select next feed entry from nntp_conf._s_feed_list */ if ( !(EntLen = strelem(&nntp_conf._s_feed_list,'|', Idx,&Ent)) ) { NNTP_LOG(LOGW,"[Feed]All hosts is polled."); Idx = 0; pthread_sleep (nntp_conf._w_feed_delta); continue; } NNTP_LOG(LOGI,"[Feed]Start (%.*s)",EntLen,Ent); Idx++; /* ** Extract parameters from feed's entry */ FeedSuckArg (Ent,EntLen,FEED$ARGS,ArgList); lib$cvt_dtb(ArgList[FEED$TCPPORT]._w_len,ArgList[FEED$TCPPORT]._a_arg,&port); /* ** Try to connect to target host */ if ( !(1 & (status = net_connect_out (&Wctxp->_a_chan, ArgList[FEED$NAME]._a_arg,ArgList[FEED$NAME]._w_len,port))) ) { NNTP_LOGT(Wctxp,LOGE,"'net_connect_out' (status = %d).",status); net_close(Wctxp->_a_chan); continue; } if ( !(1 & (status = nntp_feed (Wctxp,ArgList))) ) NNTP_LOGT(Wctxp,LOGE,"End of feeding (status = %d).",status); else NNTP_LOGT(Wctxp,LOGI,"End of feeding."); net_close(Wctxp->_a_chan); } NNTP_WorkerKill (Wctxp); return; } /* *-------------------------------------------------------------------------------- */ void NNTP_ExpireBoss (void) { wctx_t *Wctxp; if ( !(Wctxp = NNTP_WorkerInit(T_Expr,0,0,0,0,0)) ) { NNTP_LOG(LOGF,"[Expr]No free Wctx."); return ; } nntp_expire (Wctxp); NNTP_WorkerKill (Wctxp); } /* *-------------------------------------------------------------------------------- */ int pthread_sleep (int min) { struct timespec delta; delta.tv_sec = min*60; delta.tv_nsec= 0; return pthread_delay_np(&delta); }