// -*- c++ -*- //------------------------------------------------------------------------------ // $Id: bufio_tests.cpp,v 1.8 2006/07/26 00:27:32 vlg Exp $ //------------------------------------------------------------------------------ // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version // 2 of the License, or (at your option) any later version. // // Date: December 4, 1999 //------------------------------------------------------------------------------ const char title [] = " \n"\ " NAME \n"\ " \n"\ " bufio_tests - server-side of Socketbuf test functionality \n"\ " \n"\ " SYNOPSIS \n"\ " \n"\ " % bufio_tests -p port_name [-D log] [-b] [-m mask] \n"\ " \n"\ " OPTIONS \n"\ " --port port_name : Listening port name \n"\ " --log-file NAME : Pathname of the log file \n"\ " --mask MASK : Debug mask for the log file. 0x40010001 will \n"\ " mask off everything but application messages and \n"\ " trace, and errors. \n"\ " DESCRIPTION \n"\ " \n"\ " This is server-side of buffered IO test. Refer to the comments at the \n"\ " top of bufio_testc.C (or bufio_testc -h) for details. \n"\ " \n"\ " EXAMPLES \n"\ " \n"\ " % bufio_tests --port assatest --log-file=bufioS.log \n"\ " --mask=0x40010001 \n"\ " \n"; //------------------------------------------------------------------------------ #if !defined (WIN32) #include using namespace std; #include #include "assa/GenServer.h" #include "assa/Singleton.h" #include "assa/INETAddress.h" #include "assa/IPv4Socket.h" #include "assa/Reactor.h" #include "assa/ServiceHandler.h" #include "assa/Acceptor.h" #include "assa/MemDump.h" #include "assa/CommonUtils.h" using namespace ASSA; //------------------------------------------------------------------------------ // Reflector reflects all that comes from client back to server //------------------------------------------------------------------------------ class Reflector : public ServiceHandler { public: Reflector (IPv4Socket* s_); ~Reflector () { trace("Reflector::~Reflector"); } int open (void); int handle_close (int /* fd */); virtual int handle_read (int fd_); virtual int handle_signal (int sig_); private: enum {start, delay4bytes}; int m_state; }; //------------------------------------------------------------------------------ // BufSvr //------------------------------------------------------------------------------ class BufSvr : public GenServer, public Singleton { public: BufSvr () { trace("BufSvr::BufSvr"); } ~BufSvr () { trace("BufSvr::~BufSvr"); } void init_service (); void process_events () { trace("BufSvr::process_events"); while (service_is_active ()) { m_reactor.waitForEvents (); } } private: /* Listening socket. I don't have to remove Acceptor from Reactor or destroy it. It is done anyway by the Reactor::stopReactor() method. */ Acceptor* m_lsock; }; #define BUFSVR BufSvr::get_instance () ASSA_DECL_SINGLETON(BufSvr); void BufSvr::init_service () { trace("BufSvr::init_service"); m_lsock = new Acceptor (&m_reactor); const char* port = get_port ().c_str(); DL((TRACE,"Listening port = \"%s\"\n", port)); INETAddress laddr (port); // listening port Assure_exit (!laddr.bad ()); Assure_exit (m_lsock->open (laddr) == 0); } //------------------------------------------------------------------------------ // Reflector class methods //------------------------------------------------------------------------------ Reflector::Reflector (IPv4Socket* s_) : ServiceHandler (s_), m_state (Reflector::start) { trace("Reflector::Reflector"); } int Reflector::open (void) { trace("Reflector::open"); /* New connection arrived. */ IPv4Socket& s = *this; BufSvr* bs = BufSvr::get_instance (); /* Catch read events. */ DL((TRACE,"-- Registering to process data --\n")); bs->get_reactor()->registerIOHandler (this, s.getHandler (), READ_EVENT); /* Catch SIGUSR1 signal for mode switching. */ DL((TRACE,"-- Registering to catch USR1 signal --\n")); Assure_exit (bs->get_sig_manager ().install (SIGUSR1, this) != -1); return 0; } int Reflector::handle_read (int fd_) { trace("Reflector::handle_read"); DL((TRACE,"***********************************************\n")); IPv4Socket& s = *this; if (s.getHandler () != fd_) return -1; char buf[1416]; // Max data frame with 60-bytes TCP options. int bsz = sizeof(buf); int avail, rval, len; rval = len = avail = 0; /* Bytes in the socket buffer */ avail = s.getBytesAvail (); if (m_state == start) { do { DL((TRACE,"Bytes awaiting to read: %d\n", avail)); len = avail < bsz ? avail : bsz; DL((TRACE,"Reading: %d\n", len)); rval = s.read (buf, len); DL((TRACE,"Socket::read() = %d\n", rval)); if (rval <= 0) goto done; s.write (buf, rval); s << flush; MemDump hd (buf, rval); DL((TRACE,"Frame reflected: %s\n", hd.getMemDump())); avail -= rval; DL((TRACE,"avail: %d\n", avail)); } while (avail > 0); } else if (m_state == delay4bytes) { DL((TRACE,"=== Delay first 4 bytes state ===\n")); /* Doing non-blocking read */ len = s.read (buf, bsz); DL((TRACE,"Socket::read() = %d\n", len)); /* Broken socket or error on read */ if (len <= 0) { EL((ASSA::ASSAERR,"Socket::read() error\n")); goto done; } MemDump m (buf, len); DL((TRACE,"Frame received: %s\n", m.getMemDump())); /* All I'm expecting is 20 bytes of data. Send back first 4 bytes, sleep for a second and send another 10 bytes, sleep more and send the rest. If less then 4 bytes are received, simply reflect them. */ if (len > 4) { s.write (buf, 4); s << flush; DL((TRACE,"Sent first 4 bytes. Sleeping now.\n")); ASSA::Utils::sleep_for_seconds (1); DL((TRACE,"Woken up! Sending 10 bytes.\n")); s.write (buf+4, 10); s << flush; ASSA::Utils::sleep_for_seconds (1); DL((TRACE,"Woken up! Sending the rest.\n")); s.write (buf+14, 6); s << flush; } else { s.write (buf, len); s << flush; DL((TRACE,"Sent %d bytes.\n",len)); } } done: return BYTES_LEFT_IN_SOCKBUF(s); } int Reflector::handle_signal (int sig_) { trace("Reflector::handle_signal"); Assure_exit (sig_ == SIGUSR1); /* Reception of each USR1 signal switches the state to the next one, wrapped around. */ DL((TRACE,"\n\n\t==> Rcvd SIGUSR1 signal\n\n")); IPv4Socket& s = *this; switch (m_state) { case start: m_state = delay4bytes; s.turnOptionOn (Socket::nonblocking); break; case delay4bytes: m_state = start; s.turnOptionOff (Socket::nonblocking); break; default: EL((ASSA::ASSAERR,"Unknown state: %d\n",m_state)); Assure_exit(false); } return 0; } int Reflector::handle_close (int /* fd */) { trace("Reflector::handle_close"); delete (this); return 0; } #endif /* !defined WIN32 */ //------------------------------------------------------------------------------ // Main //------------------------------------------------------------------------------ int main (int argc, char* argv[]) { #if !defined (WIN32) int revision = 0; string release ("1.0"); BUFSVR->set_version (release, revision); BUFSVR->set_author ("Vladislav Grinchenko"); BUFSVR->set_flags (GenServer::RMLOG); BUFSVR->init (&argc, argv, title); BUFSVR->init_service (); BUFSVR->process_events (); #endif /* !defined WIN32 */ return 0; }