#include <opensync/opensync.h>
#include <glib.h>
#include <stdio.h>
#include <assert.h>
#include <fcntl.h>

#include "opensync/opensync_internals.h"

typedef struct PluginProcess {
	OSyncEnv *env;
	OSyncMember *member;
	OSyncQueue *incoming;
	OSyncQueue *outgoing;

	/** Does osync_member_initialized() run successfully? */
	osync_bool is_initialized;
} PluginProcess;

typedef struct context {
	PluginProcess *pp;
	OSyncMessage *message;

	/** The change being commited, for commit_change() */
	OSyncChange *change;

	/** A function that may be used to set method-specific data in the reply,
	 *  such as the UID in the in the commit_change reply
	 */
	osync_bool (*add_reply_data)(OSyncMessage*, struct context*, OSyncError**);
} context;


static osync_bool add_commit_change_reply_data(OSyncMessage *reply, context *ctx, OSyncError **error);
static osync_bool add_connect_reply_data(OSyncMessage *reply, context *ctx, OSyncError **error);
static osync_bool add_get_changedata_reply_data(OSyncMessage *reply, context *ctx, OSyncError **error);

void message_handler(OSyncMessage*, void*);
void message_callback(OSyncMember*, context*, OSyncError**);

void process_free(PluginProcess *pp)
{
	if (pp->incoming) {
		osync_queue_disconnect(pp->incoming, NULL);
		osync_queue_remove(pp->incoming, NULL);
		osync_queue_free(pp->incoming);
	}

	if (pp->outgoing) {
		osync_queue_disconnect(pp->incoming, NULL);
		osync_queue_free(pp->outgoing);
	}

	if (pp->env)
		osync_env_free(pp->env);

	g_free(pp);
}

void process_error_shutdown(PluginProcess *pp, OSyncError **error)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, pp, error);

	OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_ERROR, 0, NULL);
	if (!message)
		goto error;

	osync_marshal_error(message, *error);

	if (!osync_queue_send_message(pp->outgoing, NULL, message, NULL))
		goto error_free_message;

	sleep(1);

	process_free(pp);
	osync_trace(TRACE_EXIT, "%s", __func__);
	exit(1);

error_free_message:
	osync_message_unref(message);
error:
	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
	exit(2);
}

void osync_client_sync_alert_sink(OSyncMember *member)
{
	osync_trace(TRACE_ENTRY, "%s(%p)", __func__, member);
	PluginProcess *pp = (PluginProcess*)osync_member_get_data(member);

	OSyncError *error = NULL;

	OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_SYNC_ALERT, 0, &error);
	if (!message)
		process_error_shutdown(pp, &error);

	if (!osync_queue_send_message(pp->outgoing, NULL, message, &error))
		process_error_shutdown(pp, &error);

	osync_trace(TRACE_EXIT, "%s", __func__);
}

void osync_client_changes_sink(OSyncMember *member, OSyncChange *change, void *user_data)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, member, change, user_data);
	context *ctx = (context *)user_data;
	PluginProcess *pp = ctx->pp;
	OSyncMessage *orig = ctx->message;

	OSyncError *error = NULL;

	if (osync_message_is_answered(orig)) {
		osync_change_free(change);
		osync_trace(TRACE_EXIT, "%s", __func__);
		return;
	}

	OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_NEW_CHANGE, 0, &error);
	if (!message)
		process_error_shutdown(pp, &error);

	osync_marshal_change(message, change);

	osync_message_write_long_long_int(message, osync_member_get_id(member));

	if (!osync_queue_send_message(pp->outgoing, NULL, message, &error))
		process_error_shutdown(pp, &error);

	osync_trace(TRACE_EXIT, "%s", __func__);
}

static void usage (char *name)
{
  fprintf (stderr, "\nUsage: %s <group path> <memberid>\n\n", name);
  fprintf (stderr, "<group path> is the path to the directory\n");
  fprintf (stderr, "\tof the group to synchronize\n");
  fprintf (stderr, "<memberid> is the id of the member to debug\n\n");
  fprintf (stderr, "Example: %s /home/joe/.opensync/group1 1\n", name);
  exit (1);
}

