diff options
author | Alexis Menard <alexis.menard@nokia.com> | 2009-04-17 14:06:06 (GMT) |
---|---|---|
committer | Alexis Menard <alexis.menard@nokia.com> | 2009-04-17 14:06:06 (GMT) |
commit | f15b8a83e2e51955776a3f07cb85ebfc342dd8ef (patch) | |
tree | c5dc684986051654898db11ce73e03b9fec8db99 /examples/network/torrent/torrentclient.cpp | |
download | Qt-f15b8a83e2e51955776a3f07cb85ebfc342dd8ef.zip Qt-f15b8a83e2e51955776a3f07cb85ebfc342dd8ef.tar.gz Qt-f15b8a83e2e51955776a3f07cb85ebfc342dd8ef.tar.bz2 |
Initial import of statemachine branch from the old kinetic repository
Diffstat (limited to 'examples/network/torrent/torrentclient.cpp')
-rw-r--r-- | examples/network/torrent/torrentclient.cpp | 1529 |
1 files changed, 1529 insertions, 0 deletions
diff --git a/examples/network/torrent/torrentclient.cpp b/examples/network/torrent/torrentclient.cpp new file mode 100644 index 0000000..e8473bb --- /dev/null +++ b/examples/network/torrent/torrentclient.cpp @@ -0,0 +1,1529 @@ +/**************************************************************************** +** +** Copyright (C) 2009 Nokia Corporation and/or its subsidiary(-ies). +** Contact: Qt Software Information (qt-info@nokia.com) +** +** This file is part of the examples of the Qt Toolkit. +** +** $QT_BEGIN_LICENSE:LGPL$ +** No Commercial Usage +** This file contains pre-release code and may not be distributed. +** You may use this file in accordance with the terms and conditions +** contained in the either Technology Preview License Agreement or the +** Beta Release License Agreement. +** +** GNU Lesser General Public License Usage +** Alternatively, this file may be used under the terms of the GNU Lesser +** General Public License version 2.1 as published by the Free Software +** Foundation and appearing in the file LICENSE.LGPL included in the +** packaging of this file. Please review the following information to +** ensure the GNU Lesser General Public License version 2.1 requirements +** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html. +** +** In addition, as a special exception, Nokia gives you certain +** additional rights. These rights are described in the Nokia Qt LGPL +** Exception version 1.0, included in the file LGPL_EXCEPTION.txt in this +** package. +** +** GNU General Public License Usage +** Alternatively, this file may be used under the terms of the GNU +** General Public License version 3.0 as published by the Free Software +** Foundation and appearing in the file LICENSE.GPL included in the +** packaging of this file. Please review the following information to +** ensure the GNU General Public License version 3.0 requirements will be +** met: http://www.gnu.org/copyleft/gpl.html. +** +** If you are unsure which license is appropriate for your use, please +** contact the sales department at qt-sales@nokia.com. +** $QT_END_LICENSE$ +** +****************************************************************************/ + +#include "connectionmanager.h" +#include "filemanager.h" +#include "metainfo.h" +#include "torrentclient.h" +#include "torrentserver.h" +#include "trackerclient.h" +#include "peerwireclient.h" +#include "ratecontroller.h" + +#include <QtCore> +#include <QNetworkInterface> + +// These constants could also be configurable by the user. +static const int ServerMinPort = 6881; +static const int ServerMaxPort = /* 6889 */ 7000; +static const int BlockSize = 16384; +static const int MaxBlocksInProgress = 5; +static const int MaxBlocksInMultiMode = 2; +static const int MaxConnectionPerPeer = 1; +static const int RateControlWindowLength = 10; +static const int RateControlTimerDelay = 1000; +static const int MinimumTimeBeforeRevisit = 30; +static const int MaxUploads = 4; +static const int UploadScheduleInterval = 10000; +static const int EndGamePieces = 5; + +class TorrentPiece { +public: + int index; + int length; + QBitArray completedBlocks; + QBitArray requestedBlocks; + bool inProgress; +}; + +class TorrentClientPrivate +{ +public: + TorrentClientPrivate(TorrentClient *qq); + + // State / error + void setError(TorrentClient::Error error); + void setState(TorrentClient::State state); + TorrentClient::Error error; + TorrentClient::State state; + QString errorString; + QString stateString; + + // Where to save data + QString destinationFolder; + MetaInfo metaInfo; + + // Announce tracker and file manager + QByteArray peerId; + QByteArray infoHash; + TrackerClient trackerClient; + FileManager fileManager; + + // Connections + QList<PeerWireClient *> connections; + QList<TorrentPeer *> peers; + bool schedulerCalled; + void callScheduler(); + bool connectingToClients; + void callPeerConnector(); + int uploadScheduleTimer; + + // Pieces + QMap<int, PeerWireClient *> readIds; + QMultiMap<PeerWireClient *, TorrentPiece *> payloads; + QMap<int, TorrentPiece *> pendingPieces; + QBitArray completedPieces; + QBitArray incompletePieces; + int pieceCount; + + // Progress + int lastProgressValue; + qint64 downloadedBytes; + qint64 uploadedBytes; + int downloadRate[RateControlWindowLength]; + int uploadRate[RateControlWindowLength]; + int transferRateTimer; + + TorrentClient *q; +}; + +TorrentClientPrivate::TorrentClientPrivate(TorrentClient *qq) + : trackerClient(qq), q(qq) +{ + error = TorrentClient::UnknownError; + state = TorrentClient::Idle; + errorString = QT_TRANSLATE_NOOP(TorrentClient, "Unknown error"); + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Idle"); + schedulerCalled = false; + connectingToClients = false; + uploadScheduleTimer = 0; + lastProgressValue = -1; + pieceCount = 0; + downloadedBytes = 0; + uploadedBytes = 0; + memset(downloadRate, 0, sizeof(downloadRate)); + memset(uploadRate, 0, sizeof(uploadRate)); + transferRateTimer = 0; +} + +void TorrentClientPrivate::setError(TorrentClient::Error errorCode) +{ + this->error = errorCode; + switch (error) { + case TorrentClient::UnknownError: + errorString = QT_TRANSLATE_NOOP(TorrentClient, "Unknown error"); + break; + case TorrentClient::TorrentParseError: + errorString = QT_TRANSLATE_NOOP(TorrentClient, "Invalid torrent data"); + break; + case TorrentClient::InvalidTrackerError: + errorString = QT_TRANSLATE_NOOP(TorrentClient, "Unable to connect to tracker"); + break; + case TorrentClient::FileError: + errorString = QT_TRANSLATE_NOOP(TorrentClient, "File error"); + break; + case TorrentClient::ServerError: + errorString = QT_TRANSLATE_NOOP(TorrentClient, "Unable to initialize server"); + break; + } + emit q->error(errorCode); +} + +void TorrentClientPrivate::setState(TorrentClient::State state) +{ + this->state = state; + switch (state) { + case TorrentClient::Idle: + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Idle"); + break; + case TorrentClient::Paused: + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Paused"); + break; + case TorrentClient::Stopping: + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Stopping"); + break; + case TorrentClient::Preparing: + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Preparing"); + break; + case TorrentClient::Searching: + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Searching"); + break; + case TorrentClient::Connecting: + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Connecting"); + break; + case TorrentClient::WarmingUp: + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Warming up"); + break; + case TorrentClient::Downloading: + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Downloading"); + break; + case TorrentClient::Endgame: + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Finishing"); + break; + case TorrentClient::Seeding: + stateString = QT_TRANSLATE_NOOP(TorrentClient, "Seeding"); + break; + } + emit q->stateChanged(state); +} + +void TorrentClientPrivate::callScheduler() +{ + if (!schedulerCalled) { + schedulerCalled = true; + QMetaObject::invokeMethod(q, "scheduleDownloads", Qt::QueuedConnection); + } +} + +void TorrentClientPrivate::callPeerConnector() +{ + if (!connectingToClients) { + connectingToClients = true; + QTimer::singleShot(10000, q, SLOT(connectToPeers())); + } +} + +TorrentClient::TorrentClient(QObject *parent) + : QObject(parent), d(new TorrentClientPrivate(this)) +{ + // Connect the file manager + connect(&d->fileManager, SIGNAL(dataRead(int, int, int, const QByteArray &)), + this, SLOT(sendToPeer(int, int, int, const QByteArray &))); + connect(&d->fileManager, SIGNAL(verificationProgress(int)), + this, SLOT(updateProgress(int))); + connect(&d->fileManager, SIGNAL(verificationDone()), + this, SLOT(fullVerificationDone())); + connect(&d->fileManager, SIGNAL(pieceVerified(int, bool)), + this, SLOT(pieceVerified(int, bool))); + connect(&d->fileManager, SIGNAL(error()), + this, SLOT(handleFileError())); + + // Connect the tracker client + connect(&d->trackerClient, SIGNAL(peerListUpdated(const QList<TorrentPeer> &)), + this, SLOT(addToPeerList(const QList<TorrentPeer> &))); + connect(&d->trackerClient, SIGNAL(stopped()), + this, SIGNAL(stopped())); +} + +TorrentClient::~TorrentClient() +{ + qDeleteAll(d->peers); + qDeleteAll(d->pendingPieces); + delete d; +} + +bool TorrentClient::setTorrent(const QString &fileName) +{ + QFile file(fileName); + if (!file.open(QIODevice::ReadOnly) || !setTorrent(file.readAll())) { + d->setError(TorrentParseError); + return false; + } + return true; +} + +bool TorrentClient::setTorrent(const QByteArray &torrentData) +{ + if (!d->metaInfo.parse(torrentData)) { + d->setError(TorrentParseError); + return false; + } + + // Calculate SHA1 hash of the "info" section in the torrent + QByteArray infoValue = d->metaInfo.infoValue(); + d->infoHash = QCryptographicHash::hash(infoValue, QCryptographicHash::Sha1); + + return true; +} + +MetaInfo TorrentClient::metaInfo() const +{ + return d->metaInfo; +} + +void TorrentClient::setDestinationFolder(const QString &directory) +{ + d->destinationFolder = directory; +} + +QString TorrentClient::destinationFolder() const +{ + return d->destinationFolder; +} + +void TorrentClient::setDumpedState(const QByteArray &dumpedState) +{ + // Recover partially completed pieces + QDataStream stream(dumpedState); + + quint16 version = 0; + stream >> version; + if (version != 2) + return; + + stream >> d->completedPieces; + + while (!stream.atEnd()) { + int index; + int length; + QBitArray completed; + stream >> index >> length >> completed; + if (stream.status() != QDataStream::Ok) { + d->completedPieces.clear(); + break; + } + + TorrentPiece *piece = new TorrentPiece; + piece->index = index; + piece->length = length; + piece->completedBlocks = completed; + piece->requestedBlocks.resize(completed.size()); + piece->inProgress = false; + d->pendingPieces[index] = piece; + } +} + +QByteArray TorrentClient::dumpedState() const +{ + QByteArray partials; + QDataStream stream(&partials, QIODevice::WriteOnly); + + stream << quint16(2); + stream << d->completedPieces; + + // Save the state of all partially downloaded pieces into a format + // suitable for storing in settings. + QMap<int, TorrentPiece *>::ConstIterator it = d->pendingPieces.constBegin(); + while (it != d->pendingPieces.constEnd()) { + TorrentPiece *piece = it.value(); + if (blocksLeftForPiece(piece) > 0 && blocksLeftForPiece(piece) < piece->completedBlocks.size()) { + stream << piece->index; + stream << piece->length; + stream << piece->completedBlocks; + } + ++it; + } + + return partials; +} + +qint64 TorrentClient::progress() const +{ + return d->lastProgressValue; +} + +void TorrentClient::setDownloadedBytes(qint64 bytes) +{ + d->downloadedBytes = bytes; +} + +qint64 TorrentClient::downloadedBytes() const +{ + return d->downloadedBytes; +} + +void TorrentClient::setUploadedBytes(qint64 bytes) +{ + d->uploadedBytes = bytes; +} + +qint64 TorrentClient::uploadedBytes() const +{ + return d->uploadedBytes; +} + +int TorrentClient::connectedPeerCount() const +{ + int tmp = 0; + foreach (PeerWireClient *client, d->connections) { + if (client->state() == QAbstractSocket::ConnectedState) + ++tmp; + } + return tmp; +} + +int TorrentClient::seedCount() const +{ + int tmp = 0; + foreach (PeerWireClient *client, d->connections) { + if (client->availablePieces().count(true) == d->pieceCount) + ++tmp; + } + return tmp; +} + +TorrentClient::State TorrentClient::state() const +{ + return d->state; +} + +QString TorrentClient::stateString() const +{ + return d->stateString; +} + +TorrentClient::Error TorrentClient::error() const +{ + return d->error; +} + +QString TorrentClient::errorString() const +{ + return d->errorString; +} + +QByteArray TorrentClient::peerId() const +{ + return d->peerId; +} + +QByteArray TorrentClient::infoHash() const +{ + return d->infoHash; +} + +void TorrentClient::start() +{ + if (d->state != Idle) + return; + + TorrentServer::instance()->addClient(this); + + // Initialize the file manager + d->setState(Preparing); + d->fileManager.setMetaInfo(d->metaInfo); + d->fileManager.setDestinationFolder(d->destinationFolder); + d->fileManager.setCompletedPieces(d->completedPieces); + d->fileManager.start(QThread::LowestPriority); + d->fileManager.startDataVerification(); +} + +void TorrentClient::stop() +{ + if (d->state == Stopping) + return; + + TorrentServer::instance()->removeClient(this); + + // Update the state + State oldState = d->state; + d->setState(Stopping); + + // Stop the timer + if (d->transferRateTimer) { + killTimer(d->transferRateTimer); + d->transferRateTimer = 0; + } + + // Abort all existing connections + foreach (PeerWireClient *client, d->connections) { + RateController::instance()->removeSocket(client); + ConnectionManager::instance()->removeConnection(client); + client->abort(); + } + d->connections.clear(); + + // Perhaps stop the tracker + if (oldState > Preparing) { + d->trackerClient.stop(); + } else { + d->setState(Idle); + emit stopped(); + } +} + +void TorrentClient::setPaused(bool paused) +{ + if (paused) { + // Abort all connections, and set the max number of + // connections to 0. Keep the list of peers, so we can quickly + // resume later. + d->setState(Paused); + foreach (PeerWireClient *client, d->connections) + client->abort(); + d->connections.clear(); + TorrentServer::instance()->removeClient(this); + } else { + // Restore the max number of connections, and start the peer + // connector. We should also quickly start receiving incoming + // connections. + d->setState(d->completedPieces.count(true) == d->fileManager.pieceCount() + ? Seeding : Searching); + connectToPeers(); + TorrentServer::instance()->addClient(this); + } +} + +void TorrentClient::timerEvent(QTimerEvent *event) +{ + if (event->timerId() == d->uploadScheduleTimer) { + // Update the state of who's choked and who's not + scheduleUploads(); + return; + } + + if (event->timerId() != d->transferRateTimer) { + QObject::timerEvent(event); + return; + } + + // Calculate average upload/download rate + qint64 uploadBytesPerSecond = 0; + qint64 downloadBytesPerSecond = 0; + for (int i = 0; i < RateControlWindowLength; ++i) { + uploadBytesPerSecond += d->uploadRate[i]; + downloadBytesPerSecond += d->downloadRate[i]; + } + uploadBytesPerSecond /= qint64(RateControlWindowLength); + downloadBytesPerSecond /= qint64(RateControlWindowLength); + for (int i = RateControlWindowLength - 2; i >= 0; --i) { + d->uploadRate[i + 1] = d->uploadRate[i]; + d->downloadRate[i + 1] = d->downloadRate[i]; + } + d->uploadRate[0] = 0; + d->downloadRate[0] = 0; + emit uploadRateUpdated(int(uploadBytesPerSecond)); + emit downloadRateUpdated(int(downloadBytesPerSecond)); + + // Stop the timer if there is no activity. + if (downloadBytesPerSecond == 0 && uploadBytesPerSecond == 0) { + killTimer(d->transferRateTimer); + d->transferRateTimer = 0; + } +} + +void TorrentClient::sendToPeer(int readId, int pieceIndex, int begin, const QByteArray &data) +{ + // Send the requested block to the peer if the client connection + // still exists; otherwise do nothing. This slot is called by the + // file manager after it has read a block of data. + PeerWireClient *client = d->readIds.value(readId); + if (client) { + if ((client->peerWireState() & PeerWireClient::ChokingPeer) == 0) + client->sendBlock(pieceIndex, begin, data); + } + d->readIds.remove(readId); +} + +void TorrentClient::fullVerificationDone() +{ + // Update our list of completed and incomplete pieces. + d->completedPieces = d->fileManager.completedPieces(); + d->incompletePieces.resize(d->completedPieces.size()); + d->pieceCount = d->completedPieces.size(); + for (int i = 0; i < d->fileManager.pieceCount(); ++i) { + if (!d->completedPieces.testBit(i)) + d->incompletePieces.setBit(i); + } + + updateProgress(); + + // If the checksums show that what the dumped state thought was + // partial was in fact complete, then we trust the checksums. + QMap<int, TorrentPiece *>::Iterator it = d->pendingPieces.begin(); + while (it != d->pendingPieces.end()) { + if (d->completedPieces.testBit(it.key())) + it = d->pendingPieces.erase(it); + else + ++it; + } + + d->uploadScheduleTimer = startTimer(UploadScheduleInterval); + + // Start the server + TorrentServer *server = TorrentServer::instance(); + if (!server->isListening()) { + // Set up the peer wire server + for (int i = ServerMinPort; i <= ServerMaxPort; ++i) { + if (server->listen(QHostAddress::Any, i)) + break; + } + if (!server->isListening()) { + d->setError(ServerError); + return; + } + } + + d->setState(d->completedPieces.count(true) == d->pieceCount ? Seeding : Searching); + + // Start the tracker client + d->trackerClient.start(d->metaInfo); +} + +void TorrentClient::pieceVerified(int pieceIndex, bool ok) +{ + TorrentPiece *piece = d->pendingPieces.value(pieceIndex); + + // Remove this piece from all payloads + QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.begin(); + while (it != d->payloads.end()) { + if (it.value()->index == pieceIndex) + it = d->payloads.erase(it); + else + ++it; + } + + if (!ok) { + // If a piece did not pass the SHA1 check, we'll simply clear + // its state, and the scheduler will re-request it + piece->inProgress = false; + piece->completedBlocks.fill(false); + piece->requestedBlocks.fill(false); + d->callScheduler(); + return; + } + + // Update the peer list so we know who's still interesting. + foreach (TorrentPeer *peer, d->peers) { + if (!peer->interesting) + continue; + bool interesting = false; + for (int i = 0; i < d->pieceCount; ++i) { + if (peer->pieces.testBit(i) && d->incompletePieces.testBit(i)) { + interesting = true; + break; + } + } + peer->interesting = interesting; + } + + // Delete the piece and update our structures. + delete piece; + d->pendingPieces.remove(pieceIndex); + d->completedPieces.setBit(pieceIndex); + d->incompletePieces.clearBit(pieceIndex); + + // Notify connected peers. + foreach (PeerWireClient *client, d->connections) { + if (client->state() == QAbstractSocket::ConnectedState + && !client->availablePieces().testBit(pieceIndex)) { + client->sendPieceNotification(pieceIndex); + } + } + + // Notify the tracker if we've entered Seeding status; otherwise + // call the scheduler. + int completed = d->completedPieces.count(true); + if (completed == d->pieceCount) { + if (d->state != Seeding) { + d->setState(Seeding); + d->trackerClient.startSeeding(); + } + } else { + if (completed == 1) + d->setState(Downloading); + else if (d->incompletePieces.count(true) < 5 && d->pendingPieces.size() > d->incompletePieces.count(true)) + d->setState(Endgame); + d->callScheduler(); + } + + updateProgress(); +} + +void TorrentClient::handleFileError() +{ + if (d->state == Paused) + return; + setPaused(true); + emit error(FileError); +} + +void TorrentClient::connectToPeers() +{ + d->connectingToClients = false; + + if (d->state == Stopping || d->state == Idle || d->state == Paused) + return; + + if (d->state == Searching) + d->setState(Connecting); + + // Find the list of peers we are not currently connected to, where + // the more interesting peers are listed more than once. + QList<TorrentPeer *> weighedPeers = weighedFreePeers(); + + // Start as many connections as we can + while (!weighedPeers.isEmpty() && ConnectionManager::instance()->canAddConnection() + && (qrand() % (ConnectionManager::instance()->maxConnections() / 2))) { + PeerWireClient *client = new PeerWireClient(ConnectionManager::instance()->clientId(), this); + RateController::instance()->addSocket(client); + ConnectionManager::instance()->addConnection(client); + + initializeConnection(client); + d->connections << client; + + // Pick a random peer from the list of weighed peers. + TorrentPeer *peer = weighedPeers.takeAt(qrand() % weighedPeers.size()); + weighedPeers.removeAll(peer); + peer->connectStart = QDateTime::currentDateTime().toTime_t(); + peer->lastVisited = peer->connectStart; + + // Connect to the peer. + client->setPeer(peer); + client->connectToHost(peer->address, peer->port); + } +} + +QList<TorrentPeer *> TorrentClient::weighedFreePeers() const +{ + QList<TorrentPeer *> weighedPeers; + + // Generate a list of peers that we want to connect to. + uint now = QDateTime::currentDateTime().toTime_t(); + QList<TorrentPeer *> freePeers; + QMap<QString, int> connectionsPerPeer; + foreach (TorrentPeer *peer, d->peers) { + bool busy = false; + foreach (PeerWireClient *client, d->connections) { + if (client->state() == PeerWireClient::ConnectedState + && client->peerAddress() == peer->address + && client->peerPort() == peer->port) { + if (++connectionsPerPeer[peer->address.toString()] >= MaxConnectionPerPeer) { + busy = true; + break; + } + } + } + if (!busy && (now - peer->lastVisited) > uint(MinimumTimeBeforeRevisit)) + freePeers << peer; + } + + // Nothing to connect to + if (freePeers.isEmpty()) + return weighedPeers; + + // Assign points based on connection speed and pieces available. + QList<QPair<int, TorrentPeer *> > points; + foreach (TorrentPeer *peer, freePeers) { + int tmp = 0; + if (peer->interesting) { + tmp += peer->numCompletedPieces; + if (d->state == Seeding) + tmp = d->pieceCount - tmp; + if (!peer->connectStart) // An unknown peer is as interesting as a seed + tmp += d->pieceCount; + + // 1/5 of the total score for each second below 5 it takes to + // connect. + if (peer->connectTime < 5) + tmp += (d->pieceCount / 10) * (5 - peer->connectTime); + } + points << QPair<int, TorrentPeer *>(tmp, peer); + } + qSort(points); + + // Minimize the list so the point difference is never more than 1. + typedef QPair<int,TorrentPeer*> PointPair; + QMultiMap<int, TorrentPeer *> pointMap; + int lowestScore = 0; + int lastIndex = 0; + foreach (PointPair point, points) { + if (point.first > lowestScore) { + lowestScore = point.first; + ++lastIndex; + } + pointMap.insert(lastIndex, point.second); + } + + // Now make up a list of peers where the ones with more points are + // listed many times. + QMultiMap<int, TorrentPeer *>::ConstIterator it = pointMap.constBegin(); + while (it != pointMap.constEnd()) { + for (int i = 0; i < it.key() + 1; ++i) + weighedPeers << it.value(); + ++it; + } + + return weighedPeers; +} + +void TorrentClient::setupIncomingConnection(PeerWireClient *client) +{ + // Connect signals + initializeConnection(client); + + // Initialize this client + RateController::instance()->addSocket(client); + d->connections << client; + + client->initialize(d->infoHash, d->pieceCount); + client->sendPieceList(d->completedPieces); + + emit peerInfoUpdated(); + + if (d->state == Searching || d->state == Connecting) { + int completed = d->completedPieces.count(true); + if (completed == 0) + d->setState(WarmingUp); + else if (d->incompletePieces.count(true) < 5 && d->pendingPieces.size() > d->incompletePieces.count(true)) + d->setState(Endgame); + } + + if (d->connections.isEmpty()) + scheduleUploads(); +} + +void TorrentClient::setupOutgoingConnection() +{ + PeerWireClient *client = qobject_cast<PeerWireClient *>(sender()); + + // Update connection statistics. + foreach (TorrentPeer *peer, d->peers) { + if (peer->port == client->peerPort() && peer->address == client->peerAddress()) { + peer->connectTime = peer->lastVisited - peer->connectStart; + break; + } + } + + // Send handshake and piece list + client->initialize(d->infoHash, d->pieceCount); + client->sendPieceList(d->completedPieces); + + emit peerInfoUpdated(); + + if (d->state == Searching || d->state == Connecting) { + int completed = d->completedPieces.count(true); + if (completed == 0) + d->setState(WarmingUp); + else if (d->incompletePieces.count(true) < 5 && d->pendingPieces.size() > d->incompletePieces.count(true)) + d->setState(Endgame); + } +} + +void TorrentClient::initializeConnection(PeerWireClient *client) +{ + connect(client, SIGNAL(connected()), + this, SLOT(setupOutgoingConnection())); + connect(client, SIGNAL(disconnected()), + this, SLOT(removeClient())); + connect(client, SIGNAL(error(QAbstractSocket::SocketError)), + this, SLOT(removeClient())); + connect(client, SIGNAL(piecesAvailable(const QBitArray &)), + this, SLOT(peerPiecesAvailable(const QBitArray &))); + connect(client, SIGNAL(blockRequested(int, int, int)), + this, SLOT(peerRequestsBlock(int, int, int))); + connect(client, SIGNAL(blockReceived(int, int, const QByteArray &)), + this, SLOT(blockReceived(int, int, const QByteArray &))); + connect(client, SIGNAL(choked()), + this, SLOT(peerChoked())); + connect(client, SIGNAL(unchoked()), + this, SLOT(peerUnchoked())); + connect(client, SIGNAL(bytesWritten(qint64)), + this, SLOT(peerWireBytesWritten(qint64))); + connect(client, SIGNAL(bytesReceived(qint64)), + this, SLOT(peerWireBytesReceived(qint64))); +} + +void TorrentClient::removeClient() +{ + PeerWireClient *client = static_cast<PeerWireClient *>(sender()); + + // Remove the host from our list of known peers if the connection + // failed. + if (client->peer() && client->error() == QAbstractSocket::ConnectionRefusedError) + d->peers.removeAll(client->peer()); + + // Remove the client from RateController and all structures. + RateController::instance()->removeSocket(client); + d->connections.removeAll(client); + QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.find(client); + while (it != d->payloads.end() && it.key() == client) { + TorrentPiece *piece = it.value(); + piece->inProgress = false; + piece->requestedBlocks.fill(false); + it = d->payloads.erase(it); + } + + // Remove pending read requests. + QMapIterator<int, PeerWireClient *> it2(d->readIds); + while (it2.findNext(client)) + d->readIds.remove(it2.key()); + + // Delete the client later. + disconnect(client, SIGNAL(disconnected()), this, SLOT(removeClient())); + client->deleteLater(); + ConnectionManager::instance()->removeConnection(client); + + emit peerInfoUpdated(); + d->callPeerConnector(); +} + +void TorrentClient::peerPiecesAvailable(const QBitArray &pieces) +{ + PeerWireClient *client = qobject_cast<PeerWireClient *>(sender()); + + // Find the peer in our list of announced peers. If it's there, + // then we can use the piece list into to gather statistics that + // help us decide what peers to connect to. + TorrentPeer *peer = 0; + QList<TorrentPeer *>::Iterator it = d->peers.begin(); + while (it != d->peers.end()) { + if ((*it)->address == client->peerAddress() && (*it)->port == client->peerPort()) { + peer = *it; + break; + } + ++it; + } + + // If the peer is a seed, and we are in seeding mode, then the + // peer is uninteresting. + if (pieces.count(true) == d->pieceCount) { + if (peer) + peer->seed = true; + emit peerInfoUpdated(); + if (d->state == Seeding) { + client->abort(); + return; + } else { + if (peer) + peer->interesting = true; + if ((client->peerWireState() & PeerWireClient::InterestedInPeer) == 0) + client->sendInterested(); + d->callScheduler(); + return; + } + } + + // Update our list of available pieces. + if (peer) { + peer->pieces = pieces; + peer->numCompletedPieces = pieces.count(true); + } + + // Check for interesting pieces, and tell the peer whether we are + // interested or not. + bool interested = false; + int piecesSize = pieces.size(); + for (int pieceIndex = 0; pieceIndex < piecesSize; ++pieceIndex) { + if (!pieces.testBit(pieceIndex)) + continue; + if (!d->completedPieces.testBit(pieceIndex)) { + interested = true; + if ((client->peerWireState() & PeerWireClient::InterestedInPeer) == 0) { + if (peer) + peer->interesting = true; + client->sendInterested(); + } + + QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.find(client); + int inProgress = 0; + while (it != d->payloads.end() && it.key() == client) { + if (it.value()->inProgress) + inProgress += it.value()->requestedBlocks.count(true); + ++it; + } + if (!inProgress) + d->callScheduler(); + break; + } + } + if (!interested && (client->peerWireState() & PeerWireClient::InterestedInPeer)) { + if (peer) + peer->interesting = false; + client->sendNotInterested(); + } +} + +void TorrentClient::peerRequestsBlock(int pieceIndex, int begin, int length) +{ + PeerWireClient *client = qobject_cast<PeerWireClient *>(sender()); + + // Silently ignore requests from choked peers + if (client->peerWireState() & PeerWireClient::ChokingPeer) + return; + + // Silently ignore requests for pieces we don't have. + if (!d->completedPieces.testBit(pieceIndex)) + return; + + // Request the block from the file manager + d->readIds.insert(d->fileManager.read(pieceIndex, begin, length), + qobject_cast<PeerWireClient *>(sender())); +} + +void TorrentClient::blockReceived(int pieceIndex, int begin, const QByteArray &data) +{ + PeerWireClient *client = qobject_cast<PeerWireClient *>(sender()); + if (data.size() == 0) { + client->abort(); + return; + } + + // Ignore it if we already have this block. + int blockBit = begin / BlockSize; + TorrentPiece *piece = d->pendingPieces.value(pieceIndex); + if (!piece || piece->completedBlocks.testBit(blockBit)) { + // Discard blocks that we already have, and fill up the pipeline. + requestMore(client); + return; + } + + // If we are in warmup or endgame mode, cancel all duplicate + // requests for this block. + if (d->state == WarmingUp || d->state == Endgame) { + QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.begin(); + while (it != d->payloads.end()) { + PeerWireClient *otherClient = it.key(); + if (otherClient != client && it.value()->index == pieceIndex) { + if (otherClient->incomingBlocks().contains(TorrentBlock(pieceIndex, begin, data.size()))) + it.key()->cancelRequest(pieceIndex, begin, data.size()); + } + ++it; + } + } + + if (d->state != Downloading && d->state != Endgame && d->completedPieces.count(true) > 0) + d->setState(Downloading); + + // Store this block + d->fileManager.write(pieceIndex, begin, data); + piece->completedBlocks.setBit(blockBit); + piece->requestedBlocks.clearBit(blockBit); + + if (blocksLeftForPiece(piece) == 0) { + // Ask the file manager to verify the newly downloaded piece + d->fileManager.verifyPiece(piece->index); + + // Remove this piece from all payloads + QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.begin(); + while (it != d->payloads.end()) { + if (!it.value() || it.value()->index == piece->index) + it = d->payloads.erase(it); + else + ++it; + } + } + + // Fill up the pipeline. + requestMore(client); +} + +void TorrentClient::peerWireBytesWritten(qint64 size) +{ + if (!d->transferRateTimer) + d->transferRateTimer = startTimer(RateControlTimerDelay); + + d->uploadRate[0] += size; + d->uploadedBytes += size; + emit dataSent(size); +} + +void TorrentClient::peerWireBytesReceived(qint64 size) +{ + if (!d->transferRateTimer) + d->transferRateTimer = startTimer(RateControlTimerDelay); + + d->downloadRate[0] += size; + d->downloadedBytes += size; + emit dataSent(size); +} + +int TorrentClient::blocksLeftForPiece(const TorrentPiece *piece) const +{ + int blocksLeft = 0; + int completedBlocksSize = piece->completedBlocks.size(); + for (int i = 0; i < completedBlocksSize; ++i) { + if (!piece->completedBlocks.testBit(i)) + ++blocksLeft; + } + return blocksLeft; +} + +void TorrentClient::scheduleUploads() +{ + // Generate a list of clients sorted by their transfer + // speeds. When leeching, we sort by download speed, and when + // seeding, we sort by upload speed. Seeds are left out; there's + // no use in unchoking them. + QList<PeerWireClient *> allClients = d->connections; + QMultiMap<int, PeerWireClient *> transferSpeeds; + foreach (PeerWireClient *client, allClients) { + if (client->state() == QAbstractSocket::ConnectedState + && client->availablePieces().count(true) != d->pieceCount) { + if (d->state == Seeding) { + transferSpeeds.insert(client->uploadSpeed(), client); + } else { + transferSpeeds.insert(client->downloadSpeed(), client); + } + } + } + + // Unchoke the top 'MaxUploads' downloaders (peers that we are + // uploading to) and choke all others. + int maxUploaders = MaxUploads; + QMapIterator<int, PeerWireClient *> it(transferSpeeds); + it.toBack(); + while (it.hasPrevious()) { + PeerWireClient *client = it.previous().value(); + bool interested = (client->peerWireState() & PeerWireClient::PeerIsInterested); + + if (maxUploaders) { + allClients.removeAll(client); + if (client->peerWireState() & PeerWireClient::ChokingPeer) + client->unchokePeer(); + --maxUploaders; + continue; + } + + if ((client->peerWireState() & PeerWireClient::ChokingPeer) == 0) { + if ((qrand() % 10) == 0) + client->abort(); + else + client->chokePeer(); + allClients.removeAll(client); + } + if (!interested) + allClients.removeAll(client); + } + + // Only interested peers are left in allClients. Unchoke one + // random peer to allow it to compete for a position among the + // downloaders. (This is known as an "optimistic unchoke".) + if (!allClients.isEmpty()) { + PeerWireClient *client = allClients[qrand() % allClients.size()]; + if (client->peerWireState() & PeerWireClient::ChokingPeer) + client->unchokePeer(); + } +} + +void TorrentClient::scheduleDownloads() +{ + d->schedulerCalled = false; + + if (d->state == Stopping || d->state == Paused || d->state == Idle) + return; + + // Check what each client is doing, and assign payloads to those + // who are either idle or done. + foreach (PeerWireClient *client, d->connections) + schedulePieceForClient(client); +} + +void TorrentClient::schedulePieceForClient(PeerWireClient *client) +{ + // Only schedule connected clients. + if (client->state() != QTcpSocket::ConnectedState) + return; + + // The peer has choked us; try again later. + if (client->peerWireState() & PeerWireClient::ChokedByPeer) + return; + + // Make a list of all the client's pending pieces, and count how + // many blocks have been requested. + QList<int> currentPieces; + bool somePiecesAreNotInProgress = false; + TorrentPiece *lastPendingPiece = 0; + QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.find(client); + while (it != d->payloads.end() && it.key() == client) { + lastPendingPiece = it.value(); + if (lastPendingPiece->inProgress) { + currentPieces << lastPendingPiece->index; + } else { + somePiecesAreNotInProgress = true; + } + ++it; + } + + // Skip clients that already have too many blocks in progress. + if (client->incomingBlocks().size() >= ((d->state == Endgame || d->state == WarmingUp) + ? MaxBlocksInMultiMode : MaxBlocksInProgress)) + return; + + // If all pieces are in progress, but we haven't filled up our + // block requesting quota, then we need to schedule another piece. + if (!somePiecesAreNotInProgress || client->incomingBlocks().size() > 0) + lastPendingPiece = 0; + TorrentPiece *piece = lastPendingPiece; + + // In warmup state, all clients request blocks from the same pieces. + if (d->state == WarmingUp && d->pendingPieces.size() >= 4) { + piece = d->payloads.value(client); + if (!piece) { + QList<TorrentPiece *> values = d->pendingPieces.values(); + piece = values.value(qrand() % values.size()); + piece->inProgress = true; + d->payloads.insert(client, piece); + } + if (piece->completedBlocks.count(false) == client->incomingBlocks().size()) + return; + } + + // If no pieces are currently in progress, schedule a new one. + if (!piece) { + // Build up a list of what pieces that we have not completed + // are available to this client. + QBitArray incompletePiecesAvailableToClient = d->incompletePieces; + + // Remove all pieces that are marked as being in progress + // already (i.e., pieces that this or other clients are + // already waiting for). A special rule applies to warmup and + // endgame mode; there, we allow several clients to request + // the same piece. In endgame mode, this only applies to + // clients that are currently uploading (more than 1.0KB/s). + if ((d->state == Endgame && client->uploadSpeed() < 1024) || d->state != WarmingUp) { + QMap<int, TorrentPiece *>::ConstIterator it = d->pendingPieces.constBegin(); + while (it != d->pendingPieces.constEnd()) { + if (it.value()->inProgress) + incompletePiecesAvailableToClient.clearBit(it.key()); + ++it; + } + } + + // Remove all pieces that the client cannot download. + incompletePiecesAvailableToClient &= client->availablePieces(); + + // Remove all pieces that this client has already requested. + foreach (int i, currentPieces) + incompletePiecesAvailableToClient.clearBit(i); + + // Only continue if more pieces can be scheduled. If no pieces + // are available and no blocks are in progress, just leave + // the connection idle; it might become interesting later. + if (incompletePiecesAvailableToClient.count(true) == 0) + return; + + // Check if any of the partially completed pieces can be + // recovered, and if so, pick a random one of them. + QList<TorrentPiece *> partialPieces; + QMap<int, TorrentPiece *>::ConstIterator it = d->pendingPieces.constBegin(); + while (it != d->pendingPieces.constEnd()) { + TorrentPiece *tmp = it.value(); + if (incompletePiecesAvailableToClient.testBit(it.key())) { + if (!tmp->inProgress || d->state == WarmingUp || d->state == Endgame) { + partialPieces << tmp; + break; + } + } + ++it; + } + if (!partialPieces.isEmpty()) + piece = partialPieces.value(qrand() % partialPieces.size()); + + if (!piece) { + // Pick a random piece 3 out of 4 times; otherwise, pick either + // one of the most common or the least common pieces available, + // depending on the state we're in. + int pieceIndex = 0; + if (d->state == WarmingUp || (qrand() & 4) == 0) { + int *occurrances = new int[d->pieceCount]; + memset(occurrances, 0, d->pieceCount * sizeof(int)); + + // Count how many of each piece are available. + foreach (PeerWireClient *peer, d->connections) { + QBitArray peerPieces = peer->availablePieces(); + int peerPiecesSize = peerPieces.size(); + for (int i = 0; i < peerPiecesSize; ++i) { + if (peerPieces.testBit(i)) + ++occurrances[i]; + } + } + + // Find the rarest or most common pieces. + int numOccurrances = d->state == WarmingUp ? 0 : 99999; + QList<int> piecesReadyForDownload; + for (int i = 0; i < d->pieceCount; ++i) { + if (d->state == WarmingUp) { + // Add common pieces + if (occurrances[i] >= numOccurrances + && incompletePiecesAvailableToClient.testBit(i)) { + if (occurrances[i] > numOccurrances) + piecesReadyForDownload.clear(); + piecesReadyForDownload.append(i); + numOccurrances = occurrances[i]; + } + } else { + // Add rare pieces + if (occurrances[i] <= numOccurrances + && incompletePiecesAvailableToClient.testBit(i)) { + if (occurrances[i] < numOccurrances) + piecesReadyForDownload.clear(); + piecesReadyForDownload.append(i); + numOccurrances = occurrances[i]; + } + } + } + + // Select one piece randomly + pieceIndex = piecesReadyForDownload.at(qrand() % piecesReadyForDownload.size()); + delete [] occurrances; + } else { + // Make up a list of available piece indices, and pick + // a random one. + QList<int> values; + int incompletePiecesAvailableToClientSize = incompletePiecesAvailableToClient.size(); + for (int i = 0; i < incompletePiecesAvailableToClientSize; ++i) { + if (incompletePiecesAvailableToClient.testBit(i)) + values << i; + } + pieceIndex = values.at(qrand() % values.size()); + } + + // Create a new TorrentPiece and fill in all initial + // properties. + piece = new TorrentPiece; + piece->index = pieceIndex; + piece->length = d->fileManager.pieceLengthAt(pieceIndex); + int numBlocks = piece->length / BlockSize; + if (piece->length % BlockSize) + ++numBlocks; + piece->completedBlocks.resize(numBlocks); + piece->requestedBlocks.resize(numBlocks); + d->pendingPieces.insert(pieceIndex, piece); + } + + piece->inProgress = true; + d->payloads.insert(client, piece); + } + + // Request more blocks from all pending pieces. + requestMore(client); +} + +void TorrentClient::requestMore(PeerWireClient *client) +{ + // Make a list of all pieces this client is currently waiting for, + // and count the number of blocks in progress. + QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.find(client); + int numBlocksInProgress = client->incomingBlocks().size(); + QList<TorrentPiece *> piecesInProgress; + while (it != d->payloads.end() && it.key() == client) { + TorrentPiece *piece = it.value(); + if (piece->inProgress || (d->state == WarmingUp || d->state == Endgame)) + piecesInProgress << piece; + ++it; + } + + // If no pieces are in progress, call the scheduler. + if (piecesInProgress.isEmpty() && d->incompletePieces.count(true)) { + d->callScheduler(); + return; + } + + // If too many pieces are in progress, there's nothing to do. + int maxInProgress = ((d->state == Endgame || d->state == WarmingUp) + ? MaxBlocksInMultiMode : MaxBlocksInProgress); + if (numBlocksInProgress == maxInProgress) + return; + + // Starting with the first piece that we're waiting for, request + // blocks until the quota is filled up. + foreach (TorrentPiece *piece, piecesInProgress) { + numBlocksInProgress += requestBlocks(client, piece, maxInProgress - numBlocksInProgress); + if (numBlocksInProgress == maxInProgress) + break; + } + + // If we still didn't fill up the quota, we need to schedule more + // pieces. + if (numBlocksInProgress < maxInProgress && d->state != WarmingUp) + d->callScheduler(); +} + +int TorrentClient::requestBlocks(PeerWireClient *client, TorrentPiece *piece, int maxBlocks) +{ + // Generate the list of incomplete blocks for this piece. + QVector<int> bits; + int completedBlocksSize = piece->completedBlocks.size(); + for (int i = 0; i < completedBlocksSize; ++i) { + if (!piece->completedBlocks.testBit(i) && !piece->requestedBlocks.testBit(i)) + bits << i; + } + + // Nothing more to request. + if (bits.size() == 0) { + if (d->state != WarmingUp && d->state != Endgame) + return 0; + bits.clear(); + for (int i = 0; i < completedBlocksSize; ++i) { + if (!piece->completedBlocks.testBit(i)) + bits << i; + } + } + + if (d->state == WarmingUp || d->state == Endgame) { + // By randomizing the list of blocks to request, we + // significantly speed up the warmup and endgame modes, where + // the same blocks are requested from multiple peers. The + // speedup comes from an increased chance of receiving + // different blocks from the different peers. + for (int i = 0; i < bits.size(); ++i) { + int a = qrand() % bits.size(); + int b = qrand() % bits.size(); + int tmp = bits[a]; + bits[a] = bits[b]; + bits[b] = tmp; + } + } + + // Request no more blocks than we've been asked to. + int blocksToRequest = qMin(maxBlocks, bits.size()); + + // Calculate the offset and size of each block, and send requests. + for (int i = 0; i < blocksToRequest; ++i) { + int blockSize = BlockSize; + if ((piece->length % BlockSize) && bits.at(i) == completedBlocksSize - 1) + blockSize = piece->length % BlockSize; + client->requestBlock(piece->index, bits.at(i) * BlockSize, blockSize); + piece->requestedBlocks.setBit(bits.at(i)); + } + + return blocksToRequest; +} + +void TorrentClient::peerChoked() +{ + PeerWireClient *client = qobject_cast<PeerWireClient *>(sender()); + if (!client) + return; + + // When the peer chokes us, we immediately forget about all blocks + // we've requested from it. We also remove the piece from out + // payload, making it available to other clients. + QMultiMap<PeerWireClient *, TorrentPiece *>::Iterator it = d->payloads.find(client); + while (it != d->payloads.end() && it.key() == client) { + it.value()->inProgress = false; + it.value()->requestedBlocks.fill(false); + it = d->payloads.erase(it); + } +} + +void TorrentClient::peerUnchoked() +{ + PeerWireClient *client = qobject_cast<PeerWireClient *>(sender()); + if (!client) + return; + + // We got unchoked, which means we can request more blocks. + if (d->state != Seeding) + d->callScheduler(); +} + +void TorrentClient::addToPeerList(const QList<TorrentPeer> &peerList) +{ + // Add peers we don't already know of to our list of peers. + QList<QHostAddress> addresses = QNetworkInterface::allAddresses(); + foreach (TorrentPeer peer, peerList) { + if (addresses.contains(peer.address) + && peer.port == TorrentServer::instance()->serverPort()) { + // Skip our own server. + continue; + } + + bool known = false; + foreach (TorrentPeer *knownPeer, d->peers) { + if (knownPeer->port == peer.port + && knownPeer->address == peer.address) { + known = true; + break; + } + } + if (!known) { + TorrentPeer *newPeer = new TorrentPeer; + *newPeer = peer; + newPeer->interesting = true; + newPeer->seed = false; + newPeer->lastVisited = 0; + newPeer->connectStart = 0; + newPeer->connectTime = 999999; + newPeer->pieces.resize(d->pieceCount); + newPeer->numCompletedPieces = 0; + d->peers << newPeer; + } + } + + // If we've got more peers than we can connect to, we remove some + // of the peers that have no (or low) activity. + int maxPeers = ConnectionManager::instance()->maxConnections() * 3; + if (d->peers.size() > maxPeers) { + // Find what peers are currently connected & active + QSet<TorrentPeer *> activePeers; + foreach (TorrentPeer *peer, d->peers) { + foreach (PeerWireClient *client, d->connections) { + if (client->peer() == peer && (client->downloadSpeed() + client->uploadSpeed()) > 1024) + activePeers << peer; + } + } + + // Remove inactive peers from the peer list until we're below + // the max connections count. + QList<int> toRemove; + for (int i = 0; i < d->peers.size() && (d->peers.size() - toRemove.size()) > maxPeers; ++i) { + if (!activePeers.contains(d->peers.at(i))) + toRemove << i; + } + QListIterator<int> toRemoveIterator(toRemove); + toRemoveIterator.toBack(); + while (toRemoveIterator.hasPrevious()) + d->peers.removeAt(toRemoveIterator.previous()); + + // If we still have too many peers, remove the oldest ones. + while (d->peers.size() > maxPeers) + d->peers.takeFirst(); + } + + if (d->state != Paused && d->state != Stopping && d->state != Idle) { + if (d->state == Searching || d->state == WarmingUp) + connectToPeers(); + else + d->callPeerConnector(); + } +} + +void TorrentClient::trackerStopped() +{ + d->setState(Idle); + emit stopped(); +} + +void TorrentClient::updateProgress(int progress) +{ + if (progress == -1 && d->pieceCount > 0) { + int newProgress = (d->completedPieces.count(true) * 100) / d->pieceCount; + if (d->lastProgressValue != newProgress) { + d->lastProgressValue = newProgress; + emit progressUpdated(newProgress); + } + } else if (d->lastProgressValue != progress) { + d->lastProgressValue = progress; + emit progressUpdated(progress); + } +} |