#include "csink.h" #include "csinksocket.h" /* debugging */ #include #include #include #include static void csink_socket_connect_action (CSink *sink); static void csink_socket_free (CSinkSocket *sink); gint csink_socket_write (CSinkSocket * sink, CBuf * data); void csink_socket_init (CSinkSocket *sink, int address_len, int address_family) { csink_init(CSINK(sink)); CSINK(sink)->csink_type = CSINK_SOCKET_TYPE; sink->address_len = address_len; sink->address_family = address_family; sink->fd = -1; if (sink->remote_address) free (sink->remote_address); sink->remote_address = (struct sockaddr *)malloc(address_len); if (sink->local_address) free (sink->local_address); sink->local_address = (struct sockaddr *)malloc(address_len); CSINK(sink)->open = (CSinkOpenFunc)csink_socket_open; CSINK(sink)->write = (CSinkWriteFunc)csink_socket_write; CSINK(sink)->close = (CSinkCloseFunc)csink_socket_close; CSINK(sink)->free = (CSinkFreeFunc)csink_socket_free; sink->listen = (CSinkSocketListenFunc)csink_socket_default_listen; sink->open_action = csink_socket_connect_action; sink->can_read_action = (CSinkSocketCanReadFunc)csink_socket_can_read_action; sink->can_write_action = (CSinkSocketCanReadFunc)csink_socket_can_write_action; } void csink_socket_release (CSinkSocket *sink) { if (sink->local_address) { free (sink->local_address); sink->local_address = NULL; } if(sink->remote_address) { free (sink->remote_address); sink->remote_address = NULL; } CDEBUG (("csinksocket", "in csink_socket_release")); if (sink->read_watch_tag) { CDEBUG (("csinksocket", "clearing read tag")); csink_remove_fd(sink->read_watch_tag, "Socket sink being freed"); sink->read_watch_tag = NULL; } if (sink->write_watch_tag) { CDEBUG (("csinksocket", "clearing write tag")); csink_remove_fd(sink->write_watch_tag, "Socket Sink being freed"); sink->write_watch_tag = NULL; } csink_release (CSINK(sink)); } void csink_socket_free (CSinkSocket *sink) { /* this is not expected to be user-accesssible, as csinksocket is abstract. * it's just here for completeness. */ csink_socket_release (sink); free (sink); } void csink_socket_close (CSinkSocket *sink) { close (sink->fd); if (sink->read_watch_tag) { csink_remove_fd (sink->read_watch_tag, "Socket sink closing"); sink->read_watch_tag = NULL; } if (sink->write_watch_tag) { csink_remove_fd (sink->write_watch_tag, "Socket sink closing"); sink->write_watch_tag = NULL; } } void csink_socket_set_remote_address (CSinkSocket *sink, struct sockaddr *address) { /* memset(sink->remote_address, 0, sink->address_len); */ memcpy(sink->remote_address, address, sink->address_len); } void csink_socket_set_local_address (CSinkSocket *sink, struct sockaddr *address) { /* memset(sink->local_address, 0, sink->address_len); */ memcpy(sink->local_address, address, sink->address_len); } void csink_socket_set_accept_func (CSinkSocket *sink, CSinkCallbackFunc func) { sink->on_accept = func; } void csink_socket_on_accept (CSinkSocket *sink, CSink * newconn) { if (sink->on_accept) sink->on_accept (newconn); else { csink_close(newconn); csink_free(newconn); } } int csink_socket_listen (CSinkSocket *sink) { CDEBUG (("csinksocket", "in csink_socket_listen")); if(!sink->listen) { CDEBUG(("csinksocket", "Attempt to call null listen function")); /* set the error object here */ return -1; } /* Ensure we are in the proper state. */ if (sink->status & (SOCKET_CONNECTED | SOCKET_CONNECT_INPROGRESS)) { CDEBUG (("csinksocket", "attempt to listen on an already connected sink.")); /* set the error object here */ return -1; } return sink->listen(CSINK(sink)); } void csink_socket_open (CSinkSocket *sink) { int res; struct sockaddr_in addr; int len = sizeof(addr); CDEBUG (("csinksocket", "open, called.")); sink->fd = socket (AF_INET, SOCK_STREAM, 0); /* sink->fd = socket (sink->address_family, SOCK_STREAM, 0); */ if (sink->fd < 0) { /* error */ CDEBUG (("csinksocket", "open, couldn't get an fd. returning.")); csink_on_error (CSINK (sink), "SOCKET_CREATE_FAILED"); return; } /* IO in csink is ALWAYS nonblocking. */ fcntl (sink->fd, F_SETFL, O_NONBLOCK); res = connect (sink->fd, (struct sockaddr *) sink->remote_address, len); CDEBUG (("csinksocket", "open, result of connect() call was %d.", res)); CDEBUG (("csinksocket", "open, fd is %d.", sink->fd)); if (res < 0) { /* Error? */ switch (errno) { case EINPROGRESS: case EALREADY: CDEBUG (("csinksocket", "open, connection is happening nonblockingly")); sink->status |= SOCKET_CONNECT_INPROGRESS; /* Use the read watch because we aren't into a user defined state. */ sink->read_watch_tag = csink_add_fd (sink->fd, EIO_READ | EIO_WRITE | EIO_ERROR, sink->open_action, CSINK (sink), "Nonblocking socket connect wait tag"); break; default: CDEBUG (("csinksocket", "open, error in connect, aborting.")); sink->status &= ~(SOCKET_CONNECT_INPROGRESS | SOCKET_CONNECTED); csink_close (CSINK (sink)); csink_on_error (CSINK (sink), "CONNECT_FAILURE"); } } else { CDEBUG (("csinksocket", "open, connection established.")); if (sink->open_action) sink->open_action (CSINK(sink)); } CDEBUG (("csinksocket", "open, returning.")); } /* Need to setup the write handler code. */ gint csink_socket_write (CSinkSocket * sink, CBuf * message) { if (FALSE == csink_write_sanity (CSINK(sink), message) ) { return FALSE; } /* If we're not connected, leave. */ if (! (sink->status & SOCKET_CONNECTED)) { csink_on_error (CSINK (sink), "DISCONN_WRITE"); return FALSE; } /* Need to queue up the data. */ csink_default_write (CSINK(sink), message); if (NULL == sink->write_watch_tag) { sink->write_watch_tag = csink_add_fd (sink->fd, EIO_WRITE, sink->can_write_action, CSINK (sink), "Socket write tag"); } return TRUE; } int csink_socket_do_connect (CSinkSocket *sink) { int ret; int optval; socklen_t option_len = sizeof(optval); /* Remove our watch. */ if (sink->read_watch_tag) { csink_remove_fd (sink->read_watch_tag, "About to try connect, clearing read tag."); sink->read_watch_tag = NULL; } /* Need to see if we got connected. */ errno = 0; optval = 0; ret = getsockopt (sink->fd, SOL_SOCKET, SO_ERROR, &optval, &option_len); /* Solaris needs this; see stevens, Unix Network Pgm'ng, 15.4 */ if (ret < 0) optval = errno; CDEBUG (("csinksocket", "Getsockopt: ret = %i, optval = %i", ret, optval)); /* See if there was a bad error, or if we didn't get connected... */ if (0 != optval ) { CSINK_SOCKET (sink)->status &= ~(SOCKET_CONNECT_INPROGRESS | SOCKET_CONNECTED); CDEBUG (("csinksocket", "failure for unknown reason: %s(%d)", strerror(optval), optval)); csink_on_error (CSINK(sink), "CONNECT_FAILURE"); csink_close (CSINK(sink)); return FALSE; } sink->status |= SOCKET_CONNECTED; sink->status &= ~SOCKET_CONNECT_INPROGRESS; CDEBUG (("csinksocket", "Connect callback...\n")); /* Always need to read from the socket. */ sink->read_watch_tag = csink_add_fd (sink->fd, EIO_READ | EIO_ERROR, sink->can_read_action, CSINK (sink), "Read tag for socket sink"); return TRUE; } static void csink_socket_connect_action (CSink *sink_) { CSinkSocket *sink = CSINK_SOCKET(sink_); CDEBUG (("csinksocket", "connect_action, called")); if (!csink_socket_do_connect (sink)) return; CDEBUG (("csinksocket", "connect_action, calling connect callback")); csink_on_connect (CSINK (sink)); CDEBUG (("csinksocket", "connect_action, back from callback")); CDEBUG (("csinksocket", "connect_action, returning")); } void csink_socket_can_read_action (CSinkSocket *sink) { char buf[256] = ""; CBuf *newmsg; int res; /* Read a small block. */ res = recv (sink->fd, buf, 255, 0); if(res > 0) CDEBUG (("csinksocket", "read off %i bytes.\n", res)); /* end-of-file */ if (res == 0) { CDEBUG (("csinksocket", "Connection closed on other end. closing link.\n")); csink_close (CSINK (sink)); return; } /* error */ if (res < 0) { switch (errno) { case ENOTSOCK: case EBADF: /* Not a socket. */ csink_close (CSINK(sink)); csink_on_error (CSINK(sink), "READ_EBADF"); break; case ENOTCONN: /* Not connected. */ csink_close (CSINK(sink)); csink_on_error (CSINK(sink), "IMPLEMENTATION_BUG"); break; case EINTR: /* Just try again later. */ case EAGAIN: break; default: /* Lord only knows. */ csink_close(CSINK(sink)); csink_on_error (CSINK(sink), "UNKNOWN"); } return; } /* Create a CBuf from the buf. * +1, because we need the \0 for good luck. :) */ newmsg = cbuf_new_with_data (buf, res+1); g_ptr_array_add (CSINK (sink)->inQueue, (gpointer) newmsg); csink_on_new_data (CSINK(sink)); } void csink_socket_can_write_action (CSinkSocket * sink) { CBuf *partial; gint res; CBuf *cur; CDEBUG (("csinksocket", "in csink_socket_can_write_action")); if (! (sink->status & SOCKET_CONNECTED) ) { /* Need to clean up stuff, there is user error going on * if this is the case. We need to call the cleanup code. * We need to disable the select(2) on this fd. */ /* FIXME. MW */ return; } /* Send loop for sending the queue. */ while (0 != CSINK (sink)->outQueue->len) { int written = 0; CDEBUG (("csinksocket", "SEND LOOP--CONNECTED (1)\n")); cur = (CBuf *) g_ptr_array_index (CSINK (sink)->outQueue, 0); g_ptr_array_remove_index (CSINK (sink)->outQueue, 0); while (written < cur->len) { res = send (sink->fd, /* No SIGPIPE. */ cur->str + written, cur->len - written, 0); if (res < 0) { /* Error. */ switch (errno) { case EPIPE: /* Connection broken. */ csink_on_error (CSINK (sink), "SEND_PIPE"); csink_close (CSINK (sink)); goto fail; case ENOTSOCK: case EBADF: /* Not a valid socket file descriptor. */ csink_close (CSINK (sink)); csink_on_error (CSINK (sink), "SEND_ENOTSOCK"); goto fail; case EMSGSIZE: csink_close (CSINK (sink)); csink_on_error (CSINK (sink), "SEND_EMSGSIZE"); goto fail; case ENOTCONN: csink_close (CSINK (sink)); csink_on_error (CSINK (sink), "IMPLEMENTATION_BUG"); goto fail; case ENOBUFS: case EWOULDBLOCK: /* Wait for a while and try again. */ /* Push what's left of the message back into the queue. */ partial = cbuf_new_with_data (cur->str+written, cur->len-written); g_ptr_array_add_at_start (CSINK(sink)->outQueue, (gpointer) partial); cbuf_free (cur); return; case EINTR: /* interrupted system call; just try again */ break; default: csink_close (CSINK (sink)); csink_on_error (CSINK (sink), "UNKNOWN"); goto fail; } } else { /* Success. */ written += res; } } /* END Current hunck code. */ cbuf_free (cur); } /* Disconnect write tag now that everthing is done being sent. */ csink_remove_fd (sink->write_watch_tag, "Socket Write Queue is empty"); sink->write_watch_tag = NULL; return; fail: cbuf_free (cur); } int csink_socket_default_listen (CSinkSocket * sink) { int res; int true = 1; CDEBUG (("csinksocket", "in csink_socket_default_listen")); sink->status &= ~(SOCKET_CONNECT_INPROGRESS); /* Do all of the fun stuff necessary to initialize a listening socket. */ sink->fd = socket (sink->address_family, SOCK_STREAM, 0); if (sink->fd < 0) { csink_on_error (CSINK (sink), "SOCKET_CREATE_FAILED"); /* set error object */ return -1; } /* Set the SO_REUSEADDR option on the socket; this gets around the nasty * stupid fact that sockets aren't closed properly when a process exits. */ res = setsockopt (sink->fd, SOL_SOCKET, SO_REUSEADDR, &true, sizeof (int)); if (res < 0) { switch (errno) { case EBADF: /* bad socket descriptor */ csink_on_error (CSINK (sink), "EBADF"); break; case ENOTSOCK: /* fd was not a socket */ csink_on_error (CSINK (sink), "ENOTSOCK"); break; case ENOPROTOOPT: /* that option doesn't make any sense at that * level */ csink_on_error (CSINK (sink), "ENOPROTOOPT"); break; default: csink_on_error (CSINK (sink), "UNKNOWN"); } return -1; } /* Socket stuff. */ fcntl (sink->fd, F_SETFL, O_NONBLOCK); res = bind (sink->fd, sink->local_address, sink->address_len); if (res < 0) { switch (errno) { case EBADF: /* bad socket descriptor */ csink_on_error (CSINK (sink), "EBADF"); break; case ENOTSOCK: /* fd was not a socket */ csink_on_error (CSINK (sink), "ENOTSOCK"); break; case EADDRNOTAVAIL: /* local address/port not available on this * machine */ csink_on_error (CSINK (sink), "EADDRNOTAVAIL"); break; case EADDRINUSE: /* that address/port is in use. */ csink_on_error (CSINK (sink), "EADDRINUSE"); break; case EINVAL: /* socket already has an address */ csink_on_error (CSINK (sink), "EINVAL"); break; /* is this fatal?? I don't know. */ case EACCES: csink_on_error (CSINK (sink), "EACCES"); break; default: csink_on_error (CSINK (sink), "UNKNOWN"); break; } return -1; } res = listen (sink->fd, 2); if (res < 0) { switch (errno) { case EBADF: /* bad socket descriptor */ csink_on_error (CSINK (sink), "EBADF"); break; case ENOTSOCK: /* fd was not a socket */ csink_on_error (CSINK (sink), "ENOTSOCK"); break; case EOPNOTSUPP: /* this is not a listenable socket */ csink_on_error (CSINK (sink), "EOPNOTSUPP"); break; } return -1; } sink->status |= SOCKET_CONNECTED; sink->read_watch_tag = csink_add_fd (sink->fd, EIO_READ | EIO_WRITE | EIO_ERROR, sink->accept_action, CSINK (sink), "Socket listen tag"); return 0; }