int main( int argc, char **argv )
{
	osync_trace(TRACE_ENTRY, "%s(%i, %p)", __func__, argc, argv);
	GMainLoop *syncloop;
	GMainContext *context;
	OSyncError *error = NULL;
	PluginProcess pp;
	
	if (argc != 3)
		usage(argv[0]);

	memset(&pp, 0, sizeof(pp));

	char *group_path = argv[ 1 ];
	int member_id = atoi( argv[ 2 ] );

	context = g_main_context_new();
	syncloop = g_main_loop_new(context, TRUE);

	/** Create environment **/
	OSyncEnv *env = osync_env_new();
	/* Don't load groups. We will load the group manually using osync_group_load() */
	osync_env_set_option(env, "LOAD_GROUPS", "no");

	/* Don't load plugins automatically if OSYNC_MODULE_LIST is set */
	char *module_list = getenv("OSYNC_MODULE_LIST");

	if (module_list) {
		osync_env_set_option(env, "LOAD_PLUGINS", "no");

		osync_trace(TRACE_INTERNAL, "OSYNC_MODULE_LIST variable: %s", module_list);

		char *str, *saveptr;
		for (str = module_list; ; str = NULL) {
			char *path = strtok_r(str, ":", &saveptr);
			if (!path)
				break;

			osync_trace(TRACE_INTERNAL, "Module to be loaded: %s", path);
			if (!osync_module_load(env, path, &error)) {
				fprintf(stderr, "Unable to load plugin %s: %s\n", path, osync_error_print(&error));
				osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error));
				return 1;
			}
		}
	}

	if (!osync_env_initialize(env, &error)) {
		fprintf(stderr, "Unable to initialize environment: %s\n", osync_error_print(&error));
		osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error));
		osync_error_free(&error);
		return 1;
	}

	/** Find group **/
	OSyncGroup *group = osync_group_load(env, group_path, &error);
	if (!group) {
		fprintf(stderr, "Unable to load group from path: %s\n", group_path);
		osync_trace(TRACE_EXIT_ERROR, "%s: Unable to load group from path: %s", __func__, group_path);
		return 2;
	}

	/** Find member **/
	int i;
	for ( i = 0; i < osync_group_num_members(group); ++i ) {
		pp.member = osync_group_nth_member(group, i);
		if (member_id == osync_member_get_id(pp.member))
			break;
		else
			pp.member = NULL;
	}
	if ( !pp.member ) {
		fprintf(stderr, "Unable to find member with id %d\n", member_id);
		osync_trace(TRACE_EXIT_ERROR, "%s: Unable to find member with id %d", __func__, member_id);
		return 3;
	}
	osync_trace(TRACE_INTERNAL, "+++++++++ This is the client #%d (%s plugin) of group %s", member_id, pp.member->pluginname, osync_group_get_name(group));

	/** Create connection pipes **/
	char *pipe_path = g_strdup_printf( "%s/pluginpipe", osync_member_get_configdir( pp.member ) );
	pp.incoming = osync_queue_new( pipe_path, &error );
	pp.outgoing = NULL;
	g_free( pipe_path );

	osync_queue_create( pp.incoming, &error );
	if ( osync_error_is_set( &error ) )
		osync_error_free( &error );

	/** Idle until the syncengine connects to (and reads from) our pipe **/
	if (!osync_queue_connect( pp.incoming, OSYNC_QUEUE_RECEIVER, 0 )) {
		fprintf(stderr, "Unable to connect\n");
		osync_trace(TRACE_EXIT_ERROR, "%s: Unable to connect", __func__);
		exit(1);
	}

	
	osync_member_set_data(pp.member, &pp);

	/** Set callback functions **/
	OSyncMemberFunctions *functions = osync_member_get_memberfunctions(pp.member);
	functions->rf_change = osync_client_changes_sink;
	//functions->rf_message = osync_client_message_sink;
	functions->rf_sync_alert = osync_client_sync_alert_sink;

	/** Start loop **/
	osync_trace(TRACE_INTERNAL, "plugin setting up mainloop");
	osync_queue_set_message_handler(pp.incoming, message_handler, &pp);
	osync_queue_setup_with_gmainloop(pp.incoming, context);
	osync_member_set_loop(pp.member, context);

	osync_trace(TRACE_INTERNAL, "running loop");
	g_main_loop_run(syncloop);

	osync_trace(TRACE_EXIT, "%s", __func__);
	return 0;
}

