diff options
author | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2013-02-20 21:13:02 (GMT) |
---|---|---|
committer | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2013-02-20 21:13:02 (GMT) |
commit | a56f28b0db56ff3e39f0b50e4c55c52b7aeec696 (patch) | |
tree | 41cf67ea5cee9593e86272ab55367653fbd1c2f3 /src/uscxml/URL.cpp | |
parent | 7c779099b3acd1fa969dde718299484ebe0d2775 (diff) | |
download | uscxml-a56f28b0db56ff3e39f0b50e4c55c52b7aeec696.zip uscxml-a56f28b0db56ff3e39f0b50e4c55c52b7aeec696.tar.gz uscxml-a56f28b0db56ff3e39f0b50e4c55c52b7aeec696.tar.bz2 |
See detailled log
- Builds on windows again
- All HTTP requests are no passed into interpreter
- New response element to reply with data
- Moved basichttp URL
- New HTTP servlet invoker to register additional URLs
- More bugfixes than I care to mention
Diffstat (limited to 'src/uscxml/URL.cpp')
-rw-r--r-- | src/uscxml/URL.cpp | 894 |
1 files changed, 444 insertions, 450 deletions
diff --git a/src/uscxml/URL.cpp b/src/uscxml/URL.cpp index 015ed6c..8656cd2 100644 --- a/src/uscxml/URL.cpp +++ b/src/uscxml/URL.cpp @@ -1,23 +1,8 @@ -#include <algorithm> -#include <assert.h> -#include <iostream> -#include <fstream> - #include <glog/logging.h> -#include <boost/algorithm/string.hpp> - -#include <stdio.h> -#include <string.h> -#ifndef WIN32 -#include <sys/time.h> -#endif -#include <stdlib.h> -#include <errno.h> - -#include "uscxml/Common.h" #include "URL.h" #include "uscxml/config.h" +#include <fstream> #include <stdio.h> /* defines FILENAME_MAX */ #ifdef _WIN32 @@ -36,484 +21,493 @@ #include <unistd.h> // mkstemp legacy #endif +#include <boost/algorithm/string.hpp> + namespace uscxml { +URLImpl::URLImpl(const std::string& url) : _handle(NULL), _uri(url), _isDownloaded(false), _hasFailed(false) { + _handle = curl_easy_init(); + if (_handle == NULL) { + LOG(ERROR) << "curl_easy_init returned NULL, this is bad!"; + } +} + URLImpl::~URLImpl() { - if (_localFile.length() > 0) - remove(_localFile.c_str()); + if (_handle != NULL) + curl_easy_cleanup(_handle); } - -const bool URLImpl::toAbsoluteCwd() { - char currPath[FILENAME_MAX]; - if (!getcwd(currPath, sizeof(currPath))) { - return false; - } - currPath[sizeof(currPath) - 1] = '\0'; /* not really required */ - return toAbsolute(std::string("file://" + std::string(currPath) + "/")); + +size_t URLImpl::writeHandler(void *ptr, size_t size, size_t nmemb, void *userdata) { + URLImpl* url = (URLImpl*)userdata; + url->_inContent.write((char*)ptr, size * nmemb); + + monIter_t monIter = url->_monitors.begin(); + while(monIter != url->_monitors.end()) { + (*monIter)->contentChunkReceived(URL(url->shared_from_this()), std::string((char*)ptr, size * nmemb)); + monIter++; + } + + return size * nmemb; } -std::string URLImpl::getLocalFilename(const std::string& suffix) { - if (_localFile.length() > 0) - return _localFile; - - if (_uri.scheme().compare("file") == 0) - return _uri.path(); - - // try hard to find a temporary directory - const char* tmpDir = NULL; - if (tmpDir == NULL) - tmpDir = getenv("TMPDIR"); - if (tmpDir == NULL) - tmpDir = getenv("TMP"); - if (tmpDir == NULL) - tmpDir = getenv("TEMP"); - if (tmpDir == NULL) - tmpDir = getenv("USERPROFILE"); - if (tmpDir == NULL) - tmpDir = "/tmp"; - - char* tmpl = (char*)malloc(strlen(tmpDir) + 11 + suffix.length()); - char* writePtr = tmpl; - memcpy(writePtr, tmpDir, strlen(tmpDir)); - writePtr += strlen(tmpDir); - memcpy(writePtr, "scxmlXXXXXX", 11); - writePtr += 11; - memcpy(writePtr, suffix.c_str(), suffix.length()); - writePtr += suffix.length(); - tmpl[writePtr - tmpl] = 0; +size_t URLImpl::headerHandler(void *ptr, size_t size, size_t nmemb, void *userdata) { + URLImpl* url = (URLImpl*)userdata; + url->_inHeader.write((char*)ptr, size * nmemb); -#ifdef _WIN32 - _mktemp_s(tmpl, strlen(tmpl) + 1); - int fd = _open(tmpl, _O_CREAT, _S_IREAD | _S_IWRITE); -#else - int fd = mkstemps(tmpl, suffix.length()); -#endif - if (fd < 0) { - LOG(ERROR) << "mkstemp: " << strerror(errno) << std::endl; - return ""; - } -#ifdef WIN32 - _close(fd); -#else - close(fd); -#endif - return std::string(tmpl); -} + monIter_t monIter = url->_monitors.begin(); + while(monIter != url->_monitors.end()) { + (*monIter)->headerChunkReceived(URL(url->shared_from_this()), std::string((char*)ptr, size * nmemb)); + monIter++; + } -boost::shared_ptr<URLImpl> URLImpl::toLocalFile(const std::string& content, const std::string& suffix) { - boost::shared_ptr<URLImpl> urlImpl = boost::shared_ptr<URLImpl>(new URLImpl()); - urlImpl->_localFile = urlImpl->getLocalFilename(suffix); - urlImpl->_uri = std::string("file://") + urlImpl->_localFile; - - std::ofstream file(urlImpl->_localFile.c_str(), std::ios_base::out); - if(file.is_open()) { - file << content; - file.close(); - } else { - return boost::shared_ptr<URLImpl>(); - } - - return urlImpl; + return size * nmemb; } -const bool URLImpl::toAbsolute(const std::string& baseUrl) { - if (_uri.is_absolute()) - return true; - _uri = Arabica::io::URI(baseUrl, _uri.as_string()); - if (!_uri.is_absolute()) - return false; - return true; +void URLImpl::downloadStarted() { + LOG(INFO) << "Starting download of " << asString() << std::endl; + _inContent.str(""); + _inContent.clear(); + _inHeader.str(""); + _inHeader.clear(); + + monIter_t monIter = _monitors.begin(); + while(monIter != _monitors.end()) { + (*monIter)->downloadStarted(URL(shared_from_this())); + monIter++; + } } -const std::string URLImpl::asLocalFile(const std::string& suffix, bool reload) { - // this is already a local file - if (_uri.scheme().compare("file") == 0) - return _uri.path(); - - if (_localFile.length() > 0 && !reload) - return _localFile; - - if (_localFile.length() > 0) - remove(_localFile.c_str()); +void URLImpl::downloadCompleted() { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); - _localFile = getLocalFilename(suffix); + LOG(INFO) << "Finished downloading " << asString() << " with " << _inContent.str().size() << " bytes"; - std::ofstream file(_localFile.c_str(), std::ios_base::out); - if(file.is_open()) { - file << URL(this->shared_from_this()); - file.close(); - } else { - _localFile = ""; - } + _hasFailed = false; + _isDownloaded = true; + _condVar.notify_all(); - return _localFile; + monIter_t monIter = _monitors.begin(); + while(monIter != _monitors.end()) { + (*monIter)->downloadCompleted(URL(shared_from_this())); + monIter++; + } } -std::ostream & operator<<(std::ostream & stream, const URL& url) { - - std::string urlString = url.asString(); - std::string fileURL = "file://"; - - // strip file:// to support relative filenames - if(urlString.substr(0, fileURL.size()) == fileURL) { - urlString = urlString.substr(fileURL.size()); -#ifdef _WIN32 - urlString = urlString.substr(0,1) + ":" + urlString.substr(1); -// std::replace( urlString.begin(), urlString.end(), '/', '\\'); -#endif - } -// LOG(ERROR) << "Trying to open " << urlString; - URL_FILE *handle = url_fopen(urlString.c_str(), "r"); +void URLImpl::downloadFailed(int errorCode) { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + + LOG(ERROR) << "Downloading " << asString() << " failed: " << strerror(errorCode); + + _hasFailed = true; + _isDownloaded = false; + _condVar.notify_all(); + + monIter_t monIter = _monitors.begin(); + while(monIter != _monitors.end()) { + (*monIter)->downloadFailed(URL(shared_from_this()), errorCode); + monIter++; + } - if(!handle) { - LOG(ERROR) << "Cannot open URL " << url.asString(); - return stream; - } - - int nread; - char buffer[256]; - - do { - nread = url_fread(buffer, 1,sizeof(buffer), handle); - stream.write(buffer, nread); - } while(nread); - - url_fclose(handle); - return stream; } - -/* we use a global one for convenience */ -CURLM *multi_handle; - -/* curl calls this routine to get more data */ -static size_t write_callback(char *buffer, - size_t size, - size_t nitems, - void *userp) { - char *newbuff; - size_t rembuff; - - URL_FILE *url = (URL_FILE *)userp; - size *= nitems; - - rembuff=url->buffer_len - url->buffer_pos; /* remaining space in buffer */ - - if(size > rembuff) { - /* not enough space in buffer */ - newbuff=(char*)realloc(url->buffer,url->buffer_len + (size - rembuff)); - if(newbuff==NULL) { - fprintf(stderr,"callback buffer grow failed\n"); - size=rembuff; - } else { - /* realloc suceeded increase buffer size*/ - url->buffer_len+=size - rembuff; - url->buffer=newbuff; - } - } - - memcpy(&url->buffer[url->buffer_pos], buffer, size); - url->buffer_pos += size; - - return size; + +const std::map<std::string, std::string> URLImpl::getInHeaderFields() { + if (!_isDownloaded) { + download(true); + } + + std::map<std::string, std::string> headerFields; + std::string line; + while (std::getline(_inHeader, line)) { + size_t colon = line.find_first_of(":"); + size_t newline = line.find_first_of("\r\n"); + if (newline == std::string::npos) + newline = line.size(); + + if (colon == std::string::npos) { + if (headerFields.size() == 0) { + // put http status in a key that can never occur otherwise + headerFields["status:"] = line.substr(0, newline); + } else { + headerFields[line.substr(0, newline)] = line.substr(0, newline); // this should never happen + } + } else { + std::string key = line.substr(0, colon); + size_t firstChar = line.find_first_not_of(": ", colon, 2); + if (firstChar == std::string::npos) { + // nothing but spaces? + headerFields[line.substr(0, newline)] = ""; + } else { + std::string value = line.substr(firstChar, newline - firstChar); + headerFields[key] = value; + } + } + } + + return headerFields; } -/* use to attempt to fill the read buffer up to requested number of bytes */ -static int fill_buffer(URL_FILE *file, size_t want) { - fd_set fdread; - fd_set fdwrite; - fd_set fdexcep; - struct timeval timeout; - int rc; - - /* only attempt to fill buffer if transactions still running and buffer - * doesnt exceed required size already - */ - if((!file->still_running) || (file->buffer_pos > want)) - return 0; - - /* attempt to fill buffer */ - do { - int maxfd = -1; - long curl_timeo = -1; - - FD_ZERO(&fdread); - FD_ZERO(&fdwrite); - FD_ZERO(&fdexcep); - - /* set a suitable timeout to fail on */ - timeout.tv_sec = 60; /* 1 minute */ - timeout.tv_usec = 0; - - curl_multi_timeout(multi_handle, &curl_timeo); - if(curl_timeo >= 0) { - timeout.tv_sec = curl_timeo / 1000; - if(timeout.tv_sec > 1) - timeout.tv_sec = 1; - else - timeout.tv_usec = (curl_timeo % 1000) * 1000; - } - - /* get file descriptors from the transfers */ - curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd); - - /* In a real-world program you OF COURSE check the return code of the - function calls. On success, the value of maxfd is guaranteed to be - greater or equal than -1. We call select(maxfd + 1, ...), specially - in case of (maxfd == -1), we call select(0, ...), which is basically - equal to sleep. */ - - rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); - - switch(rc) { - case -1: - /* select error */ - break; - - case 0: - default: - /* timeout or readable/writable sockets */ - curl_multi_perform(multi_handle, &file->still_running); - break; - } - } while(file->still_running && (file->buffer_pos < want)); - return 1; +void URLImpl::setRequestType(const std::string& requestType) { + _requestType = requestType; } -/* use to remove want bytes from the front of a files buffer */ -static int use_buffer(URL_FILE *file,int want) { - /* sort out buffer */ - if((file->buffer_pos - want) <=0) { - /* ditch buffer - write will recreate */ - if(file->buffer) - free(file->buffer); - - file->buffer=NULL; - file->buffer_pos=0; - file->buffer_len=0; - } else { - /* move rest down make it available for later */ - memmove(file->buffer, - &file->buffer[want], - (file->buffer_pos - want)); - - file->buffer_pos -= want; - } - return 0; +void URLImpl::setOutContent(const std::string& content) { + _outContent = content; } -URL_FILE *url_fopen(const char *url,const char *operation) { - /* this code could check for URLs or types in the 'url' and - basicly use the real fopen() for standard files */ - - URL_FILE *file; - (void)operation; - - file = (URL_FILE*)malloc(sizeof(URL_FILE)); - if(!file) - return NULL; - - memset(file, 0, sizeof(URL_FILE)); - - if((file->handle.file=fopen(url,operation))) - file->type = CFTYPE_FILE; /* marked as URL */ - - else { - file->type = CFTYPE_CURL; /* marked as URL */ - file->handle.curl = curl_easy_init(); - - curl_easy_setopt(file->handle.curl, CURLOPT_URL, url); - curl_easy_setopt(file->handle.curl, CURLOPT_WRITEDATA, file); - curl_easy_setopt(file->handle.curl, CURLOPT_VERBOSE, 0L); - curl_easy_setopt(file->handle.curl, CURLOPT_WRITEFUNCTION, write_callback); - - if(!multi_handle) - multi_handle = curl_multi_init(); - - curl_multi_add_handle(multi_handle, file->handle.curl); - - /* lets start the fetch */ - curl_multi_perform(multi_handle, &file->still_running); - - if((file->buffer_pos == 0) && (!file->still_running)) { - /* if still_running is 0 now, we should return NULL */ - - /* make sure the easy handle is not in the multi handle anymore */ - curl_multi_remove_handle(multi_handle, file->handle.curl); - - /* cleanup */ - curl_easy_cleanup(file->handle.curl); - - free(file); - - file = NULL; - } - } - return file; +const std::string URLImpl::getInContent(bool forceReload) { + if (!_isDownloaded) { + download(true); + } + return _inContent.str(); } -int url_fclose(URL_FILE *file) { - int ret=0;/* default is good return */ - - switch(file->type) { - case CFTYPE_FILE: - ret=fclose(file->handle.file); /* passthrough */ - break; - - case CFTYPE_CURL: - /* make sure the easy handle is not in the multi handle anymore */ - curl_multi_remove_handle(multi_handle, file->handle.curl); - - /* cleanup */ - curl_easy_cleanup(file->handle.curl); - break; - - default: /* unknown or supported type - oh dear */ - ret=EOF; - errno=EBADF; - break; - } - - if(file->buffer) - free(file->buffer);/* free any allocated buffer space */ - - free(file); - - return ret; +const void URLImpl::download(bool blocking) { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + + if (_isDownloaded) + return; + + URL url(shared_from_this()); + URLFetcher::fetchURL(url); + + if (blocking) { + while(!_isDownloaded && !_hasFailed) { + _condVar.wait(_mutex); // wait for notification + } + } } -int url_feof(URL_FILE *file) { - int ret=0; - - switch(file->type) { - case CFTYPE_FILE: - ret=feof(file->handle.file); - break; - - case CFTYPE_CURL: - if((file->buffer_pos == 0) && (!file->still_running)) - ret = 1; - break; - - default: /* unknown or supported type - oh dear */ - ret=-1; - errno=EBADF; - break; - } - return ret; +const bool URLImpl::toAbsoluteCwd() { + char currPath[FILENAME_MAX]; + if (!getcwd(currPath, sizeof(currPath))) { + return false; + } + currPath[sizeof(currPath) - 1] = '\0'; /* not really required */ + return toAbsolute(std::string("file://" + std::string(currPath) + "/")); } -size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file) { - size_t want; - - switch(file->type) { - case CFTYPE_FILE: - want=fread(ptr,size,nmemb,file->handle.file); - break; - - case CFTYPE_CURL: - want = nmemb * size; - - fill_buffer(file,want); - - /* check if theres data in the buffer - if not fill_buffer() - * either errored or EOF */ - if(!file->buffer_pos) - return 0; - - /* ensure only available data is considered */ - if(file->buffer_pos < want) - want = file->buffer_pos; - - /* xfer data to caller */ - memcpy(ptr, file->buffer, want); - - use_buffer(file,want); - - want = want / size; /* number of items */ - break; - - default: /* unknown or supported type - oh dear */ - want=0; - errno=EBADF; - break; - - } - return want; +std::string URLImpl::getLocalFilename(const std::string& suffix) { + if (_localFile.length() > 0) + return _localFile; + + if (_uri.scheme().compare("file") == 0) + return _uri.path(); + + // try hard to find a temporary directory + const char* tmpDir = NULL; + if (tmpDir == NULL) + tmpDir = getenv("TMPDIR"); + if (tmpDir == NULL) + tmpDir = getenv("TMP"); + if (tmpDir == NULL) + tmpDir = getenv("TEMP"); + if (tmpDir == NULL) + tmpDir = getenv("USERPROFILE"); + if (tmpDir == NULL) + tmpDir = "/tmp/"; + + char* tmpl = (char*)malloc(strlen(tmpDir) + 11 + suffix.length()); + char* writePtr = tmpl; + memcpy(writePtr, tmpDir, strlen(tmpDir)); + writePtr += strlen(tmpDir); + memcpy(writePtr, "scxmlXXXXXX", 11); + writePtr += 11; + memcpy(writePtr, suffix.c_str(), suffix.length()); + writePtr += suffix.length(); + tmpl[writePtr - tmpl] = 0; + +#ifdef _WIN32 + _mktemp_s(tmpl, strlen(tmpl) + 1); + int fd = _open(tmpl, _O_CREAT, _S_IREAD | _S_IWRITE); +#else + int fd = mkstemps(tmpl, suffix.length()); +#endif + if (fd < 0) { + LOG(ERROR) << "mkstemp " << tmpl << ": " << strerror(errno) << std::endl; + return ""; + } +#ifdef WIN32 + _close(fd); +#else + close(fd); +#endif + return std::string(tmpl); } -char *url_fgets(char *ptr, size_t size, URL_FILE *file) { - size_t want = size - 1;/* always need to leave room for zero termination */ - size_t loop; - - switch(file->type) { - case CFTYPE_FILE: - ptr = fgets(ptr,size,file->handle.file); - break; - - case CFTYPE_CURL: - fill_buffer(file,want); - - /* check if theres data in the buffer - if not fill either errored or - * EOF */ - if(!file->buffer_pos) - return NULL; - - /* ensure only available data is considered */ - if(file->buffer_pos < want) - want = file->buffer_pos; - - /*buffer contains data */ - /* look for newline or eof */ - for(loop=0; loop < want; loop++) { - if(file->buffer[loop] == '\n') { - want=loop+1;/* include newline */ - break; - } - } - - /* xfer data to caller */ - memcpy(ptr, file->buffer, want); - ptr[want]=0;/* allways null terminate */ - - use_buffer(file,want); - - break; +const bool URLImpl::toAbsolute(const std::string& baseUrl) { + if (_uri.is_absolute()) + return true; + _uri = Arabica::io::URI(baseUrl, _uri.as_string()); + if (!_uri.is_absolute()) + return false; + return true; +} - default: /* unknown or supported type - oh dear */ - ptr=NULL; - errno=EBADF; - break; - } +boost::shared_ptr<URLImpl> URLImpl::toLocalFile(const std::string& content, const std::string& suffix) { + boost::shared_ptr<URLImpl> urlImpl = boost::shared_ptr<URLImpl>(new URLImpl()); + urlImpl->_localFile = urlImpl->getLocalFilename(suffix); + urlImpl->_uri = std::string("file://") + urlImpl->_localFile; + + std::ofstream file(urlImpl->_localFile.c_str(), std::ios_base::out); + if(file.is_open()) { + file << content; + file.close(); + } else { + return boost::shared_ptr<URLImpl>(); + } + + return urlImpl; +} + +const std::string URLImpl::asLocalFile(const std::string& suffix, bool reload) { + // this is already a local file + if (_uri.scheme().compare("file") == 0) + return _uri.path(); + + if (_localFile.length() > 0 && !reload) + return _localFile; + + if (_localFile.length() > 0) + remove(_localFile.c_str()); + + _localFile = getLocalFilename(suffix); + + std::ofstream file(_localFile.c_str(), std::ios_base::out); + if(file.is_open()) { + file << URL(this->shared_from_this()); + file.close(); + } else { + _localFile = ""; + } + + return _localFile; +} - return ptr;/*success */ +std::ostream & operator<<(std::ostream & stream, const URL& url) { + URL nonConstUrl = url; // this is a hack + stream << nonConstUrl.getInContent(); + return stream; } -void url_rewind(URL_FILE *file) { - switch(file->type) { - case CFTYPE_FILE: - rewind(file->handle.file); /* passthrough */ - break; +URLFetcher::URLFetcher() { + _isStarted = false; + _multiHandle = curl_multi_init(); + start(); +} + +URLFetcher::~URLFetcher() { + stop(); + curl_multi_cleanup(_multiHandle); +} + +void URLFetcher::fetchURL(URL& url) { + URLFetcher* instance = getInstance(); + tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); + + assert(url._impl->_handle != NULL); + if (url._impl->_handle == NULL) + return; + + if (instance->_handlesToURLs.find(url._impl->_handle) == instance->_handlesToURLs.end()) { + CURLcode curlError; + CURL* handle = url._impl->_handle; + + (curlError = curl_easy_setopt(handle, CURLOPT_URL, url.asString().c_str())) == CURLE_OK || + LOG(ERROR) << "Cannot set url to " << url.asString() << ": " << curl_easy_strerror(curlError); + + (curlError = curl_easy_setopt(handle, CURLOPT_WRITEDATA, url._impl.get())) == CURLE_OK || + LOG(ERROR) << "Cannot register this as write userdata: " << curl_easy_strerror(curlError); + + (curlError = curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, URLImpl::writeHandler)) == CURLE_OK || + LOG(ERROR) << "Cannot set write callback: " << curl_easy_strerror(curlError); + + (curlError = curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, URLImpl::headerHandler)) == CURLE_OK || + LOG(ERROR) << "Cannot request header from curl: " << curl_easy_strerror(curlError); + + (curlError = curl_easy_setopt(handle, CURLOPT_HEADERDATA, url._impl.get())) == CURLE_OK || + LOG(ERROR) << "Cannot register this as header userdata: " << curl_easy_strerror(curlError); + + + if (boost::iequals(url._impl->_requestType, "post")) { + + (curlError = curl_easy_setopt(handle, CURLOPT_POST, 1)) == CURLE_OK || + LOG(ERROR) << "Cannot set request type to post for " << url.asString() << ": " << curl_easy_strerror(curlError); + + (curlError = curl_easy_setopt(handle, CURLOPT_COPYPOSTFIELDS, url._impl->_outContent.c_str())) == CURLE_OK || + LOG(ERROR) << "Cannot set post data " << url.asString() << ": " << curl_easy_strerror(curlError); + + struct curl_slist* headers = NULL; + std::map<std::string, std::string>::iterator paramIter = url._impl->_outHeader.begin(); + while(paramIter != url._impl->_outHeader.end()) { + char* key = curl_easy_escape(handle, paramIter->first.c_str(), paramIter->first.length()); + char* value = curl_easy_escape(handle, paramIter->second.c_str(), paramIter->second.length()); + + char* header = (char*)malloc(paramIter->first.size() + strlen(value) + 3); + sprintf(header,"%s: %s", paramIter->first.c_str(), value); + headers = curl_slist_append(headers, header); + + curl_free(key); + curl_free(value); + paramIter++; + } + (curlError = curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers)) == CURLE_OK || + LOG(ERROR) << "Cannot headers for " << url.asString() << ": " << curl_easy_strerror(curlError); + + //curl_slist_free_all(headers); + + + } else if (boost::iequals(url._impl->_requestType, "get")) { + (curlError = curl_easy_setopt(handle, CURLOPT_HTTPGET, 1)) == CURLE_OK || + LOG(ERROR) << "Cannot set request type to get for " << url.asString() << ": " << curl_easy_strerror(curlError); + } + + url.downloadStarted(); + instance->_handlesToURLs[handle] = url; + assert(instance->_handlesToURLs.size() > 0); + + curl_multi_add_handle(instance->_multiHandle, handle); + instance->_condVar.notify_all(); + } +} - case CFTYPE_CURL: - /* halt transaction */ - curl_multi_remove_handle(multi_handle, file->handle.curl); +void URLFetcher::breakURL(URL& url) { + URLFetcher* instance = getInstance(); + CURL* handle = url._impl->_handle; - /* restart */ - curl_multi_add_handle(multi_handle, file->handle.curl); + tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); + if (instance->_handlesToURLs.find(handle) != instance->_handlesToURLs.end()) { + url.downloadFailed(0); + curl_multi_remove_handle(instance->_multiHandle, handle); + instance->_handlesToURLs.erase(handle); + } +} - /* ditch buffer - write will recreate - resets stream pos*/ - if(file->buffer) - free(file->buffer); +void URLFetcher::start() { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (!_isStarted) { + _isStarted = true; + _thread = new tthread::thread(URLFetcher::run, this); + } +} + +void URLFetcher::stop() { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (_isStarted) { + _isStarted = false; + _thread->join(); + delete _thread; + } +} - file->buffer=NULL; - file->buffer_pos=0; - file->buffer_len=0; +void URLFetcher::run(void* instance) { + URLFetcher* fetcher = (URLFetcher*)instance; + while(fetcher->_isStarted) { + fetcher->perform(); + } + LOG(ERROR) << "URLFetcher thread stopped!"; +} + +void URLFetcher::perform() { + + CURLMsg *msg; /* for picking up messages with the transfer status */ + int msgsLeft; /* how many messages are left */ + int stillRunning; + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + if (_handlesToURLs.empty()) { + std::cout << "Waiting for work" << std::endl; + _condVar.wait(_mutex); + } + curl_multi_perform(_multiHandle, &stillRunning); + } + + do { + struct timeval timeout; + int rc; /* select() return code */ + + fd_set fdread, fdwrite, fdexcep; + FD_ZERO(&fdread); FD_ZERO(&fdwrite); FD_ZERO(&fdexcep); + + int maxfd = -1; + long curlTimeOut = -1; + + /* set a suitable timeout to play around with */ + timeout.tv_sec = 1; + timeout.tv_usec = 0; + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_timeout(_multiHandle, &curlTimeOut); + } + + if(curlTimeOut >= 0) { + timeout.tv_sec = curlTimeOut / 1000; + if(timeout.tv_sec > 1) + timeout.tv_sec = 1; + else + timeout.tv_usec = (curlTimeOut % 1000) * 1000; + } + + /* get file descriptors from the transfers */ + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_fdset(_multiHandle, &fdread, &fdwrite, &fdexcep, &maxfd); + } + + rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); + + switch(rc) { + case -1: + /* select error */ + break; + case 0: /* timeout */ + default: /* action */ + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + curl_multi_perform(_multiHandle, &stillRunning); + } + break; + } + + { + tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); + while ((msg = curl_multi_info_read(_multiHandle, &msgsLeft))) { + if (msg->msg == CURLMSG_DONE) { + switch (msg->data.result) { + case CURLM_OK: + _handlesToURLs[msg->easy_handle].downloadCompleted(); + curl_multi_remove_handle(_multiHandle, msg->easy_handle); + _handlesToURLs.erase(msg->easy_handle); + break; + case CURLM_BAD_HANDLE: + case CURLM_BAD_EASY_HANDLE: + case CURLM_OUT_OF_MEMORY: + case CURLM_INTERNAL_ERROR: + case CURLM_BAD_SOCKET: + case CURLM_UNKNOWN_OPTION: + case CURLM_LAST: + _handlesToURLs[msg->easy_handle].downloadFailed(msg->data.result); + curl_multi_remove_handle(_multiHandle, msg->easy_handle); + _handlesToURLs.erase(msg->easy_handle); + default: + break; + } + } else { + LOG(ERROR) << "Curl reports info on unfinished download?!"; + } + } + } + } while(stillRunning && _isStarted); +} - break; +URLFetcher* URLFetcher::_instance = NULL; - default: /* unknown or supported type - oh dear */ - break; +URLFetcher* URLFetcher::getInstance() { + if (_instance == NULL) { + _instance = new URLFetcher(); } + return _instance; } }
\ No newline at end of file |