diff options
Diffstat (limited to 'test/src/test-curl-multi-api.cpp')
-rw-r--r-- | test/src/test-curl-multi-api.cpp | 633 |
1 files changed, 345 insertions, 288 deletions
diff --git a/test/src/test-curl-multi-api.cpp b/test/src/test-curl-multi-api.cpp index fac06dc..67735d2 100644 --- a/test/src/test-curl-multi-api.cpp +++ b/test/src/test-curl-multi-api.cpp @@ -17,108 +17,124 @@ class URL; class URLMonitor { public: - virtual void downloadStarted(const URL& url) {}; - virtual void downloadCompleted(const URL& url) {}; - virtual void downloadFailed(const URL& url, int errorCode) {}; - virtual void headerChunkReceived(const URL& url, const std::string& headerChunk) {}; - virtual void contentChunkReceived(const URL& url, const std::string& contentChunk) {}; + virtual void downloadStarted(const URL& url) {}; + virtual void downloadCompleted(const URL& url) {}; + virtual void downloadFailed(const URL& url, int errorCode) {}; + virtual void headerChunkReceived(const URL& url, const std::string& headerChunk) {}; + virtual void contentChunkReceived(const URL& url, const std::string& contentChunk) {}; }; class URLImpl : public boost::enable_shared_from_this<URLImpl> { public: - URLImpl(const std::string& url) : _handle(NULL), _uri(url), _isDownloaded(false) { - _handle = curl_easy_init(); - if (_handle != NULL) { - CURLcode curlError; - curlError = curl_easy_setopt(_handle, CURLOPT_URL, _uri.as_string().c_str()); - if (curlError != CURLE_OK) - LOG(ERROR) << "Cannot set url to " << _uri.as_string() << ": " << curl_easy_strerror(curlError); - - curlError = curl_easy_setopt(_handle, CURLOPT_WRITEDATA, this); - if (curlError != CURLE_OK) - LOG(ERROR) << "Cannot register this as write userdata: " << curl_easy_strerror(curlError); - - curlError = curl_easy_setopt(_handle, CURLOPT_WRITEFUNCTION, URLImpl::writeHandler); - if (curlError != CURLE_OK) - LOG(ERROR) << "Cannot set write callback: " << curl_easy_strerror(curlError); - - curlError = curl_easy_setopt(_handle, CURLOPT_HEADERFUNCTION, URLImpl::headerHandler); - if (curlError != CURLE_OK) - LOG(ERROR) << "Cannot request header from curl: " << curl_easy_strerror(curlError); - - curlError = curl_easy_setopt(_handle, CURLOPT_HEADERDATA, this); - if (curlError != CURLE_OK) - LOG(ERROR) << "Cannot register this as header userdata: " << curl_easy_strerror(curlError); - } else { - LOG(ERROR) << "curl_easy_init returned NULL, this is bad!"; - } - } - - ~URLImpl() { - if (_handle != NULL) - curl_easy_cleanup(_handle); - } - - static size_t writeHandler(void *ptr, size_t size, size_t nmemb, void *userdata) { - URLImpl* url = (URLImpl*)userdata; - url->_content.write((char*)ptr, size * nmemb); - return size * nmemb; - } - - static size_t headerHandler(void *ptr, size_t size, size_t nmemb, void *userdata) { - URLImpl* url = (URLImpl*)userdata; - url->_header.write((char*)ptr, size * nmemb); - return size * nmemb; - } - - void addMonitor(URLMonitor* monitor) { _monitors.insert(monitor); } - void removeMonitor(URLMonitor* monitor) { _monitors.erase(monitor); } - - const bool isAbsolute() const { return _uri.is_absolute(); } - const std::string scheme() const { return _uri.scheme(); } - const std::string host() const { return _uri.host(); } - const std::string port() const { return _uri.port(); } - const std::string path() const { return _uri.path(); } - const std::string asString() const { return _uri.as_string(); } - - void downloadStarted() { - std::cout << "Starting download of " << asString() << std::endl; - _content.str(""); - _content.clear(); - _header.str(""); - _header.clear(); - monIter_t monIter = _monitors.begin(); - while(monIter != _monitors.end()) { + URLImpl(const std::string& url) : _handle(NULL), _uri(url), _isDownloaded(false) { + _handle = curl_easy_init(); + if (_handle != NULL) { + CURLcode curlError; + curlError = curl_easy_setopt(_handle, CURLOPT_URL, _uri.as_string().c_str()); + if (curlError != CURLE_OK) + LOG(ERROR) << "Cannot set url to " << _uri.as_string() << ": " << curl_easy_strerror(curlError); + + curlError = curl_easy_setopt(_handle, CURLOPT_WRITEDATA, this); + if (curlError != CURLE_OK) + LOG(ERROR) << "Cannot register this as write userdata: " << curl_easy_strerror(curlError); + + curlError = curl_easy_setopt(_handle, CURLOPT_WRITEFUNCTION, URLImpl::writeHandler); + if (curlError != CURLE_OK) + LOG(ERROR) << "Cannot set write callback: " << curl_easy_strerror(curlError); + + curlError = curl_easy_setopt(_handle, CURLOPT_HEADERFUNCTION, URLImpl::headerHandler); + if (curlError != CURLE_OK) + LOG(ERROR) << "Cannot request header from curl: " << curl_easy_strerror(curlError); + + curlError = curl_easy_setopt(_handle, CURLOPT_HEADERDATA, this); + if (curlError != CURLE_OK) + LOG(ERROR) << "Cannot register this as header userdata: " << curl_easy_strerror(curlError); + } else { + LOG(ERROR) << "curl_easy_init returned NULL, this is bad!"; + } + } + + ~URLImpl() { + if (_handle != NULL) + curl_easy_cleanup(_handle); + } + + static size_t writeHandler(void *ptr, size_t size, size_t nmemb, void *userdata) { + URLImpl* url = (URLImpl*)userdata; + url->_content.write((char*)ptr, size * nmemb); + return size * nmemb; + } + + static size_t headerHandler(void *ptr, size_t size, size_t nmemb, void *userdata) { + URLImpl* url = (URLImpl*)userdata; + url->_header.write((char*)ptr, size * nmemb); + return size * nmemb; + } + + void addMonitor(URLMonitor* monitor) { + _monitors.insert(monitor); + } + void removeMonitor(URLMonitor* monitor) { + _monitors.erase(monitor); + } + + const bool isAbsolute() const { + return _uri.is_absolute(); + } + const std::string scheme() const { + return _uri.scheme(); + } + const std::string host() const { + return _uri.host(); + } + const std::string port() const { + return _uri.port(); + } + const std::string path() const { + return _uri.path(); + } + const std::string asString() const { + return _uri.as_string(); + } + + void downloadStarted() { + std::cout << "Starting download of " << asString() << std::endl; + _content.str(""); + _content.clear(); + _header.str(""); + _header.clear(); + monIter_t monIter = _monitors.begin(); + while(monIter != _monitors.end()) { // (*monIter)->downloadStarted(URL(shared_from_this())); - monIter++; - } - } - - void downloadCompleted() { - std::cout << "Finished loading " << asString() << " with " << _content.str().size() << " bytes" << std::endl; - _isDownloaded = true; - } - - void downloadFailed(int errorCode) { - std::cout << "FAILED!" << strerror(errorCode) << std::endl; - } - - std::string getHeader(bool forceReload = false) { - return _header.str(); - } - - std::string getContent(bool forceReload = false) { - return _content.str(); - } - - std::stringstream _content; - std::stringstream _header; - CURL* _handle; - Arabica::io::URI _uri; - bool _isDownloaded; - - std::set<URLMonitor*> _monitors; - typedef std::set<URLMonitor*>::iterator monIter_t; + monIter++; + } + } + + void downloadCompleted() { + std::cout << "Finished loading " << asString() << " with " << _content.str().size() << " bytes" << std::endl; + _isDownloaded = true; + } + + void downloadFailed(int errorCode) { + std::cout << "FAILED!" << strerror(errorCode) << std::endl; + } + + std::string getHeader(bool forceReload = false) { + return _header.str(); + } + + std::string getContent(bool forceReload = false) { + return _content.str(); + } + + std::stringstream _content; + std::stringstream _header; + CURL* _handle; + Arabica::io::URI _uri; + bool _isDownloaded; + + std::set<URLMonitor*> _monitors; + typedef std::set<URLMonitor*>::iterator monIter_t; }; class URL { @@ -128,10 +144,16 @@ public: URL(boost::shared_ptr<URLImpl> const impl) : _impl(impl) { } URL(const URL& other) : _impl(other._impl) { } virtual ~URL() {}; - - operator bool() const { return _impl; } - bool operator< (const URL& other) const { return _impl < other._impl; } - bool operator==(const URL& other) const { return _impl == other._impl; } + + operator bool() const { + return _impl; + } + bool operator< (const URL& other) const { + return _impl < other._impl; + } + bool operator==(const URL& other) const { + return _impl == other._impl; + } bool operator!=(const URL& other) const { return _impl != other._impl; } @@ -140,208 +162,243 @@ public: return *this; } - std::string getHeader() { return _impl->getHeader(); } - std::string getContent() { return _impl->getContent(); } - - const bool toAbsoluteCwd() { return _impl->toAbsoluteCwd(); } - const bool toAbsolute(const std::string& baseUrl) { return _impl->toAbsolute(baseUrl); } - const bool toAbsolute(const URL& baseUrl) { return _impl->toAbsolute(baseUrl.asString()); } - const std::string asLocalFile(const std::string& suffix, bool reload = false) { return _impl->asLocalFile(suffix, reload); } + std::string getHeader() { + return _impl->getHeader(); + } + std::string getContent() { + return _impl->getContent(); + } - void addMonitor(URLMonitor* monitor) { _impl->addMonitor(monitor); } - void removeMonitor(URLMonitor* monitor) { _impl->removeMonitor(monitor); } + const bool toAbsoluteCwd() { + return _impl->toAbsoluteCwd(); + } + const bool toAbsolute(const std::string& baseUrl) { + return _impl->toAbsolute(baseUrl); + } + const bool toAbsolute(const URL& baseUrl) { + return _impl->toAbsolute(baseUrl.asString()); + } + const std::string asLocalFile(const std::string& suffix, bool reload = false) { + return _impl->asLocalFile(suffix, reload); + } - const bool isAbsolute() const { return _impl->isAbsolute(); } - const std::string scheme() const { return _impl->scheme(); } - const std::string host() const { return _impl->host(); } - const std::string port() const { return _impl->port(); } - const std::string path() const { return _impl->path(); } - const std::string asString() const { return _impl->asString(); } + void addMonitor(URLMonitor* monitor) { + _impl->addMonitor(monitor); + } + void removeMonitor(URLMonitor* monitor) { + _impl->removeMonitor(monitor); + } - friend class URLFetcher; + const bool isAbsolute() const { + return _impl->isAbsolute(); + } + const std::string scheme() const { + return _impl->scheme(); + } + const std::string host() const { + return _impl->host(); + } + const std::string port() const { + return _impl->port(); + } + const std::string path() const { + return _impl->path(); + } + const std::string asString() const { + return _impl->asString(); + } + + friend class URLFetcher; friend std::ostream & operator<<(std::ostream &stream, const URL& p); protected: - void downloadStarted() { return _impl->downloadStarted(); } - void downloadCompleted() { return _impl->downloadCompleted(); } - void downloadFailed(int errorCode) { return _impl->downloadFailed(errorCode); } + void downloadStarted() { + return _impl->downloadStarted(); + } + void downloadCompleted() { + return _impl->downloadCompleted(); + } + void downloadFailed(int errorCode) { + return _impl->downloadFailed(errorCode); + } boost::shared_ptr<URLImpl> _impl; }; class URLFetcher { public: - URLFetcher() { - _multiHandle = curl_multi_init(); - start(); - } - - ~URLFetcher() { - curl_multi_cleanup(_multiHandle); - stop(); - } - - void fetchURL(URL& url) { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - url.downloadStarted(); - _handlesToURLs[url._impl->_handle] = url; - curl_multi_add_handle(_multiHandle, url._impl->_handle); - _condVar.notify_all(); - } - - void breakURL(URL& url) { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - if (_handlesToURLs.find(url._impl->_handle) != _handlesToURLs.end()) { - url.downloadFailed(0); - curl_multi_remove_handle(_multiHandle, url._impl->_handle); - _handlesToURLs.erase(url._impl->_handle); - } - } - - void start() { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - if (!_isStarted) { - _isStarted = true; - _thread = new tthread::thread(URLFetcher::run, this); - } - } - - void stop() { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - if (_isStarted) { - _isStarted = false; - _thread->join(); - delete _thread; - } - } - - static void run(void* instance) { - URLFetcher* THIS = (URLFetcher*)instance; - THIS->_mutex.lock(); - while(THIS->_isStarted) { - if(THIS->_handlesToURLs.size() > 0) { - THIS->_mutex.unlock(); - THIS->perform(); - THIS->_mutex.lock(); - } - THIS->_condVar.wait(THIS->_mutex); - } - THIS->_mutex.unlock(); - } - - void perform() { - - CURLMsg *msg; /* for picking up messages with the transfer status */ - int msgsLeft; /* how many messages are left */ - int stillRunning; - - { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - curl_multi_perform(_multiHandle, &stillRunning); - } - - do { - struct timeval timeout; - int rc; /* select() return code */ - - fd_set fdread, fdwrite, fdexcep; - FD_ZERO(&fdread); FD_ZERO(&fdwrite); FD_ZERO(&fdexcep); - - int maxfd = -1; - long curlTimeOut = -1; - - /* set a suitable timeout to play around with */ - timeout.tv_sec = 1; - timeout.tv_usec = 0; - - { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - curl_multi_timeout(_multiHandle, &curlTimeOut); - } - - if(curlTimeOut >= 0) { - timeout.tv_sec = curlTimeOut / 1000; - if(timeout.tv_sec > 1) - timeout.tv_sec = 1; - else - timeout.tv_usec = (curlTimeOut % 1000) * 1000; - } - - /* get file descriptors from the transfers */ - { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - curl_multi_fdset(_multiHandle, &fdread, &fdwrite, &fdexcep, &maxfd); - } - - rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); - - switch(rc) { - case -1: - /* select error */ - break; - case 0: /* timeout */ - default: /* action */ - { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - curl_multi_perform(_multiHandle, &stillRunning); - } - break; - } - - { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - while ((msg = curl_multi_info_read(_multiHandle, &msgsLeft))) { - if (msg->msg == CURLMSG_DONE) { - _handlesToURLs[msg->easy_handle].downloadCompleted(); - curl_multi_remove_handle(_multiHandle, msg->easy_handle); - _handlesToURLs.erase(msg->easy_handle); - } else { - switch (msg->data.result) { - case CURLM_OK: - break; - case CURLM_BAD_HANDLE: - case CURLM_BAD_EASY_HANDLE: - case CURLM_OUT_OF_MEMORY: - case CURLM_INTERNAL_ERROR: - case CURLM_BAD_SOCKET: - case CURLM_UNKNOWN_OPTION: - case CURLM_LAST: - _handlesToURLs[msg->easy_handle].downloadFailed(msg->data.result); - curl_multi_remove_handle(_multiHandle, msg->easy_handle); - _handlesToURLs.erase(msg->easy_handle); - default: - break; - } - } - } - } - } while(stillRunning && _isStarted); - - } - - tthread::condition_variable _condVar; - tthread::thread* _thread; + URLFetcher() { + _multiHandle = curl_multi_init(); + start(); + } + + ~URLFetcher() { + curl_multi_cleanup(_multiHandle); + stop(); + } + + void fetchURL(URL& url) { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + url.downloadStarted(); + _handlesToURLs[url._impl->_handle] = url; + curl_multi_add_handle(_multiHandle, url._impl->_handle); + _condVar.notify_all(); + } + + void breakURL(URL& url) { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (_handlesToURLs.find(url._impl->_handle) != _handlesToURLs.end()) { + url.downloadFailed(0); + curl_multi_remove_handle(_multiHandle, url._impl->_handle); + _handlesToURLs.erase(url._impl->_handle); + } + } + + void start() { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (!_isStarted) { + _isStarted = true; + _thread = new tthread::thread(URLFetcher::run, this); + } + } + + void stop() { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (_isStarted) { + _isStarted = false; + _thread->join(); + delete _thread; + } + } + + static void run(void* instance) { + URLFetcher* THIS = (URLFetcher*)instance; + THIS->_mutex.lock(); + while(THIS->_isStarted) { + if(THIS->_handlesToURLs.size() > 0) { + THIS->_mutex.unlock(); + THIS->perform(); + THIS->_mutex.lock(); + } + THIS->_condVar.wait(THIS->_mutex); + } + THIS->_mutex.unlock(); + } + + void perform() { + + CURLMsg *msg; /* for picking up messages with the transfer status */ + int msgsLeft; /* how many messages are left */ + int stillRunning; + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_perform(_multiHandle, &stillRunning); + } + + do { + struct timeval timeout; + int rc; /* select() return code */ + + fd_set fdread, fdwrite, fdexcep; + FD_ZERO(&fdread); + FD_ZERO(&fdwrite); + FD_ZERO(&fdexcep); + + int maxfd = -1; + long curlTimeOut = -1; + + /* set a suitable timeout to play around with */ + timeout.tv_sec = 1; + timeout.tv_usec = 0; + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_timeout(_multiHandle, &curlTimeOut); + } + + if(curlTimeOut >= 0) { + timeout.tv_sec = curlTimeOut / 1000; + if(timeout.tv_sec > 1) + timeout.tv_sec = 1; + else + timeout.tv_usec = (curlTimeOut % 1000) * 1000; + } + + /* get file descriptors from the transfers */ + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_fdset(_multiHandle, &fdread, &fdwrite, &fdexcep, &maxfd); + } + + rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); + + switch(rc) { + case -1: + /* select error */ + break; + case 0: /* timeout */ + default: { /* action */ + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_perform(_multiHandle, &stillRunning); + } + break; + } + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + while ((msg = curl_multi_info_read(_multiHandle, &msgsLeft))) { + if (msg->msg == CURLMSG_DONE) { + _handlesToURLs[msg->easy_handle].downloadCompleted(); + curl_multi_remove_handle(_multiHandle, msg->easy_handle); + _handlesToURLs.erase(msg->easy_handle); + } else { + switch (msg->data.result) { + case CURLM_OK: + break; + case CURLM_BAD_HANDLE: + case CURLM_BAD_EASY_HANDLE: + case CURLM_OUT_OF_MEMORY: + case CURLM_INTERNAL_ERROR: + case CURLM_BAD_SOCKET: + case CURLM_UNKNOWN_OPTION: + case CURLM_LAST: + _handlesToURLs[msg->easy_handle].downloadFailed(msg->data.result); + curl_multi_remove_handle(_multiHandle, msg->easy_handle); + _handlesToURLs.erase(msg->easy_handle); + default: + break; + } + } + } + } + } while(stillRunning && _isStarted); + + } + + tthread::condition_variable _condVar; + tthread::thread* _thread; tthread::recursive_mutex _mutex; - bool _isStarted; - - std::map<CURL*, URL> _handlesToURLs; - CURLM* _multiHandle; + bool _isStarted; + + std::map<CURL*, URL> _handlesToURLs; + CURLM* _multiHandle; }; int main(int argc, char** argv) { - URLFetcher fetcher; - URL heise("http://www.heise.de"); - URL localFile("file:///Users/sradomski/Desktop/scxml.xsd"); - URL slashdot("http://slashdot.org"); - URL asdf("daf://localhost:234"); - URL bahn("http://www.bahn.de"); - - fetcher.fetchURL(heise); - fetcher.fetchURL(localFile); - fetcher.fetchURL(asdf); - fetcher.fetchURL(slashdot); - fetcher.fetchURL(bahn); - - while(1) {} + URLFetcher fetcher; + URL heise("http://www.heise.de"); + URL localFile("file:///Users/sradomski/Desktop/scxml.xsd"); + URL slashdot("http://slashdot.org"); + URL asdf("daf://localhost:234"); + URL bahn("http://www.bahn.de"); + + fetcher.fetchURL(heise); + fetcher.fetchURL(localFile); + fetcher.fetchURL(asdf); + fetcher.fetchURL(slashdot); + fetcher.fetchURL(bahn); + + while(1) {} }
\ No newline at end of file |