void message_handler(OSyncMessage *message, void *user_data)
{
	osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, user_data);
	PluginProcess *pp = user_data;

	OSyncMessage *reply = NULL;
	OSyncError *error = NULL;
	//OSyncChange *change = 0;
	OSyncMember *member = pp->member;
	char *enginepipe = NULL;
   	context *ctx = NULL;

	osync_trace(TRACE_INTERNAL, "plugin received command %i", osync_message_get_command( message ));

	switch ( osync_message_get_command( message ) ) {
	case OSYNC_MESSAGE_NOOP:
		break;

	case OSYNC_MESSAGE_INITIALIZE:
		osync_trace(TRACE_INTERNAL, "init.");
		osync_message_read_string(message, &enginepipe);

		osync_trace(TRACE_INTERNAL, "enginepipe %s", enginepipe);
		pp->outgoing = osync_queue_new(enginepipe, NULL);
		if (!pp->outgoing) {
			fprintf(stderr, "Unable to make new queue\n");
			osync_trace(TRACE_EXIT_ERROR, "%s: Unable to make new queue", __func__);
			exit(1);
		}
		osync_trace(TRACE_INTERNAL, "connecting to engine");
		if (!osync_queue_connect(pp->outgoing, OSYNC_QUEUE_SENDER, 0 )) {
			fprintf(stderr, "Unable to connect queue\n");
			osync_trace(TRACE_EXIT_ERROR, "%s: Unable to connect queue", __func__);
			exit(1);
		}

		osync_trace(TRACE_INTERNAL, "done connecting to engine");
		/** Instanciate plugin **/
		if (!osync_member_instance_default_plugin(pp->member, &error))
			goto error;

		/** Initialize plugin **/
		if (!osync_member_initialize(pp->member, &error))
			goto error;

		pp->is_initialized = TRUE;

		osync_trace(TRACE_INTERNAL, "sending reply to engine");
		reply = osync_message_new_reply(message, NULL);
		if (!reply) {
			fprintf(stderr, "Unable to make new reply\n");
			osync_trace(TRACE_EXIT_ERROR, "%s: Unable to make new reply", __func__);
			exit(1);
		}

		if (!osync_queue_send_message(pp->outgoing, NULL, reply, NULL)) {
			fprintf(stderr, "Unable to send reply\n");
			osync_trace(TRACE_EXIT_ERROR, "%s: Unable to send reply", __func__);
			exit(1);
		}

		osync_trace(TRACE_INTERNAL, "done sending to engine");
		break;

	case OSYNC_MESSAGE_FINALIZE:
		if (pp->is_initialized)
			osync_member_finalize(pp->member);

		reply = osync_message_new_reply(message, NULL);
		if (!reply) {
			fprintf(stderr, "Unable to make new reply\n");
			osync_trace(TRACE_EXIT_ERROR, "%s: Unable to make new reply", __func__);
			exit(1);
		}

		if (!osync_queue_send_message(pp->outgoing, NULL, reply, NULL)) {
			fprintf(stderr, "Unable to send reply\n");
			osync_trace(TRACE_EXIT_ERROR, "%s: Unable to send reply", __func__);
			exit(1);
		}

		/*FIXME: how to wait for a message to be sent?
		 * We need to wait for the reply to be sent before exiting
		 */

		osync_trace(TRACE_EXIT, "%s", __func__);
		exit(0);
		break;

	case OSYNC_MESSAGE_CONNECT:
		osync_member_read_sink_info_full(member, message);

		ctx = g_malloc0(sizeof(context));
		ctx->pp = pp;
		ctx->message = message;
		osync_message_ref(message);

		/* connect() needs to tell the engine if it must perform a
		 * slow-sync, use add_reply_data() method for this
		 */
		ctx->add_reply_data = add_connect_reply_data;
		
		osync_member_connect(member, (OSyncEngCallback)message_callback, ctx);
		break;

	case OSYNC_MESSAGE_GET_CHANGES:
		osync_member_read_sink_info_full(member, message);

		ctx = g_malloc0(sizeof(context));
		ctx->pp = pp;
		ctx->message = message;
		osync_message_ref(message);
  		osync_member_get_changeinfo(member, (OSyncEngCallback)message_callback, ctx);
		break;

	case OSYNC_MESSAGE_COMMIT_CHANGE:
		ctx = g_malloc0(sizeof(context));
		ctx->pp = pp;
		ctx->message = message;
		osync_message_ref(message);
		OSyncChange *change;
  		osync_demarshal_change(message, member->group->conv_env, &change);
		osync_change_set_member(change, member);

		/* commit_change() needs to return some data back to the engine,
		 * use the add_reply_data() method for this
		 */
		ctx->change = change;
		ctx->add_reply_data = add_commit_change_reply_data;

	  	osync_member_commit_change(member, change, (OSyncEngCallback)message_callback, ctx);
		break;

	case OSYNC_MESSAGE_SYNC_DONE:
		ctx = g_malloc0(sizeof(context));
		ctx->pp = pp;
		ctx->message = message;
		osync_message_ref(message);
  		osync_member_sync_done(member, (OSyncEngCallback)message_callback, ctx);
		break;

	case OSYNC_MESSAGE_DISCONNECT:
		ctx = g_malloc0(sizeof(context));
		ctx->pp = pp;
		ctx->message = message;
		osync_message_ref(message);
  		osync_member_disconnect(member, (OSyncEngCallback)message_callback, ctx);
		break;

	case OSYNC_MESSAGE_REPLY:
		break;

  	case OSYNC_MESSAGE_ERRORREPLY:
		break;

	case OSYNC_MESSAGE_GET_CHANGEDATA:
		ctx = g_malloc0(sizeof(context));
		ctx->pp = pp;
		ctx->message = message;
		osync_message_ref(message);

  		osync_demarshal_change(message, member->group->conv_env, &change);
		osync_change_set_member(change, member);

		/* get_changedata needs to return the data from the change object back */
		ctx->change = change;
		ctx->add_reply_data = add_get_changedata_reply_data;

		osync_member_get_change_data(member, change, (OSyncEngCallback)message_callback, ctx);
		osync_trace(TRACE_EXIT, "message_handler");
		break;

  	case OSYNC_MESSAGE_COMMITTED_ALL:
		ctx = g_malloc0(sizeof(context));
		ctx->pp = pp;
		ctx->message = message;
		osync_message_ref(message);
  		osync_member_committed_all(member, (OSyncEngCallback)message_callback, ctx);
		break;

	/*case OSYNC_MESSAGE_READ_CHANGE:
		osync_demarshal_change( queue, &change, &error );
		osync_member_read_change(client->member, change, (OSyncEngCallback)message_callback, message);
		osync_trace(TRACE_EXIT, "message_handler");
		break;
	*/

	case OSYNC_MESSAGE_CALL_PLUGIN:
		/*
		char *function = itm_message_get_data(message, "function");
		void *data = itm_message_get_data(message, "data");
		OSyncError *error = NULL;
		void *replydata = osync_member_call_plugin(client->member, function, data, &error);

		if (itm_message_get_data(message, "want_reply")) {
			ITMessage *reply = NULL;
			if (!osync_error_is_set(&error)) {
				reply = itm_message_new_methodreply(client, message);
				itm_message_set_data(message, "reply", replydata);
			} else {
				reply = itm_message_new_errorreply(client, message);
				itm_message_set_error(reply, error);
			}

			itm_message_send_reply(reply);
		}
		*/
		break;
	case OSYNC_MESSAGE_QUEUE_HUP:
		osync_trace(TRACE_INTERNAL, "%s: ERROR: Queue hangup", __func__);
		fprintf(stderr, "Pipe closed! Exiting.\n");
		osync_trace(TRACE_EXIT, "%s: Exiting application. Goodbye.", __func__);
		exit(1);
		break;
	default:
		osync_trace(TRACE_INTERNAL, "%s: ERROR: Unknown message", __func__);
		g_assert_not_reached();
		break;
	}

	if (reply)
		osync_message_unref(reply);

	osync_trace(TRACE_EXIT, "%s", __func__);
	return;

