summaryrefslogtreecommitdiffstats
path: root/src/uscxml/server/Socket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/uscxml/server/Socket.cpp')
-rw-r--r--src/uscxml/server/Socket.cpp109
1 files changed, 97 insertions, 12 deletions
diff --git a/src/uscxml/server/Socket.cpp b/src/uscxml/server/Socket.cpp
index 9c844e5..2d474ea 100644
--- a/src/uscxml/server/Socket.cpp
+++ b/src/uscxml/server/Socket.cpp
@@ -67,10 +67,16 @@ Socket::~Socket() {
}
void Socket::setupSockAddr(const std::string& address, int port) {
+
if (address == "*") {
_sin.sin_addr.s_addr = 0;
} else {
- _sin.sin_addr.s_addr = inet_addr(address.c_str());
+ struct hostent *he = NULL;
+ if ( (he = gethostbyname(address.c_str()) ) != NULL ) {
+ memcpy(&_sin.sin_addr, he->h_addr_list[0], he->h_length);
+ } else {
+ _sin.sin_addr.s_addr = inet_addr(address.c_str());
+ }
if (_sin.sin_addr.s_addr == INADDR_NONE)
throw std::runtime_error(std::string("inet_addr: ") + strerror(errno));
}
@@ -83,6 +89,34 @@ void Socket::setBlockSizeRead(size_t size) {
_blockSizeRead = size;
}
+void Socket::parseAddress(const std::string& address, std::string& protocol, std::string& hostName, uint16_t& port) {
+ // tcp://hostname:port
+ size_t protEnd = address.find("://");
+ if (protEnd != std::string::npos) {
+ protocol = address.substr(0, protEnd);
+ protEnd += 3;
+ } else {
+ protocol = "tcp";
+ protEnd = 0;
+ }
+
+ size_t hostEnd = address.find(":", protEnd);
+ if (hostEnd != std::string::npos) {
+ hostName = address.substr(protEnd, hostEnd - protEnd);
+ hostEnd += 1;
+ } else {
+ hostName = "127.0.0.1";
+ hostEnd = protEnd;
+ }
+
+ if (hostEnd < address.size()) {
+ port = strTo<uint16_t>(address.substr(hostEnd));
+ } else {
+ port = 0;
+ }
+}
+
+
ClientSocket::ClientSocket(int domain, int type, int protocol) : Socket(domain, type, protocol), _clientEvent(NULL) {
}
@@ -95,7 +129,7 @@ ClientSocket::~ClientSocket() {
}
void ClientSocket::errorCallback(struct bufferevent *bev, short error, void *ctx) {
- ClientSocket* instance = (ClientSocket*)ctx;
+// ClientSocket* instance = (ClientSocket*)ctx;
// tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex);
if (error & BEV_EVENT_READING) {
@@ -115,6 +149,14 @@ void ClientSocket::errorCallback(struct bufferevent *bev, short error, void *ctx
// bufferevent_free(bev);
}
+void ClientSocket::connect(const std::string& address) {
+ std::string _prot;
+ std::string _address;
+ uint16_t _port;
+ parseAddress(address, _prot, _address, _port);
+ connect(_address, _port);
+}
+
void ClientSocket::connect(const std::string& address, int port) {
// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
@@ -123,11 +165,15 @@ void ClientSocket::connect(const std::string& address, int port) {
throw std::runtime_error(std::string("connect: ") + strerror(errno));
}
- _clientEvent = bufferevent_socket_new(_base->base, _socketFD, 0); //BEV_OPT_THREADSAFE);
+ _clientEvent = bufferevent_socket_new(_base->base, _socketFD, BEV_OPT_THREADSAFE); //BEV_OPT_THREADSAFE);
bufferevent_setcb(_clientEvent, ClientSocket::readCallback, NULL, ClientSocket::errorCallback, this);
bufferevent_enable(_clientEvent, EV_READ|EV_WRITE);
}
+int ClientSocket::write(const std::string& data) {
+ return write(data.data(), data.size());
+}
+
int ClientSocket::write(const char* data, size_t size) {
// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
bufferevent_write(_clientEvent, data, size);
@@ -138,14 +184,15 @@ void ClientSocket::readCallback(struct bufferevent *bev, void *ctx) {
ClientSocket* instance = (ClientSocket*)ctx;
// tthread::lock_guard<tthread::recursive_mutex> lock(instance->_mutex);
- size_t n;
+ int n;
struct evbuffer* input;
char* data = (char*)malloc(instance->_blockSizeRead);
input = bufferevent_get_input(bev);
- n = evbuffer_remove(input, data, instance->_blockSizeRead);
-
- instance->readCallback(data, n);
+
+ while((n = evbuffer_remove(input, data, instance->_blockSizeRead)) > 0) {
+ instance->readCallback(data, n);
+ }
free(data);
}
@@ -201,7 +248,6 @@ void ServerSocket::errorCallback(struct bufferevent *bev, short error, void *ctx
#else
close(conn->second.fd);
#endif
-
instance->_connections.erase(conn);
}
} else if (error & BEV_EVENT_EOF) {
@@ -229,9 +275,9 @@ void ServerSocket::readCallback(struct bufferevent *bev, void *ctx) {
char* data = (char*)malloc(instance->_blockSizeRead);
input = bufferevent_get_input(bev);
- n = evbuffer_remove(input, data, instance->_blockSizeRead);
-
- instance->readCallback(data, n, instance->_connections[bev]);
+ while((n = evbuffer_remove(input, data, instance->_blockSizeRead)) > 0) {
+ instance->readCallback(data, n, instance->_connections[bev]);
+ }
free(data);
}
@@ -241,11 +287,30 @@ void ServerSocket::bind() {
}
}
+void ServerSocket::listen(const std::string& address) {
+ std::string _prot;
+ std::string _address;
+ uint16_t _port;
+ parseAddress(address, _prot, _address, _port);
+ listen(_address, _port);
+}
+
void ServerSocket::listen(const std::string& address, int port) {
// tthread::lock_guard<tthread::recursive_mutex> lock(_mutex);
setupSockAddr(address, port);
bind();
+ int reuseaddr_on = 1;
+ setsockopt(_socketFD, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, sizeof(reuseaddr_on));
+
+ int flags = fcntl(_socketFD, F_GETFL);
+ if (flags >= 0) {
+ flags |= O_NONBLOCK;
+ if (fcntl(_socketFD, F_SETFL, flags) < 0) {
+ // could not set to non-blocj
+ }
+ }
+
_listenerEvent = event_new(_base->base, _socketFD, EV_READ|EV_PERSIST, acceptCallback, (void*)this);
/*XXX check it */
event_add(_listenerEvent, NULL);
@@ -275,7 +340,7 @@ void ServerSocket::acceptCallback(evutil_socket_t listener, short event, void *c
} else {
struct bufferevent *bev;
evutil_make_socket_nonblocking(fd);
- bev = bufferevent_socket_new(instance->_base->base, fd, BEV_OPT_THREADSAFE);
+ bev = bufferevent_socket_new(instance->_base->base, fd, BEV_OPT_THREADSAFE); //BEV_OPT_THREADSAFE
bufferevent_setcb(bev, ServerSocket::readCallback, NULL, ServerSocket::errorCallback, ctx);
bufferevent_enable(bev, EV_READ|EV_WRITE);
@@ -288,4 +353,24 @@ void ServerSocket::Connection::reply(const char* data, size_t size) {
bufferevent_write(bufferEvent, data, size);
}
+void PacketServerSocket::readCallback(const char* data, size_t size, Connection& conn) {
+ std::stringstream& fragment = _fragments[conn];
+ fragment << std::string(data, size);
+
+ size_t startPos = 0;
+ size_t endPos;
+ const std::string& buffer = fragment.str();
+ while((endPos = buffer.find(_sep, startPos)) != std::string::npos) {
+// std::cout << ">" << buffer.substr(startPos, endPos - startPos) << "<" << std::endl;
+ readCallback(buffer.substr(startPos, endPos - startPos), conn);
+ startPos = endPos + _sep.size();
+ }
+ if (startPos != 0 && startPos < buffer.size() + 1) {
+ std::string rest = buffer.substr(startPos);
+ fragment.str(std::string());
+ fragment.clear();
+ fragment << rest;
+ }
+}
+
}