// -*- c++ -*- //------------------------------------------------------------------------------ // Conn.cpp //------------------------------------------------------------------------------ // $Id: Conn.cpp,v 1.5 2006/07/20 02:30:55 vlg Exp $ //------------------------------------------------------------------------------ // Copyright (c) 2003 by Vladislav Grinchenko // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version // 2 of the License, or (at your option) any later version. //------------------------------------------------------------------------------ // Created: Thu Apr 17 23:22:39 EDT 2003 //------------------------------------------------------------------------------ #include "Conn.h" #include "LogServer.h" #include "MonitorConn.h" static const char sep[]="-------------------------------------------------"; int Conn:: open () { trace_with_mask("Conn::open",LSVRTRACE); REACTOR->registerIOHandler (this, get_stream ().getHandler (), ASSA::READ_EVENT); DL((LSVR,"+--------------------------------+\n")); DL((LSVR,"| Accepted new client connection |\n")); DL((LSVR,"+--------------------------------+\n")); return 0; } int Conn:: handle_close (int /* fd */) { trace_with_mask("Conn::handle_close", LSVRTRACE); DL((LSVR,"+---------------------+\n")); DL((LSVR,"| Client disconnected |\n")); DL((LSVR,"+---------------------+\n")); /** Disconnect from observers */ if (m_observers.size ()) { ASSA::Repository::const_iterator cit; cit = m_observers.begin (); while (cit != m_observers.end ()) { (*cit++)->notify (NULL); } m_observers.clear (); } REPO->erase (this); delete (this); return 0; } int Conn:: handle_read (int fd_) { trace_with_mask ("Conn::handle_read", LSVRTRACE); if (get_stream ().getHandler () != fd_) { return (-1); } if (m_wstate == wait_for_header) { int preamble = 0; m_msg_type = m_msg_size = 0; get_stream () >> preamble >> m_msg_type >> m_msg_size; if (preamble != 1234567890) { DL((LSVRERROR,"Message stream is out of sync - Abort!\n")); return -1; } DL((LSVR,"=> Detected Header\n")); DL((LSVR,"rcvd: Preamble = %d\n", preamble)); DL((LSVR,"rcvd: Type = %d\n", m_msg_type)); DL((LSVR,"rcvd: Size = %d\n", m_msg_size)); switch (m_msg_type) { case SIGN_ON: m_wstate = wait_for_signon; break; case SIGN_OFF: m_wstate = wait_for_signoff; break; case LOG_MSG: m_wstate = wait_for_logmsg; break; } } if (m_wstate == wait_for_signon) { m_maxsize = 0; DL((LSVR,"=> Incoming SIGN_ON message\n")); Assure_exit (m_state == closed); get_stream () >> m_maxsize >> m_app_name >> m_logfname; DL((LSVR,"rcvd: MaxLogSize = %d, AppName = \"%s\"\n", m_maxsize, m_app_name.c_str ())); DL((LSVR,"rcvd: LogFileName = \"%s\"\n", m_logfname.c_str ())); std::string path = LOGSERVER->get_log_dir () + "/" + m_logfname; if (LOGSERVER->recycle ()) { ::unlink (path.c_str ()); } m_sink.open (path.c_str (), std::ios::out | std::ios::app); if (!m_sink) { DL((LSVRERROR,"m_sink.open (\"%s\",...) = -1\n", path.c_str ())); return -1; } m_state = opened; m_wstate = wait_for_header; REPO->push_back (this); } else if (m_wstate == wait_for_signoff) { DL((LSVR,"=> Incoming SIGN_OFF message\n")); Assure_exit (m_state == opened); m_sink << std::flush; m_sink.close (); m_state = closed; return -1; } else if (m_wstate == wait_for_logmsg) { DL((LSVR,"=> Incoming LOG_MSG message\n")); Assure_exit (m_state == opened); std::string msg; if (get_stream () >> msg) { if (msg.length () != 0) { DL((LSVR,"rcvs message:\n%s\n%s%s\n", sep, msg.c_str (), sep)); m_bytecount += msg.length (); if (m_bytecount > m_maxsize) { shift_logfile (); } } else { DL((LSVR,"rcvs EMPTY message!\n")); Assure_exit (false); } m_sink << msg << std::flush; if (m_observers.size ()) { ASSA::Repository::const_iterator cit; cit = m_observers.begin (); while (cit != m_observers.end ()) { (*cit++)->notify (msg.c_str ()); } } } else { DL((LSVRERROR,"Peer dropped connection!\n")); m_sink << std::flush; m_sink.close (); DL((LSVR,"m_bytecount = %d\n", m_bytecount)); DL((LSVR,"m_maxsize = %d\n", m_maxsize)); } m_wstate = wait_for_header; } return BYTES_LEFT_IN_SOCKBUF(get_stream ()); } void Conn:: shift_logfile () { trace_with_mask("Conn::shift_logfile", LSVRTRACE); m_sink << std::flush; m_sink.close (); m_bytecount = 0; std::string oldfile = m_logfname + ".0"; ::unlink (oldfile.c_str ()); ::rename (m_logfname.c_str (), oldfile.c_str ()); m_sink.open (m_logfname.c_str (), std::ios::out | std::ios::app); if (!m_sink) { DL((LSVRERROR,"m_sink.open (\"%s\",...) = -1\n", m_logfname.c_str ())); } }