/* * CHANNELS - Handles common operations on channels; also contains the code * to process incoming data from them. For outgoing interface * messages, we just use the generic meta_avtomsg() in metaops.c. * * Author: * Emile van Bergen, emile@evbergen.xs4all.nl * * Permission to redistribute an original or modified version of this program * in source, intermediate or object code form is hereby granted exclusively * under the terms of the GNU General Public License, version 2. Please see the * file COPYING for details, or refer to http://www.gnu.org/copyleft/gpl.html. * * History: * 2001/10/29 - EvB - Created * 2002/01/07 - EvB - Moved most of chan_*msgtoav's code to metaops.c, * to make it independent of channels and rings. * 2005/06/20 - EvB - Hefty cleanup, triggered by creating have_*msg_inring * from proc_binmsgavailable for use by subserver input */ char channels_id[] = "CHANNELS - Copyright (C) 2001,2005 Emile van Bergen."; /* * INCLUDES & DEFINES */ #include /* For malloc() / free() */ #include /* For memset() */ #include /* Also includes srvtypes.h */ #include /* For job_toiface(), job_tochan(), job_fromchan() */ #include /* For VM. Also includes metaops for META_AV. */ #define DEBUGLEVEL 1 /* Include D1 statements */ #include /* * FUNCTIONS */ /* To be called when select says a channel's subprocess' pipe has data */ void chan_handle_read(CHAN *ch, time_t t) { static U_INT32_T binmsg[(C_MAX_MSGSIZE + 16) >> 2]; char *ascmsg = (char *)binmsg; IFACE *i; JOB *j; ssize_t len; /* Transfer as many bytes to the ring as possible, handling EOF and overruns by restarting the subprocess */ proc_handle_read(ch->proc, t); len = 0; if (ch->iface->flags & C_DV_FLAGS_ASCII) { /* ASCII interface: loop while we have jobs on the recv. queue * and message line in the ring */ while(!job_ring_empty(ch) && (len = have_ascline_inring(ch->proc->r)) >= 0) { if (len) { /* Take the indicated amount from the ring - we need to copy for getitembyspec and getvalbyspec anyway. If those worked on rings, we could do everything straight on the ring, but now it doesn't seem worth it to use prttoa, atoip and atoord directly on the ring, because we still need to copy the other elements. Note that 'len' is already bound by proc_new and proc_msgavailable. */ ring_get(ch->proc->r, ascmsg, len); msg(F_PROC, L_DEBUG, "chan_handle_read: got line [%s]\n", dbg_cvtstr(ascmsg, len)); if (meta_ascmsgtoav(ch->iface->c->m, ascmsg,len, &ch->reph, &ch->rept, ch->iface->flags, ch->iface->recvacl) == -1) msg(F_PROC, L_NOTICE, "chan_ascmsgtoav: ERROR: Could not add AV pair received from '%s' (pid %d)!\n", ch->proc->argv[0], ch->proc->pid); ring_discard(ch->proc->r, 1); continue; } /* Empty line: end of message */ ring_discard(ch->proc->r, 1); /* We have a full answer. Take the job from the * channel's receive queue and run it until it ends or * makes another interface call. */ msg(F_PROC, L_NOTICE, "chan_handle_read: complete ASCII message from %s, pid %d - continuing job.\n", ch->iface->name, ch->proc->pid); j = job_fromchan(ch, t); do i = job_run(ch->iface->c, j); while (i && job_toiface(i, j, t)); } } else { /* Binary interface: loop while we have jobs on the recv. queue * and messages in the ring */ while(!job_ring_empty(ch) && (len = have_binmsg_inring(ch->proc->r, C_BINMSG_REPMAGIC)) >= 0) { /* Note that 'len' is already bound by proc_new and * proc_msgavailable */ ring_get(ch->proc->r, (char *)binmsg, len); msg(F_PROC, L_NOTICE, "chan_handle_read: received binary message from %s, pid %d - continuing job.\n", ch->iface->name, ch->proc->pid); if (msg_thresh[F_PROC] >= L_DEBUG) hexdump((char *)binmsg, len); /* Decode message, copying strings; unknown pairs * are non-fatal */ if (meta_binmsgtoav(ch->iface->c->m, binmsg, len, &ch->reph, &ch->rept, 0, ch->iface->recvacl) == -1) { msg(F_PROC, L_NOTICE, "chan_binmsgtoav: WARNING: could not add all AV pair(s) received from %s, pid %d!\n", ch->proc->argv[0], ch->proc->pid); } /* Continue job */ j = job_fromchan(ch, t); do i = job_run(ch->iface->c, j); while (i && job_toiface(i, j, t)); } } /* Error recovery; if we saw header or framing errors on the channel, * restart the subprocess */ if (len < -1) { msg(F_PROC, L_ERR, "chan_handle_read: Invalid message received " "from '%s', pid %d for %s - restarting " "module!\n", ch->proc->argv[0], ch->proc->pid, ch->iface->name); proc_stop(ch->proc, t); } /* If there are no more jobs on recv. queue, clear receiving state */ if (job_ring_empty(ch)) { ch->proc->state &= ~PRS_RECEIVING; if (ch->proc->state == PRS_IDLE) ch->proc->timer = 0; msg(F_PROC, L_DEBUG, "- no more jobs waiting for pid %d - " "cleared receiving state\n", ch->proc->pid); } else { ch->proc->timer = ch->jobring[ch->job_r]->expiry; msg(F_PROC, L_DEBUG, "- done receiving for now - next oldest " "job is %d which expires in %d seconds\n", ch->job_r, ch->proc->timer - t); } /* No message yet or anymore */ return; } /* To be called when select says a channel's subprocess' pipe has more room */ void chan_handle_write(CHAN *ch, time_t t) { JOB *j; for(;;) { /* Write as many bytes as we can from the tx ring, set receiving state if we actually transfered anything, clear sending state if there's nothing more in the tx ring */ proc_handle_write(ch->proc, t); /* See if the tx'er is idle and the iface has more */ if ((ch->proc->state == PRS_IDLE || ch->proc->state == PRS_RECEIVING) && (j=ch->iface->sendqt)) { /* It has. Take the job from the shared send queue */ ch->iface->sendqt = j->next; if (ch->iface->sendqt) ch->iface->sendqt->prev = 0; else ch->iface->sendqh = 0; ch->iface->sendqlen--; msg(F_PROC, L_DEBUG, "- tx idle, took another job from " "shared send queue (%d left)\n", ch->iface->sendqlen); /* Give it to this channel and loop */ job_tochan(ch, j, t); continue; } return; } } void chan_flush(CHAN *ch) { int n; for(n = ch->job_r; n != ch->job_w; n = (n + 1) % C_CHAN_JOBRINGSIZE) { if (!ch->jobring[n]) continue; job_del(ch->jobring[n]); ch->jobring[n] = 0; msg(F_PROC, L_DEBUG, "chan_flush: removing job %d from receive queue\n", n); } ch->job_r = ch->job_w; if (ch->reph) { meta_freeavlist(ch->reph); ch->reph = 0; } ch->rept = 0; }