/* Copyright (c) 1996-2006, Ruslan R. Laishev (@RRL) */ #include "nntp.h" WCTX *WctxPtr [ NNTPMAXWORKER ]; pthread_mutex_t Wctxm = PTHREAD_MUTEX_INITIALIZER; extern pthread_attr_t tattr; extern int exit_flag; int pthread_sleep (int); /* *-------------------------------------------------------------------------------- */ void NNTP_WorkerClient (WCTX *Wctxp) { int status,sz; int (*nntp_fun) (WCTX *); sz = sprintf(Wctxp->wctx$t_buf,"200 %s is ready, (Posting Ok)",ID$IDver); if ( !(1 & (status = net_send_line (Wctxp->wctx$a_chan,Wctxp->wctx$t_buf,sz))) ) { NNTP_WorkerKill (Wctxp); pthread_exit (NULL); } if ( !(1 & (status = nntp_cmd_exec (Wctxp))) ) NNTP_LOGT(Wctxp,LOG$K_ERR,"End worker (status = %d).",status); else NNTP_LOGT(Wctxp,LOG$K_WAR,"End worker."); NNTP_WorkerKill(Wctxp); pthread_exit(NULL); } /* *-------------------------------------------------------------------------------- */ void NNTP_WorkerKill (WCTX *Wctxp) { int status, sz = 0; pthread_mutex_lock(&Wctxm); DBclose_stream (&Wctxp->wctx$r_mrab); DBclose_stream (&Wctxp->wctx$r_grab); if ( (Wctxp->wctx$b_type == WCTX_TP$K_FEED) || (Wctxp->wctx$b_type == WCTX_TP$K_SUCK) ) DBclose_stream (&Wctxp->wctx$r_fsrab); if ( Wctxp->wctx$a_chan ) net_close(Wctxp->wctx$a_chan); WctxPtr [ Wctxp->wctx$b_indx ] = NULL; sz = sizeof(WCTX); if ( !(1 & (status = lib$free_vm (&sz,&Wctxp,0))) ) lib$signal(status); pthread_mutex_unlock(&Wctxm); } /* *-------------------------------------------------------------------------------- */ void *NNTP_WorkerInit ( int WorkerType, void *chan, char *ipadr, int ipadrlen, char *ipname, int ipnamelen ) { int status,indx,sz = sizeof(WCTX); WCTX *Wctxp; ushort num_tm[7]; memset(num_tm,0,sizeof(num_tm)); num_tm[4] = nntp_conf._w_clnt_tmo; pthread_mutex_lock(&Wctxm); while(1) { Wctxp = NULL; for (indx = 0;indx < NNTPMAXWORKER;indx++) if ( NULL == WctxPtr [indx] ) break; if ( indx >= NNTPMAXWORKER ) break; if ( WorkerType == WCTX_TP$K_SUCK ) num_tm[4] = nntp_conf._w_suck_tmo; if ( WorkerType == WCTX_TP$K_FEED ) num_tm[4] = nntp_conf._w_feed_tmo; if ( !(1 & (status = lib$get_vm (&sz,&Wctxp,0))) ) break; if ( !(1 & (status = lib$cvt_vectim(num_tm,&Wctxp->wctx$q_tmo))) ) break; if ( !(1 & (status = MsgDBopen_stream (&Wctxp->wctx$r_mrab))) ) { NNTP_LOG(LOG$K_ERR,"[WorkerInit]MsgDBopen_stream."); if ( !(1 & (status = lib$free_vm (&sz,&Wctxp,0))) ) lib$signal(status); break; } if ( !(1 & (status = GrpDBopen_stream (&Wctxp->wctx$r_grab))) ) { NNTP_LOG(LOG$K_ERR,"[WorkerInit]GrpDBopen_stream."); DBclose_stream(&Wctxp->wctx$r_mrab); if ( !(1 & (status = lib$free_vm (&sz,&Wctxp,0))) ) lib$signal(status); break; } if ( (WorkerType == WCTX_TP$K_CLIENT) ) { memcpy(Wctxp->wctx$t_ipadr,ipadr,ipadrlen); Wctxp->wctx$b_ipadrlen = (unsigned char) ipadrlen; } if ( (WorkerType == WCTX_TP$K_FEED) || (WorkerType == WCTX_TP$K_SUCK) ) { if ( !(1 & (status = FeedSuckDBopen_stream (&Wctxp->wctx$r_fsrab))) ) { NNTP_LOG(LOG$K_ERR,"[WorkerInit]FeedSuckDBopen_stream."); DBclose_stream(&Wctxp->wctx$r_mrab); DBclose_stream(&Wctxp->wctx$r_grab); if ( !(1 & (status = lib$free_vm (&sz,&Wctxp,0))) ) lib$signal(status); break; } } Wctxp->wctx$b_indx = indx; Wctxp->wctx$a_chan = chan; Wctxp->wctx$b_type = WorkerType; WctxPtr[indx] = Wctxp; break; } if (!$VMS_STATUS_SUCCESS(status)) { Wctxp = NULL; NNTP_LOG(LOG$K_FAT,"[WorkerInit],(status = %d).",status); } else NNTP_LOG(LOG$K_WAR,"[WorkerInit][Th:%03u]-Ok.",indx); pthread_mutex_unlock(&Wctxm); return (void *) Wctxp; } /* *-------------------------------------------------------------------------------- */ int NNTP_InitBosses (void) { memset (WctxPtr,0,sizeof(WctxPtr)); } /* *-------------------------------------------------------------------------------- */ char SRVBUSY[] = "501 Sorry, NNTP Server is too busy, try later."; char NOACCESS[]= "502 Sorry, You are not in my access list."; void *NNTP_ClientBoss (void *empty) { int status; void *chan; WCTX *Wctxp; char buf0[512],buf1[512]; int buf0sz,buf1sz; pthread_t tid; while ( !exit_flag ) { buf0sz = sizeof(buf0); buf1sz = sizeof(buf1); if ( !(1 & (status = net_connect_inc (&chan,nntp_conf._w_localport,buf0,&buf0sz,buf1,&buf1sz))) ) lib$signal(status); if ( !chk_IP_prot (buf0,buf0sz,buf1,buf1sz) ) { net_send_line(chan,ASCIC(NOACCESS)); net_close (chan); NNTP_LOG(LOG$K_SER,"[Client]No access for %.*s,(%.*s).",buf0sz,buf0,buf1sz,buf1); continue; } if ( !(Wctxp = NNTP_WorkerInit(WCTX_TP$K_CLIENT,chan,buf0,buf0sz,buf1,buf1sz)) ) { net_send_line(chan,ASCIC(SRVBUSY)); net_close (chan); NNTP_LOG(LOG$K_FAT,"[Client]No free Wctxp."); continue; } NNTP_LOG(LOG$K_INF,"[Client]Start Worker [Th:%03u] for %.*s,(%.*s).", Wctxp->wctx$b_indx,buf0sz,buf0,buf1sz,buf1); if ( 0 > (status = pthread_create(&tid,&tattr,NNTP_WorkerClient,Wctxp)) ) { NNTP_LOG(LOG$K_FAT,"[Client]pthread_create (status = %d).",status); NNTP_WorkerKill (Wctxp); continue; } NNTP_LOG(LOG$K_DBG,"[Client]Start Worker [Th:%03u]-Ok.",Wctxp->wctx$b_indx); } return; } /* *-------------------------------------------------------------------------------- */ int FeedSuckArg ( char *list, ushort listlen, ushort argnum, struct FSArg *fsargp ) { ushort cnt; struct dsc$descriptor dsc_list; INIT_SDESC(dsc_list,listlen,list); /* ** Extract elements of entry */ for (cnt = 0;cnt < argnum;cnt++,fsargp++) fsargp->_w_len = strelem(&dsc_list,':', cnt,&fsargp->_a_arg); return SS$_NORMAL; } /* *-------------------------------------------------------------------------------- */ void *NNTP_SuckBoss (void *empty) { long status; WCTX *Wctxp; char *Ent; ushort EntLen; struct FSArg ArgList[SUCK$K_ARGS]; ushort Idx,port; if ( !nntp_conf._s_suck_list.dsc$w_length ) { NNTP_LOG(LOG$K_INF,"[Suck]No Suck defined."); pthread_exit (NULL); } if ( !(Wctxp = NNTP_WorkerInit(WCTX_TP$K_SUCK,0,0,0,0,0)) ) { NNTP_LOG(LOG$K_FAT,"[Suck]No free Wctx."); pthread_exit (NULL); } Idx = 0; while ( !exit_flag ) { /* ** Select next suck entry from nntp_conf._s_suck_list */ if ( !(EntLen = strelem(&nntp_conf._s_suck_list,'|', Idx,&Ent)) ) { Idx = 0; pthread_sleep (nntp_conf._w_suck_delta); continue; } NNTP_LOG(LOG$K_INF,"[Suck]Start (%.*s)",EntLen,Ent); Idx++; /* ** Extract parameters from suck's entry */ FeedSuckArg(Ent,EntLen,SUCK$K_ARGS,ArgList); lib$cvt_dtb(ArgList[SUCK$K_TCPPORT]._w_len,ArgList[SUCK$K_TCPPORT]._a_arg,&port); /* ** Try to connect to target host */ if ( !(1 & (status = net_connect_out(&Wctxp->wctx$a_chan, ArgList[SUCK$K_NAME]._a_arg,ArgList[SUCK$K_NAME]._w_len,port))) ) { NNTP_LOGT(Wctxp,LOG$K_ERR,"'net_connect_out',status = %d.",status); net_close (Wctxp->wctx$a_chan); continue; } if ( !(1 & (status = nntp_suck (Wctxp,ArgList))) ) NNTP_LOGT(Wctxp,LOG$K_ERR,"End of sucking (status = %d).",status); else NNTP_LOGT(Wctxp,LOG$K_INF,"End of sucking."); net_close (Wctxp->wctx$a_chan); } NNTP_WorkerKill (Wctxp); return; } /* *-------------------------------------------------------------------------------- */ void *NNTP_FeedBoss (void *empty) { long status; WCTX *Wctxp; char *Ent; ushort EntLen; struct FSArg ArgList[FEED$K_ARGS]; ushort Idx,port; if ( !nntp_conf._s_feed_list.dsc$w_length ) { NNTP_LOG(LOG$K_INF,"[Feed]No Feed record defined."); pthread_exit (NULL); } if ( !(Wctxp = NNTP_WorkerInit(WCTX_TP$K_FEED,0,0,0,0,0)) ) { NNTP_LOG(LOG$K_FAT,"[Feed]No free Wctx."); pthread_exit (NULL); } Idx = 0; while ( !exit_flag ) { /* ** Select next feed entry from nntp_conf._s_feed_list */ if ( !(EntLen = strelem(&nntp_conf._s_feed_list,'|', Idx,&Ent)) ) { NNTP_LOG(LOG$K_WAR,"[Feed]All hosts is polled."); Idx = 0; pthread_sleep (nntp_conf._w_feed_delta); continue; } NNTP_LOG(LOG$K_INF,"[Feed]Start (%.*s)",EntLen,Ent); Idx++; /* ** Extract parameters from feed's entry */ FeedSuckArg (Ent,EntLen,FEED$K_ARGS,ArgList); lib$cvt_dtb(ArgList[FEED$K_TCPPORT]._w_len,ArgList[FEED$K_TCPPORT]._a_arg,&port); /* ** Try to connect to target host */ if ( !(1 & (status = net_connect_out (&Wctxp->wctx$a_chan, ArgList[FEED$K_NAME]._a_arg,ArgList[FEED$K_NAME]._w_len,port))) ) { NNTP_LOGT(Wctxp,LOG$K_ERR,"'net_connect_out' (status = %d).",status); net_close(Wctxp->wctx$a_chan); continue; } if ( !(1 & (status = nntp_feed (Wctxp,ArgList))) ) NNTP_LOGT(Wctxp,LOG$K_ERR,"End of feeding (status = %d).",status); else NNTP_LOGT(Wctxp,LOG$K_INF,"End of feeding."); net_close(Wctxp->wctx$a_chan); } NNTP_WorkerKill (Wctxp); return; } /* *-------------------------------------------------------------------------------- */ void NNTP_ExpireBoss (void) { WCTX *Wctxp; if ( !(Wctxp = NNTP_WorkerInit(WCTX_TP$K_PURGE,0,0,0,0,0)) ) { NNTP_LOG(LOG$K_FAT,"[Expr]No free Wctx."); return ; } nntp_expire (Wctxp); NNTP_WorkerKill (Wctxp); } /* *-------------------------------------------------------------------------------- */ int pthread_sleep (int min) { struct timespec delta; delta.tv_sec = min*60; delta.tv_nsec= 0; return pthread_delay_np(&delta); }