Howto push GST thread into QT Main Thread
-
Hello,
i am working on a QT integration of a gstreamer WebRTC sample for video Streaming based on a gitlab.freedesktop.org/gstreamer/ sample.I got to the point where a very basic problem blocks my path. Namingly the QSocketNotifier is being called from a different thread. The Gstreamer initiates a new Thread (gst-pc-ops) in which a callback is called that wants to send data through a websocket living in the Main thread. But this does not work
Inside my main I run the gst_init
int main(int argc, char *argv[]) { QCoreApplication a(argc, argv); ros::init(argc, argv, "ros_module_hivecom"); ros::NodeHandle nh; // create a node handle; need to pass this to the class constructor c_interface_webrtc modul_webrtc(&nh); gst_init (&argc, &argv); modul_webrtc.startWebRTC(5571); return a.exec(); }
In the modul_webrtc header I have all elements for the websocket
#include <ros/ros.h> #include <QObject> #include <QTimer> #include <QWebSocketServer> #include <QWebSocket> #include <QJsonObject> #include <QJsonDocument> #include <gst/gst.h> #include <gst/sdp/sdp.h> #define GST_USE_UNSTABLE_API #include <gst/webrtc/webrtc.h> #define MODULE_NAME "INTERFACE_COM:" struct _ReceiverEntry{ QWebSocket *connection; GstElement *pipeline; GstElement *webrtcbin; }; class c_interface_webrtc: public QObject { Q_OBJECT public: c_interface_webrtc(ros::NodeHandle* nodehandle); ~c_interface_webrtc(); int startWebRTC(int port); private: ros::NodeHandle nh; //Websocket std::shared_ptr<QWebSocketServer> websocketServer; std::shared_ptr<QWebSocket> webSocket; //WebRTC GstWebRTCRTPTransceiver *trans; GArray *transceivers; _ReceiverEntry *client; private slots: void onNewConnection(); void processTextMessage(QString message); void processBinaryMessage(QByteArray message); void socketDisconnected(); };
In the source the problem arises when changing the pipeline state to GST_STATE_PLAYING the GSt module initiates a new thread which itself calls the callback on_negotiation_needed_cb. from here on the other callbacks are also in this thread and so the sendTextMessages can not run since the thread is different.
#include <greenhive/interface_webrtc.h> c_interface_webrtc::c_interface_webrtc(ros::NodeHandle* nodehandle):nh(*nodehandle) { ros::NodeHandle nh("~"); } c_interface_webrtc::~c_interface_webrtc(){ } int c_interface_webrtc::startWebRTC(int port){ websocketServer = std::make_shared<QWebSocketServer>(QStringLiteral("WebRTC_Server"), QWebSocketServer::NonSecureMode); if(websocketServer->listen(QHostAddress::Any, port)) { ROS_INFO_STREAM(MODULE_NAME << "WebRTC WebSocket listening to port " << port); QObject::connect(websocketServer.get(), &QWebSocketServer::newConnection, this, &c_interface_webrtc::onNewConnection); return port; } else { throw std::runtime_error("InspectionServer: failed to listen"); return 0; } //websocketServer->moveToThread(websocketServer.get()); } void on_ice_candidate_cb(G_GNUC_UNUSED GstElement * webrtcbin, guint mline_index, gchar * candidate, gpointer user_data) { QJsonObject ice_json; QJsonObject ice_data_json; gchar *json_string; _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data; ice_json["type"] = "ice"; ice_data_json["sdpMLineIndex"] = (int)mline_index; ice_data_json["candidate"] = candidate; ice_json.insert("data",ice_data_json); QJsonDocument doc(ice_json); receiver_entry->connection->sendTextMessage((QString)doc.toJson(QJsonDocument::Compact)); ROS_INFO_STREAM(MODULE_NAME << "Ice Candidate " << doc.toJson(QJsonDocument::Compact).toStdString()); } void on_offer_created_cb (GstPromise * promise, gpointer user_data) { ROS_INFO_STREAM(MODULE_NAME << "offer created"); gchar *sdp_string; gchar *json_string; QJsonObject sdp_json; QJsonObject sdp_data_json; GstStructure const *reply; GstPromise *local_desc_promise; GstWebRTCSessionDescription *offer = NULL; _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data; reply = gst_promise_get_reply (promise); gst_structure_get(reply, "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION,&offer, NULL); gst_promise_unref(promise); local_desc_promise = gst_promise_new (); g_signal_emit_by_name(receiver_entry->webrtcbin, "set-local-description",offer, local_desc_promise); gst_promise_interrupt(local_desc_promise); gst_promise_unref(local_desc_promise); sdp_string = gst_sdp_message_as_text(offer->sdp); gst_print("Negotiation offer created:\n%s\n", sdp_string); sdp_json["type"] = "sdp"; sdp_data_json["type"] = "offer"; sdp_data_json["sdp"] = sdp_string; sdp_json.insert("data",sdp_data_json); QJsonDocument doc(sdp_json); receiver_entry->connection->sendTextMessage(doc.toJson(QJsonDocument::Compact)); gst_webrtc_session_description_free (offer); } void on_negotiation_needed_cb (GstElement * webrtcbin, gpointer user_data) { ROS_INFO_STREAM(MODULE_NAME << "Negotiation Needed"); GstPromise *promise; _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data; gst_print ("Creating negotiation offer\n"); promise = gst_promise_new_with_change_func((GstPromiseChangeFunc) on_offer_created_cb, receiver_entry, NULL); g_signal_emit_by_name (G_OBJECT (webrtcbin), "create-offer", NULL, promise); } void c_interface_webrtc::onNewConnection() { ROS_INFO_STREAM(MODULE_NAME << "NewMessage"); QWebSocket *pSocket = websocketServer->nextPendingConnection(); QObject::connect(pSocket, &QWebSocket::textMessageReceived, this, &c_interface_webrtc::processTextMessage); QObject::connect(pSocket, &QWebSocket::binaryMessageReceived, this, &c_interface_webrtc::processBinaryMessage); QObject::connect(pSocket, &QWebSocket::disconnected, this, &c_interface_webrtc::socketDisconnected); client->connection = pSocket; client->connection->sendTextMessage(QStringLiteral("Hello, world!")); GError *error = NULL; client->pipeline = gst_parse_launch("webrtcbin " "name=webrtcbin stun-server=stun:// " "videotestsrc ! videorate ! " "video/x-raw," "width=1280," "height=720," "framerate=15/1 " "! videoconvert ! queue max-size-buffers=1 ! " "x264enc bitrate=600 speed-preset=ultrafast tune=zerolatency key-int-max=15 ! " "video/x-h264,profile=constrained-baseline ! queue max-size-time=100000000 ! h264parse ! " "rtph264pay config-interval=-1 name=payloader ! " "application/x-rtp," "media=video," "encoding-name=H264," "payload=96 ! webrtcbin. ", &error); if (error != NULL) { ROS_INFO_STREAM(MODULE_NAME << "Could not create WebRTC pipeline: "<< error->message); return; } client->webrtcbin = gst_bin_get_by_name(GST_BIN(client->pipeline), "webrtcbin"); if (client->webrtcbin == NULL){ ROS_INFO_STREAM(MODULE_NAME << "Could not create WebRTC pipeline: "<< error->message); return; } g_signal_emit_by_name (client->webrtcbin, "get-transceivers", &transceivers); if (transceivers == NULL && transceivers->len <1 ){ ROS_INFO_STREAM(MODULE_NAME << "Could not get transceivers"); } trans = g_array_index (transceivers, GstWebRTCRTPTransceiver *, 0); trans->direction = GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY; //Link the signals g_signal_connect(client->webrtcbin, "on-negotiation-needed", G_CALLBACK(on_negotiation_needed_cb), (gpointer) client); g_signal_connect(client->webrtcbin, "on-ice-candidate", G_CALLBACK(on_ice_candidate_cb), (gpointer) client); //Start the Pipeline if (gst_element_set_state(client->pipeline, GST_STATE_PLAYING)==0){ GstState cur_state; gst_element_get_state(client->pipeline, &cur_state,NULL ,GST_CLOCK_TIME_NONE); while (cur_state != GST_STATE_PLAYING ){ gst_element_set_state(client->pipeline, GST_STATE_PLAYING); ROS_INFO_STREAM(MODULE_NAME << "Could not Start Pipeline " << cur_state); usleep(5000000); } } } void c_interface_webrtc::processTextMessage(QString message) { ROS_INFO_STREAM(MODULE_NAME << "TextMessage"); QWebSocket *pClient = qobject_cast<QWebSocket *>(sender()); QJsonDocument inbox = QJsonDocument::fromJson(message.toUtf8()); if (!inbox.isObject()) { ROS_INFO_STREAM(MODULE_NAME << " Error parsing JSON"); }else{ QJsonObject json = inbox.object(); switch(json["cmd"].toInt()){ case 1: break; case 2: break; case 3: break; } } } void c_interface_webrtc::processBinaryMessage(QByteArray message) { QWebSocket *pClient = qobject_cast<QWebSocket *>(sender()); } void c_interface_webrtc::socketDisconnected() { ROS_INFO_STREAM(MODULE_NAME << "Disconnecting"); }
I cold not figure out how to push the GST part into the main thread and also was unable to bring the callbacks into the same.
I am sure this is very basic but after starring on the code for so long i am braindead and stuck.
any help is much appreciated !!PS.:
@Christian-Ehrlicher this is the project where your information about the g_main_context_iteration helped me to get further. If you would also have an idea on this issue it would be amazing. -
@monkfood said in Howto push GST thread into QT Main Thread:
a callback is called that wants to send data through a websocket living in the Main thread.
Then don't use that websocket in the thread, but emit a signal in this callback instead. Connect this signal to a slot in main thread where you then use the websocket.