From 16d23fdced8577e9ad015fd9283373761b8464ef Mon Sep 17 00:00:00 2001 From: Joerg Bornemann Date: Wed, 1 Jul 2009 11:06:13 +0200 Subject: fast Windows version of QLocalSocket This commit removes the 100 ms polling timer from QLocalSocket and replaces it with proper overlapped IO handling. Reviewed-by: ossi --- src/network/socket/qlocalsocket.h | 1 + src/network/socket/qlocalsocket_p.h | 16 ++- src/network/socket/qlocalsocket_win.cpp | 241 +++++++++++++++++--------------- 3 files changed, 138 insertions(+), 120 deletions(-) diff --git a/src/network/socket/qlocalsocket.h b/src/network/socket/qlocalsocket.h index 417671a..4bff62e 100644 --- a/src/network/socket/qlocalsocket.h +++ b/src/network/socket/qlocalsocket.h @@ -134,6 +134,7 @@ private: Q_PRIVATE_SLOT(d_func(), void _q_notified()) Q_PRIVATE_SLOT(d_func(), void _q_canWrite()) Q_PRIVATE_SLOT(d_func(), void _q_pipeClosed()) + Q_PRIVATE_SLOT(d_func(), void _q_emitReadyRead()) #else Q_PRIVATE_SLOT(d_func(), void _q_stateChanged(QAbstractSocket::SocketState)) Q_PRIVATE_SLOT(d_func(), void _q_error(QAbstractSocket::SocketError)) diff --git a/src/network/socket/qlocalsocket_p.h b/src/network/socket/qlocalsocket_p.h index bdbba42..2dae7d9 100644 --- a/src/network/socket/qlocalsocket_p.h +++ b/src/network/socket/qlocalsocket_p.h @@ -65,6 +65,7 @@ #elif defined(Q_OS_WIN) # include "private/qwindowspipewriter_p.h" # include "private/qringbuffer_p.h" +# include #else # include "private/qnativesocketengine_p.h" # include @@ -135,18 +136,23 @@ public: void _q_notified(); void _q_canWrite(); void _q_pipeClosed(); - qint64 readData(char *data, qint64 maxSize); - qint64 bytesAvailable(); - bool readFromSocket(); + void _q_emitReadyRead(); + DWORD bytesAvailable(); + void startAsyncRead(); + void completeAsyncRead(); + void checkReadyRead(); HANDLE handle; OVERLAPPED overlapped; QWindowsPipeWriter *pipeWriter; qint64 readBufferMaxSize; QRingBuffer readBuffer; - QTimer dataNotifier; + int actualReadBufferSize; + QWinEventNotifier *dataReadNotifier; QLocalSocket::LocalSocketError error; - bool readyReadEmitted; + bool readSequenceStarted; + bool pendingReadyRead; bool pipeClosed; + static const qint64 initialReadBufferSize = 4096; #else QLocalUnixSocket unixSocket; QString generateErrorString(QLocalSocket::LocalSocketError, const QString &function) const; diff --git a/src/network/socket/qlocalsocket_win.cpp b/src/network/socket/qlocalsocket_win.cpp index b1b69fc..1a971f0 100644 --- a/src/network/socket/qlocalsocket_win.cpp +++ b/src/network/socket/qlocalsocket_win.cpp @@ -48,13 +48,13 @@ QT_BEGIN_NAMESPACE -#define NOTIFYTIMEOUT 100 - void QLocalSocketPrivate::init() { Q_Q(QLocalSocket); - QObject::connect(&dataNotifier, SIGNAL(timeout()), q, SLOT(_q_notified())); + memset(&overlapped, 0, sizeof(overlapped)); overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + dataReadNotifier = new QWinEventNotifier(overlapped.hEvent, q); + q->connect(dataReadNotifier, SIGNAL(activated(HANDLE)), q, SLOT(_q_notified())); } void QLocalSocketPrivate::setErrorString(const QString &function) @@ -101,8 +101,10 @@ QLocalSocketPrivate::QLocalSocketPrivate() : QIODevicePrivate(), handle(INVALID_HANDLE_VALUE), pipeWriter(0), readBufferMaxSize(0), + actualReadBufferSize(0), error(QLocalSocket::UnknownSocketError), - readyReadEmitted(false), + readSequenceStarted(false), + pendingReadyRead(false), pipeClosed(false), state(QLocalSocket::UnconnectedState) { @@ -176,82 +178,103 @@ void QLocalSocket::connectToServer(const QString &name, OpenMode openMode) qint64 QLocalSocket::readData(char *data, qint64 maxSize) { Q_D(QLocalSocket); - if (d->readBuffer.isEmpty()) { - if (!d->readFromSocket()) { - if (d->pipeClosed) - return -1; - return 0; - } - } - - if (!d->dataNotifier.isActive() && d->threadData->eventDispatcher) - d->dataNotifier.start(NOTIFYTIMEOUT); - - if (d->readBuffer.isEmpty()) - return qint64(0); - // If readFromSocket() read data, copy it to its destination. - if (maxSize == 1) { + qint64 readSoFar; + // If startAsyncRead() read data, copy it to its destination. + if (maxSize == 1 && d->actualReadBufferSize > 0) { *data = d->readBuffer.getChar(); - return 1; + d->actualReadBufferSize--; + readSoFar = 1; + } else { + qint64 bytesToRead = qMin(qint64(d->actualReadBufferSize), maxSize); + readSoFar = 0; + while (readSoFar < bytesToRead) { + const char *ptr = d->readBuffer.readPointer(); + int bytesToReadFromThisBlock = qMin(bytesToRead - readSoFar, + qint64(d->readBuffer.nextDataBlockSize())); + memcpy(data + readSoFar, ptr, bytesToReadFromThisBlock); + readSoFar += bytesToReadFromThisBlock; + d->readBuffer.free(bytesToReadFromThisBlock); + d->actualReadBufferSize -= bytesToReadFromThisBlock; + } } - qint64 bytesToRead = qMin(qint64(d->readBuffer.size()), maxSize); - qint64 readSoFar = 0; - while (readSoFar < bytesToRead) { - const char *ptr = d->readBuffer.readPointer(); - int bytesToReadFromThisBlock = qMin(int(bytesToRead - readSoFar), - d->readBuffer.nextDataBlockSize()); - memcpy(data + readSoFar, ptr, bytesToReadFromThisBlock); - readSoFar += bytesToReadFromThisBlock; - d->readBuffer.free(bytesToReadFromThisBlock); - } + if (!d->readSequenceStarted) + d->startAsyncRead(); + d->checkReadyRead(); + return readSoFar; } /*! \internal - read from the socket + Schedules or cancels a readyRead() emission depending on actual data availability */ -qint64 QLocalSocketPrivate::readData(char *data, qint64 maxSize) +void QLocalSocketPrivate::checkReadyRead() { - DWORD bytesRead = 0; - overlapped.Offset = 0; - overlapped.OffsetHigh = 0; - bool success = ReadFile(handle, data, maxSize, &bytesRead, &overlapped); - if (!success && GetLastError() == ERROR_IO_PENDING) - if (GetOverlappedResult(handle, &overlapped, &bytesRead, TRUE)) - success = true; - if (!success) { - setErrorString(QLatin1String("QLocalSocket::readData")); - return 0; + if (actualReadBufferSize > 0) { + if (!pendingReadyRead) { + Q_Q(QLocalSocket); + QTimer::singleShot(0, q, SLOT(_q_emitReadyRead())); + pendingReadyRead = true; + } + } else { + pendingReadyRead = false; } - return bytesRead; } /*! \internal Reads data from the socket into the readbuffer */ -bool QLocalSocketPrivate::readFromSocket() +void QLocalSocketPrivate::startAsyncRead() { - qint64 bytesToRead = bytesAvailable(); - if (bytesToRead == 0) - return false; + do { + DWORD bytesToRead = bytesAvailable(); + if (bytesToRead == 0) { + // There are no bytes in the pipe but we need to + // start the overlapped read with some buffer size. + bytesToRead = initialReadBufferSize; + } - if (readBufferMaxSize && bytesToRead - > (readBufferMaxSize - readBuffer.size())) - bytesToRead = readBufferMaxSize - readBuffer.size(); + if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) { + bytesToRead = readBufferMaxSize - readBuffer.size(); + if (bytesToRead == 0) { + // Buffer is full. User must read data from the buffer + // before we can read more from the pipe. + return; + } + } - char *ptr = readBuffer.reserve(bytesToRead); - qint64 readBytes = readData(ptr, bytesToRead); - if (readBytes == 0) { - readBuffer.chop(bytesToRead); - return false; + char *ptr = readBuffer.reserve(bytesToRead); + + readSequenceStarted = true; + if (ReadFile(handle, ptr, bytesToRead, NULL, &overlapped)) { + completeAsyncRead(); + } else if (GetLastError() != ERROR_IO_PENDING) { + setErrorString(QLatin1String("QLocalSocketPrivate::startAsyncRead")); + return; + } + } while (!readSequenceStarted); +} + +/*! + \internal + Sets the correct size of the read buffer after a read operation. + */ +void QLocalSocketPrivate::completeAsyncRead() +{ + ResetEvent(overlapped.hEvent); + readSequenceStarted = false; + + DWORD bytesRead; + if (!GetOverlappedResult(handle, &overlapped, &bytesRead, TRUE)) { + setErrorString(QLatin1String("QLocalSocketPrivate::completeAsyncRead")); + return; } - readyReadEmitted = false; - readBuffer.chop(int(bytesToRead - (readBytes < 0 ? qint64(0) : readBytes))); - return true; + + actualReadBufferSize += bytesRead; + readBuffer.truncate(actualReadBufferSize); } qint64 QLocalSocket::writeData(const char *data, qint64 maxSize) @@ -273,7 +296,7 @@ void QLocalSocket::abort() /*! The number of bytes available from the pipe */ -qint64 QLocalSocketPrivate::bytesAvailable() +DWORD QLocalSocketPrivate::bytesAvailable() { Q_Q(QLocalSocket); if (q->state() != QLocalSocket::ConnectedState) @@ -300,7 +323,7 @@ qint64 QLocalSocket::bytesAvailable() const { Q_D(const QLocalSocket); qint64 available = QIODevice::bytesAvailable(); - available += (qint64) d->readBuffer.size(); + available += (qint64) d->actualReadBufferSize; return available; } @@ -327,7 +350,6 @@ void QLocalSocket::close() QIODevice::close(); d->state = ClosingState; emit stateChanged(d->state); - d->readyReadEmitted = false; emit readChannelFinished(); d->serverName = QString(); d->fullServerName = QString(); @@ -336,10 +358,13 @@ void QLocalSocket::close() disconnectFromServer(); return; } + d->readSequenceStarted = false; + d->pendingReadyRead = false; d->pipeClosed = false; DisconnectNamedPipe(d->handle); CloseHandle(d->handle); d->handle = INVALID_HANDLE_VALUE; + ResetEvent(d->overlapped.hEvent); d->state = UnconnectedState; emit stateChanged(d->state); emit disconnected(); @@ -347,7 +372,6 @@ void QLocalSocket::close() delete d->pipeWriter; d->pipeWriter = 0; } - d->dataNotifier.stop(); } bool QLocalSocket::flush() @@ -381,12 +405,15 @@ bool QLocalSocket::setSocketDescriptor(quintptr socketDescriptor, { Q_D(QLocalSocket); d->readBuffer.clear(); + d->actualReadBufferSize = 0; QIODevice::open(openMode); d->handle = (int*)socketDescriptor; d->state = socketState; emit stateChanged(d->state); - if (d->threadData->eventDispatcher) - d->dataNotifier.start(NOTIFYTIMEOUT); + if (d->state == ConnectedState) { + d->startAsyncRead(); + d->checkReadyRead(); + } return true; } @@ -400,20 +427,18 @@ void QLocalSocketPrivate::_q_canWrite() void QLocalSocketPrivate::_q_notified() { Q_Q(QLocalSocket); - if (0 != bytesAvailable()) { - if (readBufferMaxSize == 0 || readBuffer.size() < readBufferMaxSize) { - if (!readFromSocket()) { - return; - } - // wait until buffer is cleared before starting again - if (readBufferMaxSize && readBuffer.size() == readBufferMaxSize) { - dataNotifier.stop(); - } - } - if (!readyReadEmitted) { - readyReadEmitted = true; - q->emit readyRead(); - } + completeAsyncRead(); + startAsyncRead(); + pendingReadyRead = false; + emit q->readyRead(); +} + +void QLocalSocketPrivate::_q_emitReadyRead() +{ + if (pendingReadyRead) { + Q_Q(QLocalSocket); + pendingReadyRead = false; + emit q->readyRead(); } } @@ -448,9 +473,9 @@ bool QLocalSocket::waitForDisconnected(int msecs) return false; QIncrementalSleepTimer timer(msecs); forever { - d->_q_notified(); - if (d->pipeClosed) - close(); + d->bytesAvailable(); // to check if PeekNamedPipe fails + if (d->pipeClosed) + close(); if (state() == UnconnectedState) return true; Sleep(timer.nextSleepTime()); @@ -470,22 +495,24 @@ bool QLocalSocket::isValid() const bool QLocalSocket::waitForReadyRead(int msecs) { Q_D(QLocalSocket); - QIncrementalSleepTimer timer(msecs); - forever { - d->_q_notified(); - if (bytesAvailable() > 0) { - if (!d->readyReadEmitted) { - d->readyReadEmitted = true; - emit readyRead(); - } - return true; - } - Sleep(timer.nextSleepTime()); - if (timer.hasTimedOut()) - break; + if (bytesAvailable() > 0) + return true; + + if (d->state != QLocalSocket::ConnectedState) + return false; + + Q_ASSERT(d->readSequenceStarted); + DWORD result = WaitForSingleObject(d->overlapped.hEvent, msecs == -1 ? INFINITE : msecs); + switch (result) { + case WAIT_OBJECT_0: + d->_q_notified(); + return true; + case WAIT_TIMEOUT: + return false; } + qWarning("QLocalSocket::waitForReadyRead WaitForSingleObject failed with error code %d.", GetLastError()); return false; } @@ -495,27 +522,11 @@ bool QLocalSocket::waitForBytesWritten(int msecs) if (!d->pipeWriter) return false; - QIncrementalSleepTimer timer(msecs); - forever { - if (d->pipeWriter->hadWritten()) - return true; - - if (d->pipeWriter->bytesToWrite() == 0) - return false; - - // Wait for the pipe writer to acknowledge that it has - // written. This will succeed if either the pipe writer has - // already written the data, or if it manages to write data - // within the given timeout. - if (d->pipeWriter->waitForWrite(0)) - return true; - - Sleep(timer.nextSleepTime()); - if (timer.hasTimedOut()) - break; - } - - return false; + // Wait for the pipe writer to acknowledge that it has + // written. This will succeed if either the pipe writer has + // already written the data, or if it manages to write data + // within the given timeout. + return d->pipeWriter->waitForWrite(msecs); } QT_END_NAMESPACE -- cgit v0.12