// -*- c++ -*- // Generated by assa-genesis //------------------------------------------------------------------------------ // $Id: sb_testc.cpp,v 1.6 2006/07/20 02:30:56 vlg Exp $ //------------------------------------------------------------------------------ // SBTestClient.cpp //------------------------------------------------------------------------------ // Copyright (c) 2005 by Vladislav Grinchenko // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version // 2 of the License, or (at your option) any later version. //------------------------------------------------------------------------------ // // Date : Sun Jun 26 12:10:09 2005 // //------------------------------------------------------------------------------ static const char help_msg[]= " \n" " NAME: \n" " \n" " sb_testc - SocketBuffer Test client \n" " \n" " DESCRIPTION: \n" " \n" " SocketBuffer Test is designed to validate the fix for suspected \n" " overflow bug. This test thus simulates a situation of a sluggish reader \n" " that recovers after a while. \n" " \n" " The test client program connects to the test server, \n" " the server side connects to the client program's data flow control port, \n" " and signals to the client it is ready to receive data. At that, the \n" " size starts transmition of an input data file by filling up the pipe. \n" " When no data can be written any more to the socket, the client side \n" " emmits 'blocked' message to the server via flow control connection. \n" " Upon reception of the 'blocked' message, the server size reads the data \n" " off the data socket and saves it to the output file. When the pipe is \n" " empty, the server side emmits 'drained' message to the client to \n" " proceed with data transfer. \n" " \n" " This communication pattern continues until the last byte of the file is \n" " transmitted and received. \n" " \n" " \n" " sb_testc 'blocked' sb_tests \n" " +---------+ -----> +-----------+ \n" " | O<----------------| CtrlFlow | \n" " | | <--'drained'| | \n" " | Data | | Data | \n" " | Writer |---------------->O Reader | \n" " +---------+ +-----------+ \n" " \n" " This test is not included into automated suite because it requires \n" " an external data file of a substantial size (>400K). To run the test, \n" " start the server on host 'alpha': \n" " \n" " % sb_tests --data-port=10000 --ctrl-port=10001@beta --build-dir=`pwd` \n" " --output-file=sb-data.out \n" " \n" " On the client host 'beta', start the client side: \n" " \n" " % sb_testc --data-port=10000@beta --ctrl-port=10001 --build-dir=`pwd` \n" " --input-file=sb-data.in \n" " \n" " When the test is over, both client and server exit. \n" " Compare sb-data.in to sb-data.out. \n" " \n" " USAGE: \n" " \n" " shell> sb_testc --data-port=NAME --ctrl-port=NAME \n" " --input-file=NAME [OPTIONS] \n" " \n" " OPTIONS: \n" " \n" " --data-port NAME - Listening data port (PORT[@HOST]) \n" " --ctrl-port NAME - Data flow control port of the client program. \n" " --input-file NAME - File to transfer to the server side. \n" " (default: sbtest-data.in) \n" " \n" " --build-dir PATH - Directory where executables were built. \n" " \n" " -b, --daemon - Run process as true UNIX daemon \n" " -l, --pidfile PATH - The process ID is written to the lockfile PATH \n" " instead of default ~/.{procname}.pid \n" " -L, --ommit-pidfile - Do not create PID lockfile \n" " \n" " -D, --log-file NAME - Write debug to NAME file \n" " -d, --log-stdout - Write debug to standard output \n" " -z, --log-size NUM - Maximum size debug file can reach (dfl: is 10Mb) \n" " \n" " -c, --log-level NUM - Log verbosity \n" " -s, --with-log-server - Redirect log messages to the log server \n" " -S, --log-server NAME - Define assa-logd server address \n" " \n" " -m, --mask MASK - Mask (default: ALL = 0x7fffffff) \n" " -p, --port NAME - The tcp/ip port NAME (default - procname) \n" " -n, --instance NUM - Process instance NUM (default - none) \n" " -f, --config-file NAME - Alternative configuration file NAME \n" " \n" " -h, --help - Print this messag \n" " -v, --version - Print version number \n"; //------------------------------------------------------------------------------ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include using std::string; #include #include #include #include #include #include #include #include #include #include #include using namespace ASSA; enum { SBTC = ASSA::APP, SBDATA = ASSA::USR1 // HEX/ASCII outgoing data packets dump }; class DataWriter; //------------------------------------------------------------------------------ // FlowCtrl class //------------------------------------------------------------------------------ class FlowCtrl : public ServiceHandler { public: FlowCtrl (IPv4Socket* stream_); virtual int open (); virtual int handle_read (int fd_); virtual int handle_close (int fd_); void emmit_blocked_signal (); private: CharInBuffer m_input; std::string m_ready_msg; std::string m_drained_msg; std::string m_blocked_msg; }; //------------------------------------------------------------------------------ // Xmit Byte counter //------------------------------------------------------------------------------ class XmitByteCounter { public: XmitByteCounter () : m_count (0) { } ~XmitByteCounter () { DL ((ASSA::APP,"Total bytes processed %d\n", m_count)); } void add (size_t count_) { m_count += count_; } private: size_t m_count; }; //------------------------------------------------------------------------------ // DataWriter class //------------------------------------------------------------------------------ class DataWriter : public ServiceHandler { public: DataWriter (); ~DataWriter (); virtual int open (); int fill_pipe (); void register_flow_ctrl (FlowCtrl* flowctrl_) { m_flow_ctrl = flowctrl_; } private: /** Both read and write Socketbuf buffer sizes are 1416 - just enough to avoid TCP fragmentation. If you write more, Socketbuf will flush data out before taking more bytes from you. The kernel socket buffer size is most probably a 32K chunk. */ enum { FMAXSIZE = 64000 }; // Max frame size in bytes (2832) FlowCtrl* m_flow_ctrl; std::ifstream m_source; std::ofstream m_tee_sink; size_t m_len; char m_buffer [FMAXSIZE]; size_t m_total_xmit; // total bytes transmitted }; //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ class SBTestClient : public ASSA::GenServer, public ASSA::Singleton { public: SBTestClient (); virtual void init_service (); virtual void process_events (); std::string get_build_dir () const { return m_build_dir; } std::string get_data_port () const { return m_data_port; } std::string get_ctrl_port () const { return m_ctrl_port; } std::string get_input_file () const { return m_input_file; } DataWriter* get_data_writer () { return &m_data_writer; } void abort_test () { stop_service (); } private: std::string m_build_dir; std::string m_data_port; std::string m_ctrl_port; std::string m_input_file; Connector m_connector; Acceptor* m_acceptor; DataWriter m_data_writer; }; /* Useful definitions */ #define SBTESTCLIENT SBTestClient::get_instance() #define REACTOR SBTESTCLIENT->get_reactor() #define DATAWRITER SBTESTCLIENT->get_data_writer() // Static declarations mandated by Singleton class ASSA_DECL_SINGLETON(SBTestClient); //------------------------------------------------------------------------------ // FlowCtrl function members //------------------------------------------------------------------------------ FlowCtrl:: FlowCtrl (IPv4Socket* stream_) : ServiceHandler (stream_), m_input (32, "\n"), m_ready_msg ("ready"), // incoming msg m_drained_msg ("drained"), // incoming msg m_blocked_msg ("blocked\n") // outgoing msg { trace ("FlowCtrl::FlowCtrl"); } int FlowCtrl:: open () { trace ("FlowCtrl::open"); ASSA::IPv4Socket& s = *this; REACTOR->registerIOHandler (this, s.getHandler (), ASSA::READ_EVENT); DATAWRITER->register_flow_ctrl (this); return 0; } /** * The messages we expect from the server are: * + 'ready' * + 'drained' * * We treat them the same */ int FlowCtrl:: handle_read (int fd_) { trace ("FlowCtrl::handle_read"); ASSA::IPv4Socket& s = *this; if (s.getHandler () != fd_) { return (-1); } s >> m_input; if (m_input) { if (m_input.c_str () == m_ready_msg) { DL ((SBTC, "=> got 'ready' signal from server.\n")); } else if (m_input.c_str () == m_drained_msg) { DL ((SBTC, "=> got 'drained' signal from server.\n")); } else { DL ((SBTC, "=> got UNEXPECTED signal (\"%s\") from server.\n", m_input.c_str ())); DL ((SBTC, "Must be the end of stream?\n")); return -1; } if (DATAWRITER->fill_pipe () < 0) { return -1; } m_input.reset (); emmit_blocked_signal (); } return BYTES_LEFT_IN_SOCKBUF(s); } int FlowCtrl:: handle_close (int fd_) { trace ("FlowCtrl::handle_close"); get_stream ().close (); SBTESTCLIENT->abort_test (); return 0; } void FlowCtrl:: emmit_blocked_signal () { trace ("FlowCtrl::emmit_blocked_signal"); get_stream ().write (m_blocked_msg.c_str (), m_blocked_msg.length ()); get_stream () << ASSA::flush; } //------------------------------------------------------------------------------ // DataWriter function members //------------------------------------------------------------------------------ DataWriter:: DataWriter () : m_total_xmit (0) { trace ("DataWriter::DataWriter"); } DataWriter:: ~DataWriter () { trace ("DataWriter::~DataWriter"); if (m_source) { m_source.close (); } if (m_tee_sink) { m_tee_sink << std::flush; m_tee_sink.close (); } DL ((SBTC,"Transmitted grand total %d bytes\n", m_total_xmit)); } int DataWriter:: open () { trace ("DataWriter::open"); DL ((SBTC,"=> Connection opened with the server.\n")); std::string dup_output_stream ("sbtest-data.tee"); m_source.open (SBTESTCLIENT->get_input_file ().c_str ()); // ::unlink (dup_output_stream.c_str ()); // m_tee_sink.open (dup_output_stream.c_str ()); if (!m_source) { EL ((SBTC, "Failed to open input data file.\n")); SBTESTCLIENT->abort_test (); } // if (!m_tee_sink) { // EL ((SBTC, "Failed to open output data file.\n")); // SBTESTCLIENT->abort_test (); // } return 0; } int DataWriter:: fill_pipe () { trace ("DataWriter::fill_pipe"); XmitByteCounter total; int ret = 0; while (m_len < FMAXSIZE && m_source.get (m_buffer [m_len])) { m_len++; } if (m_len == 0) { DL ((SBTC,"Reached the end of input file\n")); get_stream ().close (); return -1; } MemDump::dump_to_log (SBDATA, "<= data message to server", m_buffer, m_len); repeat: ret = get_stream ().write (m_buffer, m_len); if (ret < 0) { DL ((SBTC,"Server-side closed the pipe!\n")); return -1; } // m_tee_sink.write (m_buffer, ret); // Save what we intended to transmit m_total_xmit += ret; total.add (ret); DL ((SBTC,"<= wrote %d bytes (out of %d intended)\n", ret, m_len)); if (ret < m_len) { memmove (m_buffer, m_buffer + ret, m_len - ret); m_len -= ret; m_flow_ctrl->emmit_blocked_signal (); DL ((SBTC,"Leftover %d bytes\n", m_len)); return 0; } m_len = 0; while (m_len < FMAXSIZE && m_source.get (m_buffer [m_len])) { m_len++; } if (m_len == 0) { DL ((SBTC,"Reached the end of input data file stream.\n")); get_stream ().close (); return -1; } MemDump::dump_to_log (SBDATA, "<= data message to server", m_buffer, m_len); goto repeat; return 0; } //------------------------------------------------------------------------------ // SBTestClient function members //------------------------------------------------------------------------------ SBTestClient:: SBTestClient () : m_acceptor (NULL), m_input_file ("sbtest-data.in") { add_opt (0, "build-dir", &m_build_dir); add_opt (0, "data-port", &m_data_port); add_opt (0, "ctrl-port", &m_ctrl_port); add_opt (0, "input-file", &m_input_file); // ---Configuration--- rm_opt ('f', "config-file" ); rm_opt ('n', "instance" ); /*--- * By defauil disable all debugging *---*/ m_mask = SBTC | ASSA::ASSAERR | ASSA::TRACE; m_log_file = "sbtest-client.log"; } void SBTestClient:: init_service () { trace("SBTestClient::init_service"); if (m_build_dir.length () == 0) { DL ((SBTC,"Missing {--build-dir=PATH} option!\n")); abort_test (); return; } if (m_data_port.length () == 0) { DL ((SBTC,"Missing {--data-port=STRING} option!\n")); abort_test (); return; } if (m_ctrl_port.length () == 0) { DL ((SBTC,"Missing {--ctrl-port=STRING} option!\n")); abort_test (); return; } if (m_input_file.length () == 0) { DL ((SBTC,"Missing {--input-file=STRING} option!\n")); abort_test (); return; } /** Initialize Acceptor on FlowCtrl socket. */ m_acceptor = new Acceptor (REACTOR); /** Open listening socket */ INETAddress listen_addr (get_ctrl_port ().c_str ()); Assure_exit (!listen_addr.bad ()); Assure_exit (m_acceptor->open (listen_addr) == 0); DL ((SBTC,"Listening socket opened on %s:%d\n", listen_addr.getHostName ().c_str (), listen_addr.getPort ())); /** Open connection with the server */ INETAddress server_addr (get_data_port ().c_str ()); Assure_exit (!server_addr.bad ()); m_connector.open (); int ret = m_connector.connect (&m_data_writer, server_addr); if (ret < 0) { DL ((SBTC,"Failed to connect to 'sb_tests' (%s:%d)\n", server_addr.getHostName ().c_str (), server_addr.getPort ())); abort_test (); } else { DL ((SBTC,"Connected to DATA port of 'sb_tests'\n")); } DL((SBTC,"Service has been initialized\n")); } void SBTestClient:: process_events () { trace("SBTestClient::process_events"); while (service_is_active ()) { m_reactor.waitForEvents (); } // Shut the service down m_reactor.stopReactor (); DL((SBTC,"Service stopped!\n")); } int main (int argc, char* argv[]) { static const char release[] = "VERSION"; int patch_level = 0; SBTESTCLIENT->set_version (release, patch_level); SBTESTCLIENT->set_author ("Vladislav Grinchenko"); SBTESTCLIENT->set_flags (ASSA::GenServer::RMLOG); SBTESTCLIENT->init (&argc, argv, help_msg); SBTESTCLIENT->init_service (); SBTESTCLIENT->process_events (); return SBTESTCLIENT->get_exit_value (); }