#include #include #include #include #include #include #include #include #include #include "entity.h" #ifndef BUFSIZ #define BUFSIZ 1024 #endif /* * The way an io tag can look in the xml. * */ typedef struct _WriteBuf { char *buffer; char *obuf; int size; } WriteBuf; /* Releases all memory and watch tags involved with doing reading */ static void rendio_release_read_watch (ENode * node) { void *rtag; EBuf *buffer; EDEBUG (("io-renderer", "releasing input stream information")); rtag = enode_get_kv (node, "rendio-io-rtag"); enode_set_kv (node, "rendio-io-rtag", NULL); if (rtag) entity_mainloop->io_remove (rtag); EDEBUG (("io-renderer", "releasing input stream tag %d", rtag)); buffer = enode_get_kv (node, "rendio-io-read-buffer"); if (buffer) ebuf_free (buffer); enode_set_kv (node, "rendio-io-read-buffer", NULL); } /* Releases watch tags involved with doing writes */ static void rendio_release_write_watch (ENode * node) { void *wtag; EDEBUG (("io-renderer", "releasing write stream information")); wtag = enode_get_kv (node, "rendio-io-wtag"); enode_set_kv (node, "rendio-io-wtag", NULL); if (wtag) entity_mainloop->io_remove (wtag); EDEBUG (("io-renderer", "releasing input stream tag %d", wtag)); enode_set_kv (node, "rendio-io-read-buffer", NULL); } /* chunk mode is real easy.. read chunk, pass it on */ static int rendio_read_chunk_mode_callback (gint source, EIOCond cond, ENode * node) { EBuf *function; gint nread; gint fd; gchar buffer[BUFSIZ]; function = enode_attrib (node, "onnewdata", NULL); /* Get fd */ fd = GPOINTER_TO_INT (enode_get_kv (node, "rendio-io-fd")); if (fd < 0) return (FALSE); EDEBUG ( ("io-renderer", "in chunk_mode read callback, gonna read from fd %d", fd)); nread = read (fd, buffer, BUFSIZ); EDEBUG (("io-renderer", "binary, nread = %i", nread)); /* If we read -1, assume there's an error and quit watching for reads */ if (nread <= 0) rendio_release_read_watch (node); /* we pass nread straight so they can use it to see status too */ if (ebuf_not_empty (function)) enode_call_ignore_return (node, function->str, "bi", buffer, nread, nread); return (TRUE); } /* When new data arrives in line mode, we call this with the new data.. this * will sort out the data and call a function if there's a new line to pass * on */ static void rendio_renderer_newdata (ENode * node, gchar * inbuf, gint has_newline) { EBuf *buffer; gint nread; gchar *function; nread = strlen (inbuf); function = enode_attrib_str (node, "onnewdata", NULL); buffer = enode_get_kv (node, "rendio-io-read-buffer"); EDEBUG (("io-renderer", "newdata function is %s", function)); /* if we find an eol.. */ if (has_newline) { EDEBUG (("io-renderer", "EOL found checking..")); /* check to see if there's already something in the buffer from * before */ if (!buffer || buffer->len == 0) { EDEBUG (("io-renderer", "No previous buffer, sending directly.")); /* If not, we can pass this straight to the application */ if (function) enode_call_ignore_return (node, function, "bi", inbuf, nread, nread); return; } else { EDEBUG (("io-renderer", "EOL found, appending '%s' to previous buffer '%s' and sending.", inbuf, buffer->str)); /* else we have to get the previous buffer first, and then pass * it on */ ebuf_append_str (buffer, inbuf); if (function) enode_call_ignore_return (node, function, "bi", buffer->str, buffer->len, buffer->len); /* truncate buffer */ ebuf_truncate (buffer, 0); return; } } else { /* If we make it here, it means there was no end of line found, and * we have to store it for the next pass */ /* init buffer if we don't already have one */ if (!buffer) { buffer = ebuf_new (); enode_set_kv (node, "rendio-io-read-buffer", buffer); } EDEBUG (("io-renderer", "no EOL, appending '%s' to buffer.", inbuf)); /* append data to buffer */ ebuf_append_str (buffer, inbuf); } } /* Line mode is a bit more tricky, must write to a queue until we've * determined that we have a full line to pass to the application */ static gint rendio_read_line_mode_callback (gint source, EIOCond cond, ENode * node) { gchar buffer[BUFSIZ + 1]; gchar *p; gchar s; gint nread; gint fd; gint i; EDEBUG (("io-renderer", "in rendio_read_callback, cond = %i", cond)); fd = GPOINTER_TO_INT (enode_get_kv (node, "rendio-io-fd")); if (fd < 0) return (FALSE); nread = read (fd, buffer, BUFSIZ); /* insure \0 */ buffer[nread] = '\0'; p = buffer; /* Walk through string and break it into chunks on newlines */ for (i = 0; i <= nread; i++) { if (buffer[i] == '\n') { /* Save old value right after \n */ s = buffer[i + 1]; buffer[i + 1] = '\0'; rendio_renderer_newdata (node, p, TRUE); /* restore value after \n */ buffer[i + 1] = s; /* update p */ p = &buffer[i + 1]; } } rendio_renderer_newdata (node, p, FALSE); if (nread <= 0) rendio_release_read_watch (node); return (TRUE); } static GSList * free_write_buffer (GSList * list, WriteBuf * buffer) { GSList *tmp; if (list && list->data == buffer) { tmp = list; list = list->next; tmp->next = NULL; g_slist_free (tmp); } g_free (buffer->obuf); g_free (buffer); return (list); } static int rendio_sendq_callback (gint source, EIOCond cond, ENode * node) { char *function; WriteBuf *buffer; GSList *write_list; gint ret; gint fd; EDEBUG (("io-renderer", "rendio_sendq_callback")); /* Get the fd */ fd = GPOINTER_TO_INT (enode_get_kv (node, "rendio-io-fd")); write_list = enode_get_kv (node, "rendio-io-write-list"); EDEBUG (("io-renderer", "write_list = %i", write_list)); if (!write_list || !write_list->data) { /* Nothing to write. */ rendio_release_write_watch (node); return (FALSE); } buffer = (WriteBuf *) write_list->data; ret = write (fd, buffer->buffer, buffer->size); EDEBUG (("io-renderer", "write ret = %i", ret)); if (ret <= 0) { /* Error on descriptor, effectivly and eof, clean up time */ g_warning ("Error writing to file: %s", g_strerror (errno)); rendio_release_write_watch (node); return (FALSE); } else if (ret < buffer->size) { buffer->buffer += ret; buffer->size -= ret; } else if (ret == buffer->size) { write_list = free_write_buffer (write_list, buffer); enode_set_kv (node, "rendio-io-write-list", write_list); if (!write_list) { /* We have written everything. */ EDEBUG (("io-renderer", "done writing")); rendio_release_write_watch (node); } } function = enode_attrib_str (node, "onwrite", NULL); enode_call_ignore_return (node, function, "i", ret); return (TRUE); } gint rendio_sendq_attr_set (ENode * node, EBuf * attr, EBuf * value) { WriteBuf *buffer; void *wtag; gint fd; GSList *write_list = NULL; GSList *write_list_tail = NULL; gint send_size = value->len; gchar *data = value->str; EDEBUG (("io-renderer", "sendq buffer set, buffer size = %i", send_size)); if (send_size <= 0) /* nothing to send. */ return (FALSE); buffer = g_new0 (WriteBuf, 1); buffer->size = send_size; buffer->obuf = g_new (char, send_size); memcpy (buffer->obuf, data, send_size); buffer->buffer = buffer->obuf; enode_attrib_quiet (node, attr->str, ebuf_new_with_str ("")); write_list = enode_get_kv (node, "rendio-io-write-list"); write_list_tail = enode_get_kv (node, "rendio-io-write-list-tail"); write_list = g_slist_append_tail (write_list, buffer, &write_list_tail); enode_set_kv (node, "rendio-io-write-list", write_list); enode_set_kv (node, "rendio-io-write-list-tail", write_list_tail); /* Get fd */ fd = GPOINTER_TO_INT (enode_get_kv (node, "rendio-io-fd")); wtag = enode_get_kv (node, "rendio-io-wtag"); if (fd && !wtag) { wtag = entity_mainloop->io_add (fd, EIO_WRITE, (void *) rendio_sendq_callback, node); EDEBUG (("io-renderer", "added new write tag, wtag = %i", wtag)); enode_set_kv (node, "rendio-io-wtag", wtag); } return (TRUE); } static int rendio_error_callback (gint source, EIOCond cond, ENode * node) { gchar *function; EDEBUG (("io-renderer", "rendio_error_callback")); function = enode_attrib_str (node, "onerror", NULL); enode_call_ignore_return (node, function, ""); return (TRUE); } static void rendio_release_watch (ENode * node) { void *old_etag; /* Delete old io watchers if they exist ... */ /* This looks after all the read watcher stuff */ rendio_release_read_watch (node); /* and the error tag... */ rendio_release_write_watch (node); old_etag = enode_get_kv (node, "rendio-io-etag"); /* need to delete the old tag and callback connection. */ EDEBUG (("io-renderer", "going to delete old tags")); if (old_etag) entity_mainloop->io_remove (old_etag); enode_set_kv (node, "rendio-io-etag", NULL); } /* Called with the fd is set, so we have to setup new handlers for * everything. */ static int rendio_fd_attr_set (ENode * node, EBuf * attr, EBuf * value) { gint source; void *rtag; void *etag; EBuf *line_mode; EDEBUG (("io-renderer", "fd attribute set")); /* Release old fd watchers */ rendio_release_watch (node); /* clear list */ enode_set_kv (node, "rendio-io-line", NULL); enode_set_kv (node, "rendio-io-write-list", NULL); /* if source is < 0 then they are telling us to stop watching, or they * have a bad fd or something.. */ source = erend_get_integer (value); if (source < 0) return (TRUE); EDEBUG (("io-renderer", "fd set to %d", source)); /* No blocking io... this could cause odd interactions, we'll have to see */ fcntl (source, F_SETFL, O_NONBLOCK); EDEBUG (("io-renderer", "source = %i", source)); line_mode = enode_attrib (node, "mode", NULL); if (ebuf_not_empty (line_mode) && ebuf_equal_str (line_mode, "line")) { rtag = entity_mainloop->io_add (source, EIO_READ, (void *) rendio_read_line_mode_callback, node); } else { rtag = entity_mainloop->io_add (source, EIO_READ, (void *) rendio_read_chunk_mode_callback, node); } enode_set_kv (node, "rendio-io-rtag", rtag); EDEBUG (("io-renderer", "rtag = %i", rtag)); etag = entity_mainloop->io_add (source, EIO_ERROR, (void *) rendio_error_callback, node); enode_set_kv (node, "rendio-io-etag", etag); EDEBUG (("io-renderer", "etag = %i", etag)); enode_set_kv (node, "rendio-io-fd", GINT_TO_POINTER (source)); return (TRUE); } static void rendio_render (ENode * node) { EDEBUG (("io-renderer", "rendering")); enode_attribs_sync (node); } static void rendio_parent (ENode * parent_node, ENode * child_node) { erend_short_curcuit_parent (parent_node, child_node); } static void rendio_destroy (ENode * node) { /* Release old fd watchers */ rendio_release_watch (node); } void io_renderer_register (void) { Element *element; ElementAttr *e_attr; element = g_new0 (Element, 1); element->render_func = rendio_render; element->parent_func = rendio_parent; element->destroy_func = rendio_destroy; element->tag = "io"; element->description = "Buffered event driven IO. Set up a file descriptor, and use this to do input or output on it."; element_register (element); e_attr = g_new0 (ElementAttr, 1); e_attr->attribute = "fd"; e_attr->description = "Specify the file descripter to watch."; e_attr->value_desc = "integer"; e_attr->possible_values = "-1,*"; e_attr->set_attr_func = rendio_fd_attr_set; element_register_attrib (element, e_attr); e_attr = g_new0 (ElementAttr, 1); e_attr->attribute = "_sendq"; e_attr->description = "Stuff data into the outbound send queue."; e_attr->value_desc = "string"; e_attr->set_attr_func = rendio_sendq_attr_set; element_register_attrib (element, e_attr); e_attr = g_new0 (ElementAttr, 1); e_attr->attribute = "mode"; e_attr->description = "Specify whether to pass lines or binary chunks."; e_attr->value_desc = "string"; e_attr->possible_values = "chunk,line"; e_attr->set_attr_func = NULL; element_register_attrib (element, e_attr); e_attr = g_new0 (ElementAttr, 1); e_attr->attribute = "onnewdata"; e_attr->description = "Function to call with data read from the fd."; e_attr->value_desc = "function"; e_attr->set_attr_func = NULL; element_register_attrib (element, e_attr); e_attr = g_new0 (ElementAttr, 1); e_attr->attribute = "onwrite"; e_attr->description = "Function to call with the number of bytes written to fd."; e_attr->value_desc = "function"; e_attr->set_attr_func = NULL; element_register_attrib (element, e_attr); e_attr = g_new0 (ElementAttr, 1); e_attr->attribute = "onerror"; e_attr->description = "Function to call when an exception occurs on the fd."; e_attr->value_desc = "function"; e_attr->set_attr_func = NULL; element_register_attrib (element, e_attr); }