/* csink.c * Description: * Author(s): Cory Stone * Created: 05/25/2000 * Last Modified: $Id: csink.c,v 1.38 2001/02/16 04:21:51 fatjim Exp $ */ #include "csink.h" #include #include /* default method definitions */ static CBuf * csink_default_read (CSink *sink); static void csink_default_open (CSink *sink); static void csink_default_close (CSink *sink); static void csink_default_free (CSink *sink); GHashTable * wrap_data_ht = NULL; void csink_init (CSink *sink) { sink->inQueue = g_ptr_array_new (); sink->outQueue = g_ptr_array_new (); sink->read = (CSinkReadFunc) csink_default_read; sink->write = (CSinkWriteFunc) csink_default_write; sink->open = (CSinkOpenFunc) csink_default_open; sink->close = (CSinkCloseFunc) csink_default_close; sink->free = (CSinkFreeFunc) csink_default_free; } void csink_release (CSink *sink) { g_ptr_array_free (sink->inQueue, TRUE); g_ptr_array_free (sink->outQueue, TRUE); } CBuf * csink_default_read (CSink *sink) { return NULL; } gint csink_default_write (CSink *sink, CBuf *msg) { CBuf *msgcpy = cbuf_new_with_cbuf (msg); g_ptr_array_add (sink->outQueue, (gpointer) msgcpy); return msgcpy->len; } static void csink_default_open (CSink *sink) { csink_on_error (sink, "IMPLEMENTATION_DEFAULTCALL"); csink_close(sink); } static void csink_default_close (CSink *sink) { csink_on_error (sink, "IMPLEMENTATION_DEFAULTCALL"); } static void csink_default_free (CSink *sink) { CDEBUG (("csink", "default free; are you sure you should be using this?")); csink_release (sink); free (sink); } /* This is not part of glib but should be. */ void g_ptr_array_add_at_start (GPtrArray *array, gpointer data) { int i; /* Grow array. */ g_ptr_array_set_size (array, array->len+1); /* Set the last value to the second last value, and so on. */ for (i = array->len-1; i > 0; i--) { g_ptr_array_index (array, i) = g_ptr_array_index (array, i - 1); } /* Now set the zeroth value to the new data. */ g_ptr_array_index (array, 0) = data; } /* This should have a string table for the errors... */ gchar * csink_errstr (CSink * sink) { return sink->error_record->description; } /* User callback setting */ void csink_set_new_data_func (CSink * sink, CSinkCallbackFunc func) { sink->on_new_data = func; } void csink_set_empty_send_queue_func (CSink * sink, CSinkCallbackFunc func) { sink->on_empty_send_queue = func; } void csink_set_error_func (CSink * sink, CSinkCallbackFunc func) { sink->on_error = func; } void csink_set_close_func (CSink * sink, CSinkCallbackFunc func) { sink->on_close = func; } void csink_set_connect_func (CSink * sink, CSinkCallbackFunc func) { sink->on_connect = func; } void * csink_get_user_data (CSink * sink) { return sink->user_data; } void csink_set_user_data (CSink * sink, void *data) { sink->user_data = data; } /* Error handling */ gint csink_error (CSink * sink) { if (TRUE == sink->err) { sink->err = FALSE; return TRUE; } return FALSE; } void csink_on_error__P (CSink * sink, char *err_name, char *file, int line) { CSink_Error *e = csink_error_table; CSink_Error *found = NULL; CDEBUG (("csink", "Asked to signal error: %s on %s:%d", err_name, file, line)); while (e->name != NULL) { if (strcmp (e->name, err_name) == 0) { found = e; break; } e++; } sink->err = TRUE; if (found) { sink->error_record = found; CDEBUG (("csink", "Got error: %s\n", sink->error_record->name)); CDEBUG (("csink", "err string: %s\n", csink_errstr (sink))); } else { sink->error_record = &csink_error_internal_error; CDEBUG (("csink", "'%s' is not a registered error.", err_name)); } if (sink->on_error) { sink->on_error (sink); } } gchar * csink_errname(CSink *sink) { if (sink && sink->err && sink->error_record) return sink->error_record->name; else return "None"; } gchar * csink_erormsg (CSink * sink) { /* Do a lookup of sink->err and print the appropriate mesg. */ CDEBUG (("csink", "Sorry, no errors yet!\n")); return NULL; } void csink_on_new_data (CSink * sink) { if (sink->on_new_data) sink->on_new_data (sink); } void csink_on_connect (CSink *sink) { if (sink->on_connect) sink->on_connect (sink); } CSink * csink_create (CSink * sink) { if (NULL == sink || NULL == sink->create) { return NULL; } return sink->create (sink); } void csink_open (CSink * sink) { sink->closed = FALSE; sink->open (sink); } CBuf * csink_read (CSink * sink) { CBuf *message = NULL; CDEBUG (("csink", "in read")); /* If queue is empty, try to populate it first. */ if (sink->inQueue->len == 0) sink->read (sink); if (sink->inQueue->len > 0) { message = (CBuf *) g_ptr_array_index (sink->inQueue, 0); g_ptr_array_remove_index (sink->inQueue, 0); } CDEBUG(("csink", "returning from read")); return message; /* THIS MUST BE FREED BY OUR CALLER. */ } gint csink_write (CSink * sink, CBuf * data) { return (sink->write (sink, data)); } void csink_close (CSink * sink) { CDEBUG (("csink", "calling sink close function.")); if (sink->closed) { CDEBUG (("csink", "attempt to close an already closed sink.")); return; } sink->close (sink); sink->closed = 1; if (sink->on_close) sink->on_close (sink); } void csink_free (CSink * sink) { CDEBUG (("csink", "freeing a csink.")); sink->free (sink); } /* get the queue sizes in bytes */ gint csink_send_queue_size (CSink * sink) { int i, size; for (i = 0, size = 0; i < sink->outQueue->len; i++) size += ((CBuf *) g_ptr_array_index (sink->outQueue, i))->len; return size; } gint csink_receive_queue_size (CSink * sink) { int i, size; for (i = 0, size = 0; i < sink->outQueue->len; i++) size += ((CBuf *) g_ptr_array_index (sink->inQueue, i))->len; return size; } /* The simple sanity check to see if we can csink_write (). */ gint csink_write_sanity (CSink * sink, CBuf * message) { /* Ensure some sanity. */ if (NULL == sink) { CDEBUG (("csink", "csink_write_sanity called with null sink!\n")); return FALSE; } if (NULL == message) { CDEBUG (("csink", "Can't write out a null message!")); return FALSE; } if (CSSF_CLOSED == sink->flags) { CDEBUG (("csink", "->flags say the connection is not yet ready.\n")); return FALSE; } return TRUE; /* The write looks sane. */ } /* Some design thoughts: * * Do a fast select() to see if anything needs handling right now. hmm. no. * * The user gives us the functions to watch the fds. * * We should supply a csink_init_with_glib which would set us up with glib's * fd watching. * * Also, we can have a poll-style function which does our fd watching and * dispatching for us. */ CSinkAddFDFunc csink_fd_add_func = NULL; CSinkRemoveFDFunc csink_fd_remove_func = NULL; GHashTable * tag_info_ht = NULL; void csink_init_funcs (CSinkAddFDFunc fd_add_func, CSinkRemoveFDFunc fd_remove_func) { wrap_data_ht = g_hash_table_new (g_direct_hash, g_direct_equal); csink_fd_add_func = fd_add_func; csink_fd_remove_func = fd_remove_func; } /* * Below is an implementation of the fd watching interface; call * csink_set_polling_fd_funcs and be sure to call csink_do_poll every once * in a while (like in your main loop). */ typedef struct { CSinkCallbackFunc func; CSink *sink; } WrapData; int wrap_func (int fd, CSinkFDCondition cond, void * data) { WrapData *wrap_data = (WrapData *) data; wrap_data->func (wrap_data->sink); return 1; /* i guess? */ } void * csink_add_fd (int fd, CSinkFDCondition cond, CSinkCallbackFunc func, CSink * sink, char * info) { WrapData * wrap_data; void * tag; if (NULL == wrap_data_ht) { CDEBUG (("csink", "You need to call csink_init_funcs () " "before you can call csink_add_fd ().")); return NULL; } if (NULL == csink_fd_add_func) { CDEBUG (("csink", "There is no add fd watcher () registered!")); return NULL; } wrap_data = g_new0 (WrapData, 1); wrap_data->func = func; wrap_data->sink = sink; tag = csink_fd_add_func (fd, cond, wrap_func, (void *) wrap_data, info); if (NULL != tag) { g_hash_table_insert (wrap_data_ht, tag, wrap_data); } if (NULL == tag_info_ht) tag_info_ht = g_hash_table_new (g_direct_hash, g_direct_equal); g_hash_table_insert (tag_info_ht, (void *)tag, info); CDEBUG (("mainloop", "adding tag = %i, fd = %i\n", tag, fd)); return tag; } void csink_remove_fd (void * tag, char * info) { WrapData * wrap_data; char * add_info; CDEBUG (("mainloop", "in csink_remove_fd")); if (NULL == wrap_data_ht) { CDEBUG (("csink", "You need to call csink_init_funcs () " "before you can call csink_remove_fd ().")); return; } wrap_data = g_hash_table_lookup (wrap_data_ht, tag); if (NULL != wrap_data) { g_free (wrap_data); g_hash_table_remove (wrap_data_ht, tag); } if (csink_fd_remove_func) csink_fd_remove_func (tag, info); else CDEBUG (("csink", "There is no remove fd watcher () registered!")); if (NULL == tag_info_ht) tag_info_ht = g_hash_table_new (g_direct_hash, g_direct_equal); add_info = (char *) g_hash_table_lookup (tag_info_ht, (void *)tag); g_hash_table_remove (tag_info_ht, (void *)tag); CDEBUG (("mainloop", "removing tag = %i\n", tag)); CDEBUG (("mainloop", "removing '%s' because of '%s'\n", add_info, info)); } /***** MAINLOOP CODE *****/ /***** Old poll code *****/ typedef struct _csink_polled_fd CSinkPolledFD; struct _csink_polled_fd { void *data; int fd; CSinkFDCondition cond; CSinkFDCallbackFunc func; CSinkPolledFD *next; }; CSinkPolledFD *_csink_polled_fd_list; void * csink_polled_fd_add (int fd, CSinkFDCondition cond, CSinkCallbackFunc func, void *data) { CSinkPolledFD *rec = (CSinkPolledFD *) malloc (sizeof (CSinkPolledFD)); CDEBUG (("csink", "adding an fd watcher (fd = %d.)", fd)); rec->fd = fd; rec->cond = cond; rec->data = data; rec->func = (CSinkFDCallbackFunc) func; rec->next = _csink_polled_fd_list; _csink_polled_fd_list = rec; { CSinkPolledFD *this = _csink_polled_fd_list; int i = 0; CDEBUG (("csink", "counting fds watched..")); for (i = 0, this = _csink_polled_fd_list; this; i++, this = this->next); CDEBUG (("csink", "%d fds being watched.", i)); } return rec; } void csink_polled_fd_remove (void *tag) { CSinkPolledFD *this = _csink_polled_fd_list; CSinkPolledFD *last = NULL; CSinkPolledFD *caught = NULL; CDEBUG (("csink", "removing an fd watcher")); while (this) { if (this == tag) { caught = this; if (last) last->next = this->next; else _csink_polled_fd_list = this->next; break; } last = this; this = this->next; } if (caught) { CDEBUG (("csink", "remove_fd, removing fd = %d", caught->fd)); free (caught); } else { CDEBUG (("csink", "remove_fd, tag not found.")); } { CSinkPolledFD *this = _csink_polled_fd_list; int i = 0; CDEBUG (("csink", "counting fds watched..")); for (i = 0, this = _csink_polled_fd_list; this; i++, this = this->next); CDEBUG (("csink", "%d fds being watched.", i)); } } void csink_do_poll (void) { CSinkPolledFD *this = _csink_polled_fd_list; fd_set in_set, out_set, err_set; struct timeval timeout; int res; FD_ZERO (&in_set); FD_ZERO (&out_set); FD_ZERO (&err_set); while (this) { if (this->cond & EIO_READ) { /* CDEBUG (("csink", "poll, adding %d to in set.", this->fd)); */ FD_SET (this->fd, &in_set); } if (this->cond & EIO_WRITE) { /* CDEBUG (("csink", "poll, adding %d to out set.", this->fd)); */ FD_SET (this->fd, &out_set); } if (this->cond & EIO_ERROR) { /* CDEBUG (("csink", "poll, adding %d to error set.", this->fd)); */ FD_SET (this->fd, &err_set); } this = this->next; } timeout.tv_sec = timeout.tv_usec = 0; /* CDEBUG(("csink", "calling select()")); */ res = select (FD_SETSIZE, &in_set, &out_set, &err_set, &timeout); /* if (res != 0) */ /* CDEBUG (("csink", "select() returned %d", res)); */ if (res == -1) { switch (errno) { case EBADF: /* we've passed in a bad file descriptor - * BUG */ CDEBUG (("csink", "bad fd in list of fds to watch - bailing.")); return; break; case EINVAL: /* i've specified an incorrect timeout - BUG */ CDEBUG (("csink", "invalid timeout in select() call - bailing.")); return; break; case EINTR: /* interrupted by a signal */ /* we actually shouldn't get this, since the nice GNU macro * 'TEMP_FAILURE_RETRY' checks for it .. */ CDEBUG (("csink", "EINTR from select() call - not fatal.")); break; default: CDEBUG ( ("csink", "falling off the end of the world after select()")); } } if (res != 0) { CSinkPolledFD *next; /* CDEBUG (("csink", "got activity, so we're gonna check it out..")); */ /* this is done in three loops because an fd might be removed each * loop. */ /* CDEBUG (("csink", "checking for activity in the input set")); */ this = _csink_polled_fd_list; while (this) { next = this->next; if (FD_ISSET (this->fd, &in_set)) { /* CDEBUG (("csink", "responding to activity in the input set")); */ this->func (this->fd, this->cond, this->data); } this = next; } /* CDEBUG (("csink", "checking for activity in the output set")); */ this = _csink_polled_fd_list; while (this) { next = this->next; if (FD_ISSET (this->fd, &out_set)) { /* CDEBUG (("csink", "responding to activity in the output set")); */ this->func (this->fd, this->cond, this->data); } this = next; } /* CDEBUG (("csink", "checking for activity in the exception set")); */ this = _csink_polled_fd_list; while (this) { next = this->next; if (FD_ISSET (this->fd, &err_set)) { /* CDEBUG (("csink", "responding to activity in the exception set")); */ this->func (this->fd, this->cond, this->data); } this = next; } /* CDEBUG (("csink", "done checking")); */ } } void csink_set_polling_fd_funcs (void) { csink_init_funcs ((CSinkAddFDFunc) csink_polled_fd_add, (CSinkRemoveFDFunc) csink_polled_fd_remove); _csink_polled_fd_list = NULL; }