diff options
Diffstat (limited to 'src/uscxml/server/Socket.cpp')
-rw-r--r-- | src/uscxml/server/Socket.cpp | 109 |
1 files changed, 97 insertions, 12 deletions
diff --git a/src/uscxml/server/Socket.cpp b/src/uscxml/server/Socket.cpp index 9c844e5..2d474ea 100644 --- a/src/uscxml/server/Socket.cpp +++ b/src/uscxml/server/Socket.cpp @@ -67,10 +67,16 @@ Socket::~Socket() { } void Socket::setupSockAddr(const std::string& address, int port) { + if (address == "*") { _sin.sin_addr.s_addr = 0; } else { - _sin.sin_addr.s_addr = inet_addr(address.c_str()); + struct hostent *he = NULL; + if ( (he = gethostbyname(address.c_str()) ) != NULL ) { + memcpy(&_sin.sin_addr, he->h_addr_list[0], he->h_length); + } else { + _sin.sin_addr.s_addr = inet_addr(address.c_str()); + } if (_sin.sin_addr.s_addr == INADDR_NONE) throw std::runtime_error(std::string("inet_addr: ") + strerror(errno)); } @@ -83,6 +89,34 @@ void Socket::setBlockSizeRead(size_t size) { _blockSizeRead = size; } +void Socket::parseAddress(const std::string& address, std::string& protocol, std::string& hostName, uint16_t& port) { + // tcp://hostname:port + size_t protEnd = address.find("://"); + if (protEnd != std::string::npos) { + protocol = address.substr(0, protEnd); + protEnd += 3; + } else { + protocol = "tcp"; + protEnd = 0; + } + + size_t hostEnd = address.find(":", protEnd); + if (hostEnd != std::string::npos) { + hostName = address.substr(protEnd, hostEnd - protEnd); + hostEnd += 1; + } else { + hostName = "127.0.0.1"; + hostEnd = protEnd; + } + + if (hostEnd < address.size()) { + port = strTo<uint16_t>(address.substr(hostEnd)); + } else { + port = 0; + } +} + + ClientSocket::ClientSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _clientEvent(NULL) { } @@ -95,7 +129,7 @@ ClientSocket::~ClientSocket() { } void ClientSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) { - ClientSocket* instance = (ClientSocket*)ctx; +// ClientSocket* instance = (ClientSocket*)ctx; // tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); if (error & BEV_EVENT_READING) { @@ -115,6 +149,14 @@ void ClientSocket::errorCallback(struct bufferevent *bev, short error, void *ctx // bufferevent_free(bev); } +void ClientSocket::connect(const std::string& address) { + std::string _prot; + std::string _address; + uint16_t _port; + parseAddress(address, _prot, _address, _port); + connect(_address, _port); +} + void ClientSocket::connect(const std::string& address, int port) { // tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); @@ -123,11 +165,15 @@ void ClientSocket::connect(const std::string& address, int port) { throw std::runtime_error(std::string("connect: ") + strerror(errno)); } - _clientEvent = bufferevent_socket_new(_base->base, _socketFD, 0); //BEV_OPT_THREADSAFE); + _clientEvent = bufferevent_socket_new(_base->base, _socketFD, BEV_OPT_THREADSAFE); //BEV_OPT_THREADSAFE); bufferevent_setcb(_clientEvent, ClientSocket::readCallback, NULL, ClientSocket::errorCallback, this); bufferevent_enable(_clientEvent, EV_READ|EV_WRITE); } +int ClientSocket::write(const std::string& data) { + return write(data.data(), data.size()); +} + int ClientSocket::write(const char* data, size_t size) { // tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); bufferevent_write(_clientEvent, data, size); @@ -138,14 +184,15 @@ void ClientSocket::readCallback(struct bufferevent *bev, void *ctx) { ClientSocket* instance = (ClientSocket*)ctx; // tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex); - size_t n; + int n; struct evbuffer* input; char* data = (char*)malloc(instance->_blockSizeRead); input = bufferevent_get_input(bev); - n = evbuffer_remove(input, data, instance->_blockSizeRead); - - instance->readCallback(data, n); + + while((n = evbuffer_remove(input, data, instance->_blockSizeRead)) > 0) { + instance->readCallback(data, n); + } free(data); } @@ -201,7 +248,6 @@ void ServerSocket::errorCallback(struct bufferevent *bev, short error, void *ctx #else close(conn->second.fd); #endif - instance->_connections.erase(conn); } } else if (error & BEV_EVENT_EOF) { @@ -229,9 +275,9 @@ void ServerSocket::readCallback(struct bufferevent *bev, void *ctx) { char* data = (char*)malloc(instance->_blockSizeRead); input = bufferevent_get_input(bev); - n = evbuffer_remove(input, data, instance->_blockSizeRead); - - instance->readCallback(data, n, instance->_connections[bev]); + while((n = evbuffer_remove(input, data, instance->_blockSizeRead)) > 0) { + instance->readCallback(data, n, instance->_connections[bev]); + } free(data); } @@ -241,11 +287,30 @@ void ServerSocket::bind() { } } +void ServerSocket::listen(const std::string& address) { + std::string _prot; + std::string _address; + uint16_t _port; + parseAddress(address, _prot, _address, _port); + listen(_address, _port); +} + void ServerSocket::listen(const std::string& address, int port) { // tthread::lock_guard<tthread::recursive_mutex> lock(_mutex); setupSockAddr(address, port); bind(); + int reuseaddr_on = 1; + setsockopt(_socketFD, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on)); + + int flags = fcntl(_socketFD, F_GETFL); + if (flags >= 0) { + flags |= O_NONBLOCK; + if (fcntl(_socketFD, F_SETFL, flags) < 0) { + // could not set to non-blocj + } + } + _listenerEvent = event_new(_base->base, _socketFD, EV_READ|EV_PERSIST, acceptCallback, (void*)this); /*XXX check it */ event_add(_listenerEvent, NULL); @@ -275,7 +340,7 @@ void ServerSocket::acceptCallback(evutil_socket_t listener, short event, void *c } else { struct bufferevent *bev; evutil_make_socket_nonblocking(fd); - bev = bufferevent_socket_new(instance->_base->base, fd, BEV_OPT_THREADSAFE); + bev = bufferevent_socket_new(instance->_base->base, fd, BEV_OPT_THREADSAFE); //BEV_OPT_THREADSAFE bufferevent_setcb(bev, ServerSocket::readCallback, NULL, ServerSocket::errorCallback, ctx); bufferevent_enable(bev, EV_READ|EV_WRITE); @@ -288,4 +353,24 @@ void ServerSocket::Connection::reply(const char* data, size_t size) { bufferevent_write(bufferEvent, data, size); } +void PacketServerSocket::readCallback(const char* data, size_t size, Connection& conn) { + std::stringstream& fragment = _fragments[conn]; + fragment << std::string(data, size); + + size_t startPos = 0; + size_t endPos; + const std::string& buffer = fragment.str(); + while((endPos = buffer.find(_sep, startPos)) != std::string::npos) { +// std::cout << ">" << buffer.substr(startPos, endPos - startPos) << "<" << std::endl; + readCallback(buffer.substr(startPos, endPos - startPos), conn); + startPos = endPos + _sep.size(); + } + if (startPos != 0 && startPos < buffer.size() + 1) { + std::string rest = buffer.substr(startPos); + fragment.str(std::string()); + fragment.clear(); + fragment << rest; + } +} + } |