diff options
author | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2013-03-17 12:11:03 (GMT) |
---|---|---|
committer | Stefan Radomski <radomski@tk.informatik.tu-darmstadt.de> | 2013-03-17 12:11:03 (GMT) |
commit | ccbf595c52fd705ec70abc774a29b153a7281334 (patch) | |
tree | 81b698cf9b2290a6905d12c71ef62e88a8b1c1cc /src | |
parent | 81079295b8be14128b7e532d504b32280360532e (diff) | |
download | uscxml-ccbf595c52fd705ec70abc774a29b153a7281334.zip uscxml-ccbf595c52fd705ec70abc774a29b153a7281334.tar.gz uscxml-ccbf595c52fd705ec70abc774a29b153a7281334.tar.bz2 |
Fixed http responses and added miles invoker
Diffstat (limited to 'src')
-rw-r--r-- | src/uscxml/Factory.cpp | 29 | ||||
-rw-r--r-- | src/uscxml/Interpreter.cpp | 5 | ||||
-rw-r--r-- | src/uscxml/Interpreter.h | 8 | ||||
-rw-r--r-- | src/uscxml/plugins/datamodel/ecmascript/v8/V8DataModel.cpp | 1 | ||||
-rw-r--r-- | src/uscxml/plugins/element/postpone/PostponeElement.cpp | 4 | ||||
-rw-r--r-- | src/uscxml/plugins/element/response/ResponseElement.cpp | 4 | ||||
-rw-r--r-- | src/uscxml/plugins/invoker/filesystem/dirmon/DirMonInvoker.cpp | 5 | ||||
-rw-r--r-- | src/uscxml/plugins/invoker/graphics/openscenegraph/OSGConverter.cpp | 14 | ||||
-rw-r--r-- | src/uscxml/plugins/invoker/http/HTTPServletInvoker.h | 2 | ||||
-rw-r--r-- | src/uscxml/plugins/invoker/miles/MilesSessionInvoker.cpp | 407 | ||||
-rw-r--r-- | src/uscxml/plugins/invoker/miles/MilesSessionInvoker.h | 100 | ||||
-rw-r--r-- | src/uscxml/server/HTTPServer.cpp | 78 | ||||
-rw-r--r-- | src/uscxml/server/HTTPServer.h | 1 |
13 files changed, 613 insertions, 45 deletions
diff --git a/src/uscxml/Factory.cpp b/src/uscxml/Factory.cpp index 94049ea..4011446 100644 --- a/src/uscxml/Factory.cpp +++ b/src/uscxml/Factory.cpp @@ -3,6 +3,7 @@ #include "uscxml/Factory.h" #include "uscxml/Message.h" +#include "uscxml/Interpreter.h" #include <glog/logging.h> #ifdef BUILD_AS_PLUGINS @@ -25,7 +26,7 @@ # endif # ifdef MILES_FOUND -# include "uscxml/plugins/invoker/modality/miles/SpatialAudio.h" +# include "uscxml/plugins/invoker/miles/MilesSessionInvoker.h" # endif # ifdef FFMPEG_FOUND @@ -60,29 +61,29 @@ Factory::Factory() { pluginPath = (getenv("USCXML_PLUGIN_PATH") != NULL ? getenv("USCXML_PLUGIN_PATH") : ""); } if (pluginPath.length() > 0) { - pluma.acceptProviderType<InvokerProvider>(); - pluma.acceptProviderType<IOProcessorProvider>(); - pluma.acceptProviderType<DataModelProvider>(); + pluma.acceptProviderType<InvokerImplProvider>(); + pluma.acceptProviderType<IOProcessorImplProvider>(); + pluma.acceptProviderType<DataModelImplProvider>(); pluma.loadFromFolder(pluginPath); - std::vector<InvokerProvider*> invokerProviders; + std::vector<InvokerImplProvider*> invokerProviders; pluma.getProviders(invokerProviders); - for (std::vector<InvokerProvider*>::iterator it = invokerProviders.begin() ; it != invokerProviders.end() ; ++it) { - Invoker* invoker = (*it)->create(); + for (std::vector<InvokerImplProvider*>::iterator it = invokerProviders.begin() ; it != invokerProviders.end() ; ++it) { + InvokerImpl* invoker = (*it)->create(); registerInvoker(invoker); } - std::vector<IOProcessorProvider*> ioProcessorProviders; + std::vector<IOProcessorImplProvider*> ioProcessorProviders; pluma.getProviders(ioProcessorProviders); - for (std::vector<IOProcessorProvider*>::iterator it = ioProcessorProviders.begin() ; it != ioProcessorProviders.end() ; ++it) { - IOProcessor* ioProcessor = (*it)->create(); + for (std::vector<IOProcessorImplProvider*>::iterator it = ioProcessorProviders.begin() ; it != ioProcessorProviders.end() ; ++it) { + IOProcessorImpl* ioProcessor = (*it)->create(); registerIOProcessor(ioProcessor); } - std::vector<DataModelProvider*> dataModelProviders; + std::vector<DataModelImplProvider*> dataModelProviders; pluma.getProviders(dataModelProviders); - for (std::vector<DataModelProvider*>::iterator it = dataModelProviders.begin() ; it != dataModelProviders.end() ; ++it) { - DataModel* dataModel = (*it)->create(); + for (std::vector<DataModelImplProvider*>::iterator it = dataModelProviders.begin() ; it != dataModelProviders.end() ; ++it) { + DataModelImpl* dataModel = (*it)->create(); registerDataModel(dataModel); } } @@ -96,7 +97,7 @@ Factory::Factory() { #ifdef MILES_FOUND { - SpatialAudio* invoker = new SpatialAudio(); + MilesSessionInvoker* invoker = new MilesSessionInvoker(); registerInvoker(invoker); } #endif diff --git a/src/uscxml/Interpreter.cpp b/src/uscxml/Interpreter.cpp index 5324b9e..95f4658 100644 --- a/src/uscxml/Interpreter.cpp +++ b/src/uscxml/Interpreter.cpp @@ -37,9 +37,9 @@ Interpreter::Interpreter() : Arabica::SAX2DOM::Parser<std::string>() { _thread = NULL; _sendQueue = NULL; _parentQueue = NULL; - _httpServlet = NULL; _running = false; _done = false; + _httpServlet = NULL; #ifdef _WIN32 WSADATA wsaData; @@ -277,7 +277,8 @@ void Interpreter::interpret() { if (_dataModel) { _dataModel.assign("_x.args", _cmdLineOptions); - _dataModel.assign("_ioprocessors['http']", _httpServlet->getDataModelVariables()); + if (_httpServlet) + _dataModel.assign("_ioprocessors['http']", _httpServlet->getDataModelVariables()); } setupIOProcessors(); diff --git a/src/uscxml/Interpreter.h b/src/uscxml/Interpreter.h index 318af95..1a44f80 100644 --- a/src/uscxml/Interpreter.h +++ b/src/uscxml/Interpreter.h @@ -114,6 +114,10 @@ public: return _cmdLineOptions; } + HTTPServletInvoker* getHTTPServlet() { + return _httpServlet; + } + DataModel getDataModel() { return _dataModel; } @@ -166,10 +170,6 @@ public: return _sessionId; } - HTTPServletInvoker* getHTTPServlet() { - return _httpServlet; - } - bool runOnMainThread(int fps, bool blocking = true); static bool isMember(const Arabica::DOM::Node<std::string>& node, const Arabica::XPath::NodeSet<std::string>& set); diff --git a/src/uscxml/plugins/datamodel/ecmascript/v8/V8DataModel.cpp b/src/uscxml/plugins/datamodel/ecmascript/v8/V8DataModel.cpp index d00ad43..407988e 100644 --- a/src/uscxml/plugins/datamodel/ecmascript/v8/V8DataModel.cpp +++ b/src/uscxml/plugins/datamodel/ecmascript/v8/V8DataModel.cpp @@ -178,6 +178,7 @@ Data V8DataModel::getValueAsData(const v8::Handle<v8::Value>& value) { } else if(value->IsString()) { v8::String::AsciiValue property(v8::Handle<v8::String>::Cast(value)); data.atom = *property; + data.type = Data::VERBATIM; } else if(value->IsStringObject()) { LOG(ERROR) << "IsStringObject is unimplemented" << std::endl; } else if(value->IsTrue()) { diff --git a/src/uscxml/plugins/element/postpone/PostponeElement.cpp b/src/uscxml/plugins/element/postpone/PostponeElement.cpp index b50b5c2..96cda6f 100644 --- a/src/uscxml/plugins/element/postpone/PostponeElement.cpp +++ b/src/uscxml/plugins/element/postpone/PostponeElement.cpp @@ -58,7 +58,7 @@ void PostponeElement::enterElement(const Arabica::DOM::Node<std::string>& node) return; } - LOG(INFO) << until; +// LOG(INFO) << until; #if 0 std::string timeoutStr = "0s"; @@ -124,6 +124,8 @@ void PostponeElement::Resubmitter::onStableConfiguration(Interpreter* interprete } eventIter++; } +// LOG(ERROR) << _postponedEvents.size() << " Postponess remaining"; + } void PostponeElement::Resubmitter::afterCompletion(Interpreter* interpreter) { diff --git a/src/uscxml/plugins/element/response/ResponseElement.cpp b/src/uscxml/plugins/element/response/ResponseElement.cpp index 2e25b27..2da0d07 100644 --- a/src/uscxml/plugins/element/response/ResponseElement.cpp +++ b/src/uscxml/plugins/element/response/ResponseElement.cpp @@ -42,8 +42,12 @@ void ResponseElement::enterElement(const Arabica::DOM::Node<std::string>& node) LOG(ERROR) << "No matching HTTP request for response element"; return; } + + assert(servlet->getRequests().find(requestId) != servlet->getRequests().end()); HTTPServer::Request httpReq = servlet->getRequests()[requestId]; + assert(httpReq.curlReq != NULL); HTTPServer::Reply httpReply(httpReq); + servlet->getRequests().erase(requestId); // get the status or default to 200 std::string statusStr = (HAS_ATTR(node, "status") ? ATTR(node, "status") : "200"); diff --git a/src/uscxml/plugins/invoker/filesystem/dirmon/DirMonInvoker.cpp b/src/uscxml/plugins/invoker/filesystem/dirmon/DirMonInvoker.cpp index 675135a..65ea531 100644 --- a/src/uscxml/plugins/invoker/filesystem/dirmon/DirMonInvoker.cpp +++ b/src/uscxml/plugins/invoker/filesystem/dirmon/DirMonInvoker.cpp @@ -358,9 +358,10 @@ void DirectoryWatch::updateEntries(bool reportAsExisting) { monIter++; } } - _knownEntries.erase(fileIter->first); + _knownEntries.erase(fileIter++); + } else { + fileIter++; } - fileIter++; } // remember when we last checked the directory for modifications #ifndef WIN32 diff --git a/src/uscxml/plugins/invoker/graphics/openscenegraph/OSGConverter.cpp b/src/uscxml/plugins/invoker/graphics/openscenegraph/OSGConverter.cpp index 09cf663..93a238c 100644 --- a/src/uscxml/plugins/invoker/graphics/openscenegraph/OSGConverter.cpp +++ b/src/uscxml/plugins/invoker/graphics/openscenegraph/OSGConverter.cpp @@ -270,10 +270,13 @@ void OSGConverter::process(const SendRequest& req) { viewer.getCamera()->setDrawBuffer(pbuffer); viewer.getCamera()->setReadBuffer(pbuffer); + double zoom = 1; + CAST_PARAM(req.params, zoom, "zoom", double); + // set background color viewer.getCamera()->setClearColor(osg::Vec4f(1.0f,1.0f,1.0f,1.0f)); viewer.getCamera()->setClearMask(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT); - viewer.getCameraManipulator()->setByMatrix(osg::Matrix::lookAt(osg::Vec3d(0,0,bs.radius() * -4), // eye + viewer.getCameraManipulator()->setByMatrix(osg::Matrix::lookAt(osg::Vec3d(0,0,bs.radius() * (-3.4 * zoom)), // eye (osg::Vec3d)bs.center(), // center osg::Vec3d(0,1,0))); // up @@ -304,19 +307,17 @@ osg::Matrix OSGConverter::requestToModelPose(const SendRequest& req) { double pitch = 0; double roll = 0; double yaw = 0; - double zoom = 1; double x = 0; double y = 0; double z = 0; CAST_PARAM(req.params, pitch, "pitch", double); CAST_PARAM(req.params, roll, "roll", double); CAST_PARAM(req.params, yaw, "yaw", double); - CAST_PARAM(req.params, zoom, "zoom", double); CAST_PARAM(req.params, x, "x", double); CAST_PARAM(req.params, y, "y", double); CAST_PARAM(req.params, z, "z", double); - osg::Matrix m = osg::Matrix::scale(zoom, zoom, zoom) * eulerToMatrix(pitch, roll, yaw) * osg::Matrix::translate(-1 * x, -1 * y, -1 * z); + osg::Matrix m = eulerToMatrix(pitch, roll, yaw) * osg::Matrix::translate(-1 * x, -1 * y, -1 * z); #if 0 dumpMatrix(m); #endif @@ -335,6 +336,9 @@ osg::Matrix OSGConverter::requestToCamPose(const SendRequest& req) { osg::ref_ptr<osg::Node> OSGConverter::setupGraph(const std::string filename, bool autoRotate) { + // get some privacy + tthread::lock_guard<tthread::recursive_mutex> lock(_cacheMutex); + /** * root (model pose) * - rotate (autoRotate to face largest side) @@ -345,8 +349,6 @@ osg::ref_ptr<osg::Node> OSGConverter::setupGraph(const std::string filename, boo long now = tthread::chrono::system_clock::now(); { - // get some privacy - tthread::lock_guard<tthread::recursive_mutex> lock(_cacheMutex); // do we have it in the cache? if (_models.find(filename) == _models.end()) { diff --git a/src/uscxml/plugins/invoker/http/HTTPServletInvoker.h b/src/uscxml/plugins/invoker/http/HTTPServletInvoker.h index c9fe844..5d2d4b9 100644 --- a/src/uscxml/plugins/invoker/http/HTTPServletInvoker.h +++ b/src/uscxml/plugins/invoker/http/HTTPServletInvoker.h @@ -53,7 +53,7 @@ protected: }; #ifdef BUILD_AS_PLUGINS -PLUMA_INHERIT_PROVIDER(HTTPServletInvoker, Invoker); +PLUMA_INHERIT_PROVIDER(HTTPServletInvoker, InvokerImpl); #endif } diff --git a/src/uscxml/plugins/invoker/miles/MilesSessionInvoker.cpp b/src/uscxml/plugins/invoker/miles/MilesSessionInvoker.cpp new file mode 100644 index 0000000..1a1416a --- /dev/null +++ b/src/uscxml/plugins/invoker/miles/MilesSessionInvoker.cpp @@ -0,0 +1,407 @@ +#include "MilesSessionInvoker.h" +#include <glog/logging.h> + +#ifdef BUILD_AS_PLUGINS +#include <Pluma/Connector.hpp> +#endif + +#include <inttypes.h> + +namespace uscxml { + +#ifdef BUILD_AS_PLUGINS +PLUMA_CONNECTOR +bool connect(pluma::Host& host) { + host.add( new MilesSessionInvokerProvider() ); + return true; +} +#endif + +MilesSessionInvoker::MilesSessionInvoker() { + /* Initalize Miles */ + miles_init(); +} + +MilesSessionInvoker::~MilesSessionInvoker() { +}; + +boost::shared_ptr<IOProcessorImpl> MilesSessionInvoker::create(Interpreter* interpreter) { + boost::shared_ptr<MilesSessionInvoker> invoker = boost::shared_ptr<MilesSessionInvoker>(new MilesSessionInvoker()); + invoker->_interpreter = interpreter; + return invoker; +} + +Data MilesSessionInvoker::getDataModelVariables() { + Data data; + return data; +} + +void MilesSessionInvoker::send(const SendRequest& req) { + std::cout << req; + if (boost::iequals(req.name, "disconnect")) { + std::string reflectorIP = req.params.find("reflectorip")->second; + std::string problemName = req.params.find("problemname")->second; + int rv; + rv = miles_disconnect_reflector_session((char*)reflectorIP.c_str(), (char*)problemName.c_str()); + if (!rv) { + LOG(ERROR) << "Could not disconnect from reflector session"; + return; + } + + } + if (boost::iequals(req.name, "connect")) { + std::string email = req.params.find("email")->second; + std::string reflectorIP = req.params.find("reflectorip")->second; + std::string problemName = req.params.find("problemname")->second; + + int rv; + rv = miles_connect_reflector_session((char*)reflectorIP.c_str(), (char*)problemName.c_str()); + if (!rv) { + LOG(ERROR) << "Could not setup reflector session"; + return; + } + + /* Set up audio and video RTP sockets */ + video_rtp_in_socket = miles_net_setup_udp_socket((char*)reflectorIP.c_str(), video_port, video_port, 10, 16000); + audio_rtp_in_socket = miles_net_setup_udp_socket((char*)reflectorIP.c_str(), audio_port, audio_port, 10, 16000); + video_rtp_out_socket = video_rtp_in_socket; //miles_net_setup_udp_socket((char*)reflectorIP.c_str(), video_port, 0, 10, 16000); + audio_rtp_out_socket = audio_rtp_in_socket; //miles_net_setup_udp_socket((char*)reflectorIP.c_str(), audio_port, 0, 10, 16000); + + /* Set up audio and video RTCP sockets */ + video_rtcp_in_socket = miles_net_setup_udp_socket((char*)reflectorIP.c_str(), video_port+1, video_port+1, 10, 16000); + audio_rtcp_in_socket = miles_net_setup_udp_socket((char*)reflectorIP.c_str(), audio_port+1, audio_port+1, 10, 16000); + video_rtcp_out_socket = video_rtcp_in_socket; //miles_net_setup_udp_socket((char*)reflectorIP.c_str(), video_port+1, 0, 10, 16000); + audio_rtcp_out_socket = audio_rtcp_in_socket; //miles_net_setup_udp_socket((char*)reflectorIP.c_str(), audio_port+1, 0, 10, 16000); + + /* Set up RTP audio and video sessions */ + video_session = miles_rtp_setup_session(video_rtp_in_socket, MILES_RTP_MEDIA_TYPE_VIDEO); + audio_session = miles_rtp_setup_session(audio_rtp_in_socket, MILES_RTP_MEDIA_TYPE_AUDIO); + + /* Set up RTCP audio and video sessions */ + video_session->rtcp_session = miles_rtp_setup_rtcp_session(video_session, video_rtcp_in_socket); + audio_session->rtcp_session = miles_rtp_setup_rtcp_session(audio_session, audio_rtcp_in_socket); + + /* Initialize and configure video encoder */ + video_encoder = miles_video_codec_init_encoder(); + video_encoder->codec_id = miles_video_codec_get_encoder_for_rtp_payload_type(MILES_RTP_PAYLOAD_TYPE_JPEG); + video_encoder->width = 320; + video_encoder->height = 240; + video_encoder->qfactor = 50; + rv = miles_video_codec_setup_encoder(video_encoder); + if (!rv) { + LOG(ERROR) << "Could not setup video encoder"; + return; + } + + /* Set up video grabber */ + rv = miles_video_grabber_get_supported_grabbers(&supported_video_grabbers); + if(rv<=0) { + /* No video grabber available */ + exit(-1); + } + video_grabber = miles_video_grabber_create_context(supported_video_grabbers[0]); + video_grabber->width = video_encoder->width; + video_grabber->height = video_encoder->height; + miles_video_grabber_setup(video_grabber); + free(supported_video_grabbers); + + /* Set up outgoing RTP stream for video */ + out_rtp_video_stream = miles_rtp_setup_outgoing_stream(video_session, video_rtp_out_socket, 0, MILES_RTP_PAYLOAD_TYPE_JPEG); + + /* Initialize and configure audio encoder */ + audio_encoder = miles_audio_codec_init_encoder(); + audio_encoder->codec_id = miles_audio_codec_get_encoder_for_rtp_payload_type(MILES_RTP_PAYLOAD_TYPE_L16); + audio_encoder->sample_rate = 16000; + audio_encoder->bytes_per_sample = 2; + audio_encoder->chunk_size = 320; /* 20 ms */ + audio_encoder->input_format = MILES_AUDIO_FORMAT_PCM; + rv = miles_audio_codec_setup_encoder(audio_encoder); + if(rv == 0) { + /* Couldn't set up audio codec */ + exit(-1); + } + + + /* Set up audio grabber */ + int n = miles_audio_device_get_supported_devices(&supported_audio_devices); + if(n<=0) { + /* No audio device available */ + exit(-1); + } + /* Use first device that supports capture */ + for(int i=0; i<n; i++) { + audio_dev = miles_audio_device_open(supported_audio_devices[i].id, MILES_AUDIO_FORMAT_PCM, 16000, 2, 1, 640, 1); + if(audio_dev) + break; + } + if(audio_dev == NULL) + exit(-1); + + /* Find first audio device that supports playback */ + for(int i=0; i<n; i++) { + audio_dev_playback = miles_audio_device_open(supported_audio_devices[i].id, MILES_AUDIO_FORMAT_PCM, 16000, 2, 1, 640, 0); + if(audio_dev_playback) { + audio_dev_playback_id = supported_audio_devices[i].id; + break; + } + } + if(audio_dev_playback == NULL) + exit(-1); + + /* Set up outgoing RTP stream for audio */ + out_rtp_audio_stream = miles_rtp_setup_outgoing_stream(audio_session, audio_rtp_out_socket, 0, MILES_RTP_PAYLOAD_TYPE_L16); + + /* Associate RTP stream with codec context */ + out_rtp_audio_stream->codec_ctx = audio_encoder; + out_rtp_video_stream->codec_ctx = video_encoder; + + /* Set up outgoing RTCP streams for audio and video */ + out_rtcp_audio_stream = miles_rtp_setup_outgoing_rtcp_stream(audio_session->rtcp_session, audio_rtcp_out_socket, out_rtp_audio_stream->ssrc); + out_rtcp_video_stream = miles_rtp_setup_outgoing_rtcp_stream(video_session->rtcp_session, video_rtcp_out_socket, out_rtp_video_stream->ssrc); + + _isRunning = true; + +// while(true) { +// rtp_video_receiver(video_session); +// video_transmitter(video_grabber, video_encoder, out_rtp_video_stream, out_rtcp_video_stream); +// rtp_audio_receiver(audio_session); +// audio_transmitter(audio_dev, audio_encoder, out_rtp_audio_stream, out_rtcp_audio_stream); +// } + + _audioThread = new tthread::thread(MilesSessionInvoker::runAudio, this); + _videoThread = new tthread::thread(MilesSessionInvoker::runVideo, this); + } +} + +void MilesSessionInvoker::runAudio(void* instance) { + ((MilesSessionInvoker*)instance)->processAudio(); +} + +void MilesSessionInvoker::runVideo(void* instance) { + ((MilesSessionInvoker*)instance)->processVideo(); +} + +void MilesSessionInvoker::processVideo() { + while(_isRunning) { + rtp_video_receiver(video_session); + video_transmitter(video_grabber, video_encoder, out_rtp_video_stream, out_rtcp_video_stream); + } +} + +void MilesSessionInvoker::processAudio() { + while(_isRunning) { + rtp_audio_receiver(audio_session); + audio_transmitter(audio_dev, audio_encoder, out_rtp_audio_stream, out_rtcp_audio_stream); + } +} + +void MilesSessionInvoker::cancel(const std::string sendId) { +} + +void MilesSessionInvoker::invoke(const InvokeRequest& req) { + video_port = 5566; + audio_port = 5568; +} + +/** + * Do something with an image decoded from an RTP stream (e.g. render to screen) + */ +void MilesSessionInvoker::render_video_image(u_int32_t ssrc, char *img, int width, int height, int img_format) { + + if(img_format != MILES_IMAGE_RGBA) { + miles_image_convert(img, render_img, img_format, MILES_IMAGE_RGBA, width, height); + stbi_write_png("/Users/sradomski/Desktop/image.png", width, height, 3, render_img, width * 3); + } else { + stbi_write_png("/Users/sradomski/Desktop/image.png", width, height, 3, img, width * 3); + } + + /* render image... */ +} + +/** + * Send an audio chunk decoded from an RTP stream to an audio device + */ +void MilesSessionInvoker::playback_audio(u_int32_t ssrc, char *buf, int sample_rate, int bps, int audio_format, int size) { + int n; + + if(size<0) + return; + + /* re-configure audio device, if needed */ + if(audio_dev_playback == NULL || audio_dev_playback->chunk_size != size || audio_dev_playback->sample_rate != sample_rate || + audio_dev_playback->format != audio_format || audio_dev_playback->bytes_per_sample != bps) { + if(audio_dev_playback) + miles_audio_device_close(audio_dev_playback, 0); + audio_dev_playback = miles_audio_device_open(audio_dev_playback_id, audio_format, sample_rate, bps, 1, size, 0); + if(audio_dev_playback == NULL) + return; + } + + /* play audio */ + n = miles_audio_device_write(audio_dev_playback, buf, size); +} + +/** + * Handle incoming video streams + */ + +int MilesSessionInvoker::video_receiver(struct miles_rtp_in_stream *rtp_stream, char *data, int bytes_read) { + int status, n; + struct miles_video_codec_decode_context *codec_ctx; + + codec_ctx = (struct miles_video_codec_decode_context *)rtp_stream->codec_ctx; + + if(codec_ctx == NULL || !miles_video_codec_decoder_supports_rtp_payload_type(codec_ctx, rtp_stream->payload_type)) { + if(codec_ctx) + miles_video_codec_destroy_decoder(codec_ctx); + codec_ctx = miles_video_codec_init_decoder(); + codec_ctx->codec_id = miles_video_codec_get_decoder_for_rtp_payload_type(rtp_stream->payload_type); + if(codec_ctx->codec_id == MILES_VIDEO_CODEC_UNKNOWN) { + /* Cannot decode the video stream */ + return 0; + } + + status = miles_video_codec_setup_decoder(codec_ctx); + if(status == 0) { + /* Cannot decode the video stream */ + return 0; + } + rtp_stream->codec_ctx = (void *)codec_ctx; + return 0; + } + n = miles_video_codec_decode(codec_ctx, data, decoded_in_img, bytes_read); + if(n > 0) { + render_video_image(rtp_stream->ssrc, decoded_in_img, codec_ctx->width, codec_ctx->height, codec_ctx->output_format); + } + return n; +} + +/** + * Handle incoming audio streams + */ + +int MilesSessionInvoker::audio_receiver(struct miles_rtp_in_stream *rtp_stream, char *data, int bytes_read) { + int status, size; + struct miles_audio_codec_decode_context *codec_ctx; + + codec_ctx = (struct miles_audio_codec_decode_context *)rtp_stream->codec_ctx; + + if(codec_ctx == NULL || !miles_audio_codec_decoder_supports_rtp_payload_type(codec_ctx, rtp_stream->payload_type)) { + if(codec_ctx) + miles_audio_codec_destroy_decoder(codec_ctx); + codec_ctx = miles_audio_codec_init_decoder(); + codec_ctx->codec_id = miles_audio_codec_get_decoder_for_rtp_payload_type(rtp_stream->payload_type); + if(codec_ctx->codec_id == MILES_AUDIO_CODEC_UNKNOWN) { + /* Cannot decode the audio stream */ + return 0; + } + status = miles_audio_codec_setup_decoder(codec_ctx); + if(status == 0) { + /* Cannot decode the audio stream */ + return 0; + } + rtp_stream->codec_ctx = (void *)codec_ctx; + } + size = miles_audio_codec_decode(codec_ctx, data, audio_in_buf); + if(size > 0) { + playback_audio(rtp_stream->ssrc, audio_in_buf, codec_ctx->sample_rate, codec_ctx->bytes_per_sample, codec_ctx->output_format, size); + } + return size; +} + +/** + * Read and depacketize incoming RTP streams + */ + +void MilesSessionInvoker::rtp_audio_receiver(struct miles_rtp_session *rtp_session) { + int n, m=0; + struct miles_rtp_in_stream *rtp_stream; + + /* Poll RTP socket, read all available RTP packets */ + while (1) { + n = miles_net_poll_socket(rtp_session->socket); + if(n<=0) return; + + /* Read RTP data */ + n = miles_rtp_recv(rtp_session, &rtp_stream, audio_data); + if(n>0) { + m = audio_receiver(rtp_stream, audio_data, n); + } + + /* Poll RTCP socket */ + n = miles_net_poll_socket(rtp_session->rtcp_session->socket); + if(n>0) { + /* Do RTCP packet processing */ + n = miles_rtp_recv_rtcp(rtp_session->rtcp_session); + } + } +} + +void MilesSessionInvoker::rtp_video_receiver(struct miles_rtp_session *rtp_session) { + int n, m=0; + struct miles_rtp_in_stream *rtp_stream; + + /* Poll RTP socket, read all available RTP packets */ + while (1) { + n = miles_net_poll_socket(rtp_session->socket); + if(n<=0) return; + + /* Read RTP data */ + n = miles_rtp_recv(rtp_session, &rtp_stream, video_data); + if(n>0) { + m = video_receiver(rtp_stream, video_data, n); + } + + /* Poll RTCP socket */ + n = miles_net_poll_socket(rtp_session->rtcp_session->socket); + if(n>0) { + /* Do RTCP packet processing */ + n = miles_rtp_recv_rtcp(rtp_session->rtcp_session); + } + } +} + +/** + * Send RTP video stream + */ +int MilesSessionInvoker::video_transmitter(struct miles_video_grabber_context *grabber, struct miles_video_codec_encode_context *codec_ctx, struct miles_rtp_out_stream *rtp_stream, struct miles_rtcp_out_stream *out_rtcp_stream) { + int n; + + /* Send RTCP packets, if due */ + miles_rtp_send_rtcp(out_rtcp_stream); + + n = miles_video_grabber_grab(grabber, video_out_buf); + if(n <= 0) + return 0; + if(grabber->image_format != codec_ctx->input_format) { + /* image conversion ... */ + } + n = miles_video_codec_encode(codec_ctx, video_out_buf, encoded_out_img); + if(n<=0) + return 0; + return miles_rtp_send(rtp_stream, encoded_out_img, n); +} + +/** + * Send RTP audio stream + */ +int MilesSessionInvoker::audio_transmitter(struct miles_audio_device *dev, struct miles_audio_codec_encode_context *codec_ctx, struct miles_rtp_out_stream *rtp_stream, struct miles_rtcp_out_stream *out_rtcp_audio_stream) { + int n; + + /* Send RTCP packets, if due */ + miles_rtp_send_rtcp(out_rtcp_audio_stream); + + n = miles_audio_device_read(dev, audio_read_buf, codec_ctx->chunk_size); + if(n <= 0) + return 0; + if(dev->format != codec_ctx->input_format) { + /* audio conversion needed ... */ + } + n = miles_audio_codec_encode(codec_ctx, audio_read_buf, encoded_out_audio); + if(n<=0) + return 0; + return miles_rtp_send(rtp_stream, encoded_out_audio, n); +} + + +}
\ No newline at end of file diff --git a/src/uscxml/plugins/invoker/miles/MilesSessionInvoker.h b/src/uscxml/plugins/invoker/miles/MilesSessionInvoker.h new file mode 100644 index 0000000..cb3e9ee --- /dev/null +++ b/src/uscxml/plugins/invoker/miles/MilesSessionInvoker.h @@ -0,0 +1,100 @@ +#ifndef MILESSESIONINVOKER_H_W09J90F0 +#define MILESSESIONINVOKER_H_W09J90F0 + +#include <uscxml/Interpreter.h> + +extern "C" { +#include "miles/miles.h" +#include "miles/network.h" +#include "miles/rtp.h" +#include "miles/audio_codec.h" +#include "miles/audio_device.h" +#include "miles/video_codec.h" +#include "miles/video_grabber.h" +#include "miles/session.h" +#include "miles/image.h" +} +#ifdef BUILD_AS_PLUGINS +#include "uscxml/plugins/Plugins.h" +#endif + +namespace uscxml { + +class MilesSessionInvoker : public InvokerImpl { +public: + MilesSessionInvoker(); + virtual ~MilesSessionInvoker(); + virtual boost::shared_ptr<IOProcessorImpl> create(Interpreter* interpreter); + + virtual std::set<std::string> getNames() { + std::set<std::string> names; + names.insert("miles"); + names.insert("http://uscxml.tk.informatik.tu-darmstadt.de/#miles"); + return names; + } + + virtual Data getDataModelVariables(); + virtual void send(const SendRequest& req); + virtual void cancel(const std::string sendId); + virtual void invoke(const InvokeRequest& req); + +protected: + int video_rtp_in_socket, audio_rtp_in_socket; + int video_rtp_out_socket, audio_rtp_out_socket; + int video_rtcp_in_socket, audio_rtcp_in_socket; + int video_rtcp_out_socket, audio_rtcp_out_socket; + struct miles_rtp_session *video_session, *audio_session; + struct miles_video_codec_encode_context *video_encoder; + struct miles_audio_codec_encode_context *audio_encoder; + int *supported_video_grabbers; + struct miles_video_grabber_context *video_grabber; + struct miles_rtp_out_stream *out_rtp_video_stream, *out_rtp_audio_stream; + struct miles_rtcp_out_stream *out_rtcp_video_stream, *out_rtcp_audio_stream; + struct miles_audio_device *audio_dev; + struct miles_audio_device_description *supported_audio_devices; + int video_port, audio_port; + std::string ip_address; + + char video_out_buf[1000000]; + char encoded_out_img[1000000]; + char decoded_in_img[1000000]; + char audio_in_buf[1000000]; + char render_img[1000000]; + char audio_data[1000000]; + char video_data[1000000]; + + char encoded_out_audio[1000000]; + char audio_read_buf[1000000]; + + struct miles_audio_device *audio_dev_playback; + int audio_dev_playback_id; + + static void runAudio(void* instance); + static void runVideo(void* instance); + void processVideo(); + void processAudio(); + + void render_video_image(u_int32_t ssrc, char *img, int width, int height, int img_format); + void playback_audio(u_int32_t ssrc, char *buf, int sample_rate, int bps, int audio_format, int size); + int video_receiver(struct miles_rtp_in_stream *rtp_stream, char *data, int bytes_read); + int audio_receiver(struct miles_rtp_in_stream *rtp_stream, char *data, int bytes_read); + void rtp_audio_receiver(struct miles_rtp_session *rtp_session); + void rtp_video_receiver(struct miles_rtp_session *rtp_session); + int video_transmitter(struct miles_video_grabber_context *grabber, struct miles_video_codec_encode_context *codec_ctx, struct miles_rtp_out_stream *rtp_stream, struct miles_rtcp_out_stream *out_rtcp_stream); + int audio_transmitter(struct miles_audio_device *dev, struct miles_audio_codec_encode_context *codec_ctx, struct miles_rtp_out_stream *rtp_stream, struct miles_rtcp_out_stream *out_rtcp_audio_stream); + + + bool _isRunning; + tthread::thread* _videoThread; + tthread::thread* _audioThread; + tthread::recursive_mutex _mutex; +}; + +#ifdef BUILD_AS_PLUGINS +PLUMA_INHERIT_PROVIDER(MilesSessionInvoker, Invoker); +#endif + +} + + +#endif /* end of include guard: MILESSESIONINVOKER_H_W09J90F0 */ diff --git a/src/uscxml/server/HTTPServer.cpp b/src/uscxml/server/HTTPServer.cpp index ebc5b91..8be45e6 100644 --- a/src/uscxml/server/HTTPServer.cpp +++ b/src/uscxml/server/HTTPServer.cpp @@ -38,12 +38,25 @@ HTTPServer::HTTPServer(unsigned short port) { _port = port; _base = event_base_new(); _http = evhttp_new(_base); + + evhttp_set_allowed_methods(_http, + EVHTTP_REQ_GET | + EVHTTP_REQ_POST | + EVHTTP_REQ_HEAD | + EVHTTP_REQ_PUT | + EVHTTP_REQ_DELETE | + EVHTTP_REQ_OPTIONS | + EVHTTP_REQ_TRACE | + EVHTTP_REQ_CONNECT | + EVHTTP_REQ_PATCH); // allow all methods + _handle = NULL; while((_handle = evhttp_bind_socket_with_handle(_http, INADDR_ANY, _port)) == NULL) { _port++; } determineAddress(); + evhttp_set_timeout(_http, 5); // generic callback evhttp_set_gencb(_http, HTTPServer::httpRecvReqCallback, NULL); } @@ -56,7 +69,7 @@ tthread::recursive_mutex HTTPServer::_instanceMutex; std::map<std::string, std::string> HTTPServer::mimeTypes; HTTPServer* HTTPServer::getInstance(int port) { - tthread::lock_guard<tthread::recursive_mutex> lock(_instanceMutex); +// tthread::lock_guard<tthread::recursive_mutex> lock(_instanceMutex); if (_instance == NULL) { // this is but a tiny list, supply a content-type <header> yourself @@ -97,9 +110,10 @@ std::string HTTPServer::mimeTypeForExtension(const std::string& ext) { void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackData) { // std::cout << (uintptr_t)req << ": Replying" << std::endl; -// evhttp_send_reply(req, 200, NULL, NULL); +// evhttp_send_error(req, 404, NULL); // return; + evhttp_request_own(req); Request request; request.curlReq = req; @@ -153,6 +167,12 @@ void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackD request.data.compound["uri"] = Data(HTTPServer::getBaseURL() + req->uri, Data::VERBATIM); request.data.compound["path"] = Data(evhttp_uri_get_path(evhttp_request_get_evhttp_uri(req)), Data::VERBATIM); + // This was used for debugging +// if (boost::ends_with(request.data.compound["path"].atom, ".png")) { +// evhttp_send_error(req, 404, NULL); +// return; +// } + // seperate path into components std::stringstream ss(request.data.compound["path"].atom); std::string item; @@ -175,12 +195,26 @@ void HTTPServer::httpRecvReqCallback(struct evhttp_request *req, void *callbackD // get content buf = evhttp_request_get_input_buffer(req); + if (evbuffer_get_length(buf)) + request.data.compound["content"] = Data("", Data::VERBATIM); while (evbuffer_get_length(buf)) { int n; char cbuf[1024]; n = evbuffer_remove(buf, cbuf, sizeof(buf)-1); if (n > 0) { - request.content.append(cbuf, n); + request.data.compound["content"].atom.append(cbuf, n); + } + } + + // decode content + if (request.data.compound.find("content") != request.data.compound.end() && + request.data.compound["header"].compound.find("Content-Type") != request.data.compound["header"].compound.end()) { + std::string contentType = request.data.compound["header"].compound["Content-Type"].atom; + if (false) { + } else if (boost::iequals(contentType, "application/x-www-form-urlencoded")) { + request.data.compound["content"].atom = evhttp_decode_uri(request.data.compound["content"].atom.c_str()); + } else if (boost::iequals(contentType, "application/json")) { + request.data.compound["content"] = Data::fromJSON(request.data.compound["content"].atom); } } @@ -221,30 +255,44 @@ void HTTPServer::processByMatchingServlet(const Request& request) { } void HTTPServer::reply(const Reply& reply) { - if (reply.content.size() > 0 && reply.headers.find("Content-Type") == reply.headers.end()) { + // we need to reply from the thread calling event_base_dispatch, just add to ist base queue! + Reply* replyCB = new Reply(reply); + HTTPServer* INSTANCE = getInstance(); + event_base_once(INSTANCE->_base, -1, EV_TIMEOUT, HTTPServer::replyCallback, replyCB, NULL); +} + +void HTTPServer::replyCallback(evutil_socket_t fd, short what, void *arg) { + + Reply* reply = (Reply*)arg; + + if (reply->content.size() > 0 && reply->headers.find("Content-Type") == reply->headers.end()) { LOG(INFO) << "Sending content without Content-Type header"; } - std::map<std::string, std::string>::const_iterator headerIter = reply.headers.begin(); - while(headerIter != reply.headers.end()) { - evhttp_add_header(evhttp_request_get_output_headers(reply.curlReq), headerIter->first.c_str(), headerIter->second.c_str()); + std::map<std::string, std::string>::const_iterator headerIter = reply->headers.begin(); + while(headerIter != reply->headers.end()) { + evhttp_add_header(evhttp_request_get_output_headers(reply->curlReq), headerIter->first.c_str(), headerIter->second.c_str()); headerIter++; } - if (reply.status >= 400) { - evhttp_send_error(reply.curlReq, reply.status, NULL); + if (reply->status >= 400) { + evhttp_send_error(reply->curlReq, reply->status, NULL); return; } - struct evbuffer *evb = evbuffer_new(); + struct evbuffer *evb = NULL; - if (!boost::iequals(reply.type, "HEAD")) - evbuffer_add(evb, reply.content.data(), reply.content.size()); + if (!boost::iequals(reply->type, "HEAD") && reply->content.size() > 0) { + evb = evbuffer_new(); + evbuffer_add(evb, reply->content.data(), reply->content.size()); + } - evhttp_send_reply(reply.curlReq, reply.status, NULL, evb); - evbuffer_free(evb); -// evhttp_request_free(reply.curlReq); + evhttp_send_reply(reply->curlReq, reply->status, NULL, evb); + if (evb != NULL) + evbuffer_free(evb); +// evhttp_request_free(reply->curlReq); + delete(reply); } bool HTTPServer::registerServlet(const std::string& path, HTTPServlet* servlet) { diff --git a/src/uscxml/server/HTTPServer.h b/src/uscxml/server/HTTPServer.h index c573422..319b62f 100644 --- a/src/uscxml/server/HTTPServer.h +++ b/src/uscxml/server/HTTPServer.h @@ -55,6 +55,7 @@ private: void determineAddress(); + static void replyCallback(evutil_socket_t fd, short what, void *arg); static void httpRecvReqCallback(struct evhttp_request *req, void *callbackData); void processByMatchingServlet(const Request& request); |