Important: Please read the Qt Code of Conduct -

Howto push GST thread into QT Main Thread

  • Hello,
    i am working on a QT integration of a gstreamer WebRTC sample for video Streaming based on a sample.

    I got to the point where a very basic problem blocks my path. Namingly the QSocketNotifier is being called from a different thread. The Gstreamer initiates a new Thread (gst-pc-ops) in which a callback is called that wants to send data through a websocket living in the Main thread. But this does not work

    Inside my main I run the gst_init

    int main(int argc, char *argv[])
      QCoreApplication a(argc, argv);
      ros::init(argc, argv, "ros_module_hivecom");
      ros::NodeHandle nh; // create a node handle; need to pass this to the class constructor
      c_interface_webrtc modul_webrtc(&nh);
      gst_init (&argc, &argv);
      return a.exec();

    In the modul_webrtc header I have all elements for the websocket

    #include <ros/ros.h>
    #include <QObject>
    #include <QTimer>
    #include <QWebSocketServer>
    #include <QWebSocket>
    #include <QJsonObject>
    #include <QJsonDocument>
    #include <gst/gst.h>
    #include <gst/sdp/sdp.h>
    #include <gst/webrtc/webrtc.h>
    struct _ReceiverEntry{
            QWebSocket *connection;
            GstElement *pipeline;
            GstElement *webrtcbin;
    class c_interface_webrtc: public QObject
        c_interface_webrtc(ros::NodeHandle* nodehandle);
        int startWebRTC(int port);
        ros::NodeHandle nh;
        std::shared_ptr<QWebSocketServer> websocketServer;
        std::shared_ptr<QWebSocket> webSocket;
        GstWebRTCRTPTransceiver *trans;
        GArray *transceivers;
        _ReceiverEntry *client;
    private slots:
        void onNewConnection();
        void processTextMessage(QString message);
        void processBinaryMessage(QByteArray message);
        void socketDisconnected();

    In the source the problem arises when changing the pipeline state to GST_STATE_PLAYING the GSt module initiates a new thread which itself calls the callback on_negotiation_needed_cb. from here on the other callbacks are also in this thread and so the sendTextMessages can not run since the thread is different.

    #include <greenhive/interface_webrtc.h>
    c_interface_webrtc::c_interface_webrtc(ros::NodeHandle* nodehandle):nh(*nodehandle)
    	ros::NodeHandle nh("~");
    int c_interface_webrtc::startWebRTC(int port){
        websocketServer = std::make_shared<QWebSocketServer>(QStringLiteral("WebRTC_Server"), QWebSocketServer::NonSecureMode);
        if(websocketServer->listen(QHostAddress::Any, port))
            ROS_INFO_STREAM(MODULE_NAME << "WebRTC WebSocket listening to port " << port);
            QObject::connect(websocketServer.get(), &QWebSocketServer::newConnection, this, &c_interface_webrtc::onNewConnection);
            return port;
            throw std::runtime_error("InspectionServer: failed to listen");
            return 0;
    void on_ice_candidate_cb(G_GNUC_UNUSED GstElement * webrtcbin, guint mline_index, gchar * candidate, gpointer user_data)
        QJsonObject ice_json;
        QJsonObject ice_data_json;
        gchar *json_string;
        _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
        ice_json["type"] = "ice";
        ice_data_json["sdpMLineIndex"] = (int)mline_index;
        ice_data_json["candidate"] = candidate;
        QJsonDocument doc(ice_json);
        ROS_INFO_STREAM(MODULE_NAME << "Ice Candidate " << doc.toJson(QJsonDocument::Compact).toStdString());
    void on_offer_created_cb (GstPromise * promise, gpointer user_data)
        ROS_INFO_STREAM(MODULE_NAME << "offer created");
        gchar *sdp_string;
        gchar *json_string;
        QJsonObject sdp_json;
        QJsonObject sdp_data_json;
        GstStructure const *reply;
        GstPromise *local_desc_promise;
        GstWebRTCSessionDescription *offer = NULL;
        _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
        reply = gst_promise_get_reply (promise);
        gst_structure_get(reply, "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION,&offer, NULL);
        local_desc_promise = gst_promise_new ();
        g_signal_emit_by_name(receiver_entry->webrtcbin, "set-local-description",offer, local_desc_promise);
        sdp_string = gst_sdp_message_as_text(offer->sdp);
        gst_print("Negotiation offer created:\n%s\n", sdp_string);
        sdp_json["type"] = "sdp";
        sdp_data_json["type"] = "offer";
        sdp_data_json["sdp"] = sdp_string;
        QJsonDocument doc(sdp_json);
        gst_webrtc_session_description_free (offer);
    void on_negotiation_needed_cb (GstElement * webrtcbin, gpointer user_data)
        ROS_INFO_STREAM(MODULE_NAME << "Negotiation Needed");
      GstPromise *promise;
      _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
      gst_print ("Creating negotiation offer\n");
      promise = gst_promise_new_with_change_func((GstPromiseChangeFunc) on_offer_created_cb, receiver_entry, NULL);
      g_signal_emit_by_name (G_OBJECT (webrtcbin), "create-offer", NULL, promise);
    void c_interface_webrtc::onNewConnection()
        ROS_INFO_STREAM(MODULE_NAME << "NewMessage");
        QWebSocket *pSocket = websocketServer->nextPendingConnection();
        QObject::connect(pSocket, &QWebSocket::textMessageReceived, this, &c_interface_webrtc::processTextMessage);
        QObject::connect(pSocket, &QWebSocket::binaryMessageReceived, this, &c_interface_webrtc::processBinaryMessage);
        QObject::connect(pSocket, &QWebSocket::disconnected, this, &c_interface_webrtc::socketDisconnected);
        client->connection = pSocket;
        client->connection->sendTextMessage(QStringLiteral("Hello, world!"));
        GError *error = NULL;
        client->pipeline = 
            gst_parse_launch("webrtcbin "
            "name=webrtcbin stun-server=stun:// "
            "videotestsrc ! videorate ! "
            "framerate=15/1 "
            "! videoconvert ! queue max-size-buffers=1 ! "
            "x264enc bitrate=600 speed-preset=ultrafast tune=zerolatency key-int-max=15 ! "
            "video/x-h264,profile=constrained-baseline ! queue max-size-time=100000000 ! h264parse ! "
            "rtph264pay config-interval=-1 name=payloader ! "
            "payload=96 ! webrtcbin. ", &error);
        if (error != NULL) {
            ROS_INFO_STREAM(MODULE_NAME << "Could not create WebRTC pipeline: "<< error->message);
        client->webrtcbin = gst_bin_get_by_name(GST_BIN(client->pipeline), "webrtcbin");
        if (client->webrtcbin == NULL){
            ROS_INFO_STREAM(MODULE_NAME << "Could not create WebRTC pipeline: "<< error->message);
        g_signal_emit_by_name (client->webrtcbin, "get-transceivers",
        if (transceivers == NULL && transceivers->len <1 ){
            ROS_INFO_STREAM(MODULE_NAME << "Could not get transceivers");
        trans = g_array_index (transceivers, GstWebRTCRTPTransceiver *, 0);
        //Link the signals
        g_signal_connect(client->webrtcbin, "on-negotiation-needed", G_CALLBACK(on_negotiation_needed_cb), (gpointer) client);
        g_signal_connect(client->webrtcbin, "on-ice-candidate", G_CALLBACK(on_ice_candidate_cb), (gpointer) client);
        //Start the Pipeline
        if (gst_element_set_state(client->pipeline, GST_STATE_PLAYING)==0){
            GstState cur_state;
            gst_element_get_state(client->pipeline, &cur_state,NULL ,GST_CLOCK_TIME_NONE);
            while (cur_state != GST_STATE_PLAYING ){
                gst_element_set_state(client->pipeline, GST_STATE_PLAYING);
                ROS_INFO_STREAM(MODULE_NAME << "Could not Start Pipeline " << cur_state);
    void c_interface_webrtc::processTextMessage(QString message)
            ROS_INFO_STREAM(MODULE_NAME << "TextMessage");
        QWebSocket *pClient = qobject_cast<QWebSocket *>(sender());
        QJsonDocument inbox = QJsonDocument::fromJson(message.toUtf8());
        if (!inbox.isObject()) {
            ROS_INFO_STREAM(MODULE_NAME << " Error parsing JSON");
            QJsonObject json = inbox.object();
                case 1:
                case 2:
                case 3:
    void c_interface_webrtc::processBinaryMessage(QByteArray message)
        QWebSocket *pClient = qobject_cast<QWebSocket *>(sender());
    void c_interface_webrtc::socketDisconnected()
            ROS_INFO_STREAM(MODULE_NAME << "Disconnecting");

    I cold not figure out how to push the GST part into the main thread and also was unable to bring the callbacks into the same.

    I am sure this is very basic but after starring on the code for so long i am braindead and stuck.
    any help is much appreciated !!

    @Christian-Ehrlicher this is the project where your information about the g_main_context_iteration helped me to get further. If you would also have an idea on this issue it would be amazing.

  • Lifetime Qt Champion

    @monkfood said in Howto push GST thread into QT Main Thread:

    a callback is called that wants to send data through a websocket living in the Main thread.

    Then don't use that websocket in the thread, but emit a signal in this callback instead. Connect this signal to a slot in main thread where you then use the websocket.

Log in to reply