error:;

	OSyncMessage *errorreply = osync_message_new_errorreply(message, NULL);
	if (!errorreply) {
		fprintf(stderr, "Unable to make new reply\n");
		osync_trace(TRACE_EXIT_ERROR, "%s", __func__);
		exit(1);
	}

	osync_marshal_error(errorreply, error);

	if (!osync_queue_send_message(pp->outgoing, NULL, errorreply, NULL)) {
		fprintf(stderr, "Unable to send error\n");
		osync_trace(TRACE_EXIT_ERROR, "%s", __func__);
		exit(1);
	}

	osync_message_unref(errorreply);

	osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error));
	osync_error_free(&error);
}

/** add get_changedat-specific data to the get_changedata reply */
static osync_bool add_get_changedata_reply_data(OSyncMessage *reply, context *ctx, OSyncError **error)
{
	OSyncChange *change = ctx->change;

	assert(change);

	osync_marshal_changedata(reply, change);

	return TRUE;
}

/** Add commit_change-specific data to the commit_change reply */
static osync_bool add_commit_change_reply_data(OSyncMessage *reply, context *ctx, OSyncError **error)
{
	OSyncChange *change = ctx->change;

	assert(change);

	osync_message_write_string(reply, osync_change_get_uid(change));

	return TRUE;
}

