#include "csink.h"
#include "csinksocket.h"
/* debugging */
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
static void csink_socket_connect_action (CSink *sink);
static void csink_socket_free (CSinkSocket *sink);
gint
csink_socket_write (CSinkSocket * sink, CBuf * data);
void
csink_socket_init (CSinkSocket *sink, int address_len, int address_family)
{
csink_init(CSINK(sink));
CSINK(sink)->csink_type = CSINK_SOCKET_TYPE;
sink->address_len = address_len;
sink->address_family = address_family;
sink->fd = -1;
if (sink->remote_address)
free (sink->remote_address);
sink->remote_address = (struct sockaddr *)malloc(address_len);
if (sink->local_address)
free (sink->local_address);
sink->local_address = (struct sockaddr *)malloc(address_len);
CSINK(sink)->open = (CSinkOpenFunc)csink_socket_open;
CSINK(sink)->write = (CSinkWriteFunc)csink_socket_write;
CSINK(sink)->close = (CSinkCloseFunc)csink_socket_close;
CSINK(sink)->free = (CSinkFreeFunc)csink_socket_free;
sink->listen = (CSinkSocketListenFunc)csink_socket_default_listen;
sink->open_action = csink_socket_connect_action;
sink->can_read_action = (CSinkSocketCanReadFunc)csink_socket_can_read_action;
sink->can_write_action =
(CSinkSocketCanReadFunc)csink_socket_can_write_action;
}
void
csink_socket_release (CSinkSocket *sink)
{
if (sink->local_address) {
free (sink->local_address);
sink->local_address = NULL;
}
if(sink->remote_address) {
free (sink->remote_address);
sink->remote_address = NULL;
}
CDEBUG (("csinksocket", "in csink_socket_release"));
if (sink->read_watch_tag) {
CDEBUG (("csinksocket", "clearing read tag"));
csink_remove_fd(sink->read_watch_tag,
"Socket sink being freed");
sink->read_watch_tag = NULL;
}
if (sink->write_watch_tag) {
CDEBUG (("csinksocket", "clearing write tag"));
csink_remove_fd(sink->write_watch_tag,
"Socket Sink being freed");
sink->write_watch_tag = NULL;
}
csink_release (CSINK(sink));
}
void
csink_socket_free (CSinkSocket *sink)
{
/* this is not expected to be user-accesssible, as csinksocket is abstract.
* it's just here for completeness. */
csink_socket_release (sink);
free (sink);
}
void
csink_socket_close (CSinkSocket *sink)
{
close (sink->fd);
if (sink->read_watch_tag) {
csink_remove_fd (sink->read_watch_tag,
"Socket sink closing");
sink->read_watch_tag = NULL;
}
if (sink->write_watch_tag) {
csink_remove_fd (sink->write_watch_tag,
"Socket sink closing");
sink->write_watch_tag = NULL;
}
}
void
csink_socket_set_remote_address (CSinkSocket *sink, struct sockaddr *address)
{
/* memset(sink->remote_address, 0, sink->address_len); */
memcpy(sink->remote_address, address, sink->address_len);
}
void
csink_socket_set_local_address (CSinkSocket *sink, struct sockaddr *address)
{
/* memset(sink->local_address, 0, sink->address_len); */
memcpy(sink->local_address, address, sink->address_len);
}
void
csink_socket_set_accept_func (CSinkSocket *sink, CSinkCallbackFunc func)
{
sink->on_accept = func;
}
void
csink_socket_on_accept (CSinkSocket *sink, CSink * newconn)
{
if (sink->on_accept)
sink->on_accept (newconn);
else {
csink_close(newconn);
csink_free(newconn);
}
}
int
csink_socket_listen (CSinkSocket *sink)
{
CDEBUG (("csinksocket", "in csink_socket_listen"));
if(!sink->listen) {
CDEBUG(("csinksocket", "Attempt to call null listen function"));
/* set the error object here */
return -1;
}
/* Ensure we are in the proper state. */
if (sink->status & (SOCKET_CONNECTED | SOCKET_CONNECT_INPROGRESS)) {
CDEBUG (("csinksocket",
"attempt to listen on an already connected sink."));
/* set the error object here */
return -1;
}
return sink->listen(CSINK(sink));
}
void
csink_socket_open (CSinkSocket *sink)
{
int res;
struct sockaddr_in addr;
int len = sizeof(addr);
CDEBUG (("csinksocket", "open, called."));
sink->fd = socket (AF_INET, SOCK_STREAM, 0);
/* sink->fd = socket (sink->address_family, SOCK_STREAM, 0); */
if (sink->fd < 0) { /* error */
CDEBUG (("csinksocket", "open, couldn't get an fd. returning."));
csink_on_error (CSINK (sink), "SOCKET_CREATE_FAILED");
return;
}
/* IO in csink is ALWAYS nonblocking. */
fcntl (sink->fd, F_SETFL, O_NONBLOCK);
res = connect (sink->fd, (struct sockaddr *) sink->remote_address,
len);
CDEBUG (("csinksocket", "open, result of connect() call was %d.", res));
CDEBUG (("csinksocket", "open, fd is %d.", sink->fd));
if (res < 0) { /* Error? */
switch (errno) {
case EINPROGRESS:
case EALREADY:
CDEBUG (("csinksocket", "open, connection is happening nonblockingly"));
sink->status |= SOCKET_CONNECT_INPROGRESS;
/* Use the read watch because we aren't into a user defined state. */
sink->read_watch_tag = csink_add_fd (sink->fd,
EIO_READ | EIO_WRITE | EIO_ERROR,
sink->open_action,
CSINK (sink),
"Nonblocking socket connect wait tag");
break;
default:
CDEBUG (("csinksocket", "open, error in connect, aborting."));
sink->status &= ~(SOCKET_CONNECT_INPROGRESS | SOCKET_CONNECTED);
csink_close (CSINK (sink));
csink_on_error (CSINK (sink), "CONNECT_FAILURE");
}
} else {
CDEBUG (("csinksocket", "open, connection established."));
if (sink->open_action)
sink->open_action (CSINK(sink));
}
CDEBUG (("csinksocket", "open, returning."));
}
/* Need to setup the write handler code. */
gint
csink_socket_write (CSinkSocket * sink, CBuf * message)
{
if (FALSE == csink_write_sanity (CSINK(sink), message) ) {
return FALSE;
}
/* If we're not connected, leave. */
if (! (sink->status & SOCKET_CONNECTED)) {
csink_on_error (CSINK (sink), "DISCONN_WRITE");
return FALSE;
}
/* Need to queue up the data. */
csink_default_write (CSINK(sink), message);
if (NULL == sink->write_watch_tag) {
sink->write_watch_tag = csink_add_fd (sink->fd,
EIO_WRITE, sink->can_write_action,
CSINK (sink),
"Socket write tag");
}
return TRUE;
}
int
csink_socket_do_connect (CSinkSocket *sink)
{
int ret;
int optval;
socklen_t option_len = sizeof(optval);
/* Remove our watch. */
if (sink->read_watch_tag) {
csink_remove_fd (sink->read_watch_tag,
"About to try connect, clearing read tag.");
sink->read_watch_tag = NULL;
}
/* Need to see if we got connected. */
errno = 0;
optval = 0;
ret = getsockopt (sink->fd, SOL_SOCKET, SO_ERROR, &optval, &option_len);
/* Solaris needs this; see stevens, Unix Network Pgm'ng, 15.4 */
if (ret < 0)
optval = errno;
CDEBUG (("csinksocket", "Getsockopt: ret = %i, optval = %i", ret, optval));
/* See if there was a bad error, or if we didn't get connected... */
if (0 != optval ) {
CSINK_SOCKET (sink)->status &=
~(SOCKET_CONNECT_INPROGRESS | SOCKET_CONNECTED);
CDEBUG (("csinksocket", "failure for unknown reason: %s(%d)",
strerror(optval),
optval));
csink_on_error (CSINK(sink), "CONNECT_FAILURE");
csink_close (CSINK(sink));
return FALSE;
}
sink->status |= SOCKET_CONNECTED;
sink->status &= ~SOCKET_CONNECT_INPROGRESS;
CDEBUG (("csinksocket", "Connect callback...\n"));
/* Always need to read from the socket. */
sink->read_watch_tag =
csink_add_fd (sink->fd, EIO_READ | EIO_ERROR,
sink->can_read_action, CSINK (sink),
"Read tag for socket sink");
return TRUE;
}
static void
csink_socket_connect_action (CSink *sink_)
{
CSinkSocket *sink = CSINK_SOCKET(sink_);
CDEBUG (("csinksocket", "connect_action, called"));
if (!csink_socket_do_connect (sink))
return;
CDEBUG (("csinksocket", "connect_action, calling connect callback"));
csink_on_connect (CSINK (sink));
CDEBUG (("csinksocket", "connect_action, back from callback"));
CDEBUG (("csinksocket", "connect_action, returning"));
}
void
csink_socket_can_read_action (CSinkSocket *sink)
{
char buf[256] = "";
CBuf *newmsg;
int res;
/* Read a small block. */
res = recv (sink->fd, buf, 255, 0);
if(res > 0)
CDEBUG (("csinksocket", "read off %i bytes.\n", res));
/* end-of-file */
if (res == 0) {
CDEBUG (("csinksocket",
"Connection closed on other end. closing link.\n"));
csink_close (CSINK (sink));
return;
}
/* error */
if (res < 0) {
switch (errno) {
case ENOTSOCK:
case EBADF: /* Not a socket. */
csink_close (CSINK(sink));
csink_on_error (CSINK(sink), "READ_EBADF");
break;
case ENOTCONN: /* Not connected. */
csink_close (CSINK(sink));
csink_on_error (CSINK(sink), "IMPLEMENTATION_BUG");
break;
case EINTR: /* Just try again later. */
case EAGAIN:
break;
default: /* Lord only knows. */
csink_close(CSINK(sink));
csink_on_error (CSINK(sink), "UNKNOWN");
}
return;
}
/* Create a CBuf from the buf.
* +1, because we need the \0 for good luck. :) */
newmsg = cbuf_new_with_data (buf, res+1);
g_ptr_array_add (CSINK (sink)->inQueue, (gpointer) newmsg);
csink_on_new_data (CSINK(sink));
}
void
csink_socket_can_write_action (CSinkSocket * sink)
{
CBuf *partial;
gint res;
CBuf *cur;
CDEBUG (("csinksocket", "in csink_socket_can_write_action"));
if (! (sink->status & SOCKET_CONNECTED) ) {
/* Need to clean up stuff, there is user error going on
* if this is the case. We need to call the cleanup code.
* We need to disable the select(2) on this fd. */
/* FIXME. MW */
return;
}
/* Send loop for sending the queue. */
while (0 != CSINK (sink)->outQueue->len) {
int written = 0;
CDEBUG (("csinksocket", "SEND LOOP--CONNECTED (1)\n"));
cur = (CBuf *) g_ptr_array_index (CSINK (sink)->outQueue, 0);
g_ptr_array_remove_index (CSINK (sink)->outQueue, 0);
while (written < cur->len) {
res = send (sink->fd, /* No SIGPIPE. */
cur->str + written, cur->len - written, 0);
if (res < 0) { /* Error. */
switch (errno) {
case EPIPE: /* Connection broken. */
csink_on_error (CSINK (sink), "SEND_PIPE");
csink_close (CSINK (sink));
goto fail;
case ENOTSOCK:
case EBADF: /* Not a valid socket file descriptor. */
csink_close (CSINK (sink));
csink_on_error (CSINK (sink), "SEND_ENOTSOCK");
goto fail;
case EMSGSIZE:
csink_close (CSINK (sink));
csink_on_error (CSINK (sink), "SEND_EMSGSIZE");
goto fail;
case ENOTCONN:
csink_close (CSINK (sink));
csink_on_error (CSINK (sink), "IMPLEMENTATION_BUG");
goto fail;
case ENOBUFS:
case EWOULDBLOCK: /* Wait for a while and try again. */
/* Push what's left of the message back into the queue. */
partial = cbuf_new_with_data (cur->str+written,
cur->len-written);
g_ptr_array_add_at_start (CSINK(sink)->outQueue,
(gpointer) partial);
cbuf_free (cur);
return;
case EINTR: /* interrupted system call; just try again */
break;
default:
csink_close (CSINK (sink));
csink_on_error (CSINK (sink), "UNKNOWN");
goto fail;
}
} else { /* Success. */
written += res;
}
} /* END Current hunck code. */
cbuf_free (cur);
}
/* Disconnect write tag now that everthing is done being sent. */
csink_remove_fd (sink->write_watch_tag,
"Socket Write Queue is empty");
sink->write_watch_tag = NULL;
return;
fail:
cbuf_free (cur);
}
int
csink_socket_default_listen (CSinkSocket * sink)
{
int res;
int true = 1;
CDEBUG (("csinksocket", "in csink_socket_default_listen"));
sink->status &= ~(SOCKET_CONNECT_INPROGRESS);
/* Do all of the fun stuff necessary to initialize a listening socket. */
sink->fd = socket (sink->address_family, SOCK_STREAM, 0);
if (sink->fd < 0) {
csink_on_error (CSINK (sink), "SOCKET_CREATE_FAILED");
/* set error object */
return -1;
}
/* Set the SO_REUSEADDR option on the socket; this gets around the nasty
* stupid fact that sockets aren't closed properly when a process exits. */
res = setsockopt (sink->fd, SOL_SOCKET, SO_REUSEADDR,
&true, sizeof (int));
if (res < 0) {
switch (errno) {
case EBADF: /* bad socket descriptor */
csink_on_error (CSINK (sink), "EBADF");
break;
case ENOTSOCK: /* fd was not a socket */
csink_on_error (CSINK (sink), "ENOTSOCK");
break;
case ENOPROTOOPT: /* that option doesn't make any sense at that
* level */
csink_on_error (CSINK (sink), "ENOPROTOOPT");
break;
default:
csink_on_error (CSINK (sink), "UNKNOWN");
}
return -1;
}
/* Socket stuff. */
fcntl (sink->fd, F_SETFL, O_NONBLOCK);
res = bind (sink->fd, sink->local_address, sink->address_len);
if (res < 0) {
switch (errno) {
case EBADF: /* bad socket descriptor */
csink_on_error (CSINK (sink), "EBADF");
break;
case ENOTSOCK: /* fd was not a socket */
csink_on_error (CSINK (sink), "ENOTSOCK");
break;
case EADDRNOTAVAIL: /* local address/port not available on this
* machine */
csink_on_error (CSINK (sink), "EADDRNOTAVAIL");
break;
case EADDRINUSE: /* that address/port is in use. */
csink_on_error (CSINK (sink), "EADDRINUSE");
break;
case EINVAL: /* socket already has an address */
csink_on_error (CSINK (sink), "EINVAL");
break; /* is this fatal?? I don't know. */
case EACCES:
csink_on_error (CSINK (sink), "EACCES");
break;
default:
csink_on_error (CSINK (sink), "UNKNOWN");
break;
}
return -1;
}
res = listen (sink->fd, 2);
if (res < 0) {
switch (errno) {
case EBADF: /* bad socket descriptor */
csink_on_error (CSINK (sink), "EBADF");
break;
case ENOTSOCK: /* fd was not a socket */
csink_on_error (CSINK (sink), "ENOTSOCK");
break;
case EOPNOTSUPP: /* this is not a listenable socket */
csink_on_error (CSINK (sink), "EOPNOTSUPP");
break;
}
return -1;
}
sink->status |= SOCKET_CONNECTED;
sink->read_watch_tag = csink_add_fd (sink->fd,
EIO_READ | EIO_WRITE | EIO_ERROR,
sink->accept_action, CSINK (sink),
"Socket listen tag");
return 0;
}
syntax highlighted by Code2HTML, v. 0.9.1