diff options
Diffstat (limited to 'src/uscxml/URL.cpp')
-rw-r--r-- | src/uscxml/URL.cpp | 785 |
1 files changed, 393 insertions, 392 deletions
diff --git a/src/uscxml/URL.cpp b/src/uscxml/URL.cpp index 8656cd2..024bb0c 100644 --- a/src/uscxml/URL.cpp +++ b/src/uscxml/URL.cpp @@ -26,485 +26,486 @@ namespace uscxml { URLImpl::URLImpl(const std::string& url) : _handle(NULL), _uri(url), _isDownloaded(false), _hasFailed(false) { - _handle = curl_easy_init(); - if (_handle == NULL) { - LOG(ERROR) << "curl_easy_init returned NULL, this is bad!"; - } + _handle = curl_easy_init(); + if (_handle == NULL) { + LOG(ERROR) << "curl_easy_init returned NULL, this is bad!"; + } } - + URLImpl::~URLImpl() { - if (_handle != NULL) - curl_easy_cleanup(_handle); + if (_handle != NULL) + curl_easy_cleanup(_handle); } - + size_t URLImpl::writeHandler(void *ptr, size_t size, size_t nmemb, void *userdata) { - URLImpl* url = (URLImpl*)userdata; - url->_inContent.write((char*)ptr, size * nmemb); + URLImpl* url = (URLImpl*)userdata; + url->_inContent.write((char*)ptr, size * nmemb); - monIter_t monIter = url->_monitors.begin(); - while(monIter != url->_monitors.end()) { - (*monIter)->contentChunkReceived(URL(url->shared_from_this()), std::string((char*)ptr, size * nmemb)); - monIter++; - } + monIter_t monIter = url->_monitors.begin(); + while(monIter != url->_monitors.end()) { + (*monIter)->contentChunkReceived(URL(url->shared_from_this()), std::string((char*)ptr, size * nmemb)); + monIter++; + } - return size * nmemb; + return size * nmemb; } size_t URLImpl::headerHandler(void *ptr, size_t size, size_t nmemb, void *userdata) { - URLImpl* url = (URLImpl*)userdata; - url->_inHeader.write((char*)ptr, size * nmemb); + URLImpl* url = (URLImpl*)userdata; + url->_inHeader.write((char*)ptr, size * nmemb); - monIter_t monIter = url->_monitors.begin(); - while(monIter != url->_monitors.end()) { - (*monIter)->headerChunkReceived(URL(url->shared_from_this()), std::string((char*)ptr, size * nmemb)); - monIter++; - } + monIter_t monIter = url->_monitors.begin(); + while(monIter != url->_monitors.end()) { + (*monIter)->headerChunkReceived(URL(url->shared_from_this()), std::string((char*)ptr, size * nmemb)); + monIter++; + } - return size * nmemb; + return size * nmemb; } void URLImpl::downloadStarted() { - LOG(INFO) << "Starting download of " << asString() << std::endl; - _inContent.str(""); - _inContent.clear(); - _inHeader.str(""); - _inHeader.clear(); - - monIter_t monIter = _monitors.begin(); - while(monIter != _monitors.end()) { - (*monIter)->downloadStarted(URL(shared_from_this())); - monIter++; - } + LOG(INFO) << "Starting download of " << asString() << std::endl; + _inContent.str(""); + _inContent.clear(); + _inHeader.str(""); + _inHeader.clear(); + + monIter_t monIter = _monitors.begin(); + while(monIter != _monitors.end()) { + (*monIter)->downloadStarted(URL(shared_from_this())); + monIter++; + } } void URLImpl::downloadCompleted() { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - LOG(INFO) << "Finished downloading " << asString() << " with " << _inContent.str().size() << " bytes"; + LOG(INFO) << "Finished downloading " << asString() << " with " << _inContent.str().size() << " bytes"; - _hasFailed = false; - _isDownloaded = true; - _condVar.notify_all(); + _hasFailed = false; + _isDownloaded = true; + _condVar.notify_all(); - monIter_t monIter = _monitors.begin(); - while(monIter != _monitors.end()) { - (*monIter)->downloadCompleted(URL(shared_from_this())); - monIter++; - } + monIter_t monIter = _monitors.begin(); + while(monIter != _monitors.end()) { + (*monIter)->downloadCompleted(URL(shared_from_this())); + monIter++; + } } void URLImpl::downloadFailed(int errorCode) { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - - LOG(ERROR) << "Downloading " << asString() << " failed: " << strerror(errorCode); - - _hasFailed = true; - _isDownloaded = false; - _condVar.notify_all(); - - monIter_t monIter = _monitors.begin(); - while(monIter != _monitors.end()) { - (*monIter)->downloadFailed(URL(shared_from_this()), errorCode); - monIter++; - } + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + + LOG(ERROR) << "Downloading " << asString() << " failed: " << strerror(errorCode); + + _hasFailed = true; + _isDownloaded = false; + _condVar.notify_all(); + + monIter_t monIter = _monitors.begin(); + while(monIter != _monitors.end()) { + (*monIter)->downloadFailed(URL(shared_from_this()), errorCode); + monIter++; + } } - + const std::map<std::string, std::string> URLImpl::getInHeaderFields() { - if (!_isDownloaded) { - download(true); - } - - std::map<std::string, std::string> headerFields; - std::string line; - while (std::getline(_inHeader, line)) { - size_t colon = line.find_first_of(":"); - size_t newline = line.find_first_of("\r\n"); - if (newline == std::string::npos) - newline = line.size(); - - if (colon == std::string::npos) { - if (headerFields.size() == 0) { - // put http status in a key that can never occur otherwise - headerFields["status:"] = line.substr(0, newline); - } else { - headerFields[line.substr(0, newline)] = line.substr(0, newline); // this should never happen - } - } else { - std::string key = line.substr(0, colon); - size_t firstChar = line.find_first_not_of(": ", colon, 2); - if (firstChar == std::string::npos) { - // nothing but spaces? - headerFields[line.substr(0, newline)] = ""; - } else { - std::string value = line.substr(firstChar, newline - firstChar); - headerFields[key] = value; - } - } - } - - return headerFields; + if (!_isDownloaded) { + download(true); + } + + std::map<std::string, std::string> headerFields; + std::string line; + while (std::getline(_inHeader, line)) { + size_t colon = line.find_first_of(":"); + size_t newline = line.find_first_of("\r\n"); + if (newline == std::string::npos) + newline = line.size(); + + if (colon == std::string::npos) { + if (headerFields.size() == 0) { + // put http status in a key that can never occur otherwise + headerFields["status:"] = line.substr(0, newline); + } else { + headerFields[line.substr(0, newline)] = line.substr(0, newline); // this should never happen + } + } else { + std::string key = line.substr(0, colon); + size_t firstChar = line.find_first_not_of(": ", colon, 2); + if (firstChar == std::string::npos) { + // nothing but spaces? + headerFields[line.substr(0, newline)] = ""; + } else { + std::string value = line.substr(firstChar, newline - firstChar); + headerFields[key] = value; + } + } + } + + return headerFields; } void URLImpl::setRequestType(const std::string& requestType) { - _requestType = requestType; + _requestType = requestType; } void URLImpl::setOutContent(const std::string& content) { - _outContent = content; + _outContent = content; } const std::string URLImpl::getInContent(bool forceReload) { - if (!_isDownloaded) { - download(true); - } - return _inContent.str(); + if (!_isDownloaded) { + download(true); + } + return _inContent.str(); } const void URLImpl::download(bool blocking) { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - - if (_isDownloaded) - return; - - URL url(shared_from_this()); - URLFetcher::fetchURL(url); - - if (blocking) { - while(!_isDownloaded && !_hasFailed) { - _condVar.wait(_mutex); // wait for notification - } - } + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + + if (_isDownloaded) + return; + + URL url(shared_from_this()); + URLFetcher::fetchURL(url); + + if (blocking) { + while(!_isDownloaded && !_hasFailed) { + _condVar.wait(_mutex); // wait for notification + } + } } const bool URLImpl::toAbsoluteCwd() { - char currPath[FILENAME_MAX]; - if (!getcwd(currPath, sizeof(currPath))) { - return false; - } - currPath[sizeof(currPath) - 1] = '\0'; /* not really required */ - return toAbsolute(std::string("file://" + std::string(currPath) + "/")); + char currPath[FILENAME_MAX]; + if (!getcwd(currPath, sizeof(currPath))) { + return false; + } + currPath[sizeof(currPath) - 1] = '\0'; /* not really required */ + return toAbsolute(std::string("file://" + std::string(currPath) + "/")); } std::string URLImpl::getLocalFilename(const std::string& suffix) { - if (_localFile.length() > 0) - return _localFile; - - if (_uri.scheme().compare("file") == 0) - return _uri.path(); - - // try hard to find a temporary directory - const char* tmpDir = NULL; - if (tmpDir == NULL) - tmpDir = getenv("TMPDIR"); - if (tmpDir == NULL) - tmpDir = getenv("TMP"); - if (tmpDir == NULL) - tmpDir = getenv("TEMP"); - if (tmpDir == NULL) - tmpDir = getenv("USERPROFILE"); - if (tmpDir == NULL) - tmpDir = "/tmp/"; - - char* tmpl = (char*)malloc(strlen(tmpDir) + 11 + suffix.length()); - char* writePtr = tmpl; - memcpy(writePtr, tmpDir, strlen(tmpDir)); - writePtr += strlen(tmpDir); - memcpy(writePtr, "scxmlXXXXXX", 11); - writePtr += 11; - memcpy(writePtr, suffix.c_str(), suffix.length()); - writePtr += suffix.length(); - tmpl[writePtr - tmpl] = 0; - + if (_localFile.length() > 0) + return _localFile; + + if (_uri.scheme().compare("file") == 0) + return _uri.path(); + + // try hard to find a temporary directory + const char* tmpDir = NULL; + if (tmpDir == NULL) + tmpDir = getenv("TMPDIR"); + if (tmpDir == NULL) + tmpDir = getenv("TMP"); + if (tmpDir == NULL) + tmpDir = getenv("TEMP"); + if (tmpDir == NULL) + tmpDir = getenv("USERPROFILE"); + if (tmpDir == NULL) + tmpDir = "/tmp/"; + + char* tmpl = (char*)malloc(strlen(tmpDir) + 11 + suffix.length()); + char* writePtr = tmpl; + memcpy(writePtr, tmpDir, strlen(tmpDir)); + writePtr += strlen(tmpDir); + memcpy(writePtr, "scxmlXXXXXX", 11); + writePtr += 11; + memcpy(writePtr, suffix.c_str(), suffix.length()); + writePtr += suffix.length(); + tmpl[writePtr - tmpl] = 0; + #ifdef _WIN32 - _mktemp_s(tmpl, strlen(tmpl) + 1); - int fd = _open(tmpl, _O_CREAT, _S_IREAD | _S_IWRITE); + _mktemp_s(tmpl, strlen(tmpl) + 1); + int fd = _open(tmpl, _O_CREAT, _S_IREAD | _S_IWRITE); #else - int fd = mkstemps(tmpl, suffix.length()); + int fd = mkstemps(tmpl, suffix.length()); #endif - if (fd < 0) { - LOG(ERROR) << "mkstemp " << tmpl << ": " << strerror(errno) << std::endl; - return ""; - } + if (fd < 0) { + LOG(ERROR) << "mkstemp " << tmpl << ": " << strerror(errno) << std::endl; + return ""; + } #ifdef WIN32 - _close(fd); + _close(fd); #else - close(fd); + close(fd); #endif - return std::string(tmpl); + return std::string(tmpl); } const bool URLImpl::toAbsolute(const std::string& baseUrl) { - if (_uri.is_absolute()) - return true; - _uri = Arabica::io::URI(baseUrl, _uri.as_string()); - if (!_uri.is_absolute()) - return false; - return true; + if (_uri.is_absolute()) + return true; + _uri = Arabica::io::URI(baseUrl, _uri.as_string()); + if (!_uri.is_absolute()) + return false; + return true; } boost::shared_ptr<URLImpl> URLImpl::toLocalFile(const std::string& content, const std::string& suffix) { - boost::shared_ptr<URLImpl> urlImpl = boost::shared_ptr<URLImpl>(new URLImpl()); - urlImpl->_localFile = urlImpl->getLocalFilename(suffix); - urlImpl->_uri = std::string("file://") + urlImpl->_localFile; - - std::ofstream file(urlImpl->_localFile.c_str(), std::ios_base::out); - if(file.is_open()) { - file << content; - file.close(); - } else { - return boost::shared_ptr<URLImpl>(); - } - - return urlImpl; + boost::shared_ptr<URLImpl> urlImpl = boost::shared_ptr<URLImpl>(new URLImpl()); + urlImpl->_localFile = urlImpl->getLocalFilename(suffix); + urlImpl->_uri = std::string("file://") + urlImpl->_localFile; + + std::ofstream file(urlImpl->_localFile.c_str(), std::ios_base::out); + if(file.is_open()) { + file << content; + file.close(); + } else { + return boost::shared_ptr<URLImpl>(); + } + + return urlImpl; } - + const std::string URLImpl::asLocalFile(const std::string& suffix, bool reload) { - // this is already a local file - if (_uri.scheme().compare("file") == 0) - return _uri.path(); - - if (_localFile.length() > 0 && !reload) - return _localFile; - - if (_localFile.length() > 0) - remove(_localFile.c_str()); - - _localFile = getLocalFilename(suffix); - - std::ofstream file(_localFile.c_str(), std::ios_base::out); - if(file.is_open()) { - file << URL(this->shared_from_this()); - file.close(); - } else { - _localFile = ""; - } - - return _localFile; + // this is already a local file + if (_uri.scheme().compare("file") == 0) + return _uri.path(); + + if (_localFile.length() > 0 && !reload) + return _localFile; + + if (_localFile.length() > 0) + remove(_localFile.c_str()); + + _localFile = getLocalFilename(suffix); + + std::ofstream file(_localFile.c_str(), std::ios_base::out); + if(file.is_open()) { + file << URL(this->shared_from_this()); + file.close(); + } else { + _localFile = ""; + } + + return _localFile; } std::ostream & operator<<(std::ostream & stream, const URL& url) { - URL nonConstUrl = url; // this is a hack - stream << nonConstUrl.getInContent(); - return stream; + URL nonConstUrl = url; // this is a hack + stream << nonConstUrl.getInContent(); + return stream; } URLFetcher::URLFetcher() { - _isStarted = false; - _multiHandle = curl_multi_init(); - start(); + _isStarted = false; + _multiHandle = curl_multi_init(); + start(); } - + URLFetcher::~URLFetcher() { - stop(); - curl_multi_cleanup(_multiHandle); + stop(); + curl_multi_cleanup(_multiHandle); } - + void URLFetcher::fetchURL(URL& url) { - URLFetcher* instance = getInstance(); - tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); - - assert(url._impl->_handle != NULL); - if (url._impl->_handle == NULL) - return; - - if (instance->_handlesToURLs.find(url._impl->_handle) == instance->_handlesToURLs.end()) { - CURLcode curlError; - CURL* handle = url._impl->_handle; - - (curlError = curl_easy_setopt(handle, CURLOPT_URL, url.asString().c_str())) == CURLE_OK || - LOG(ERROR) << "Cannot set url to " << url.asString() << ": " << curl_easy_strerror(curlError); - - (curlError = curl_easy_setopt(handle, CURLOPT_WRITEDATA, url._impl.get())) == CURLE_OK || - LOG(ERROR) << "Cannot register this as write userdata: " << curl_easy_strerror(curlError); - - (curlError = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, URLImpl::writeHandler)) == CURLE_OK || - LOG(ERROR) << "Cannot set write callback: " << curl_easy_strerror(curlError); - - (curlError = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, URLImpl::headerHandler)) == CURLE_OK || - LOG(ERROR) << "Cannot request header from curl: " << curl_easy_strerror(curlError); - - (curlError = curl_easy_setopt(handle, CURLOPT_HEADERDATA, url._impl.get())) == CURLE_OK || - LOG(ERROR) << "Cannot register this as header userdata: " << curl_easy_strerror(curlError); - - - if (boost::iequals(url._impl->_requestType, "post")) { - - (curlError = curl_easy_setopt(handle, CURLOPT_POST, 1)) == CURLE_OK || - LOG(ERROR) << "Cannot set request type to post for " << url.asString() << ": " << curl_easy_strerror(curlError); - - (curlError = curl_easy_setopt(handle, CURLOPT_COPYPOSTFIELDS, url._impl->_outContent.c_str())) == CURLE_OK || - LOG(ERROR) << "Cannot set post data " << url.asString() << ": " << curl_easy_strerror(curlError); - - struct curl_slist* headers = NULL; - std::map<std::string, std::string>::iterator paramIter = url._impl->_outHeader.begin(); - while(paramIter != url._impl->_outHeader.end()) { - char* key = curl_easy_escape(handle, paramIter->first.c_str(), paramIter->first.length()); - char* value = curl_easy_escape(handle, paramIter->second.c_str(), paramIter->second.length()); - - char* header = (char*)malloc(paramIter->first.size() + strlen(value) + 3); - sprintf(header,"%s: %s", paramIter->first.c_str(), value); - headers = curl_slist_append(headers, header); - - curl_free(key); - curl_free(value); - paramIter++; - } - (curlError = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers)) == CURLE_OK || - LOG(ERROR) << "Cannot headers for " << url.asString() << ": " << curl_easy_strerror(curlError); - - //curl_slist_free_all(headers); - - - } else if (boost::iequals(url._impl->_requestType, "get")) { - (curlError = curl_easy_setopt(handle, CURLOPT_HTTPGET, 1)) == CURLE_OK || - LOG(ERROR) << "Cannot set request type to get for " << url.asString() << ": " << curl_easy_strerror(curlError); - } - - url.downloadStarted(); - instance->_handlesToURLs[handle] = url; - assert(instance->_handlesToURLs.size() > 0); - - curl_multi_add_handle(instance->_multiHandle, handle); - instance->_condVar.notify_all(); - } + URLFetcher* instance = getInstance(); + tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); + + assert(url._impl->_handle != NULL); + if (url._impl->_handle == NULL) + return; + + if (instance->_handlesToURLs.find(url._impl->_handle) == instance->_handlesToURLs.end()) { + CURLcode curlError; + CURL* handle = url._impl->_handle; + + (curlError = curl_easy_setopt(handle, CURLOPT_URL, url.asString().c_str())) == CURLE_OK || + LOG(ERROR) << "Cannot set url to " << url.asString() << ": " << curl_easy_strerror(curlError); + + (curlError = curl_easy_setopt(handle, CURLOPT_WRITEDATA, url._impl.get())) == CURLE_OK || + LOG(ERROR) << "Cannot register this as write userdata: " << curl_easy_strerror(curlError); + + (curlError = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, URLImpl::writeHandler)) == CURLE_OK || + LOG(ERROR) << "Cannot set write callback: " << curl_easy_strerror(curlError); + + (curlError = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, URLImpl::headerHandler)) == CURLE_OK || + LOG(ERROR) << "Cannot request header from curl: " << curl_easy_strerror(curlError); + + (curlError = curl_easy_setopt(handle, CURLOPT_HEADERDATA, url._impl.get())) == CURLE_OK || + LOG(ERROR) << "Cannot register this as header userdata: " << curl_easy_strerror(curlError); + + + if (boost::iequals(url._impl->_requestType, "post")) { + + (curlError = curl_easy_setopt(handle, CURLOPT_POST, 1)) == CURLE_OK || + LOG(ERROR) << "Cannot set request type to post for " << url.asString() << ": " << curl_easy_strerror(curlError); + + (curlError = curl_easy_setopt(handle, CURLOPT_COPYPOSTFIELDS, url._impl->_outContent.c_str())) == CURLE_OK || + LOG(ERROR) << "Cannot set post data " << url.asString() << ": " << curl_easy_strerror(curlError); + + struct curl_slist* headers = NULL; + std::map<std::string, std::string>::iterator paramIter = url._impl->_outHeader.begin(); + while(paramIter != url._impl->_outHeader.end()) { + char* key = curl_easy_escape(handle, paramIter->first.c_str(), paramIter->first.length()); + char* value = curl_easy_escape(handle, paramIter->second.c_str(), paramIter->second.length()); + + char* header = (char*)malloc(paramIter->first.size() + strlen(value) + 3); + sprintf(header,"%s: %s", paramIter->first.c_str(), value); + headers = curl_slist_append(headers, header); + + curl_free(key); + curl_free(value); + paramIter++; + } + (curlError = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers)) == CURLE_OK || + LOG(ERROR) << "Cannot headers for " << url.asString() << ": " << curl_easy_strerror(curlError); + + //curl_slist_free_all(headers); + + + } else if (boost::iequals(url._impl->_requestType, "get")) { + (curlError = curl_easy_setopt(handle, CURLOPT_HTTPGET, 1)) == CURLE_OK || + LOG(ERROR) << "Cannot set request type to get for " << url.asString() << ": " << curl_easy_strerror(curlError); + } + + url.downloadStarted(); + instance->_handlesToURLs[handle] = url; + assert(instance->_handlesToURLs.size() > 0); + + curl_multi_add_handle(instance->_multiHandle, handle); + instance->_condVar.notify_all(); + } } void URLFetcher::breakURL(URL& url) { - URLFetcher* instance = getInstance(); - CURL* handle = url._impl->_handle; - - tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); - if (instance->_handlesToURLs.find(handle) != instance->_handlesToURLs.end()) { - url.downloadFailed(0); - curl_multi_remove_handle(instance->_multiHandle, handle); - instance->_handlesToURLs.erase(handle); - } + URLFetcher* instance = getInstance(); + CURL* handle = url._impl->_handle; + + tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); + if (instance->_handlesToURLs.find(handle) != instance->_handlesToURLs.end()) { + url.downloadFailed(0); + curl_multi_remove_handle(instance->_multiHandle, handle); + instance->_handlesToURLs.erase(handle); + } } void URLFetcher::start() { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - if (!_isStarted) { - _isStarted = true; - _thread = new tthread::thread(URLFetcher::run, this); - } + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (!_isStarted) { + _isStarted = true; + _thread = new tthread::thread(URLFetcher::run, this); + } } - + void URLFetcher::stop() { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - if (_isStarted) { - _isStarted = false; - _thread->join(); - delete _thread; - } + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (_isStarted) { + _isStarted = false; + _thread->join(); + delete _thread; + } } void URLFetcher::run(void* instance) { - URLFetcher* fetcher = (URLFetcher*)instance; - while(fetcher->_isStarted) { - fetcher->perform(); - } - LOG(ERROR) << "URLFetcher thread stopped!"; + URLFetcher* fetcher = (URLFetcher*)instance; + while(fetcher->_isStarted) { + fetcher->perform(); + } + LOG(ERROR) << "URLFetcher thread stopped!"; } - + void URLFetcher::perform() { - CURLMsg *msg; /* for picking up messages with the transfer status */ - int msgsLeft; /* how many messages are left */ - int stillRunning; - - { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - if (_handlesToURLs.empty()) { - std::cout << "Waiting for work" << std::endl; - _condVar.wait(_mutex); - } - curl_multi_perform(_multiHandle, &stillRunning); - } - - do { - struct timeval timeout; - int rc; /* select() return code */ - - fd_set fdread, fdwrite, fdexcep; - FD_ZERO(&fdread); FD_ZERO(&fdwrite); FD_ZERO(&fdexcep); - - int maxfd = -1; - long curlTimeOut = -1; - - /* set a suitable timeout to play around with */ - timeout.tv_sec = 1; - timeout.tv_usec = 0; - - { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - curl_multi_timeout(_multiHandle, &curlTimeOut); - } - - if(curlTimeOut >= 0) { - timeout.tv_sec = curlTimeOut / 1000; - if(timeout.tv_sec > 1) - timeout.tv_sec = 1; - else - timeout.tv_usec = (curlTimeOut % 1000) * 1000; - } - - /* get file descriptors from the transfers */ - { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - curl_multi_fdset(_multiHandle, &fdread, &fdwrite, &fdexcep, &maxfd); - } - - rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); - - switch(rc) { - case -1: - /* select error */ - break; - case 0: /* timeout */ - default: /* action */ - { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - curl_multi_perform(_multiHandle, &stillRunning); - } - break; - } - - { - tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - while ((msg = curl_multi_info_read(_multiHandle, &msgsLeft))) { - if (msg->msg == CURLMSG_DONE) { - switch (msg->data.result) { - case CURLM_OK: - _handlesToURLs[msg->easy_handle].downloadCompleted(); - curl_multi_remove_handle(_multiHandle, msg->easy_handle); - _handlesToURLs.erase(msg->easy_handle); - break; - case CURLM_BAD_HANDLE: - case CURLM_BAD_EASY_HANDLE: - case CURLM_OUT_OF_MEMORY: - case CURLM_INTERNAL_ERROR: - case CURLM_BAD_SOCKET: - case CURLM_UNKNOWN_OPTION: - case CURLM_LAST: - _handlesToURLs[msg->easy_handle].downloadFailed(msg->data.result); - curl_multi_remove_handle(_multiHandle, msg->easy_handle); - _handlesToURLs.erase(msg->easy_handle); - default: - break; - } - } else { - LOG(ERROR) << "Curl reports info on unfinished download?!"; - } - } - } - } while(stillRunning && _isStarted); + CURLMsg *msg; /* for picking up messages with the transfer status */ + int msgsLeft; /* how many messages are left */ + int stillRunning; + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (_handlesToURLs.empty()) { + std::cout << "Waiting for work" << std::endl; + _condVar.wait(_mutex); + } + curl_multi_perform(_multiHandle, &stillRunning); + } + + do { + struct timeval timeout; + int rc; /* select() return code */ + + fd_set fdread, fdwrite, fdexcep; + FD_ZERO(&fdread); + FD_ZERO(&fdwrite); + FD_ZERO(&fdexcep); + + int maxfd = -1; + long curlTimeOut = -1; + + /* set a suitable timeout to play around with */ + timeout.tv_sec = 1; + timeout.tv_usec = 0; + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_timeout(_multiHandle, &curlTimeOut); + } + + if(curlTimeOut >= 0) { + timeout.tv_sec = curlTimeOut / 1000; + if(timeout.tv_sec > 1) + timeout.tv_sec = 1; + else + timeout.tv_usec = (curlTimeOut % 1000) * 1000; + } + + /* get file descriptors from the transfers */ + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_fdset(_multiHandle, &fdread, &fdwrite, &fdexcep, &maxfd); + } + + rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); + + switch(rc) { + case -1: + /* select error */ + break; + case 0: /* timeout */ + default: { /* action */ + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_perform(_multiHandle, &stillRunning); + } + break; + } + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + while ((msg = curl_multi_info_read(_multiHandle, &msgsLeft))) { + if (msg->msg == CURLMSG_DONE) { + switch (msg->data.result) { + case CURLM_OK: + _handlesToURLs[msg->easy_handle].downloadCompleted(); + curl_multi_remove_handle(_multiHandle, msg->easy_handle); + _handlesToURLs.erase(msg->easy_handle); + break; + case CURLM_BAD_HANDLE: + case CURLM_BAD_EASY_HANDLE: + case CURLM_OUT_OF_MEMORY: + case CURLM_INTERNAL_ERROR: + case CURLM_BAD_SOCKET: + case CURLM_UNKNOWN_OPTION: + case CURLM_LAST: + _handlesToURLs[msg->easy_handle].downloadFailed(msg->data.result); + curl_multi_remove_handle(_multiHandle, msg->easy_handle); + _handlesToURLs.erase(msg->easy_handle); + default: + break; + } + } else { + LOG(ERROR) << "Curl reports info on unfinished download?!"; + } + } + } + } while(stillRunning && _isStarted); } URLFetcher* URLFetcher::_instance = NULL; URLFetcher* URLFetcher::getInstance() { - if (_instance == NULL) { + if (_instance == NULL) { _instance = new URLFetcher(); } return _instance; |