diff options
author | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2014-08-13 11:48:26 (GMT) |
---|---|---|
committer | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2014-08-13 11:48:26 (GMT) |
commit | 02f52d15e7df2500c0c6e96660a43a985add16e8 (patch) | |
tree | d4ccfdd7d1070201f39a9589b35e02a39ec179c8 /src/uscxml/server | |
parent | c30b602cdb5ede809b960e35fc7e702b7f1f76e2 (diff) | |
download | uscxml-02f52d15e7df2500c0c6e96660a43a985add16e8.zip uscxml-02f52d15e7df2500c0c6e96660a43a985add16e8.tar.gz uscxml-02f52d15e7df2500c0c6e96660a43a985add16e8.tar.bz2 |
Builds for Raspberry and
started VoiceXML HTTP integration
Diffstat (limited to 'src/uscxml/server')
-rw-r--r-- | src/uscxml/server/Socket.cpp | 109 | ||||
-rw-r--r-- | src/uscxml/server/Socket.h | 22 |
2 files changed, 119 insertions, 12 deletions
diff --git a/src/uscxml/server/Socket.cpp b/src/uscxml/server/Socket.cpp index 9c844e5..2d474ea 100644 --- a/src/uscxml/server/Socket.cpp +++ b/src/uscxml/server/Socket.cpp @@ -67,10 +67,16 @@ Socket::~Socket() { } void Socket::setupSockAddr(const std::string& address, int port) { + if (address == "*") { _sin.sin_addr.s_addr = 0; } else { - _sin.sin_addr.s_addr = inet_addr(address.c_str()); + struct hostent *he = NULL; + if ( (he = gethostbyname(address.c_str()) ) != NULL ) { + memcpy(&_sin.sin_addr, he->h_addr_list[0], he->h_length); + } else { + _sin.sin_addr.s_addr = inet_addr(address.c_str()); + } if (_sin.sin_addr.s_addr == INADDR_NONE) throw std::runtime_error(std::string("inet_addr: ") + strerror(errno)); } @@ -83,6 +89,34 @@ void Socket::setBlockSizeRead(size_t size) { _blockSizeRead = size; } +void Socket::parseAddress(const std::string& address, std::string& protocol, std::string& hostName, uint16_t& port) { + // tcp://hostname:port + size_t protEnd = address.find("://"); + if (protEnd != std::string::npos) { + protocol = address.substr(0, protEnd); + protEnd += 3; + } else { + protocol = "tcp"; + protEnd = 0; + } + + size_t hostEnd = address.find(":", protEnd); + if (hostEnd != std::string::npos) { + hostName = address.substr(protEnd, hostEnd - protEnd); + hostEnd += 1; + } else { + hostName = "127.0.0.1"; + hostEnd = protEnd; + } + + if (hostEnd < address.size()) { + port = strTo<uint16_t>(address.substr(hostEnd)); + } else { + port = 0; + } +} + + ClientSocket::ClientSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _clientEvent(NULL) { } @@ -95,7 +129,7 @@ ClientSocket::~ClientSocket() { } void ClientSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) { - ClientSocket* instance = (ClientSocket*)ctx; +// ClientSocket* instance = (ClientSocket*)ctx; // tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); if (error & BEV_EVENT_READING) { @@ -115,6 +149,14 @@ void ClientSocket::errorCallback(struct bufferevent *bev, short error, void *ctx // bufferevent_free(bev); } +void ClientSocket::connect(const std::string& address) { + std::string _prot; + std::string _address; + uint16_t _port; + parseAddress(address, _prot, _address, _port); + connect(_address, _port); +} + void ClientSocket::connect(const std::string& address, int port) { // tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); @@ -123,11 +165,15 @@ void ClientSocket::connect(const std::string& address, int port) { throw std::runtime_error(std::string("connect: ") + strerror(errno)); } - _clientEvent = bufferevent_socket_new(_base->base, _socketFD, 0); //BEV_OPT_THREADSAFE); + _clientEvent = bufferevent_socket_new(_base->base, _socketFD, BEV_OPT_THREADSAFE); //BEV_OPT_THREADSAFE); bufferevent_setcb(_clientEvent, ClientSocket::readCallback, NULL, ClientSocket::errorCallback, this); bufferevent_enable(_clientEvent, EV_READ|EV_WRITE); } +int ClientSocket::write(const std::string& data) { + return write(data.data(), data.size()); +} + int ClientSocket::write(const char* data, size_t size) { // tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); bufferevent_write(_clientEvent, data, size); @@ -138,14 +184,15 @@ void ClientSocket::readCallback(struct bufferevent *bev, void *ctx) { ClientSocket* instance = (ClientSocket*)ctx; // tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); - size_t n; + int n; struct evbuffer* input; char* data = (char*)malloc(instance->_blockSizeRead); input = bufferevent_get_input(bev); - n = evbuffer_remove(input, data, instance->_blockSizeRead); - - instance->readCallback(data, n); + + while((n = evbuffer_remove(input, data, instance->_blockSizeRead)) > 0) { + instance->readCallback(data, n); + } free(data); } @@ -201,7 +248,6 @@ void ServerSocket::errorCallback(struct bufferevent *bev, short error, void *ctx #else close(conn->second.fd); #endif - instance->_connections.erase(conn); } } else if (error & BEV_EVENT_EOF) { @@ -229,9 +275,9 @@ void ServerSocket::readCallback(struct bufferevent *bev, void *ctx) { char* data = (char*)malloc(instance->_blockSizeRead); input = bufferevent_get_input(bev); - n = evbuffer_remove(input, data, instance->_blockSizeRead); - - instance->readCallback(data, n, instance->_connections[bev]); + while((n = evbuffer_remove(input, data, instance->_blockSizeRead)) > 0) { + instance->readCallback(data, n, instance->_connections[bev]); + } free(data); } @@ -241,11 +287,30 @@ void ServerSocket::bind() { } } +void ServerSocket::listen(const std::string& address) { + std::string _prot; + std::string _address; + uint16_t _port; + parseAddress(address, _prot, _address, _port); + listen(_address, _port); +} + void ServerSocket::listen(const std::string& address, int port) { // tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); setupSockAddr(address, port); bind(); + int reuseaddr_on = 1; + setsockopt(_socketFD, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on)); + + int flags = fcntl(_socketFD, F_GETFL); + if (flags >= 0) { + flags |= O_NONBLOCK; + if (fcntl(_socketFD, F_SETFL, flags) < 0) { + // could not set to non-blocj + } + } + _listenerEvent = event_new(_base->base, _socketFD, EV_READ|EV_PERSIST, acceptCallback, (void*)this); /*XXX check it */ event_add(_listenerEvent, NULL); @@ -275,7 +340,7 @@ void ServerSocket::acceptCallback(evutil_socket_t listener, short event, void *c } else { struct bufferevent *bev; evutil_make_socket_nonblocking(fd); - bev = bufferevent_socket_new(instance->_base->base, fd, BEV_OPT_THREADSAFE); + bev = bufferevent_socket_new(instance->_base->base, fd, BEV_OPT_THREADSAFE); //BEV_OPT_THREADSAFE bufferevent_setcb(bev, ServerSocket::readCallback, NULL, ServerSocket::errorCallback, ctx); bufferevent_enable(bev, EV_READ|EV_WRITE); @@ -288,4 +353,24 @@ void ServerSocket::Connection::reply(const char* data, size_t size) { bufferevent_write(bufferEvent, data, size); } +void PacketServerSocket::readCallback(const char* data, size_t size, Connection& conn) { + std::stringstream& fragment = _fragments[conn]; + fragment << std::string(data, size); + + size_t startPos = 0; + size_t endPos; + const std::string& buffer = fragment.str(); + while((endPos = buffer.find(_sep, startPos)) != std::string::npos) { +// std::cout << ">" << buffer.substr(startPos, endPos - startPos) << "<" << std::endl; + readCallback(buffer.substr(startPos, endPos - startPos), conn); + startPos = endPos + _sep.size(); + } + if (startPos != 0 && startPos < buffer.size() + 1) { + std::string rest = buffer.substr(startPos); + fragment.str(std::string()); + fragment.clear(); + fragment << rest; + } +} + } diff --git a/src/uscxml/server/Socket.h b/src/uscxml/server/Socket.h index 5854a46..9330c4b 100644 --- a/src/uscxml/server/Socket.h +++ b/src/uscxml/server/Socket.h @@ -23,6 +23,7 @@ #include "uscxml/Common.h" // for USCXML_API #include "uscxml/concurrency/EventBase.h" #include <string> +#include <sstream> #include <map> #include <set> @@ -49,6 +50,7 @@ public: virtual ~Socket(); void setBlockSizeRead(size_t size); + static void parseAddress(const std::string& address, std::string& protocol, std::string& hostName, uint16_t& port); protected: @@ -67,6 +69,10 @@ class USCXML_API ServerSocket : public Socket { public: class Connection { public: + bool operator<(const Connection& other) const { + return bufferEvent < other.bufferEvent; + } + struct bufferevent* bufferEvent; int fd; @@ -77,6 +83,7 @@ public: virtual ~ServerSocket(); void listen(const std::string& address, int port); + void listen(const std::string& address); virtual void readCallback(const char* data, size_t size, Connection& conn) {}; @@ -93,6 +100,19 @@ protected: }; +class USCXML_API PacketServerSocket : public ServerSocket { +public: + PacketServerSocket(int domain, int type, int protocol, const std::string& sep) : ServerSocket(domain, type, protocol), _sep(sep) {} + virtual ~PacketServerSocket() {} + + void readCallback(const char* data, size_t size, Connection& conn); + virtual void readCallback(const std::string& packet, Connection& conn) = 0; + +protected: + std::string _sep; + std::map<Connection, std::stringstream> _fragments; +}; + class USCXML_API ClientSocket : public Socket { public: ClientSocket(int domain, int type, int protocol); @@ -100,6 +120,8 @@ public: virtual void readCallback(const char* data, size_t size) {}; void connect(const std::string& address, int port); + void connect(const std::string& address); + int write(const std::string& data); int write(const char* data, size_t size); |