// -*- c-basic-offset: 4; tab-width: 8; indent-tabs-mode: t -*-
// vim:set sts=4 ts=8:
// Copyright (c) 2001-2007 International Computer Science Institute
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software")
// to deal in the Software without restriction, subject to the conditions
// listed in the XORP LICENSE file. These conditions include: you must
// preserve this copyright notice, and you cannot mention the copyright
// holders in advertising related to the Software without their permission.
// The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
// notice is a summary of the XORP LICENSE file; the license in that file is
// legally binding.
#ident "$XORP: xorp/libxorp/asyncio.cc,v 1.31 2007/02/16 22:46:15 pavlin Exp $"
#include "libxorp_module.h"
#include "libxorp/xorp.h"
#include "libxorp/debug.h"
#include "libxorp/xlog.h"
#include "libxorp/eventloop.hh"
#include <signal.h>
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#include "xorpfd.hh"
#include "asyncio.hh"
#ifdef HOST_OS_WINDOWS
# define EDGE_TRIGGERED_READ_LATENCY // IOT_READ may be delayed.
# define EDGE_TRIGGERED_WRITES // IOT_WRITE is edge triggered.
# include "win_dispatcher.hh"
# include "win_io.h"
#endif
// ----------------------------------------------------------------------------
// Utility
bool
is_pseudo_error(const char* name, XorpFd fd, int error_num)
{
switch (error_num) {
#ifdef HOST_OS_WINDOWS
case ERROR_IO_PENDING:
XLOG_WARNING("%s (fd = %p) got ERROR_IO_PENDING, continuing.", name,
(void *)fd);
return true;
case WSAEINTR:
XLOG_WARNING("%s (fd = %p) got WSAEINTR, continuing.", name,
(void *)fd);
return true;
case WSAEWOULDBLOCK:
XLOG_WARNING("%s (fd = %p) got WSAEWOULDBLOCK, continuing.", name,
(void *)fd);
return true;
case WSAEINPROGRESS:
XLOG_WARNING("%s (fd = %p) got WSAEINPROGRESS, continuing.", name,
(void *)fd);
return true;
#else // ! HOST_OS_WINDOWS
case EINTR:
XLOG_WARNING("%s (fd = %d) got EINTR, continuing.", name,
XORP_INT_CAST(fd));
return true;
case EWOULDBLOCK:
XLOG_WARNING("%s (fd = %d) got EWOULDBLOCK, continuing.", name,
XORP_INT_CAST(fd));
return true;
#endif // ! HOST_OS_WINDOWS
}
return false;
}
// ----------------------------------------------------------------------------
AsyncFileOperator::~AsyncFileOperator()
{
}
// ----------------------------------------------------------------------------
// AsyncFileReader read method and entry hook
void
AsyncFileReader::add_buffer(uint8_t* b, size_t b_bytes, const Callback& cb)
{
assert(b_bytes != 0);
_buffers.push_back(BufferInfo(b, b_bytes, cb));
}
void
AsyncFileReader::add_buffer_with_offset(uint8_t* b,
size_t b_bytes,
size_t off,
const Callback& cb)
{
assert(off < b_bytes);
_buffers.push_back(BufferInfo(b, b_bytes, off, cb));
}
#ifdef HOST_OS_WINDOWS
void
AsyncFileReader::disconnect(XorpFd fd, IoEventType type)
{
assert(type == IOT_DISCONNECT);
assert(fd == _fd);
assert(fd.is_valid());
assert(fd.is_socket());
debug_msg("IOT_DISCONNECT close detected (reader side)\n");
BufferInfo& head = _buffers.front();
head.dispatch_callback(END_OF_FILE);
}
#endif // HOST_OS_WINDOWS
void
AsyncFileReader::read(XorpFd fd, IoEventType type)
{
#ifdef EDGE_TRIGGERED_READ_LATENCY
if (_running == false)
return;
#endif
assert(type == IOT_READ);
assert(fd == _fd);
assert(_buffers.empty() == false);
#if 0
// XXX: comm_sock_is_connected() is cross-reference to libcomm
if (_fd.is_socket() && (XORP_OK != comm_sock_is_connected(_fd))) {
debug_msg("Warning: socket %p may have been closed\n", (HANDLE)_fd);
}
#endif
debug_msg("Buffer count %u\n", XORP_UINT_CAST(_buffers.size()));
BufferInfo& head = _buffers.front();
ssize_t done = 0;
#ifdef HOST_OS_WINDOWS
BOOL result = FALSE;
switch (fd.type()) {
case XorpFd::FDTYPE_SOCKET:
done = recv((SOCKET)_fd, (char *)(head._buffer + head._offset),
head._buffer_bytes - head._offset, 0);
if (done == SOCKET_ERROR) {
_last_error = WSAGetLastError();
done = -1;
} else if (done == 0) {
// Graceful close; make sure complete_transfer() gets this.
debug_msg("graceful close detected\n");
_last_error = WSAENOTCONN;
done = -1;
}
break;
case XorpFd::FDTYPE_PIPE:
// XXX: Return values need review.
done = win_pipe_read(_fd, head._buffer + head._offset,
head._buffer_bytes - head._offset);
_last_error = GetLastError();
break;
case XorpFd::FDTYPE_CONSOLE:
// XXX: Return values need review.
done = win_con_read(_fd, head._buffer + head._offset,
head._buffer_bytes - head._offset);
_last_error = GetLastError();
break;
case XorpFd::FDTYPE_FILE:
result = ReadFile(_fd, (LPVOID)(head._buffer + head._offset),
(DWORD)(head._buffer_bytes - head._offset),
(LPDWORD)&done, NULL);
if (result == FALSE) {
_last_error = GetLastError();
SetLastError(ERROR_SUCCESS);
}
break;
default:
XLOG_FATAL("Invalid descriptor type.");
break;
}
#else // ! HOST_OS_WINDOWS
errno = 0;
_last_error = 0;
done = ::read(_fd, head._buffer + head._offset,
head._buffer_bytes - head._offset);
if (done < 0)
_last_error = errno;
errno = 0;
#endif // ! HOST_OS_WINDOWS
debug_msg("Read %d bytes\n", XORP_INT_CAST(done));
if (done < 0 && is_pseudo_error("AsyncFileReader", _fd, _last_error)) {
return;
}
complete_transfer(_last_error, done);
#ifdef EDGE_TRIGGERED_READ_LATENCY
//
// If there's still data which we can read without blocking,
// and we didn't fill our buffers, then try to read again
// without waiting for an IOT_READ dispatch, as it may not come,
// or be delayed due to latency between the primary thread
// and the Winsock thread.
//
if (_fd.is_socket()) {
u_long remaining = 0;
int result = ioctlsocket(_fd, FIONREAD, &remaining);
if (result != SOCKET_ERROR && remaining > 0) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileReader::read, _fd, IOT_READ));
XLOG_ASSERT(_deferred_io_task.scheduled());
}
}
#endif // EDGE_TRIGGERED_READ_LATENCY
}
// transfer_complete() invokes callbacks if necessary and updates buffer
// variables and buffer list.
void
AsyncFileReader::complete_transfer(int err, ssize_t done)
{
// XXX careful after callback is invoked: "this" maybe deleted, so do
// not reference any object state after callback.
if (done > 0) {
BufferInfo& head = _buffers.front();
head._offset += done;
if (head._offset == head._buffer_bytes) {
BufferInfo copy = head; // copy head
_buffers.erase(_buffers.begin()); // remove head
if (_buffers.empty()) {
stop();
}
copy.dispatch_callback(DATA);
} else {
head.dispatch_callback(DATA);
}
return;
}
BufferInfo& head = _buffers.front();
if (err != 0 || done < 0) {
stop();
head.dispatch_callback(OS_ERROR);
} else {
head.dispatch_callback(END_OF_FILE);
}
}
bool
AsyncFileReader::start()
{
if (_running)
return true;
if (_buffers.empty() == true) {
XLOG_WARNING("Could not start reader - no buffers available");
return false;
}
#if 0
// XXX: comm_sock_is_connected() is cross-reference to libcomm
if (_fd.is_socket() && (XORP_OK != comm_sock_is_connected(_fd))) {
debug_msg("Warning: socket %p may have been closed\n", (HANDLE)_fd);
}
#endif // 0
EventLoop& e = _eventloop;
if (e.add_ioevent_cb(_fd, IOT_READ,
callback(this, &AsyncFileReader::read),
_priority) == false) {
XLOG_ERROR("AsyncFileReader: Failed to add ioevent callback.");
return false;
}
#ifdef HOST_OS_WINDOWS
// Windows notifies us of disconnections using a separate flag.
// The file descriptor may no longer be valid when we stop, so
// mark the IOT_DISCONNECT callback as being added using a boolean.
_disconnect_added = false;
if (_fd.is_socket()) {
_disconnect_added = e.add_ioevent_cb(
_fd,
IOT_DISCONNECT,
callback(this, &AsyncFileReader::disconnect),
_priority);
if (_disconnect_added == false) {
XLOG_ERROR("AsyncFileReader: Failed to add ioevent callback.");
_eventloop.remove_ioevent_cb(_fd, IOT_READ);
return false;
}
}
#endif // HOST_OS_WINDOWS
debug_msg("%p start\n", this);
_running = true;
return _running;
}
void
AsyncFileReader::stop()
{
debug_msg("%p stop\n", this);
_eventloop.remove_ioevent_cb(_fd, IOT_READ);
#ifdef EDGE_TRIGGERED_WRITES
_deferred_io_task.unschedule();
#endif
#ifdef HOST_OS_WINDOWS
if (_disconnect_added == true) {
_eventloop.remove_ioevent_cb(_fd, IOT_DISCONNECT);
_disconnect_added = false;
}
#endif
_running = false;
}
void
AsyncFileReader::flush_buffers()
{
stop();
while (_buffers.empty() == false) {
// Copy out buffer so flush buffers can be called re-entrantly (even
// if we happen to think this is bad coding style :-).
BufferInfo head = _buffers.front();
_buffers.erase(_buffers.begin());
head.dispatch_callback(FLUSHING);
}
}
// ----------------------------------------------------------------------------
// AsyncFileWriter write method and entry hook
#ifndef MAX_IOVEC
#define MAX_IOVEC 16
#endif
AsyncFileWriter::AsyncFileWriter(EventLoop& e, XorpFd fd, uint32_t coalesce,
int priority)
: AsyncFileOperator(e, fd, priority)
{
static const uint32_t max_coalesce = 16;
_coalesce = (coalesce > MAX_IOVEC) ? MAX_IOVEC : coalesce;
if (_coalesce > max_coalesce) {
_coalesce = max_coalesce;
}
_iov = new iovec[_coalesce];
_dtoken = new int;
}
AsyncFileWriter::~AsyncFileWriter()
{
stop();
delete[] _iov;
}
void
AsyncFileWriter::add_buffer(const uint8_t* b,
size_t b_bytes,
const Callback& cb)
{
assert(b_bytes != 0);
_buffers.push_back(BufferInfo(b, b_bytes, cb));
#ifdef EDGE_TRIGGERED_WRITES
if (_running == true) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
XLOG_ASSERT(_deferred_io_task.scheduled());
}
#endif // EDGE_TRIGGERED_WRITES
}
void
AsyncFileWriter::add_buffer_with_offset(const uint8_t* b,
size_t b_bytes,
size_t off,
const Callback& cb)
{
assert(off < b_bytes);
_buffers.push_back(BufferInfo(b, b_bytes, off, cb));
#ifdef EDGE_TRIGGERED_WRITES
if (_running == true) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
XLOG_ASSERT(_deferred_io_task.scheduled());
}
#endif // EDGE_TRIGGERED_WRITES
}
//
// Different UNIX platforms have different iov.iov_base types which
// we can fix at compile time. The general idea of writev doesn't
// change much across UNIX platforms.
//
template <typename T, typename U>
static void
iov_place(T*& iov_base, U& iov_len, uint8_t* data, size_t data_len)
{
static_assert(sizeof(T*) == sizeof(uint8_t*));
iov_base = reinterpret_cast<T*>(data);
iov_len = data_len;
}
#ifdef HOST_OS_WINDOWS
void
AsyncFileWriter::disconnect(XorpFd fd, IoEventType type)
{
assert(type == IOT_DISCONNECT);
assert(fd == _fd);
assert(fd.is_valid());
assert(fd.is_socket());
debug_msg("IOT_DISCONNECT close detected (writer side)\n");
#if 0
BufferInfo& head = _buffers.front();
head.dispatch_callback(END_OF_FILE);
#endif // 0
}
#endif // HOST_OS_WINDOWS
void
AsyncFileWriter::write(XorpFd fd, IoEventType type)
{
#ifdef EDGE_TRIGGERED_WRITES
if (_running == false)
return;
#endif
#if 0
// XXX: comm_sock_is_connected() is cross-reference to libcomm
if (_fd.is_socket() && (XORP_OK != comm_sock_is_connected(_fd))) {
debug_msg("Warning: socket %p may have been closed\n", (HANDLE)_fd);
}
#endif // 0
assert(type == IOT_WRITE);
assert(fd == _fd);
assert(_buffers.empty() == false);
// Coalesce buffers into a group
uint32_t iov_cnt = 0;
size_t total_bytes = 0;
ssize_t done = 0;
list<BufferInfo>::iterator i = _buffers.begin();
while (i != _buffers.end()) {
BufferInfo bi = *i;
uint8_t* u = const_cast<uint8_t*>(bi._buffer + bi._offset);
size_t u_bytes = bi._buffer_bytes - bi._offset;
iov_place(_iov[iov_cnt].iov_base, _iov[iov_cnt].iov_len, u, u_bytes);
total_bytes += u_bytes;
assert(total_bytes != 0);
iov_cnt++;
if (iov_cnt == _coalesce)
break;
++i;
}
#ifdef HOST_OS_WINDOWS
if (fd.is_socket()) {
// Socket handles take non-blocking writes.
// WSASend() approximates writev().
int result = WSASend((SOCKET)_fd, (LPWSABUF)_iov, iov_cnt,
(LPDWORD)&done, 0, NULL, NULL);
_last_error = (result == SOCKET_ERROR) ? WSAGetLastError() : 0;
#if 1
if (_last_error != 0)
debug_msg("writer: winsock error %d\n", _last_error);
#endif // 1
} else {
// Non-socket handles take blocking writes.
// There is no writev() equivalent, so emulate it.
BOOL result;
DWORD done2;
for (uint32_t j = 0; j < iov_cnt; j++) {
result = WriteFile(_fd, (LPVOID)_iov[j].iov_base,
(DWORD)_iov[j].iov_len, (LPDWORD)&done2, NULL);
done += done2;
if (result == FALSE)
break;
}
_last_error = (result == FALSE) ? GetLastError() : 0;
}
#else // ! HOST_OS_WINDOWS
sig_t saved_sigpipe = signal(SIGPIPE, SIG_IGN);
errno = 0;
_last_error = 0;
done = ::writev(_fd, _iov, (int)iov_cnt);
if (done < 0)
_last_error = errno;
errno = 0;
signal(SIGPIPE, saved_sigpipe);
#endif // ! HOST_OS_WINDOWS
debug_msg("Wrote %d of %u bytes\n",
XORP_INT_CAST(done), XORP_UINT_CAST(total_bytes));
if (done < 0 && is_pseudo_error("AsyncFileWriter", _fd, _last_error)) {
XLOG_WARNING("Write error %d\n", _last_error);
return;
}
complete_transfer(done);
#ifdef EDGE_TRIGGERED_WRITES
if (_buffers.empty() == false) {
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
XLOG_ASSERT(_deferred_io_task.scheduled());
}
#endif // EDGE_TRIGGERED_WRITES
}
// transfer_complete() invokes callbacks if necessary and updates buffer
// variables and buffer list.
void
AsyncFileWriter::complete_transfer(ssize_t sdone)
{
if (sdone < 0) {
XLOG_ERROR("Write error %d\n", _last_error);
stop();
BufferInfo& head = _buffers.front();
head.dispatch_callback(OS_ERROR);
return;
}
size_t notified = 0;
size_t done = (size_t)sdone;
//
// This is a trick to detect if the instance of the current object is
// deleted mid-callback. If so the method should not touch any part of
// the instance state afterwards and should just return. Okay, so how
// to tell if the current AsyncFileWriter instance is deleted?
//
// The key observation is that _dtoken is a reference counted object
// associated with the current instance. Another reference is made to it
// here bumping the reference count from 1 to 2. If after invoking
// a callback the instance count is no longer 2 then the AsyncFileWriter
// instance was deleted in the callback.
//
ref_ptr<int> stack_token = _dtoken;
while (notified != done) {
assert(notified <= done);
assert(_buffers.empty() == false);
BufferInfo& head = _buffers.front();
assert(head._buffer_bytes >= head._offset);
size_t bytes_needed = head._buffer_bytes - head._offset;
if (done - notified >= bytes_needed) {
//
// All data in this buffer has been written
//
head._offset += bytes_needed;
assert(head._offset == head._buffer_bytes);
// Copy, then detach head buffer and update state
BufferInfo copy = head;
_buffers.pop_front();
if (_buffers.empty()) {
stop();
}
assert(stack_token.is_only() == false);
copy.dispatch_callback(DATA);
if (stack_token.is_only() == true) {
// "this" instance of AsyncFileWriter was deleted by the
// calback, return immediately.
return;
}
notified += bytes_needed;
continue;
} else {
//
// Not enough data has been written
//
head._offset += (done - notified);
assert(head._offset < head._buffer_bytes);
return;
}
}
}
bool
AsyncFileWriter::start()
{
if (_running)
return true;
if (_buffers.empty() == true) {
XLOG_WARNING("Could not start writer - no buffers available");
return false;
}
#if 0
// XXX: comm_sock_is_connected() is cross-reference to libcomm
if (_fd.is_socket() && (XORP_OK != comm_sock_is_connected(_fd))) {
debug_msg("Warning: socket %p may have been closed\n", (HANDLE)_fd);
}
#endif // 0
EventLoop& e = _eventloop;
if (e.add_ioevent_cb(_fd, IOT_WRITE,
callback(this, &AsyncFileWriter::write),
_priority) == false) {
XLOG_ERROR("AsyncFileWriter: Failed to add I/O event callback.");
return false;
}
#ifdef HOST_OS_WINDOWS
_disconnect_added = false;
// Disable disconnection notification on write end for now.
// Don't bitch if we can't add the notification.
// it's a no-op to see if we pick them up on the write side...
// XXX: If we fail to add a notification it's possible the socket
// might already have been closed?
#if 0
if (_fd.is_socket()) {
_disconnect_added = e.add_ioevent_cb(
_fd,
IOT_DISCONNECT,
callback(this, &AsyncFileWriter::disconnect),
_priority);
#if 0
if (_disconnect_added == false) {
XLOG_ERROR("AsyncFileWriter: Failed to add I/O event callback.");
_eventloop.remove_ioevent_cb(_fd, IOT_WRITE);
return false;
}
#endif // 0
}
#endif // 0
#endif // HOST_OS_WINDOWS
#ifdef EDGE_TRIGGERED_WRITES
_deferred_io_task = _eventloop.new_oneoff_task(
callback(this, &AsyncFileWriter::write, _fd, IOT_WRITE));
#endif // EDGE_TRIGGERED_WRITES
_running = true;
debug_msg("%p start\n", this);
return _running;
}
void
AsyncFileWriter::stop()
{
debug_msg("%p stop\n", this);
#ifdef EDGE_TRIGGERED_WRITES
_deferred_io_task.unschedule();
#endif // EDGE_TRIGGERED_WRITES
_eventloop.remove_ioevent_cb(_fd, IOT_WRITE);
#ifdef HOST_OS_WINDOWS
if (_disconnect_added == true) {
_eventloop.remove_ioevent_cb(_fd, IOT_DISCONNECT);
_disconnect_added = false;
}
#endif // HOST_OS_WINDOWS
_running = false;
}
void
AsyncFileWriter::flush_buffers()
{
stop();
while (_buffers.empty() == false) {
// Copy out buffer so flush buffers can be called re-entrantly (even
// if we happen to think this is bad coding style :-).
BufferInfo head = _buffers.front();
_buffers.erase(_buffers.begin());
head.dispatch_callback(FLUSHING);
}
}
syntax highlighted by Code2HTML, v. 0.9.1