/*
* JOBS - Handles job creation from requests, job juggling on interfaces
* and channels, and job ending after converting them to responses.
*
* Author:
* Emile van Bergen, emile@evbergen.xs4all.nl
*
* Permission to redistribute an original or modified version of this program
* in source, intermediate or object code form is hereby granted exclusively
* under the terms of the GNU General Public License, version 2. Please see the
* file COPYING for details, or refer to http://www.gnu.org/copyleft/gpl.html.
*
* History:
* 2001/08/25 - EvB - Created
* 2001/10/29 - EvB - Moved chan_put/getjob here as job_to/fromchan etc.
* 2003/06/23 - EvB - Fixed SIGSEGV caused by wrong arguments to log message
* when a shared queue overflows
*/
char jobs_id[] = "JOBS - Copyright (C) 2001 Emile van Bergen.";
/*
* INCLUDES & DEFINES
*/
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <stdlib.h>
#include <language.h> /* Also includes metaops.h */
#include <config.h> /* For conf_new() etc */
#include <md5.h>
#include <debug.h>
#include <jobs.h>
/*
* FUNCTIONS
*/
static META_AV *meta_addnewav(META_AV **head, META_AV **tail,
META_AV *rel, int before,
META_ITEM *i, char *p, META_ORD l, int flags)
{
META_AV *av;
av = (META_AV *)malloc(sizeof(META_AV)); if (!av) return 0;
memset(av, 0, sizeof(META_AV));
av->i = i;
av->p = p;
av->l = l;
av->flags = flags;
meta_addav(head, tail, rel, before, av);
return av;
}
static JOB *job_new_common(JOB *j, CONF *c,
META_AV *reqh, META_AV *reqt, META_AV *reqcur)
{
META_AV *reph, *rept, *av, *srcav;
/* Add copied pairs from the configuration's static request list */
av = reqcur;
for(srcav = c->inithead[VM_REQ]; srcav; srcav = srcav->next) {
if (srcav->i == c->di_request_nr) srcav->l++;
av = meta_addnewav(&reqh, &reqt, av, 0,
srcav->i, srcav->p, srcav->l, 0);
if (av) continue;
msg(F_RECV, L_ERR, "job_new: ERROR: Could not allocate "
"new AV item(s)!\n");
meta_freeavlist(reqh);
job_del(j);
return 0;
}
/* Initialize reply list with pairs copied from the config reply list */
av = reph = rept = 0;
for(srcav = c->inithead[VM_REP]; srcav; srcav = srcav->next) {
av = meta_addnewav(&reph, &rept, av, 0,
srcav->i, srcav->p, srcav->l, 0);
if (av) continue;
msg(F_RECV, L_ERR, "job_new: ERROR: Could not allocate "
"new AV item(s)!\n");
meta_freeavlist(reqh);
meta_freeavlist(reph);
job_del(j);
return 0;
}
/* Create the VM */
j->vm = vm_new(c->expr, c->exprlen, C_MAX_STKDEPTH,
reqh, reqt, reph, rept);
/* Show request list */
if (msg_thresh[F_RECV] >= L_NOTICE) {
meta_printavlist(c->m, j->vm->head[VM_REQ], 0);
}
return j;
}
JOB *job_new_fromring(CONF *c, RING *r, ssize_t len,
RING *reply, int replyfd, time_t t)
{
JOB *j;
META_AV *reqh, *reqt, *av;
msg(F_RECV, L_NOTICE, "job_new_fromring: Received request:\n");
/* Allocate new job, including packet buffer */
j = (JOB *)malloc(sizeof(JOB)); if (!j) return 0;
memset(j, 0, sizeof(JOB));
j->pkt = (char *)malloc(C_MAX_MSGSIZE); /* must be aligned */
if (!j->pkt) { free(j); return 0; }
/* Remember how to reply to this job */
j->replyfd = replyfd;
j->replytype = RT_RING;
j->replyaddr.ring = reply;
if (len) {
/* Get packet from ring and decode it using module interface;
* strings point into buffer, so we must keep that around.
* This will all be better and involve less copying once we
* backport the fastrings. */
ring_get(r, (char *)j->pkt, len);
j->pktlen = len;
if (msg_thresh[F_RECV] >= L_DEBUG) hexdump(j->pkt, j->pktlen);
reqh = reqt = 0;
if (meta_binmsgtoav(c->m, (U_INT32_T *)j->pkt, j->pktlen,
&reqh, &reqt, 1, 0) == -1) {
msg(F_RECV, L_ERR, "job_new_fromring: ERROR: Could not "
"decode packet!\n");
job_del(j);
return 0;
}
}
/* Add extra information about this request */
av = meta_addnewav(&reqh, &reqt, 0, 1, c->di_timestamp, 0, t, 0);
av = meta_addnewav(&reqh, &reqt, av, 0, c->di_source, "Parent", 6, 0);
av = meta_addnewav(&reqh, &reqt, av, 0, c->di_dest, "Stdin", 5, 0);
if (!av) {
msg(F_RECV,L_ERR, "job_new_fromring: ERROR: Could not allocate "
"new AV item(s)!\n");
meta_freeavlist(reqh);
job_del(j);
return 0;
}
return job_new_common(j, c, reqh, reqt, av);
}
JOB *job_new_fromsock(SOCK *s, time_t t)
{
JOB *j;
META_AV *reqh, *reqt, *av;
int sl;
/* Allocate new job, including packet buffer */
j = (JOB *)malloc(sizeof(JOB)); if (!j) return 0;
memset(j, 0, sizeof(JOB));
j->pkt = (char *)malloc(C_MAX_PKTSIZE);
if (!j->pkt) { free(j); return 0; }
/* Receive packet, setting also the reply fd and sockaddr_in */
sl = sizeof(j->replyaddr.sockaddr);
j->pktlen = recvfrom(s->fd, j->pkt, C_MAX_PKTSIZE, 0,
(struct sockaddr *)&(j->replyaddr.sockaddr), &sl);
if (j->pktlen == -1) {
msg(F_RECV,L_NOTICE, "job_new_fromsock: WARNING: recvfrom() "
"said: %s!\n", strerror(errno));
job_del(j);
return 0;
}
j->replyfd = s->fd;
j->replytype = RT_SOCKET;
if (msg_thresh[F_RECV] >= L_NOTICE) {
msg(F_RECV, L_NOTICE, "job_new_fromsock: Received request:\n");
if (msg_thresh[F_RECV] >= L_DEBUG) hexdump(j->pkt, j->pktlen);
}
/* Decode packet to the request list. Note that the pointers in the AV
items refer to the data in the packet buffer, so don't free that
packet buffer for as long as the job lives. */
reqh = meta_decode(s->c->m, s->c->ds_ground, 0, j->pkt, j->pktlen,
&reqt);
if (!reqh) {
msg(F_RECV, L_ERR, "job_new_fromsock: ERROR: Could not decode "
"packet!\n");
job_del(j);
return 0;
}
/* Add extra information about this request */
av = meta_addnewav(&reqh, &reqt, 0, 1, s->c->di_timestamp, 0, t, 0);
av = meta_addnewav(&reqh, &reqt, av, 0, s->c->di_source, "Network",7,0);
av = meta_addnewav(&reqh, &reqt, av, 0, s->c->di_dest, "Socket", 6, 0);
av = meta_addnewav(&reqh, &reqt, av, 0, s->c->di_ip_source, 0,
getord((char *)&j->replyaddr.sockaddr.sin_addr,
sizeof(j->replyaddr.sockaddr.sin_addr)), 0);
av = meta_addnewav(&reqh, &reqt, av, 0, s->c->di_ip_dest, 0, s->ip, 0);
av = meta_addnewav(&reqh, &reqt, av, 0, s->c->di_udp_source, 0,
getord((char *)&j->replyaddr.sockaddr.sin_port,
sizeof(j->replyaddr.sockaddr.sin_port)), 0);
av = meta_addnewav(&reqh, &reqt, av, 0, s->c->di_udp_dest, 0,
s->port, 0);
if (!av) {
msg(F_RECV,L_ERR, "job_new_fromsock: ERROR: Could not allocate "
"new AV item(s)!\n");
meta_freeavlist(reqh);
job_del(j);
return 0;
}
return job_new_common(j, s->c, reqh, reqt, av);
}
void job_del(JOB *j)
{
if (j) {
if (j->pkt) free(j->pkt);
if (j->vm) {
if (j->vm->head[VM_REQ])
meta_freeavlist(j->vm->head[VM_REQ]);
if (j->vm->head[VM_REP])
meta_freeavlist(j->vm->head[VM_REP]);
vm_del(j->vm);
}
free(j);
}
}
/* Runs the expression until interface trap or done. If done, signs,
responds, deletes the job and returns 0. If error, deletes the job and
returns 0. If interface trap, returns the interface. */
IFACE *job_run(CONF *c, JOB *j)
{
static char dummy[16];
META_AV *av, *avtree, *secretav, *msgauthav;
META_VAL *val;
md5_state_t mds;
IFACE *ret;
char *logline;
int loglinel, radcode, n;
/* Run the expression on this job until something happens */
n = vm_run(c->m, j->vm);
/* Scan reply list for a few pairs we need in multiple cases below */
secretav = msgauthav = 0;
radcode = 0; logline = "No details"; loglinel = 10;
for(av = j->vm->head[VM_REP]; av; av = av->next)
{
if (av->i == c->di_secret) { secretav = av; continue; }
if (av->i == c->di_msg_auth) { msgauthav = av; continue; }
if (av->i == c->di_code) { radcode = av->l; continue; }
if (av->i == c->di_log_line) logline = av->p, loglinel = av->l;
}
/* If Message-Authenticator is present but not exactly 16 characters
long, replace it with a dummy buffer of 16 characters. It will be
set to zero in the encoded packet before signing. */
if (msgauthav && msgauthav->l != 16) {
meta_freeavdata(msgauthav);
msgauthav->p = dummy; msgauthav->l = 16;
}
/* Handle the various halt or trap cases */
switch(n) {
case VM_HALTED: /* Done */
if (j->replytype == RT_SOCKET) {
val = meta_getvalbynr(c->m, c->di_code, radcode);
msg(F_SEND, L_NOTICE, "Sending %s (%d) %s\n",
val ? val->name : "response code", radcode,
dbg_cvtstr(logline, loglinel));
msg(F_SEND, L_DEBUG, "job_run: Expression done, response:\n");
/* Build encapsulation tree and encode packet */
meta_buildtree(j->vm->head[VM_REP], &avtree, c->ds_ground);
if (!avtree) { msg(F_SEND, L_ERR, "job_run: ERROR: Could not "
"build response tree!\n"); break; }
if (msg_thresh[F_SEND] >= L_DEBUG)
meta_printavlist(c->m, avtree, 0);
j->pktlen = meta_encode(c->ds_ground, j->pkt,
C_MAX_PKTSIZE, avtree, 0);
if (j->pktlen == -1) {
msg(F_SEND, L_ERR, "job_run: ERROR: Could not encode "
"response!\n");
meta_freeavtree(avtree);
break;
}
/* Verify that we have a shared secret */
if (!secretav || !secretav->p || !secretav->l || secretav->l > C_MAX_SECRSIZE) { msg(F_SEND, L_ERR, "job_run: ERROR: No (valid) secret to sign response!\n"); break; }
/* If we have an empty Message-Authenticator with the correct
* length on the reply list, and we have a corresponding pair
* in the reply tree, and and we have an offset for its value
* in the encoded packet, sign packet there */
if (msgauthav && msgauthav->l == 16 && msgauthav->treeitem &&
msgauthav->treeitem->encp) {
memset(msgauthav->treeitem->encp, 0, 16);
hmac_md5(msgauthav->treeitem->encp, j->pkt, j->pktlen,
secretav->p, secretav->l);
}
/* Only now free the tree; we needed it to find the encoded
* Message-Authenticator value */
meta_freeavtree(avtree);
/* Sign the packet's response auth. using the shared secret */
md5_init(&mds);
md5_append(&mds, (unsigned char *)j->pkt, j->pktlen);
md5_append(&mds, (unsigned char *)secretav->p, secretav->l);
md5_finish(&mds, (unsigned char *)j->pkt +
c->di_authenticator->val_ofs);
/* Send the reply */
if (msg_thresh[F_SEND] >= L_DEBUG) hexdump(j->pkt, j->pktlen);
n = sendto(j->replyfd, j->pkt, j->pktlen, 0,
(struct sockaddr *)&j->replyaddr.sockaddr,
sizeof(j->replyaddr.sockaddr));
if (n == -1) { msg(F_SEND, L_ERR, "job_run: ERROR: Could not "
"reply: %s!\n", strerror(errno)); }
break;
}
/* Else: reply type not socket */
msg(F_SEND, L_NOTICE, "job_run: Expression done, response:\n");
if (msg_thresh[F_SEND] >= L_NOTICE)
meta_printavlist(c->m, j->vm->head[VM_REP], 0);
j->pktlen = meta_avtomsg(c->m, j->vm->head[VM_REP],
j->pkt, C_MAX_MSGSIZE, 0, 0,
C_BINMSG_REPMAGIC);
if (msg_thresh[F_SEND] >= L_DEBUG) hexdump(j->pkt, j->pktlen);
if (j->replytype == RT_RING) {
ring_put(j->replyaddr.ring, j->pkt, j->pktlen);
break;
}
if (j->replytype == RT_CHAN) {
/* TBD */
}
break;
case VM_ABORTED: /* There was an 'abort' opcode */
msg(F_SEND, L_NOTICE, "ABORT - %s\n",
dbg_cvtstr(logline, loglinel));
msg(F_SEND, L_DEBUG, "job_run: Aborted by code - "
"dropping request.\n");
break;
case VM_IFACETRAP: /* There was an interface call */
ret = vm_getiface(j->vm);
msg(F_LANG, L_DEBUG, "job_run: Interface call: %s\n",ret->name);
return ret;
default: /* The VM choked on the code */
msg(F_SEND, L_NOTICE, "ERROR - %s\n",
dbg_cvtstr(logline, loglinel));
msg(F_LANG, L_ERR, "job_run: VM returned error code %d - "
"dropping request!\n", n);
vm_dumptrace(j->vm, c->m, c->expr);
break;
}
job_del(j);
return 0;
}
/* Returns 0 if the job could be delivered, or -1 if it could not and the
* expression should be continued. */
int job_toiface(IFACE *i, JOB *j, time_t t)
{
CHAN *ch;
META_AV *av;
/* First check if this interface allows a PID to be specified in the
module call (for multi-call transactions that need to go to the
same instance of the module). If so, we try and find the given PID
among our channels. If none, we continue the job with a returned
PID attribute containing -1. */
if (i->pidattr) {
for(av = j->vm->head[VM_REQ];
av && av->i != i->pidattr;
av = av->next);
if (av) {
for(ch = i->chans;
ch && ch->proc->pid != av->l;
ch = ch->next);
if (ch) {
msg(F_PROC, L_NOTICE, "job_toiface: %s called, "
"using PID %ld due to A/V pair REQ:%s\n",
i->name, av->l, av->i->name);
job_tochan(ch, j, t);
return 0;
}
/* PID required but not found running. Add pid A/V pair
containing -1 to REQUEST list and continue job. */
msg(F_PROC, L_NOTICE, "job_toiface: %s called, but "
"PID %ld specified by REQ:%s not found - "
"continuing job\n", i->name, av->l, av->i->name);
meta_addnewav(&j->vm->head[VM_REQ],&j->vm->tail[VM_REQ],
0, 0, i->pidattr, 0, -1, 0);
return -1;
}
}
/* Otherwise, see if this interface has got an idle channel, starting
at the round-robin next one. If we don't, put the job on the shared
send queue. */
for(ch = i->rrch; ; ) {
/* If we found an idle channel or a channel with idle
* transmitter and window > 1, make it take the job. Putting
* it on the shared interface queue in case of an idle
* transmitter is a bad idea, because only the emptying of
* the transmitter will make a channel look for jobs on the
* shared send queue. Ideally the receiver should also be
* able to trigger this, depending on window size etc. Now,
* if a module is slow but the OS pipe to it is big, it will
* starve the shared queue. */
if (ch->proc->state == PRS_IDLE ||
ch->proc->state == PRS_RECEIVING) {
msg(F_PROC, L_NOTICE, "job_toiface: %s called, "
"using PID %ld (in state %d)\n", i->name,
ch->proc->pid, ch->proc->state);
job_tochan(ch, j, t);
i->rrch = ch->next; if (!i->rrch) i->rrch = i->chans;
return 0;
}
/* Go to next one, wrapping around */
ch = ch->next; if (!ch) ch = i->chans;
/* If we're back where we started, nothing was idle */
if (ch == i->rrch) break;
}
msg(F_PROC, L_NOTICE, "job_toiface: %s called, but no process idle - "
"putting job on shared queue (%d)\n", i->name, i->sendqlen);
if (i->sendqlen >= C_MAX_SENDQLEN) {
msg(F_PROC, L_ERR, "job_toiface: Shared queue for interface "
"%s overflowing (%d) - DROPPING job!\n",
i->name, i->sendqlen);
job_del(j);
return 0;
}
/* Put job on this interface's shared send queue */
j->next = 0; /* my next = none */
j->prev = i->sendqh; /* my prev = current head */
if (i->sendqh) i->sendqh->next = j; /* current head's next = me */
else i->sendqt = j; /* if none, then I'm tail too */
i->sendqh = j; /* I am always the new head */
i->sendqlen++;
return 0;
}
void job_tochan(CHAN *ch, JOB *j, time_t t)
{
static char reqmsg[C_MAX_MSGSIZE], jobticketbuf[16];
META_AV av;
ssize_t msglen;
PROC *p;
/* For convenience */
p = ch->proc;
/* See if this channel has indeed room (it should,
otherwise I shouldn't have been called) */
if (job_ring_maxput(ch) < 1) {
msg(F_PROC, L_ERR, "job_tochan: ERROR: No room in channel's job ring! Dropping request.\n");
job_del(j);
return;
}
/* Add A/V pair for ch->job_w if we use job tickets */
if (ch->iface->jobticket) {
memset(&av, 0, sizeof(av));
av.i = ch->iface->jobticket;
av.l = ch->job_w;
if (av.i->val_type == MT_STRING) {
av.l = meta_ordtoa(jobticketbuf, sizeof(jobticketbuf),
0, 10, ch->job_w);
av.p = jobticketbuf;
}
av.prev = j->vm->tail[VM_REQ];
if (j->vm->tail[VM_REQ]) j->vm->tail[VM_REQ]->next = &av;
else j->vm->head[VM_REQ] = &av;
}
/* Create the message from the request list. */
msglen = meta_avtomsg(ch->iface->c->m, j->vm->head[VM_REQ],
reqmsg, C_MAX_MSGSIZE,
ch->iface->flags, ch->iface->sendacl,
C_BINMSG_REQMAGIC);
if (msg_thresh[F_PROC] >= L_DEBUG) {
msg(F_PROC, L_DEBUG, "Message to %s's subprocess %s, pid %d:\n",
ch->iface->name, ch->proc->cmd, p->pid);
if (ch->iface->flags & AVMSG_ASCII) write(2, reqmsg, msglen);
else hexdump(reqmsg, msglen);
}
/* Remove temporary job ticket pair from list tail */
if (ch->iface->jobticket) {
if (av.prev) av.prev->next = 0;
else j->vm->head[VM_REQ] = 0;
j->vm->tail[VM_REQ] = av.prev;
}
/* Put the message in the channel's proc's ring. I'd agree we copy
things around a lot - avtomsg could be done straight to the ring.
The only times meta_avtomsg is used right now is here and in
printav(), which would also benefit from that... */
ring_put(p->w, reqmsg, msglen);
/* Set expiry time in job */
j->expiry = t + ch->iface->xfertimeout;
/* Put the channel's process tx'er in motion; apparently it was idle */
p->state |= PRS_SENDING;
if (p->timer == 0) p->timer = j->expiry;
msg(F_PROC, L_DEBUG, "job_tochan: set sending state on %s's pid %d for job %d; %d bytes now in ring; proc timer expires in %d seconds\n", ch->iface->name, p->pid, ch->job_w, ring_maxget(p->w), p->timer - t);
/* Add job to ring -- we verified that we had room earlier */
ch->jobring[ch->job_w] = j;
ch->job_w = (ch->job_w + 1) % C_CHAN_JOBRINGSIZE;
}
/* Note: this also takes the received AV list from the channel and
adds the pairs on it to the job that will be returned */
JOB *job_fromchan(CHAN *ch, time_t t)
{
JOB *j;
META_AV *av;
ssize_t len;
int nr;
if (ch->iface->window > 1 && ch->iface->jobticket) {
/* Async iface: scan channel's receive list for a job ticket
attribute, which must be present, and take its value. */
for(av = ch->reph;
av && av->i != ch->iface->jobticket;
av = av->next);
if (!av) { msg(F_PROC, L_ERR, "WARNING: Ignoring message received on asynchronous interface: no job ticket present\n"); meta_freeavlist(ch->reph); ch->reph = ch->rept = 0; return 0; }
nr = av->l;
if (av->i->val_type == MT_STRING) {
len = 0;
nr = meta_atoord(av->p, av->l, 0, 0, &len, 10);
if (len != av->l) { msg(F_PROC, L_ERR, "WARNING: Ignoring message containing malformed job ticket '%s'!\n", dbg_cvtstr(av->p, av->l)); meta_freeavlist(ch->reph); ch->reph = ch->rept = 0; return 0; }
}
msg(F_PROC, L_DEBUG, "- got job ticket %s = %d\n",
av->i->name, nr);
if (nr < 0 || nr > C_CHAN_JOBRINGSIZE || (j = ch->jobring[nr]) == 0) { msg(F_PROC, L_ERR, "WARNING: Ignoring message for unknown job %d\n", nr); meta_freeavlist(ch->reph); ch->reph = ch->rept = 0; return 0; }
/* We now have the job in j and its slot number nr in nr.
Clear the slot in the ring and see how far we can advance
the read pointer. */
ch->jobring[nr] = 0;
while(ch->jobring[ch->job_r] == 0 && ch->job_r != ch->job_w)
ch->job_r = (ch->job_r + 1) % C_CHAN_JOBRINGSIZE;
}
else { /* Synchronous interface: just take next job from ring */
if (ch->job_r == ch->job_w || (j = ch->jobring[ch->job_r]) == 0) { msg(F_PROC, L_ERR, "job_fromchan: ERROR: Ignoring unexpected message from subprocess; no job in queue!\n"); if (ch->reph) meta_freeavlist(ch->reph); ch->reph = ch->rept = 0; return 0; }
ch->jobring[ch->job_r] = 0;
ch->job_r = (ch->job_r + 1) % C_CHAN_JOBRINGSIZE;
}
/* Move received list of A/V pairs to job's reply list */
if (ch->reph) ch->reph->prev = j->vm->tail[VM_REP];
if (j->vm->tail[VM_REP]) j->vm->tail[VM_REP]->next = ch->reph;
else j->vm->head[VM_REP] = ch->reph;
if (ch->rept) j->vm->tail[VM_REP] = ch->rept;
ch->reph = ch->rept = 0;
/* If the interface specifies a module PID attribute, add an
A/V pair to the REQUEST list containing that attribute and
this channel's proc's process ID. */
if (ch->iface->pidattr) {
meta_addnewav(&j->vm->head[VM_REQ], &j->vm->tail[VM_REQ], 0, 0,
ch->iface->pidattr, 0, ch->proc->pid, 0);
msg(F_PROC, L_DEBUG, "- added A/V pair %s = %d\n",
ch->iface->pidattr->name, ch->proc->pid);
}
return j;
}
syntax highlighted by Code2HTML, v. 0.9.1