diff options
Diffstat (limited to 'test/src/test-curl-multi-api.cpp')
-rw-r--r-- | test/src/test-curl-multi-api.cpp | 347 |
1 files changed, 347 insertions, 0 deletions
diff --git a/test/src/test-curl-multi-api.cpp b/test/src/test-curl-multi-api.cpp new file mode 100644 index 0000000..fac06dc --- /dev/null +++ b/test/src/test-curl-multi-api.cpp @@ -0,0 +1,347 @@ +#include <curl/curl.h> +#include <glog/logging.h> +#include <string> +#include <iostream> +#include <sstream> +#include <map> +#include <set> +#include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> + +#include "uscxml/concurrency/tinythread.h" + +// use arabica URL parser +#include <io/uri.hpp> + +class URL; + +class URLMonitor { +public: + virtual void downloadStarted(const URL& url) {}; + virtual void downloadCompleted(const URL& url) {}; + virtual void downloadFailed(const URL& url, int errorCode) {}; + virtual void headerChunkReceived(const URL& url, const std::string& headerChunk) {}; + virtual void contentChunkReceived(const URL& url, const std::string& contentChunk) {}; +}; + +class URLImpl : public boost::enable_shared_from_this<URLImpl> { +public: + URLImpl(const std::string& url) : _handle(NULL), _uri(url), _isDownloaded(false) { + _handle = curl_easy_init(); + if (_handle != NULL) { + CURLcode curlError; + curlError = curl_easy_setopt(_handle, CURLOPT_URL, _uri.as_string().c_str()); + if (curlError != CURLE_OK) + LOG(ERROR) << "Cannot set url to " << _uri.as_string() << ": " << curl_easy_strerror(curlError); + + curlError = curl_easy_setopt(_handle, CURLOPT_WRITEDATA, this); + if (curlError != CURLE_OK) + LOG(ERROR) << "Cannot register this as write userdata: " << curl_easy_strerror(curlError); + + curlError = curl_easy_setopt(_handle, CURLOPT_WRITEFUNCTION, URLImpl::writeHandler); + if (curlError != CURLE_OK) + LOG(ERROR) << "Cannot set write callback: " << curl_easy_strerror(curlError); + + curlError = curl_easy_setopt(_handle, CURLOPT_HEADERFUNCTION, URLImpl::headerHandler); + if (curlError != CURLE_OK) + LOG(ERROR) << "Cannot request header from curl: " << curl_easy_strerror(curlError); + + curlError = curl_easy_setopt(_handle, CURLOPT_HEADERDATA, this); + if (curlError != CURLE_OK) + LOG(ERROR) << "Cannot register this as header userdata: " << curl_easy_strerror(curlError); + } else { + LOG(ERROR) << "curl_easy_init returned NULL, this is bad!"; + } + } + + ~URLImpl() { + if (_handle != NULL) + curl_easy_cleanup(_handle); + } + + static size_t writeHandler(void *ptr, size_t size, size_t nmemb, void *userdata) { + URLImpl* url = (URLImpl*)userdata; + url->_content.write((char*)ptr, size * nmemb); + return size * nmemb; + } + + static size_t headerHandler(void *ptr, size_t size, size_t nmemb, void *userdata) { + URLImpl* url = (URLImpl*)userdata; + url->_header.write((char*)ptr, size * nmemb); + return size * nmemb; + } + + void addMonitor(URLMonitor* monitor) { _monitors.insert(monitor); } + void removeMonitor(URLMonitor* monitor) { _monitors.erase(monitor); } + + const bool isAbsolute() const { return _uri.is_absolute(); } + const std::string scheme() const { return _uri.scheme(); } + const std::string host() const { return _uri.host(); } + const std::string port() const { return _uri.port(); } + const std::string path() const { return _uri.path(); } + const std::string asString() const { return _uri.as_string(); } + + void downloadStarted() { + std::cout << "Starting download of " << asString() << std::endl; + _content.str(""); + _content.clear(); + _header.str(""); + _header.clear(); + monIter_t monIter = _monitors.begin(); + while(monIter != _monitors.end()) { +// (*monIter)->downloadStarted(URL(shared_from_this())); + monIter++; + } + } + + void downloadCompleted() { + std::cout << "Finished loading " << asString() << " with " << _content.str().size() << " bytes" << std::endl; + _isDownloaded = true; + } + + void downloadFailed(int errorCode) { + std::cout << "FAILED!" << strerror(errorCode) << std::endl; + } + + std::string getHeader(bool forceReload = false) { + return _header.str(); + } + + std::string getContent(bool forceReload = false) { + return _content.str(); + } + + std::stringstream _content; + std::stringstream _header; + CURL* _handle; + Arabica::io::URI _uri; + bool _isDownloaded; + + std::set<URLMonitor*> _monitors; + typedef std::set<URLMonitor*>::iterator monIter_t; +}; + +class URL { +public: + URL() : _impl() {} + URL(const std::string url) : _impl(new URLImpl(url)) {} + URL(boost::shared_ptr<URLImpl> const impl) : _impl(impl) { } + URL(const URL& other) : _impl(other._impl) { } + virtual ~URL() {}; + + operator bool() const { return _impl; } + bool operator< (const URL& other) const { return _impl < other._impl; } + bool operator==(const URL& other) const { return _impl == other._impl; } + bool operator!=(const URL& other) const { + return _impl != other._impl; + } + URL& operator= (const URL& other) { + _impl = other._impl; + return *this; + } + + std::string getHeader() { return _impl->getHeader(); } + std::string getContent() { return _impl->getContent(); } + + const bool toAbsoluteCwd() { return _impl->toAbsoluteCwd(); } + const bool toAbsolute(const std::string& baseUrl) { return _impl->toAbsolute(baseUrl); } + const bool toAbsolute(const URL& baseUrl) { return _impl->toAbsolute(baseUrl.asString()); } + const std::string asLocalFile(const std::string& suffix, bool reload = false) { return _impl->asLocalFile(suffix, reload); } + + void addMonitor(URLMonitor* monitor) { _impl->addMonitor(monitor); } + void removeMonitor(URLMonitor* monitor) { _impl->removeMonitor(monitor); } + + const bool isAbsolute() const { return _impl->isAbsolute(); } + const std::string scheme() const { return _impl->scheme(); } + const std::string host() const { return _impl->host(); } + const std::string port() const { return _impl->port(); } + const std::string path() const { return _impl->path(); } + const std::string asString() const { return _impl->asString(); } + + friend class URLFetcher; + friend std::ostream & operator<<(std::ostream &stream, const URL& p); + +protected: + void downloadStarted() { return _impl->downloadStarted(); } + void downloadCompleted() { return _impl->downloadCompleted(); } + void downloadFailed(int errorCode) { return _impl->downloadFailed(errorCode); } + + boost::shared_ptr<URLImpl> _impl; +}; + +class URLFetcher { +public: + URLFetcher() { + _multiHandle = curl_multi_init(); + start(); + } + + ~URLFetcher() { + curl_multi_cleanup(_multiHandle); + stop(); + } + + void fetchURL(URL& url) { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + url.downloadStarted(); + _handlesToURLs[url._impl->_handle] = url; + curl_multi_add_handle(_multiHandle, url._impl->_handle); + _condVar.notify_all(); + } + + void breakURL(URL& url) { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (_handlesToURLs.find(url._impl->_handle) != _handlesToURLs.end()) { + url.downloadFailed(0); + curl_multi_remove_handle(_multiHandle, url._impl->_handle); + _handlesToURLs.erase(url._impl->_handle); + } + } + + void start() { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (!_isStarted) { + _isStarted = true; + _thread = new tthread::thread(URLFetcher::run, this); + } + } + + void stop() { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (_isStarted) { + _isStarted = false; + _thread->join(); + delete _thread; + } + } + + static void run(void* instance) { + URLFetcher* THIS = (URLFetcher*)instance; + THIS->_mutex.lock(); + while(THIS->_isStarted) { + if(THIS->_handlesToURLs.size() > 0) { + THIS->_mutex.unlock(); + THIS->perform(); + THIS->_mutex.lock(); + } + THIS->_condVar.wait(THIS->_mutex); + } + THIS->_mutex.unlock(); + } + + void perform() { + + CURLMsg *msg; /* for picking up messages with the transfer status */ + int msgsLeft; /* how many messages are left */ + int stillRunning; + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_perform(_multiHandle, &stillRunning); + } + + do { + struct timeval timeout; + int rc; /* select() return code */ + + fd_set fdread, fdwrite, fdexcep; + FD_ZERO(&fdread); FD_ZERO(&fdwrite); FD_ZERO(&fdexcep); + + int maxfd = -1; + long curlTimeOut = -1; + + /* set a suitable timeout to play around with */ + timeout.tv_sec = 1; + timeout.tv_usec = 0; + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_timeout(_multiHandle, &curlTimeOut); + } + + if(curlTimeOut >= 0) { + timeout.tv_sec = curlTimeOut / 1000; + if(timeout.tv_sec > 1) + timeout.tv_sec = 1; + else + timeout.tv_usec = (curlTimeOut % 1000) * 1000; + } + + /* get file descriptors from the transfers */ + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_fdset(_multiHandle, &fdread, &fdwrite, &fdexcep, &maxfd); + } + + rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); + + switch(rc) { + case -1: + /* select error */ + break; + case 0: /* timeout */ + default: /* action */ + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_perform(_multiHandle, &stillRunning); + } + break; + } + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + while ((msg = curl_multi_info_read(_multiHandle, &msgsLeft))) { + if (msg->msg == CURLMSG_DONE) { + _handlesToURLs[msg->easy_handle].downloadCompleted(); + curl_multi_remove_handle(_multiHandle, msg->easy_handle); + _handlesToURLs.erase(msg->easy_handle); + } else { + switch (msg->data.result) { + case CURLM_OK: + break; + case CURLM_BAD_HANDLE: + case CURLM_BAD_EASY_HANDLE: + case CURLM_OUT_OF_MEMORY: + case CURLM_INTERNAL_ERROR: + case CURLM_BAD_SOCKET: + case CURLM_UNKNOWN_OPTION: + case CURLM_LAST: + _handlesToURLs[msg->easy_handle].downloadFailed(msg->data.result); + curl_multi_remove_handle(_multiHandle, msg->easy_handle); + _handlesToURLs.erase(msg->easy_handle); + default: + break; + } + } + } + } + } while(stillRunning && _isStarted); + + } + + tthread::condition_variable _condVar; + tthread::thread* _thread; + tthread::recursive_mutex _mutex; + bool _isStarted; + + std::map<CURL*, URL> _handlesToURLs; + CURLM* _multiHandle; +}; + + +int main(int argc, char** argv) { + URLFetcher fetcher; + URL heise("http://www.heise.de"); + URL localFile("file:///Users/sradomski/Desktop/scxml.xsd"); + URL slashdot("http://slashdot.org"); + URL asdf("daf://localhost:234"); + URL bahn("http://www.bahn.de"); + + fetcher.fetchURL(heise); + fetcher.fetchURL(localFile); + fetcher.fetchURL(asdf); + fetcher.fetchURL(slashdot); + fetcher.fetchURL(bahn); + + while(1) {} +}
\ No newline at end of file |