// -*- c++ -*- // Generated by assa-genesis //------------------------------------------------------------------------------ // charinbuffer_test.cpp //------------------------------------------------------------------------------ // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version // 2 of the License, or (at your option) any later version. // // Date : Fri Aug 9 11:06:50 EDT 2002 //------------------------------------------------------------------------------ static const char help_msg[]= " \n" " NAME: \n" " charinbuffer_test - test program for CharInBuffer class \n" " \n" " DESCRIPTION: \n" " \n" " Test functionality of CharInBuffer class. \n" " \n" " :listener \n" " Test Loopback[1] Loopback[2] Test1 Test2 \n" " --------- -------- -------- ----- ----- \n" " | | | | | \n" " | | | connect | | \n" " |<---------------------------------------o | \n" " | <> | | | | \n" " o--------------->| | | | \n" " | | | | connect | \n" " |<-------------------------------------------------o \n" " | <> | | | | \n" " o--------------------------->| | | \n" " | | | write(EVIAN\\n\\r) | \n" " | |<----------------------o | \n" " | | | | | \n" " | | write(OK\\n) | | \n" " | o---------------------->| | \n" " | | | | | \n" " | | |<--------------------o write('E') \n" " | | |<--------------------o write('V') \n" " | | |<--------------------o write('I') \n" " | | |<--------------------o write('A') \n" " | | |<--------------------o write('N') \n" " | | |<--------------------o write(\\n) \n" " | | |<--------------------o write(\\r) \n" " | | | | | \n" " | | | write(OK\\n) | \n" " | | o-------------------->| \n" " | | | | | \n" " \n" " USAGE: \n" " \n" " shell> charinbuffer_test [OPTIONS] \n" " \n" " OPTIONS: \n" " \n" " -d, --log-stdout - write debug to standard output \n" " -D, --debug-file NAME - write debug to NAME file \n" " -h, --help - print this messag \n" " -l, --pidfile PATH - the process ID is written to the lockfile PATH \n" " instead of default ~/.{procname}.pid \n" " -L, --no-pidfile - do not create PID lockfile \n" " -m, --mask MASK - mask (default: DBG_ALL = 0x7fffffff) \n" " -p, --port NAME - tcp/ip port NAME (default - procname) \n" " -v, --version - print version number \n" " -z, --log-size NUM - maximum size debug file can reach (dfl: is 10Mb)\n"; //------------------------------------------------------------------------------ #if !defined (WIN32) #include #include using std::string; #include "assa/Assure.h" #include "assa/GenServer.h" #include "assa/Singleton.h" #include "assa/TimeVal.h" #include "assa/IPv4Socket.h" #include "assa/Acceptor.h" #include "assa/Connector.h" #include "assa/UNIXAddress.h" #include "assa/CharInBuffer.h" #include "assa/MemDump.h" using namespace ASSA; // Forward declarations class Task1; class Task2; class Loopback; static const string REQUEST ("Evian\n\r"); static const string REQUEST_CLEAR ("Evian"); static const string REP_OK ("OK\n"); static const string REP_ERR ("ERROR\n"); /******************************************************************************/ // Class Test /******************************************************************************/ class Test : public GenServer, public Singleton { public: Test (); ~Test () { ::unlink ("/tmp/charinbuffer.unix"); } virtual void init_service (); virtual void process_events (); virtual int handle_timeout (TimerId tid_); int exit_value (int v_) { set_exit_value (v_); } private: Acceptor* m_acceptor; Connector m_connector_t1; Connector m_connector_t2; UNIXAddress m_address; Task1* m_task1; Task2* m_task2; }; /******************************************************************************/ // Class Task1 [Client side] // // Task1 sends Evian\\n message to the Loopback as one chunk. // It expects OK\\n or ERROR\\n from Loopback in return. /******************************************************************************/ class Task1 : public ServiceHandler { public: Task1 (); ~Task1 (); virtual int open (); virtual int handle_timeout (TimerId tid_); virtual int handle_read (int fd_); virtual int handle_close (int fd_); /** Report completion status. * 0 - task is still running * -1 - task failed * 1 - task completed succesfully */ bool completed () const { return m_completion_status; } enum state_t { start, inprogress, finish }; private: void state (state_t new_state_); state_t state () const { return m_state; } const char* state_name (state_t state_) const; private: int m_completion_status; state_t m_state; CharInBuffer m_reply; // { OK\n, ERROR\n } }; /******************************************************************************/ // Class Task2 [Client side] // // Task2 sends Evian\\n message to the Loopback 1 byte at a time with // delays in 1 second. // It expects OK\\n or ERROR\\n from Loopback in return. /******************************************************************************/ class Task2 : public ServiceHandler { public: enum state_t { start, inprogress, finish }; Task2 (); ~Task2 (); virtual int open (); virtual int handle_timeout (TimerId tid_); virtual int handle_read (int fd_); virtual int handle_close (int fd_); /** Report completion status. * 0 - task is still running * -1 - task failed * 1 - task completed succesfully */ int completed () const { return m_completion_status; } private: void state (state_t new_state_); state_t state () const { return m_state; } const char* state_name (state_t state_) const; private: int m_completion_status; state_t m_state; CharInBuffer m_reply; // { OK\n, ERROR\n } size_t m_idx; // index to the next character to send }; /******************************************************************************/ // Class Loopback [Server side] /******************************************************************************/ class Loopback : public ServiceHandler { public: Loopback (IPv4Socket*); ~Loopback (); virtual int open (); virtual void close (); virtual int handle_read (int fd_); virtual int handle_close (int fd_); private: CharInBuffer m_msg; }; /******************************************************************************/ // Useful defines /******************************************************************************/ #define TEST Test::get_instance() #define REACTOR Test::get_instance()->get_reactor() /******************************************************************************/ // Static declarations mandated by Singleton class /******************************************************************************/ ASSA_DECL_SINGLETON(Test); /******************************************************************************/ // Class Test member functions /******************************************************************************/ Test:: Test () : m_address ("/tmp/charinbuffer.unix"), m_task1 (NULL), m_task2 (NULL) { // ---Configuration-- rm_opt ('f', "config-file" ); rm_opt ('n', "instance" ); rm_opt ('s', "set-name" ); rm_opt ('t', "comm-timeout" ); // ---Process bookkeeping-- rm_opt ('b', "daemon" ); /*--- * Disable all debugging *---*/ // m_mask = 0; m_log_file = "charinbuf.log"; set_id ("Test"); } void Test::init_service () { trace_with_mask("Test::init_service",TRACE); #ifndef __CYGWIN__ TimeVal tv (5.0); m_address.dump (); if (m_address.bad ()) { DL((APP,"Bad UNIX address!\n")); goto on_error; } /** * Create and open Acceptor */ m_acceptor = new Acceptor (REACTOR); Assure_exit (m_acceptor->open (m_address) == 0); /** * Open Connectors for ASYNC connection */ if (m_connector_t1.open (tv, ASSA::async, REACTOR) < 0) { DL((APP,"Connector failed on open(task1, async)\n")); goto on_error; } if (m_connector_t2.open (tv, ASSA::async, REACTOR) < 0) { DL((APP,"Connector failed on open(task2, async)\n")); goto on_error; } /* Instantiate all Tasks */ m_task1 = new Task1; m_task2 = new Task2; /** * Connect Task 1 with Loopback asynchronously. * I cannot reuse the same connector because connection happens * asynchronously. */ if (m_connector_t1.connect (m_task1, m_address, AF_UNIX) < 0) { DL((APP,"Connector failed on connect(task1)\n")); goto on_error; } /** * Connect Task 2 with Loopback asynchronously */ if (m_connector_t2.connect (m_task2, m_address, AF_UNIX) < 0) { DL((APP,"Connector failed on connect(task2)\n")); goto on_error; } /** * Schedule Test(server-side) task again */ REACTOR->registerTimerHandler (this, 0.1, "Test-tick"); // one tick #endif // !__CYGWIN__ DL((APP,"Initialized\n")); std::cout << "= Running charinbuffer_test Test =\n"; return; on_error: TEST->stop_service (); } int Test::handle_timeout (TimerId tid_) { trace_with_mask("Test::handle_timeout",TRACE); int status; /* Ask client application if it has finished running. If it has, kill it. If not, schedule this task to run again. */ if (m_task1) { if ((status = m_task1->completed ())) { if (status < 0) { set_exit_value (1); std::cout << "Task 1 failed\n"; } else { std::cout << "Task 1 ok\n"; } delete m_task1; m_task1 = NULL; } } if (m_task2) { if ((status = m_task2->completed ())) { if (status < 0) { set_exit_value (1); std::cout << "Task 2 failed\n"; } else { std::cout << "Task 2 ok\n"; } delete m_task2; m_task2 = NULL; } } /* If there is a task still running, reschedule ourselves */ if (m_task1 || m_task2) { return 1; } else { if (get_exit_value () == 0) { std::cout << "Test completed ok\n"; } else { std::cout << "Test failed\n"; } DL((APP,"Test completed - stopping service\n")); TEST->stop_service (); } return 0; } void Test::process_events () { trace_with_mask("Test::process_events",TRACE); #ifdef __CYGWIN__ std::cout << "Task 1 ok\n"; std::cout << "Task 2 ok\n"; std::cout << "Test completed ok\n"; set_exit_value (0); #else while (service_is_active ()) { m_reactor.waitForEvents (); } #endif m_reactor.stopReactor (); DL((APP,"Done\n")); } /******************************************************************************/ // Class Task1 member functions /******************************************************************************/ Task1:: Task1 () : m_completion_status (0), m_state (Task1::start), m_reply (30, "\n") { trace_with_mask("Task1::Task1",TRACE); set_id ("Task1"); DL((APP,"Initial state: %s\n", state_name (state ()) )); } Task1:: ~Task1 () { trace_with_mask("Task1::~Task1",TRACE); } int Task1::open () { trace_with_mask("Task1::open",TRACE); Assure_exit (REACTOR->registerTimerHandler (this, 0.1, "Task1-tick")); Assure_exit (REACTOR->registerIOHandler (this, get_stream ().getHandler (), READ_EVENT)); DL((APP,"Task1 starting\n")); return 0; } int Task1::handle_close (int fd_) { trace_with_mask("Task1::handle_close",TRACE); get_stream ().close (); m_completion_status = true; // Tell the world that we have finished return 0; } int Task1::handle_read (int fd_) { trace_with_mask("Task1::handle_read",TRACE); if (state () != Task1::inprogress) { DL((APP,"unexpected state: %s\n", state () )); return -1; } IPv4Socket& s = *this; s >> m_reply; if (m_reply) // complete { MemDump::dump_to_log (APP, "Message received:", m_reply.c_str (), m_reply.length ()); if (m_reply.c_str () == string ("OK")) { m_completion_status = 1; } else if (m_reply.c_str () == string ("ERROR")) { m_completion_status = -1; } else { DL((APP,"Unexpected reply received!\n")); } state (finish); m_reply.reset (); // Very important return -1; } else { MemDump::dump_to_log (APP, "Incomplete message:", m_reply.c_str (), m_reply.length ()); } return BYTES_LEFT_IN_SOCKBUF(s); } int Task1::handle_timeout (TimerId tid_) { trace_with_mask("Task1::handle_timeout",TRACE); int ret; if (state () == Task1::start) { IPv4Socket& s = *this; ret = s.write (REQUEST.c_str (), REQUEST.size ()); Assure_exit (ret == REQUEST.size ()); s << flush; state (Task1::inprogress); DL((APP,"<== Sent \"Evian\\n\\r\" to Loopback\n")); } else { DL((APP,"Task1 in an unexpected state: %s\n", state_name (state ()) )); } return 0; } const char* Task1::state_name (state_t state_) const { static const char* state_name [] = { "start", "inprogress", "finish" }; return state_name [state_]; } void Task1::state (state_t new_state_) { trace_with_mask("Task1::state",TRACE); switch (new_state_) { case Task1::start: DL((APP, "State change: [%s] -> [start]\n", state_name (state ()) )); break; case Task1::inprogress: DL((APP, "State change: [%s] -> [inprogress]\n", state_name (state ()) )); break; case Task1::finish: DL((APP, "State change: [%s] -> [finish]\n", state_name (state ()) )); break; } m_state = new_state_; } /******************************************************************************/ // Class Task2 member functions /******************************************************************************/ Task2:: Task2 () : m_completion_status (0), m_state (Task2::start), m_reply (20, "\n"), m_idx (0) { trace_with_mask("Task2::Task2",TRACE); set_id ("Task2"); DL((APP,"Initial state: %s\n", state_name (state ()) )); } Task2:: ~Task2 () { trace_with_mask("Task2::~Task2",TRACE); } int Task2::open () { trace_with_mask("Task2::open",TRACE); Assure_exit (REACTOR->registerTimerHandler (this, 0.1, "Test2-tick")); Assure_exit (REACTOR->registerIOHandler (this, get_stream ().getHandler (), READ_EVENT)); DL((APP,"Task2 starting\n")); return 0; } int Task2::handle_close (int fd_) { trace_with_mask("Task2::handle_close",TRACE); get_stream ().close (); m_completion_status = true; // Tell the world that we have finished return 0; } int Task2::handle_timeout (TimerId tid_) { trace_with_mask("Task2::handle_timeout",TRACE); IPv4Socket& s = *this; if (state () == Task2::start) { Assure_exit (s.write (&REQUEST [m_idx], 1) == 1); s << flush; DL((APP,"<== Sent \"%c\" to Loopback\n", REQUEST [m_idx] )); if (++m_idx == REQUEST.size ()) { state (Task2::inprogress); DL((APP,"<== Last byte sent to Loopback\n")); } else { // 1 char/sec. return 1; } } else { DL((APP,"Found Task2 in an unexpected state: %s\n", state_name (state ()) )); } return 0; } int Task2::handle_read (int fd_) { trace_with_mask("Task2::handle_read",TRACE); if (state () != Task2::inprogress) { DL((APP,"unexpected state: %d\n", state () )); return -1; } IPv4Socket& s = *this; s >> m_reply; if (m_reply) // complete { MemDump::dump_to_log (APP, "Message received:", m_reply.c_str (), m_reply.length ()); if (m_reply.c_str () == string ("OK")) { m_completion_status = 1; } else if (m_reply.c_str () == string ("ERROR")) { m_completion_status = -1; } else { DL((APP,"Unexpected reply received!\n")); } state (finish); m_reply.reset (); // Very important return -1; } else { MemDump::dump_to_log (APP, "Incomplete message:", m_reply.c_str (), m_reply.length ()); } return BYTES_LEFT_IN_SOCKBUF(s); } const char* Task2::state_name (state_t state_) const { static const char* state_name [] = { "start", "inprogress", "finish" }; return state_name [state_]; } void Task2::state (state_t new_state_) { trace("Task2::state"); switch (new_state_) { case Task2::start: DL((APP, "State change: [%s] -> [start]\n", state_name (state ()) )); break; case Task2::inprogress: DL((APP, "State change: [%s] -> [inprogress]\n", state_name (state ()) )); break; case Task2::finish: DL((APP, "State change: [%s] -> [finish]\n", state_name (state ()) )); break; } m_state = new_state_; } /******************************************************************************/ // Class Loopback member functions /******************************************************************************/ Loopback:: Loopback (IPv4Socket* stream_) : ServiceHandler (stream_), m_msg (20, "\n\r") { trace_with_mask("Loopback::Loopback",TRACE); set_id ("Loopback"); DL((APP,"Loopback object is instantiated\n")); } Loopback:: ~Loopback () { trace_with_mask("Loopback::~Loopback",TRACE); } int Loopback::open () { trace_with_mask("Loopback::open",TRACE); Assure_exit (REACTOR->registerIOHandler (this, get_stream ().getHandler (), READ_EVENT)); DL((APP,"Loopback is opened for service\n")); return 0; } void Loopback::close () { trace_with_mask("Loopback::close",TRACE); delete this; } int Loopback::handle_close (int fd_) { trace_with_mask("Loopback::handle_close",TRACE); delete this; return 0; } int Loopback::handle_read (int fd_) { trace_with_mask("Loopback::handle_read",TRACE); IPv4Socket& s = *this; int ret; s >> m_msg; if (m_msg) { MemDump::dump_to_log (TRACE, "==> Message received:", m_msg.c_str (), m_msg.length ()); if (m_msg.c_str () == REQUEST_CLEAR) { DL((APP,"==> Received expected message\n")); DL((APP,"<== Replying with \"OK\\n\"\n")); ret = s.write (REP_OK.c_str (), REP_OK.size ()); Assure_exit (ret == REP_OK.size ()); TEST->exit_value (0); } else { DL((APP,"==> Received unexpected message\n")); DL((APP,"<== Replying with \"ERROR\\n\"\n")); ret = s.write (REP_ERR.c_str (), REP_ERR.size ()); Assure_exit (ret == REP_ERR.size ()); TEST->exit_value (1); } s << flush; m_msg.reset (); } else { DL((APP,"Incomplete message so far: \n")); } return BYTES_LEFT_IN_SOCKBUF(s); } #endif /* !defined WIN32 */ /******************************************************************************/ // MAIN /******************************************************************************/ int main (int argc, char* argv[]) { #if !defined (WIN32) static const char release[] = "1.0"; int patch_level = 0; TEST->set_version (release, patch_level); TEST->set_author ("Vladislav Grinchenko"); TEST->set_flags (GenServer::RMLOG); TEST->init (&argc, argv, help_msg); TEST->init_service (); TEST->process_events (); return TEST->get_exit_value (); #endif /* !defined WIN32 */ return 0; }