/* * libosengine - A synchronization engine for the opensync framework * Copyright (C) 2004-2005 Armin Bauer * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include "opensync.h" #include "opensync_internals.h" #include #include typedef struct OSyncPendingMessage { long long int id1; int id2; /** Where should the reply be received? */ OSyncMessageHandler callback; /** The user data */ gpointer user_data; } OSyncPendingMessage; /** * @ingroup OSEngineQueue * @brief A Queue used for asynchronous communication between thread * */ /*@{*/ static gboolean _incoming_prepare(GSource *source, gint *timeout_) { *timeout_ = 1; return FALSE; } static gboolean _incoming_check(GSource *source) { OSyncQueue *queue = *((OSyncQueue **)(source + 1)); if (g_async_queue_length(queue->incoming) > 0) return TRUE; return FALSE; } /* This function is called from the master thread. The function dispatched incoming data from * the remote end */ static gboolean _incoming_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) { osync_trace(TRACE_ENTRY, "%s(%p)", __func__, user_data); OSyncQueue *queue = user_data; OSyncMessage *message = NULL; while ((message = g_async_queue_try_pop(queue->incoming))) { /* We check of the message is a reply to something */ if (message->cmd == OSYNC_MESSAGE_REPLY || message->cmd == OSYNC_MESSAGE_ERRORREPLY) { /* Search for the pending reply. We have to lock the * list since another thread might be duing the updates */ g_mutex_lock(queue->pendingLock); OSyncPendingMessage *found = NULL; GList *p = NULL; for (p = queue->pendingReplies; p; p = p->next) { OSyncPendingMessage *pending = p->data; if (pending->id1 == message->id1 && pending->id2 == message->id2) { /* Get the pending message from the queue */ queue->pendingReplies = g_list_remove(queue->pendingReplies, pending); found = pending; break; } } g_mutex_unlock(queue->pendingLock); if (found) { /* Call the callback of the pending message and free the message */ osync_assert(found->callback); found->callback(message, found->user_data); g_free(found); } else osync_trace(TRACE_INTERNAL, "%s: No pending message for %lld:%d", __func__, message->id1, message->id2); } else queue->message_handler(message, queue->user_data); osync_message_unref(message); } osync_trace(TRACE_EXIT, "%s: Done dispatching", __func__); return TRUE; } static void _osync_queue_stop_incoming(OSyncQueue *queue) { if (queue->incoming_source) { g_source_destroy(queue->incoming_source); queue->incoming_source = NULL; } if (queue->incomingContext) { g_main_context_unref(queue->incomingContext); queue->incomingContext = NULL; } if (queue->incoming_functions) { g_free(queue->incoming_functions); queue->incoming_functions = NULL; } } static gboolean _queue_prepare(GSource *source, gint *timeout_) { *timeout_ = 1; return FALSE; } static gboolean _queue_check(GSource *source) { OSyncQueue *queue = *((OSyncQueue **)(source + 1)); if (g_async_queue_length(queue->outgoing) > 0) return TRUE; return FALSE; } int _osync_queue_write_data(OSyncQueue *queue, const void *vptr, size_t n, OSyncError **error) { ssize_t nwritten = 0; while (n > 0) { if ((nwritten = write(queue->fd, vptr, n)) <= 0) { if (errno == EINTR) nwritten = 0; /* and call write() again */ else { osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to write IPC data: %i: %s", errno, strerror(errno)); return (-1); /* error */ } } n -= nwritten; vptr += nwritten; } return (nwritten); } osync_bool _osync_queue_write_long_long_int(OSyncQueue *queue, const long long int message, OSyncError **error) { if (_osync_queue_write_data(queue, &message, sizeof(long long int), error) < 0) return FALSE; return TRUE; } osync_bool _osync_queue_write_int(OSyncQueue *queue, const int message, OSyncError **error) { if (_osync_queue_write_data(queue, &message, sizeof(int), error) < 0) return FALSE; return TRUE; } /* This function sends the data to the remote side. If there is an error, it sends an error * message to the incoming queue */ static gboolean _queue_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) { OSyncQueue *queue = user_data; OSyncError *error = NULL; OSyncMessage *message = NULL; while ((message = g_async_queue_try_pop(queue->outgoing))) { /* Check if the queue is connected */ if (!queue->connected) { osync_error_set(&error, OSYNC_ERROR_GENERIC, "Trying to send to a queue thats not connected"); goto error; } /*FIXME: review usage of osync_marshal_get_size_message() */ if (!_osync_queue_write_int(queue, message->buffer->len + osync_marshal_get_size_message(message), &error)) goto error; if (!_osync_queue_write_int(queue, message->cmd, &error)) goto error; if (!_osync_queue_write_long_long_int(queue, message->id1, &error)) goto error; if (!_osync_queue_write_int(queue, message->id2, &error)) goto error; if (message->buffer->len) { int sent = 0; do { int written = _osync_queue_write_data(queue, message->buffer->data + sent, message->buffer->len - sent, &error); if (written < 0) goto error; sent += written; } while (sent < message->buffer->len); } osync_message_unref(message); } return TRUE; error: if (message) osync_message_unref(message); if (error) { message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error); if (message) { osync_marshal_error(message, error); g_async_queue_push(queue->incoming, message); } osync_error_free(&error); } return FALSE; } static gboolean _source_prepare(GSource *source, gint *timeout_) { *timeout_ = 1; return FALSE; } static int _osync_queue_read_data(OSyncQueue *queue, void *vptr, size_t n, OSyncError **error) { size_t nleft; ssize_t nread = 0; nleft = n; while (n > 0) { if ((nread = read(queue->fd, vptr, nleft)) < 0) { if (errno == EINTR) nread = 0; /* and call read() again */ else { osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read IPC data: %i: %s", errno, strerror(errno)); return (-1); } } else if (nread == 0) break; /* EOF */ nleft -= nread; vptr += nread; } return (n - nleft); /* return >= 0 */ } static osync_bool _osync_queue_read_int(OSyncQueue *queue, int *message, OSyncError **error) { int read = _osync_queue_read_data(queue, message, sizeof(int), error); if (read < 0) return FALSE; if (read != sizeof(int)) { osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read int. EOF"); return FALSE; } return TRUE; } static osync_bool _osync_queue_read_long_long_int(OSyncQueue *queue, long long int *message, OSyncError **error) { int read = _osync_queue_read_data(queue, message, sizeof(long long int), error); if (read < 0) return FALSE; if (read != sizeof(long long int)) { osync_error_set(error, OSYNC_ERROR_IO_ERROR, "Unable to read int. EOF"); return FALSE; } return TRUE; } static gboolean _source_check(GSource *source) { OSyncQueue *queue = *((OSyncQueue **)(source + 1)); OSyncMessage *message = NULL; OSyncError *error = NULL; if (queue->connected == FALSE) { /* Ok. so we arent connected. lets check if there are pending replies. We cannot * receive any data on the pipe, therefore, any pending replies will never * be answered. So we return error messages for all of them. */ if (queue->pendingReplies) { g_mutex_lock(queue->pendingLock); osync_error_set(&error, OSYNC_ERROR_IO_ERROR, "Broken Pipe"); GList *p = NULL; for (p = queue->pendingReplies; p; p = p->next) { OSyncPendingMessage *pending = p->data; message = osync_message_new(OSYNC_MESSAGE_ERRORREPLY, 0, NULL); if (message) { osync_marshal_error(message, error); message->id1 = pending->id1; message->id2 = pending->id2; g_async_queue_push(queue->incoming, message); } } osync_error_free(&error); g_mutex_unlock(queue->pendingLock); } return FALSE; } switch (osync_queue_poll(queue)) { case OSYNC_QUEUE_EVENT_NONE: return FALSE; case OSYNC_QUEUE_EVENT_READ: return TRUE; case OSYNC_QUEUE_EVENT_HUP: case OSYNC_QUEUE_EVENT_ERROR: queue->connected = FALSE; /* Now we can send the hup message, and wake up the consumer thread so * it can pickup the messages in the incoming queue */ message = osync_message_new(OSYNC_MESSAGE_QUEUE_HUP, 0, &error); if (!message) goto error; g_async_queue_push(queue->incoming, message); if (queue->incomingContext) g_main_context_wakeup(queue->incomingContext); return FALSE; } return FALSE; error: message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error); if (message) { osync_marshal_error(message, error); g_async_queue_push(queue->incoming, message); } osync_error_free(&error); return FALSE; } /* This function reads from the file descriptor and inserts incoming data into the * incoming queue */ static gboolean _source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) { OSyncQueue *queue = user_data; OSyncMessage *message = NULL; OSyncError *error = NULL; do { int size = 0; int cmd = 0; long long int id1 = 0; int id2 = 0; if (!_osync_queue_read_int(queue, &size, &error)) goto error; if (!_osync_queue_read_int(queue, &cmd, &error)) goto error; if (!_osync_queue_read_long_long_int(queue, &id1, &error)) goto error; if (!_osync_queue_read_int(queue, &id2, &error)) goto error; message = osync_message_new(cmd, size, &error); if (!message) goto error; message->id1 = id1; message->id2 = id2; if (size) { int read = 0; do { int inc = _osync_queue_read_data(queue, message->buffer->data + read, size - read, &error); if (inc < 0) goto error_free_message; if (inc == 0) { osync_error_set(&error, OSYNC_ERROR_IO_ERROR, "Encountered EOF while data was missing"); goto error_free_message; } read += inc; } while (read < size); } g_async_queue_push(queue->incoming, message); if (queue->incomingContext) g_main_context_wakeup(queue->incomingContext); } while (_source_check(queue->read_source)); return TRUE; error_free_message: osync_message_unref(message); error: if (error) { message = osync_message_new(OSYNC_MESSAGE_QUEUE_ERROR, 0, &error); if (message) { osync_marshal_error(message, error); g_async_queue_push(queue->incoming, message); } osync_error_free(&error); } return FALSE; } /*! @brief Creates a new asynchronous queue * * This function return the pointer to a newly created OSyncQueue * */ OSyncQueue *osync_queue_new(const char *name, OSyncError **error) { osync_trace(TRACE_ENTRY, "%s(%s, %p)", __func__, name, error); OSyncQueue *queue = osync_try_malloc0(sizeof(OSyncQueue), error); if (!queue) goto error; if (name) queue->name = g_strdup(name); queue->fd = -1; if (!g_thread_supported ()) g_thread_init (NULL); queue->pendingLock = g_mutex_new(); queue->context = g_main_context_new(); queue->outgoing = g_async_queue_new(); queue->incoming = g_async_queue_new(); osync_trace(TRACE_EXIT, "%s: %p", __func__, queue); return queue; error: osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); return NULL; } /* Creates anonymous pipes which dont have to be created and are automatically connected. * * Lets assume parent wants to send, child wants to receive * * osync_queue_new_pipes() * fork() * * Parent: * connect(write_queue) * disconnect(read_queue) * * Child: * connect(read_queue) * close(write_queue) * * * */ osync_bool osync_queue_new_pipes(OSyncQueue **read_queue, OSyncQueue **write_queue, OSyncError **error) { osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, read_queue, write_queue, error); *read_queue = osync_queue_new(NULL, error); if (!*read_queue) goto error; *write_queue = osync_queue_new(NULL, error); if (!*write_queue) goto error_free_read_queue; int filedes[2]; if (pipe(filedes) < 0) { osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to create pipes"); goto error_free_write_queue; } (*read_queue)->fd = filedes[0]; (*write_queue)->fd = filedes[1]; osync_trace(TRACE_EXIT, "%s", __func__); return TRUE; error_free_write_queue: osync_queue_free(*write_queue); error_free_read_queue: osync_queue_free(*read_queue); error: osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); return FALSE; } void osync_queue_free(OSyncQueue *queue) { osync_trace(TRACE_ENTRY, "%s(%p)", __func__, queue); OSyncMessage *message = NULL; OSyncPendingMessage *pending = NULL; g_mutex_free(queue->pendingLock); g_main_context_unref(queue->context); _osync_queue_stop_incoming(queue); while ((message = g_async_queue_try_pop(queue->incoming))) { osync_message_unref(message); } g_async_queue_unref(queue->incoming); while ((message = g_async_queue_try_pop(queue->outgoing))) { osync_message_unref(message); } g_async_queue_unref(queue->outgoing); while (queue->pendingReplies) { pending = queue->pendingReplies->data; g_free(pending); queue->pendingReplies = g_list_remove(queue->pendingReplies, pending); } if (queue->name) g_free(queue->name); g_free(queue); osync_trace(TRACE_EXIT, "%s", __func__); } osync_bool osync_queue_exists(OSyncQueue *queue) { return g_file_test(queue->name, G_FILE_TEST_EXISTS) ? TRUE : FALSE; } osync_bool osync_queue_create(OSyncQueue *queue, OSyncError **error) { if (mkfifo(queue->name, 0600) != 0) { osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to create fifo"); return FALSE; } return TRUE; } osync_bool osync_queue_remove(OSyncQueue *queue, OSyncError **error) { osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error); if (unlink(queue->name) != 0) { osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to remove queue"); osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); return FALSE; } osync_trace(TRACE_EXIT, "%s", __func__); return TRUE; } static osync_bool __osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, osync_bool nonblocking, OSyncError **error) { osync_assert(queue); osync_assert(queue->connected == FALSE); OSyncQueue **queueptr = NULL; queue->type = type; if (queue->fd == -1) { /* First, open the queue with the flags provided by the user */ int fd = open(queue->name, (type == OSYNC_QUEUE_SENDER ? O_WRONLY : O_RDONLY) | (nonblocking ? O_NONBLOCK : 0)); if (fd == -1) { osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to open fifo"); goto error; } queue->fd = fd; int oldflags = fcntl(queue->fd, F_GETFD); if (oldflags == -1) { osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to get fifo flags"); goto error_close; } if (fcntl(queue->fd, F_SETFD, oldflags|FD_CLOEXEC) == -1) { osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to set fifo flags"); goto error_close; } } queue->connected = TRUE; signal(SIGPIPE, SIG_IGN); /* now we start a thread which handles reading/writing of the queue */ queue->thread = osync_thread_new(queue->context, error); if (!queue->thread) goto error; queue->write_functions = g_malloc0(sizeof(GSourceFuncs)); queue->write_functions->prepare = _queue_prepare; queue->write_functions->check = _queue_check; queue->write_functions->dispatch = _queue_dispatch; queue->write_functions->finalize = NULL; queue->write_source = g_source_new(queue->write_functions, sizeof(GSource) + sizeof(OSyncQueue *)); queueptr = (OSyncQueue **)(queue->write_source + 1); *queueptr = queue; g_source_set_callback(queue->write_source, NULL, queue, NULL); g_source_attach(queue->write_source, queue->context); g_main_context_ref(queue->context); queue->read_functions = g_malloc0(sizeof(GSourceFuncs)); queue->read_functions->prepare = _source_prepare; queue->read_functions->check = _source_check; queue->read_functions->dispatch = _source_dispatch; queue->read_functions->finalize = NULL; queue->read_source = g_source_new(queue->read_functions, sizeof(GSource) + sizeof(OSyncQueue *)); queueptr = (OSyncQueue **)(queue->read_source + 1); *queueptr = queue; g_source_set_callback(queue->read_source, NULL, queue, NULL); g_source_attach(queue->read_source, queue->context); g_main_context_ref(queue->context); osync_thread_start(queue->thread); return TRUE; error_close: close(queue->fd); error: return FALSE; } osync_bool osync_queue_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error) { return __osync_queue_connect(queue, type, FALSE, error); } osync_bool osync_queue_try_connect(OSyncQueue *queue, OSyncQueueType type, OSyncError **error) { return __osync_queue_connect(queue, type, TRUE, error); } osync_bool osync_queue_disconnect(OSyncQueue *queue, OSyncError **error) { osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, error); osync_assert(queue); if (queue->thread) { osync_thread_stop(queue->thread); osync_thread_free(queue->thread); queue->thread = NULL; } //g_source_unref(queue->write_source); if (queue->write_functions) g_free(queue->write_functions); //g_source_unref(queue->read_source); _osync_queue_stop_incoming(queue); /* We have to empty the incoming queue if we disconnect the queue. Otherwise, the * consumer threads might try to pick up messages even after we are done. */ OSyncMessage *message = NULL; while ((message = g_async_queue_try_pop(queue->incoming))) { osync_message_unref(message); } if (close(queue->fd) != 0) { osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to close queue"); osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); return FALSE; } queue->fd = -1; queue->connected = FALSE; osync_trace(TRACE_EXIT, "%s", __func__); return TRUE; } osync_bool osync_queue_is_connected(OSyncQueue *queue) { osync_assert(queue); return queue->connected; } /*! @brief Sets the message handler for a queue * * Sets the function that will receive all messages, except the methodcall replies * * @param queue The queue to set the handler on * @param handler The message handler function * @param user_data The userdata that the message handler should receive * */ void osync_queue_set_message_handler(OSyncQueue *queue, OSyncMessageHandler handler, gpointer user_data) { osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, handler, user_data); queue->message_handler = handler; queue->user_data = user_data; osync_trace(TRACE_EXIT, "%s", __func__); } /*! @brief Sets the queue to use the gmainloop with the given context * * This function will attach the OSyncQueue as a source to the given context. * The queue will then be check for new messages and the messages will be * handled. * * @param queue The queue to set up * @param context The context to use. NULL for default loop * */ void osync_queue_setup_with_gmainloop(OSyncQueue *queue, GMainContext *context) { osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, queue, context); queue->incoming_functions = g_malloc0(sizeof(GSourceFuncs)); queue->incoming_functions->prepare = _incoming_prepare; queue->incoming_functions->check = _incoming_check; queue->incoming_functions->dispatch = _incoming_dispatch; queue->incoming_functions->finalize = NULL; queue->incoming_source = g_source_new(queue->incoming_functions, sizeof(GSource) + sizeof(OSyncQueue *)); OSyncQueue **queueptr = (OSyncQueue **)(queue->incoming_source + 1); *queueptr = queue; g_source_set_callback(queue->incoming_source, NULL, queue, NULL); g_source_attach(queue->incoming_source, context); queue->incomingContext = context; // For the source g_main_context_ref(context); //To unref it later g_main_context_ref(context); osync_trace(TRACE_EXIT, "%s", __func__); } osync_bool osync_queue_dispatch(OSyncQueue *queue, OSyncError **error) { _incoming_dispatch(NULL, NULL, queue); return TRUE; } OSyncQueueEvent osync_queue_poll(OSyncQueue *queue) { struct pollfd pfd; pfd.fd = queue->fd; pfd.events = POLLIN; /* Here we poll on the queue. If we read on the queue, we either receive a * POLLIN or POLLHUP. Since we cannot write to the queue, we can block pretty long here. * * If we are sending, we can only receive a POLLERR which means that the remote side has * disconnected. Since we mainly dispatch the write IO, we dont want to block here. */ int ret = poll(&pfd, 1, queue->type == OSYNC_QUEUE_SENDER ? 0 : 100); if (ret < 0 && errno == EINTR) return OSYNC_QUEUE_EVENT_NONE; if (ret == 0) return OSYNC_QUEUE_EVENT_NONE; if (pfd.revents & POLLERR) return OSYNC_QUEUE_EVENT_ERROR; else if (pfd.revents & POLLHUP) return OSYNC_QUEUE_EVENT_HUP; else if (pfd.revents & POLLIN) return OSYNC_QUEUE_EVENT_READ; return OSYNC_QUEUE_EVENT_ERROR; } /** note that this function is blocking */ OSyncMessage *osync_queue_get_message(OSyncQueue *queue) { return g_async_queue_pop(queue->incoming); } void gen_id(long long int *part1, int *part2) { struct timeval tv; struct timezone tz; gettimeofday(&tv, &tz); long long int now = tv.tv_sec * 1000000 + tv.tv_usec; int rnd = (int)random(); rnd = rnd << 16 | getpid(); *part1 = now; *part2 = rnd; } osync_bool osync_queue_send_message(OSyncQueue *queue, OSyncQueue *replyqueue, OSyncMessage *message, OSyncError **error) { osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, queue, replyqueue, message, error); if (message->callback) { osync_assert(replyqueue); OSyncPendingMessage *pending = osync_try_malloc0(sizeof(OSyncPendingMessage), error); if (!pending) goto error; gen_id(&(message->id1), &(message->id2)); pending->id1 = message->id1; pending->id2 = message->id2; pending->callback = message->callback; pending->user_data = message->user_data; g_mutex_lock(replyqueue->pendingLock); replyqueue->pendingReplies = g_list_append(replyqueue->pendingReplies, pending); g_mutex_unlock(replyqueue->pendingLock); } osync_message_ref(message); g_async_queue_push(queue->outgoing, message); g_main_context_wakeup(queue->context); osync_trace(TRACE_EXIT, "%s", __func__); return TRUE; error: osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); return FALSE; } osync_bool osync_queue_send_message_with_timeout(OSyncQueue *queue, OSyncQueue *replyqueue, OSyncMessage *message, int timeout, OSyncError **error) { osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, queue, message, error); /*TODO: add timeout handling */ osync_bool ret = osync_queue_send_message(queue, replyqueue, message, error); osync_trace(ret ? TRACE_EXIT : TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); return ret; } osync_bool osync_queue_is_alive(OSyncQueue *queue) { if (!osync_queue_try_connect(queue, OSYNC_QUEUE_SENDER, NULL)) { return FALSE; } OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_NOOP, 0, NULL); if (!message) { return FALSE; } if (!osync_queue_send_message(queue, NULL, message, NULL)) { return FALSE; } osync_queue_disconnect(queue, NULL); return TRUE; }