/* csink.c
* Description:
* Author(s): Cory Stone
* Created: 05/25/2000
* Last Modified: $Id: csink.c,v 1.38 2001/02/16 04:21:51 fatjim Exp $
*/
#include "csink.h"
#include <sys/time.h>
#include <stdio.h>
/* default method definitions */
static CBuf * csink_default_read (CSink *sink);
static void csink_default_open (CSink *sink);
static void csink_default_close (CSink *sink);
static void csink_default_free (CSink *sink);
GHashTable * wrap_data_ht = NULL;
void
csink_init (CSink *sink)
{
sink->inQueue = g_ptr_array_new ();
sink->outQueue = g_ptr_array_new ();
sink->read = (CSinkReadFunc) csink_default_read;
sink->write = (CSinkWriteFunc) csink_default_write;
sink->open = (CSinkOpenFunc) csink_default_open;
sink->close = (CSinkCloseFunc) csink_default_close;
sink->free = (CSinkFreeFunc) csink_default_free;
}
void
csink_release (CSink *sink)
{
g_ptr_array_free (sink->inQueue, TRUE);
g_ptr_array_free (sink->outQueue, TRUE);
}
CBuf *
csink_default_read (CSink *sink)
{
return NULL;
}
gint
csink_default_write (CSink *sink, CBuf *msg)
{
CBuf *msgcpy = cbuf_new_with_cbuf (msg);
g_ptr_array_add (sink->outQueue, (gpointer) msgcpy);
return msgcpy->len;
}
static void
csink_default_open (CSink *sink)
{
csink_on_error (sink, "IMPLEMENTATION_DEFAULTCALL");
csink_close(sink);
}
static void
csink_default_close (CSink *sink)
{
csink_on_error (sink, "IMPLEMENTATION_DEFAULTCALL");
}
static void
csink_default_free (CSink *sink)
{
CDEBUG (("csink", "default free; are you sure you should be using this?"));
csink_release (sink);
free (sink);
}
/* This is not part of glib but should be. */
void
g_ptr_array_add_at_start (GPtrArray *array, gpointer data)
{
int i;
/* Grow array. */
g_ptr_array_set_size (array, array->len+1);
/* Set the last value to the second last value, and so on. */
for (i = array->len-1; i > 0; i--)
{
g_ptr_array_index (array, i) = g_ptr_array_index (array, i - 1);
}
/* Now set the zeroth value to the new data. */
g_ptr_array_index (array, 0) = data;
}
/* This should have a string table for the errors... */
gchar *
csink_errstr (CSink * sink)
{
return sink->error_record->description;
}
/* User callback setting */
void
csink_set_new_data_func (CSink * sink, CSinkCallbackFunc func)
{
sink->on_new_data = func;
}
void
csink_set_empty_send_queue_func (CSink * sink, CSinkCallbackFunc func)
{
sink->on_empty_send_queue = func;
}
void
csink_set_error_func (CSink * sink, CSinkCallbackFunc func)
{
sink->on_error = func;
}
void
csink_set_close_func (CSink * sink, CSinkCallbackFunc func)
{
sink->on_close = func;
}
void
csink_set_connect_func (CSink * sink, CSinkCallbackFunc func)
{
sink->on_connect = func;
}
void *
csink_get_user_data (CSink * sink)
{
return sink->user_data;
}
void
csink_set_user_data (CSink * sink, void *data)
{
sink->user_data = data;
}
/* Error handling */
gint
csink_error (CSink * sink)
{
if (TRUE == sink->err) {
sink->err = FALSE;
return TRUE;
}
return FALSE;
}
void
csink_on_error__P (CSink * sink, char *err_name, char *file, int line)
{
CSink_Error *e = csink_error_table;
CSink_Error *found = NULL;
CDEBUG (("csink", "Asked to signal error: %s on %s:%d", err_name,
file, line));
while (e->name != NULL) {
if (strcmp (e->name, err_name) == 0) {
found = e;
break;
}
e++;
}
sink->err = TRUE;
if (found) {
sink->error_record = found;
CDEBUG (("csink", "Got error: %s\n", sink->error_record->name));
CDEBUG (("csink", "err string: %s\n", csink_errstr (sink)));
} else {
sink->error_record = &csink_error_internal_error;
CDEBUG (("csink", "'%s' is not a registered error.", err_name));
}
if (sink->on_error) {
sink->on_error (sink);
}
}
gchar *
csink_errname(CSink *sink) {
if (sink && sink->err && sink->error_record)
return sink->error_record->name;
else
return "None";
}
gchar *
csink_erormsg (CSink * sink)
{
/* Do a lookup of sink->err and print the appropriate mesg. */
CDEBUG (("csink", "Sorry, no errors yet!\n"));
return NULL;
}
void
csink_on_new_data (CSink * sink)
{
if (sink->on_new_data)
sink->on_new_data (sink);
}
void
csink_on_connect (CSink *sink)
{
if (sink->on_connect)
sink->on_connect (sink);
}
CSink * csink_create (CSink * sink)
{
if (NULL == sink || NULL == sink->create) {
return NULL;
}
return sink->create (sink);
}
void
csink_open (CSink * sink)
{
sink->closed = FALSE;
sink->open (sink);
}
CBuf *
csink_read (CSink * sink)
{
CBuf *message = NULL;
CDEBUG (("csink", "in read"));
/* If queue is empty, try to populate it first. */
if (sink->inQueue->len == 0)
sink->read (sink);
if (sink->inQueue->len > 0) {
message = (CBuf *) g_ptr_array_index (sink->inQueue, 0);
g_ptr_array_remove_index (sink->inQueue, 0);
}
CDEBUG(("csink", "returning from read"));
return message; /* THIS MUST BE FREED BY OUR CALLER. */
}
gint
csink_write (CSink * sink, CBuf * data)
{
return (sink->write (sink, data));
}
void
csink_close (CSink * sink)
{
CDEBUG (("csink", "calling sink close function."));
if (sink->closed) {
CDEBUG (("csink", "attempt to close an already closed sink."));
return;
}
sink->close (sink);
sink->closed = 1;
if (sink->on_close)
sink->on_close (sink);
}
void
csink_free (CSink * sink)
{
CDEBUG (("csink", "freeing a csink."));
sink->free (sink);
}
/* get the queue sizes in bytes */
gint
csink_send_queue_size (CSink * sink)
{
int i,
size;
for (i = 0, size = 0; i < sink->outQueue->len; i++)
size += ((CBuf *) g_ptr_array_index (sink->outQueue, i))->len;
return size;
}
gint
csink_receive_queue_size (CSink * sink)
{
int i,
size;
for (i = 0, size = 0; i < sink->outQueue->len; i++)
size += ((CBuf *) g_ptr_array_index (sink->inQueue, i))->len;
return size;
}
/* The simple sanity check to see if we can csink_write (). */
gint csink_write_sanity (CSink * sink, CBuf * message)
{
/* Ensure some sanity. */
if (NULL == sink) {
CDEBUG (("csink", "csink_write_sanity called with null sink!\n"));
return FALSE;
}
if (NULL == message) {
CDEBUG (("csink", "Can't write out a null message!"));
return FALSE;
}
if (CSSF_CLOSED == sink->flags) {
CDEBUG (("csink", "->flags say the connection is not yet ready.\n"));
return FALSE;
}
return TRUE; /* The write looks sane. */
}
/* Some design thoughts:
*
* Do a fast select() to see if anything needs handling right now. hmm. no.
*
* The user gives us the functions to watch the fds.
*
* We should supply a csink_init_with_glib which would set us up with glib's
* fd watching.
*
* Also, we can have a poll-style function which does our fd watching and
* dispatching for us.
*/
CSinkAddFDFunc csink_fd_add_func = NULL;
CSinkRemoveFDFunc csink_fd_remove_func = NULL;
GHashTable * tag_info_ht = NULL;
void
csink_init_funcs (CSinkAddFDFunc fd_add_func, CSinkRemoveFDFunc fd_remove_func)
{
wrap_data_ht = g_hash_table_new (g_direct_hash, g_direct_equal);
csink_fd_add_func = fd_add_func;
csink_fd_remove_func = fd_remove_func;
}
/*
* Below is an implementation of the fd watching interface; call
* csink_set_polling_fd_funcs and be sure to call csink_do_poll every once
* in a while (like in your main loop).
*/
typedef struct {
CSinkCallbackFunc func;
CSink *sink;
} WrapData;
int
wrap_func (int fd, CSinkFDCondition cond, void * data)
{
WrapData *wrap_data = (WrapData *) data;
wrap_data->func (wrap_data->sink);
return 1; /* i guess? */
}
void *
csink_add_fd (int fd, CSinkFDCondition cond,
CSinkCallbackFunc func, CSink * sink, char * info)
{
WrapData * wrap_data;
void * tag;
if (NULL == wrap_data_ht) {
CDEBUG (("csink", "You need to call csink_init_funcs () "
"before you can call csink_add_fd ()."));
return NULL;
}
if (NULL == csink_fd_add_func) {
CDEBUG (("csink", "There is no add fd watcher () registered!"));
return NULL;
}
wrap_data = g_new0 (WrapData, 1);
wrap_data->func = func;
wrap_data->sink = sink;
tag = csink_fd_add_func (fd,
cond, wrap_func, (void *) wrap_data, info);
if (NULL != tag) {
g_hash_table_insert (wrap_data_ht, tag, wrap_data);
}
if (NULL == tag_info_ht)
tag_info_ht = g_hash_table_new (g_direct_hash, g_direct_equal);
g_hash_table_insert (tag_info_ht, (void *)tag, info);
CDEBUG (("mainloop", "adding tag = %i, fd = %i\n", tag, fd));
return tag;
}
void
csink_remove_fd (void * tag, char * info)
{
WrapData * wrap_data;
char * add_info;
CDEBUG (("mainloop", "in csink_remove_fd"));
if (NULL == wrap_data_ht) {
CDEBUG (("csink", "You need to call csink_init_funcs () "
"before you can call csink_remove_fd ()."));
return;
}
wrap_data = g_hash_table_lookup (wrap_data_ht, tag);
if (NULL != wrap_data) {
g_free (wrap_data);
g_hash_table_remove (wrap_data_ht, tag);
}
if (csink_fd_remove_func)
csink_fd_remove_func (tag, info);
else
CDEBUG (("csink", "There is no remove fd watcher () registered!"));
if (NULL == tag_info_ht)
tag_info_ht = g_hash_table_new (g_direct_hash, g_direct_equal);
add_info = (char *) g_hash_table_lookup (tag_info_ht, (void *)tag);
g_hash_table_remove (tag_info_ht, (void *)tag);
CDEBUG (("mainloop", "removing tag = %i\n", tag));
CDEBUG (("mainloop", "removing '%s' because of '%s'\n", add_info, info));
}
/***** MAINLOOP CODE *****/
/***** Old poll code *****/
typedef struct _csink_polled_fd CSinkPolledFD;
struct _csink_polled_fd {
void *data;
int fd;
CSinkFDCondition cond;
CSinkFDCallbackFunc func;
CSinkPolledFD *next;
};
CSinkPolledFD *_csink_polled_fd_list;
void *
csink_polled_fd_add (int fd, CSinkFDCondition cond,
CSinkCallbackFunc func, void *data)
{
CSinkPolledFD *rec = (CSinkPolledFD *) malloc (sizeof (CSinkPolledFD));
CDEBUG (("csink", "adding an fd watcher (fd = %d.)", fd));
rec->fd = fd;
rec->cond = cond;
rec->data = data;
rec->func = (CSinkFDCallbackFunc) func;
rec->next = _csink_polled_fd_list;
_csink_polled_fd_list = rec;
{
CSinkPolledFD *this = _csink_polled_fd_list;
int i = 0;
CDEBUG (("csink", "counting fds watched.."));
for (i = 0, this = _csink_polled_fd_list; this; i++, this = this->next);
CDEBUG (("csink", "%d fds being watched.", i));
}
return rec;
}
void
csink_polled_fd_remove (void *tag)
{
CSinkPolledFD *this = _csink_polled_fd_list;
CSinkPolledFD *last = NULL;
CSinkPolledFD *caught = NULL;
CDEBUG (("csink", "removing an fd watcher"));
while (this) {
if (this == tag) {
caught = this;
if (last)
last->next = this->next;
else
_csink_polled_fd_list = this->next;
break;
}
last = this;
this = this->next;
}
if (caught) {
CDEBUG (("csink", "remove_fd, removing fd = %d", caught->fd));
free (caught);
} else {
CDEBUG (("csink", "remove_fd, tag not found."));
}
{
CSinkPolledFD *this = _csink_polled_fd_list;
int i = 0;
CDEBUG (("csink", "counting fds watched.."));
for (i = 0, this = _csink_polled_fd_list; this; i++, this = this->next);
CDEBUG (("csink", "%d fds being watched.", i));
}
}
void
csink_do_poll (void)
{
CSinkPolledFD *this = _csink_polled_fd_list;
fd_set in_set,
out_set,
err_set;
struct timeval timeout;
int res;
FD_ZERO (&in_set);
FD_ZERO (&out_set);
FD_ZERO (&err_set);
while (this) {
if (this->cond & EIO_READ) {
/* CDEBUG (("csink", "poll, adding %d to in set.", this->fd)); */
FD_SET (this->fd, &in_set);
}
if (this->cond & EIO_WRITE) {
/* CDEBUG (("csink", "poll, adding %d to out set.", this->fd)); */
FD_SET (this->fd, &out_set);
}
if (this->cond & EIO_ERROR) {
/* CDEBUG (("csink", "poll, adding %d to error set.", this->fd)); */
FD_SET (this->fd, &err_set);
}
this = this->next;
}
timeout.tv_sec = timeout.tv_usec = 0;
/* CDEBUG(("csink", "calling select()")); */
res = select (FD_SETSIZE, &in_set, &out_set, &err_set, &timeout);
/* if (res != 0) */
/* CDEBUG (("csink", "select() returned %d", res)); */
if (res == -1) {
switch (errno) {
case EBADF: /* we've passed in a bad file descriptor -
* BUG */
CDEBUG (("csink", "bad fd in list of fds to watch - bailing."));
return;
break;
case EINVAL: /* i've specified an incorrect timeout - BUG */
CDEBUG (("csink", "invalid timeout in select() call - bailing."));
return;
break;
case EINTR: /* interrupted by a signal */
/* we actually shouldn't get this, since the nice GNU macro
* 'TEMP_FAILURE_RETRY' checks for it .. */
CDEBUG (("csink", "EINTR from select() call - not fatal."));
break;
default:
CDEBUG (
("csink",
"falling off the end of the world after select()"));
}
}
if (res != 0) {
CSinkPolledFD *next;
/* CDEBUG (("csink", "got activity, so we're gonna check it out..")); */
/* this is done in three loops because an fd might be removed each
* loop. */
/* CDEBUG (("csink", "checking for activity in the input set")); */
this = _csink_polled_fd_list;
while (this) {
next = this->next;
if (FD_ISSET (this->fd, &in_set)) {
/* CDEBUG (("csink", "responding to activity in the input set")); */
this->func (this->fd, this->cond, this->data);
}
this = next;
}
/* CDEBUG (("csink", "checking for activity in the output set")); */
this = _csink_polled_fd_list;
while (this) {
next = this->next;
if (FD_ISSET (this->fd, &out_set)) {
/* CDEBUG (("csink", "responding to activity in the output set")); */
this->func (this->fd, this->cond, this->data);
}
this = next;
}
/* CDEBUG (("csink", "checking for activity in the exception set")); */
this = _csink_polled_fd_list;
while (this) {
next = this->next;
if (FD_ISSET (this->fd, &err_set)) {
/* CDEBUG (("csink", "responding to activity in the exception set")); */
this->func (this->fd, this->cond, this->data);
}
this = next;
}
/* CDEBUG (("csink", "done checking")); */
}
}
void
csink_set_polling_fd_funcs (void)
{
csink_init_funcs ((CSinkAddFDFunc) csink_polled_fd_add,
(CSinkRemoveFDFunc) csink_polled_fd_remove);
_csink_polled_fd_list = NULL;
}
syntax highlighted by Code2HTML, v. 0.9.1