/** Add connect-specific data to the connect reply */
static osync_bool add_connect_reply_data(OSyncMessage *reply, context *ctx, OSyncError **error)
{
	OSyncMember *member = ctx->pp->member;

	assert(member);

	osync_member_write_sink_info(member, reply);
	
	return TRUE;
}

void message_callback(OSyncMember *member, context *ctx, OSyncError **error)
{
	/*FIXME: handle errors in this function */

	OSyncError *myerror = NULL;
	osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, member, ctx, error);

	OSyncMessage *message = ctx->message;
	PluginProcess *pp = ctx->pp;

	OSyncMessage *reply = NULL;

	if (osync_message_is_answered(message) == TRUE) {
		osync_message_unref(message);
		osync_trace(TRACE_EXIT, "%s", __func__);
		return;
	}

	if (!osync_error_is_set(error)) {
		reply = osync_message_new_reply(message, error);
		osync_debug("CLI", 4, "Member is replying with message %p to message %p:\"%lli-%i\" with no error", reply, message, message->id1, message->id2);
		/* Set method-specific data, if needed */
		if (ctx->add_reply_data)
			ctx->add_reply_data(reply, ctx, error);
	} else {
		reply = osync_message_new_errorreply(message, &myerror);
		osync_marshal_error(reply, *error);
		osync_debug("CLI", 1, "Member is replying with message %p to message %p:\"%lli-%i\" with error %i: %s", reply, message, message->id1, message->id2, osync_error_get_type(error), osync_error_print(error));
	}

	g_free(ctx);

	osync_queue_send_message(pp->outgoing, NULL, reply, NULL);
	osync_message_set_answered(message);

	osync_message_unref(message);
	osync_message_unref(reply);

	osync_trace(TRACE_EXIT, "%s", __func__);
}

void *osync_client_message_sink(OSyncMember *member, const char *name, void *data, osync_bool synchronous)
{
	/*TODO: Implement support for PLUGIN_MESSAGE */
/*
	OSyncClient *client = osync_member_get_data(member);
	OSyncEngine *engine = client->engine;
	if (!synchronous) {

		ITMessage *message = itm_message_new_signal(client, "PLUGIN_MESSAGE");
		osync_debug("CLI", 3, "Sending message %p PLUGIN_MESSAGE for message %s", message, name);
		itm_message_set_data(message, "data", data);
		itm_message_set_data(message, "name", g_strdup(name));
		itm_queue_send(engine->incoming, message);

		return NULL;
	} else {
		return engine->plgmsg_callback(engine, client, name, data, engine->plgmsg_userdata);
	}
*/
  return NULL;
}


syntax highlighted by Code2HTML, v. 0.9.1