// -*- c++ -*- //------------------------------------------------------------------------------ // Reactor.cpp //------------------------------------------------------------------------------ // Copyright (C) 1997-2002,2005,2006 Vladislav Grinchenko // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Library General Public // License as published by the Free Software Foundation; either // version 2 of the License, or (at your option) any later version. //----------------------------------------------------------------------------- // Created: 05/25/1999 //----------------------------------------------------------------------------- #include #include #include #include "assa/Reactor.h" #include "assa/Logger.h" using namespace ASSA; Reactor:: Reactor () : m_fd_setsize (1024), m_maxfd_plus1 (0), m_active (true) { trace_with_mask("Reactor::Reactor",REACTTRACE); /** Maximum number of sockets supported (per process) * Win32 defines it to 64 in winsock2.h. */ #if defined(WIN32) m_fd_setsize = FD_SETSIZE; #else // POSIX struct rlimit rlim; rlim.rlim_max = 0; if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) { m_fd_setsize = rlim.rlim_cur; } #endif /** Initialize winsock2 library */ #if defined (WIN32) WSADATA data; WSAStartup (MAKEWORD (2, 2), &data); #endif } Reactor:: ~Reactor() { trace_with_mask("Reactor::~Reactor",REACTTRACE); m_readSet.clear (); m_writeSet.clear (); m_exceptSet.clear (); deactivate (); } TimerId Reactor:: registerTimerHandler (EventHandler* eh_, const TimeVal& timeout_, const std::string& name_) { trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE); Assure_return (eh_); TimeVal now (TimeVal::gettimeofday()); TimeVal t (now + timeout_); DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n", timeout_.sec(),timeout_.msec())); DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() )); DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() )); TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_); DL((REACT,"---Modified Timer Queue----\n")); m_tqueue.dump(); DL((REACT,"---------------------------\n")); return (tid); } bool Reactor:: registerIOHandler (EventHandler* eh_, handler_t fd_, EventType et_) { trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE); std::ostringstream msg; Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_)); if (isReadEvent (et_)) { if (!m_waitSet.m_rset.setFd (fd_)) { DL((ASSAERR,"readset: fd %d out of range\n", fd_)); return (false); } m_readSet[fd_] = eh_; msg << "READ_EVENT"; } if (isWriteEvent (et_)) { if (!m_waitSet.m_wset.setFd (fd_)) { DL((ASSAERR,"writeset: fd %d out of range\n", fd_)); return (false); } m_writeSet[fd_] = eh_; msg << " WRITE_EVENT"; } if (isExceptEvent (et_)) { if (!m_waitSet.m_eset.setFd (fd_)) { DL((ASSAERR,"exceptset: fd %d out of range\n", fd_)); return (false); } m_exceptSet[fd_] = eh_; msg << " EXCEPT_EVENT"; } msg << std::ends; DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () )); #if !defined (WIN32) if (m_maxfd_plus1 < fd_+1) { m_maxfd_plus1 = fd_+1; DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1)); } #endif DL((REACT,"Modified waitSet:\n")); m_waitSet.dump (); return (true); } bool Reactor:: removeTimerHandler (TimerId tid_) { trace_with_mask("Reactor::removeTimer",REACTTRACE); bool ret; if ((ret = m_tqueue.remove (tid_))) { DL((REACT,"---Modified Timer Queue----\n")); m_tqueue.dump(); DL((REACT,"---------------------------\n")); } else { EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ )); } return (ret); } /** * Remove handler from all events that matches event_. */ bool Reactor:: removeHandler (EventHandler* eh_, EventType event_) { trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE); bool ret = false; handler_t fd; handler_t rfdmax; handler_t wfdmax; handler_t efdmax; Fd2Eh_Map_Iter iter; rfdmax = wfdmax = efdmax = 0; if (eh_ == NULL) { return false; } if (isTimeoutEvent (event_)) { ret = m_tqueue.remove (eh_); ret = true; } if (isReadEvent (event_)) { iter = m_readSet.begin (); while (iter != m_readSet.end ()) { if ((*iter).second == eh_) { fd = (*iter).first; m_readSet.erase (iter); m_waitSet.m_rset.clear (fd); ret = true; break; } rfdmax = fd; iter++; } } if (isWriteEvent (event_)) { iter = m_writeSet.begin (); while (iter != m_writeSet.end ()) { if ((*iter).second == eh_) { fd = (*iter).first; m_writeSet.erase (iter); m_waitSet.m_wset.clear (fd); ret = true; break; } wfdmax = fd; iter++; } } if (isExceptEvent (event_)) { iter = m_exceptSet.begin (); while (iter != m_exceptSet.end ()) { if ((*iter).second == eh_) { fd = (*iter).first; m_exceptSet.erase (iter); m_waitSet.m_eset.clear (fd); ret = true; break; } efdmax = fd; iter++; } } if (ret == true) { DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_)); eh_->handle_close (fd); } adjust_maxfdp1 (fd, rfdmax, wfdmax, efdmax); DL((REACT,"Modifies waitSet:\n")); m_waitSet.dump (); return (ret); } bool Reactor:: removeIOHandler (handler_t fd_) { trace_with_mask("Reactor::removeIOHandler",REACTTRACE); bool ret = false; EventHandler* ehp = NULL; Fd2Eh_Map_Iter iter; handler_t rfdmax; handler_t wfdmax; handler_t efdmax; rfdmax = wfdmax = efdmax = 0; Assure_return (ASSA::is_valid_handler (fd_)); DL((REACT,"Removing handler for fd=%d\n",fd_)); /** We clear m_readySet mask here as well, because * if we don't, it will be erroneously used by isAnyReady() * before select(). */ if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) { ehp = (*iter).second; m_readSet.erase (iter); m_waitSet.m_rset.clear (fd_); m_readySet.m_rset.clear (fd_); if (m_readSet.size () > 0) { iter = m_readSet.end (); iter--; rfdmax = (*iter).first; } ret = true; } if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) { ehp = (*iter).second; m_writeSet.erase (iter); m_waitSet.m_wset.clear (fd_); m_readySet.m_wset.clear (fd_); if (m_writeSet.size () > 0) { iter = m_writeSet.end (); iter--; wfdmax = (*iter).first; } ret = true; } if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) { ehp = (*iter).second; m_exceptSet.erase (iter); m_waitSet.m_eset.clear (fd_); m_readySet.m_eset.clear (fd_); if (m_exceptSet.size () > 0) { iter = m_exceptSet.end (); iter--; efdmax = (*iter).first; } ret = true; } if (ret == true && ehp != NULL) { DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp)); ehp->handle_close (fd_); } adjust_maxfdp1 (fd_, rfdmax, wfdmax, efdmax); DL((REACT,"Modifies waitSet:\n")); m_waitSet.dump (); return (ret); } bool Reactor:: checkFDs (void) { trace_with_mask("Reactor::checkFDs",REACTTRACE); bool num_removed = false; FdSet mask; timeval poll = { 0, 0 }; for (handler_t fd = 0; fd < m_fd_setsize; fd++) { if ( m_readSet[fd] != NULL ) { mask.setFd (fd); if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) { removeIOHandler (fd); num_removed = true; DL((REACT,"Detected BAD FD: %d\n", fd )); } mask.clear (fd); } } return (num_removed); } bool Reactor:: handleError (void) { trace_with_mask("Reactor::handleError",REACTTRACE); /** If commanded to stop, do so */ if ( !m_active ) { DL((REACT,"Received cmd to stop Reactor\n")); return (false); } /*--- TODO: If select(2) returns before time expires, with a descriptor ready or with EINTR, timeval is not going to be updated with number of seconds remaining. This is true for all systems except Linux, which will do so. Therefore, to restart correctly in case of EINTR, we ought to take time measurement before and after select, and try to select() for remaining time. For now, we restart with the initial timing value. ---*/ /*--- BSD kernel never restarts select(2). SVR4 will restart if the SA_RESTART flag is specified when the signal handler for the signal delivered is installed. This means taht for portability, we must handle signal interrupts. ---*/ if ( errno == EINTR ) { EL((REACT,"EINTR: interrupted select(2)\n")); /* If I was sitting in select(2) and received SIGTERM, the signal handler would have set m_active to 'false', and this function would have returned 'false' as above. For any other non-critical signals (USR1,...), we retry select. */ return (true); } /* EBADF - bad file number. One of the file descriptors does not reference an open file to open(), close(), ioctl(). This can happen if user closed fd and forgot to remove handler from Reactor. */ if ( errno == EBADF ) { DL((REACT,"EBADF: bad file descriptor\n")); return (checkFDs ()); } /* Any other error from select */ #if defined (WIN32) DL ((REACT,"select(3) error = %d\n", WSAGetLastError())); #else EL((ASSAERR,"select(3) error\n")); #endif return (false); } int Reactor:: isAnyReady (void) { trace_with_mask("Reactor::isAnyReady",REACTTRACE); int n = m_readySet.m_rset.numSet () + m_readySet.m_wset.numSet () + m_readySet.m_eset.numSet (); if ( n > 0 ) { DL((REACT,"m_readySet: %d FDs are ready for processing\n", n)); m_readySet.dump (); } return (n); } void Reactor:: calculateTimeout (TimeVal*& howlong_, TimeVal* maxwait_) { trace_with_mask("Reactor::calculateTimeout",REACTTRACE); TimeVal now; TimeVal tv; if (m_tqueue.isEmpty () ) { howlong_ = maxwait_; goto done; } now = TimeVal::gettimeofday (); tv = m_tqueue.top (); if (tv < now) { /*--- It took too long to get here (fraction of a millisecond), and top timer had already expired. In this case, perform non-blocking select in order to drain the timer queue. ---*/ *howlong_ = 0; } else { DL((REACT,"--------- Timer Queue ----------\n")); m_tqueue.dump(); DL((REACT,"--------------------------------\n")); if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) { *howlong_ = tv - now; } else { *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now; } } done: if (howlong_ != NULL) { DL((REACT,"delay (%f)\n", double (*howlong_) )); } else { DL((REACT,"delay (forever)\n")); } } /** * Block forever version */ void Reactor:: waitForEvents (void) { while ( m_active ) { waitForEvents ((TimeVal*) NULL); } } /** ===================================================================== | select() | errno | Events | Behavior | |===================================================================| | < 0 | EINTR | Interrup by signal | Retry | +----------+-------+---------------------+--------------------------+ | < 0 | EBADF | Bad file descriptor | Remove bad fds and retry | | | | | and retry | +----------+-------+---------------------+--------------------------+ | < 0 | others| Some other error | Fall through | +----------+-------+---------------------+--------------------------+ | == 0 | 0 | Timed out | Fall through | +----------+-------+---------------------+--------------------------+ | > 0 | 0 | Got some work to do | Fall through | +-------------------------------------------------------------------+ */ void Reactor:: waitForEvents (TimeVal* tv_) { trace_with_mask("Reactor::waitForEvents",REACTTRACE); TimerCountdown traceTime (tv_); DL((REACT,"======================================\n")); /*--- Expire all stale Timers ---*/ m_tqueue.expire (TimeVal::gettimeofday ()); /* Test to see if Reactor has been deactivated as a result * of processing done by any TimerHandlers. */ if (!m_active) { return; } int nReady; TimeVal delay; TimeVal* dlp = &delay; /*--- In case if not all data have been processed by the EventHandler, and EventHandler stated so in its callback's return value to dispatcher (), it will be called again. This way underlying file/socket stream can efficiently utilize its buffering mechaninsm. ---*/ if ((nReady = isAnyReady ())) { DL((REACT,"isAnyReady returned: %d\n",nReady)); dispatch (nReady); return; } DL((REACT,"=== m_waitSet ===\n")); m_waitSet.dump (); do { m_readySet.reset (); DL ((REACT,"m_readySet after reset():\n")); m_readySet.dump (); m_readySet = m_waitSet; DL ((REACT,"m_readySet after assign:\n")); m_readySet.dump (); calculateTimeout (dlp, tv_); nReady = ::select (m_maxfd_plus1, &m_readySet.m_rset, &m_readySet.m_wset, &m_readySet.m_eset, dlp); DL((REACT,"::select() returned: %d\n",nReady)); m_readySet.sync (); DL ((REACT,"m_readySet after select:\n")); m_readySet.dump (); } while (nReady < 0 && handleError ()); dispatch (nReady); } /** * This spot needs re-thinking. When you have several high data-rate * connections sending data at the same time, the one that had * connected first would get lower FD number and would get data * transfer preference over everybody else who has connected later on. */ void Reactor:: dispatchHandler (FdSet& mask_, Fd2Eh_Map_Type& fdSet_, EH_IO_Callback callback_) { trace_with_mask("Reactor::dispatchHandler",REACTTRACE); int ret = 0; handler_t fd; EventHandler* ehp = NULL; std::string eh_id; Fd2Eh_Map_Iter iter = fdSet_.begin (); while (iter != fdSet_.end ()) { fd = (*iter).first; ehp = (*iter).second; if (mask_.isSet (fd) && ehp != NULL) { eh_id = ehp->get_id (); DL((REACT,"Data detected from \"%s\"(fd=%d)\n", eh_id.c_str (), fd)); ret = (ehp->*callback_) (fd); /* Fire up a callback */ if (ret == -1) { removeIOHandler (fd); } else if (ret > 0) { DL((REACT,"%d bytes pending on fd=%d \"%s\"\n", ret, fd, eh_id.c_str ())); //return; <-- would starve other connections } else { DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", eh_id.c_str (), fd)); mask_.clear (fd); } /** WIN32 HACK: * Without having restarted scan from the beginning, * this causes crash due to the fact that firing a callback of * EventHandler might have invalidated the iterator * (happens with Connector's in a sync mode). */ iter = fdSet_.begin (); } else { iter++; } } } /** * Many UNIX systems will count a particular file descriptor in the * ready_ only ONCE, even if it was flagged by ::select(2) in, say, * both read and write masks. */ bool Reactor:: dispatch (int ready_) { trace_with_mask("Reactor::dispatch", REACTTRACE); m_tqueue.expire (TimeVal::gettimeofday ()); if ( ready_ < 0 ) { #if !defined (WIN32) EL((ASSAERR,"::select(3) error\n")); #endif return (false); } if ( ready_ == 0 ) { return (true); } DL((REACT,"Dispatching %d FDs.\n",ready_)); DL((REACT,"m_readySet:\n")); m_readySet.dump (); /*--- Writes first ---*/ dispatchHandler (m_readySet.m_wset, m_writeSet, &EventHandler::handle_write); /*--- Exceptions next ---*/ dispatchHandler (m_readySet.m_eset, m_exceptSet, &EventHandler::handle_except); /*--- Finally, the Reads ---*/ dispatchHandler (m_readySet.m_rset, m_readSet, &EventHandler::handle_read); return (true); } void Reactor:: stopReactor (void) { trace_with_mask("Reactor::stopReactor", REACTTRACE); m_active = false; Fd2Eh_Map_Iter iter; EventHandler* ehp; while (m_readSet.size () > 0) { iter = m_readSet.begin (); ehp = (*iter).second; removeHandler (ehp); } while (m_writeSet.size () > 0) { iter = m_writeSet.begin (); ehp = (*iter).second; removeHandler (ehp); } while (m_exceptSet.size () > 0) { iter = m_exceptSet.begin (); ehp = (*iter).second; removeHandler (ehp); } } /** If the socket descriptor that has just been eliminated * was the maxfd+1, we readjust to the next highest. * Win32 implementation of select() ignores this value altogether. */ void Reactor:: adjust_maxfdp1 (handler_t fd_, handler_t rmax_, handler_t wmax_, handler_t emax_) { #if !defined (WIN32) /* POSIX */ trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE); if (m_maxfd_plus1 == fd_ + 1) { m_maxfd_plus1 = std::max (rmax_, std::max (wmax_, emax_)); DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1)); } #endif }