Skip to content
  • Categories
  • Recent
  • Tags
  • Popular
  • Users
  • Groups
  • Search
  • Get Qt Extensions
  • Unsolved
Collapse
Brand Logo
  1. Home
  2. Qt Development
  3. General and Desktop
  4. Howto push GST thread into QT Main Thread
Forum Update on Monday, May 27th 2025

Howto push GST thread into QT Main Thread

Scheduled Pinned Locked Moved Unsolved General and Desktop
2 Posts 2 Posters 364 Views
  • Oldest to Newest
  • Newest to Oldest
  • Most Votes
Reply
  • Reply as topic
Log in to reply
This topic has been deleted. Only users with topic management privileges can see it.
  • M Offline
    M Offline
    monkfood
    wrote on 6 Dec 2021, 12:40 last edited by
    #1

    Hello,
    i am working on a QT integration of a gstreamer WebRTC sample for video Streaming based on a gitlab.freedesktop.org/gstreamer/ sample.

    I got to the point where a very basic problem blocks my path. Namingly the QSocketNotifier is being called from a different thread. The Gstreamer initiates a new Thread (gst-pc-ops) in which a callback is called that wants to send data through a websocket living in the Main thread. But this does not work

    Inside my main I run the gst_init

    int main(int argc, char *argv[])
    { 
      QCoreApplication a(argc, argv);
      ros::init(argc, argv, "ros_module_hivecom");
    
      ros::NodeHandle nh; // create a node handle; need to pass this to the class constructor
      
      c_interface_webrtc modul_webrtc(&nh);
      gst_init (&argc, &argv);
      modul_webrtc.startWebRTC(5571);
    
      return a.exec();
    }
    
    

    In the modul_webrtc header I have all elements for the websocket

    #include <ros/ros.h>
    
    #include <QObject>
    #include <QTimer>
    #include <QWebSocketServer>
    #include <QWebSocket>
    #include <QJsonObject>
    #include <QJsonDocument>
    
    #include <gst/gst.h>
    #include <gst/sdp/sdp.h>
    #define GST_USE_UNSTABLE_API
    #include <gst/webrtc/webrtc.h>
    
    
    #define MODULE_NAME "INTERFACE_COM:" 
    
    
    struct _ReceiverEntry{
            QWebSocket *connection;
    
            GstElement *pipeline;
            GstElement *webrtcbin;
        };
    
    class c_interface_webrtc: public QObject
    {
        Q_OBJECT
    
    public:
        c_interface_webrtc(ros::NodeHandle* nodehandle);
        ~c_interface_webrtc();
        int startWebRTC(int port);
    
    private:
        ros::NodeHandle nh;
    
        //Websocket
        std::shared_ptr<QWebSocketServer> websocketServer;
        std::shared_ptr<QWebSocket> webSocket;
    
        //WebRTC
        GstWebRTCRTPTransceiver *trans;
        GArray *transceivers;
        _ReceiverEntry *client;
    
    private slots:
        void onNewConnection();
        void processTextMessage(QString message);
        void processBinaryMessage(QByteArray message);
        void socketDisconnected();
    };
    

    In the source the problem arises when changing the pipeline state to GST_STATE_PLAYING the GSt module initiates a new thread which itself calls the callback on_negotiation_needed_cb. from here on the other callbacks are also in this thread and so the sendTextMessages can not run since the thread is different.

    #include <greenhive/interface_webrtc.h>
    
    
    c_interface_webrtc::c_interface_webrtc(ros::NodeHandle* nodehandle):nh(*nodehandle)
    {
    	ros::NodeHandle nh("~");
    }
    
    c_interface_webrtc::~c_interface_webrtc(){
        
    }
    
    
    
    
    int c_interface_webrtc::startWebRTC(int port){
        
        websocketServer = std::make_shared<QWebSocketServer>(QStringLiteral("WebRTC_Server"), QWebSocketServer::NonSecureMode);
        if(websocketServer->listen(QHostAddress::Any, port))
        {
            ROS_INFO_STREAM(MODULE_NAME << "WebRTC WebSocket listening to port " << port);
            QObject::connect(websocketServer.get(), &QWebSocketServer::newConnection, this, &c_interface_webrtc::onNewConnection);
            return port;
        }
        else
        {
            throw std::runtime_error("InspectionServer: failed to listen");
            return 0;
        }
        //websocketServer->moveToThread(websocketServer.get());
    
    }
    
    
    
    
    void on_ice_candidate_cb(G_GNUC_UNUSED GstElement * webrtcbin, guint mline_index, gchar * candidate, gpointer user_data)
    {
        QJsonObject ice_json;
        QJsonObject ice_data_json;
        gchar *json_string;
        _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
    
        ice_json["type"] = "ice";
        ice_data_json["sdpMLineIndex"] = (int)mline_index;
        ice_data_json["candidate"] = candidate;
        ice_json.insert("data",ice_data_json);
    
        QJsonDocument doc(ice_json);
        receiver_entry->connection->sendTextMessage((QString)doc.toJson(QJsonDocument::Compact));
        ROS_INFO_STREAM(MODULE_NAME << "Ice Candidate " << doc.toJson(QJsonDocument::Compact).toStdString());
    }
    
    
    void on_offer_created_cb (GstPromise * promise, gpointer user_data)
    {
        ROS_INFO_STREAM(MODULE_NAME << "offer created");
    
    
        gchar *sdp_string;
        gchar *json_string;
        QJsonObject sdp_json;
        QJsonObject sdp_data_json;
        GstStructure const *reply;
        GstPromise *local_desc_promise;
        GstWebRTCSessionDescription *offer = NULL;
        _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
        
        reply = gst_promise_get_reply (promise);
        gst_structure_get(reply, "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION,&offer, NULL);
        gst_promise_unref(promise);
    
        local_desc_promise = gst_promise_new ();
        g_signal_emit_by_name(receiver_entry->webrtcbin, "set-local-description",offer, local_desc_promise);
        gst_promise_interrupt(local_desc_promise);
        gst_promise_unref(local_desc_promise);
    
        sdp_string = gst_sdp_message_as_text(offer->sdp);
        gst_print("Negotiation offer created:\n%s\n", sdp_string);
    
        sdp_json["type"] = "sdp";
    
        sdp_data_json["type"] = "offer";
        sdp_data_json["sdp"] = sdp_string;
        
        sdp_json.insert("data",sdp_data_json);
    
        QJsonDocument doc(sdp_json);
        receiver_entry->connection->sendTextMessage(doc.toJson(QJsonDocument::Compact));
    
        gst_webrtc_session_description_free (offer);
    }
    
    
    void on_negotiation_needed_cb (GstElement * webrtcbin, gpointer user_data)
    {
        ROS_INFO_STREAM(MODULE_NAME << "Negotiation Needed");
    
      GstPromise *promise;
      _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
    
      gst_print ("Creating negotiation offer\n");
    
      promise = gst_promise_new_with_change_func((GstPromiseChangeFunc) on_offer_created_cb, receiver_entry, NULL);
      g_signal_emit_by_name (G_OBJECT (webrtcbin), "create-offer", NULL, promise);
    
    }
    
    
    void c_interface_webrtc::onNewConnection()
    {
        ROS_INFO_STREAM(MODULE_NAME << "NewMessage");
    
        QWebSocket *pSocket = websocketServer->nextPendingConnection();
    
    
        QObject::connect(pSocket, &QWebSocket::textMessageReceived, this, &c_interface_webrtc::processTextMessage);
        QObject::connect(pSocket, &QWebSocket::binaryMessageReceived, this, &c_interface_webrtc::processBinaryMessage);
        QObject::connect(pSocket, &QWebSocket::disconnected, this, &c_interface_webrtc::socketDisconnected);
        client->connection = pSocket;
        client->connection->sendTextMessage(QStringLiteral("Hello, world!"));
    
    
        GError *error = NULL;
    
        client->pipeline = 
            gst_parse_launch("webrtcbin "
            "name=webrtcbin stun-server=stun:// "
            "videotestsrc ! videorate ! "
            "video/x-raw,"
            "width=1280,"
            "height=720,"
            "framerate=15/1 "
            "! videoconvert ! queue max-size-buffers=1 ! "
            "x264enc bitrate=600 speed-preset=ultrafast tune=zerolatency key-int-max=15 ! "
            "video/x-h264,profile=constrained-baseline ! queue max-size-time=100000000 ! h264parse ! "
            "rtph264pay config-interval=-1 name=payloader ! "
            "application/x-rtp,"
            "media=video,"
            "encoding-name=H264,"
            "payload=96 ! webrtcbin. ", &error);
        
        if (error != NULL) {
            ROS_INFO_STREAM(MODULE_NAME << "Could not create WebRTC pipeline: "<< error->message);
            return;
        }
    
        client->webrtcbin = gst_bin_get_by_name(GST_BIN(client->pipeline), "webrtcbin");
        if (client->webrtcbin == NULL){
            ROS_INFO_STREAM(MODULE_NAME << "Could not create WebRTC pipeline: "<< error->message);
            return;
        }
    
        g_signal_emit_by_name (client->webrtcbin, "get-transceivers",
            &transceivers);
        if (transceivers == NULL && transceivers->len <1 ){
            ROS_INFO_STREAM(MODULE_NAME << "Could not get transceivers");
    
        }
        trans = g_array_index (transceivers, GstWebRTCRTPTransceiver *, 0);
        trans->direction = GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY;
        
        //Link the signals
        g_signal_connect(client->webrtcbin, "on-negotiation-needed", G_CALLBACK(on_negotiation_needed_cb), (gpointer) client);
    
        g_signal_connect(client->webrtcbin, "on-ice-candidate", G_CALLBACK(on_ice_candidate_cb), (gpointer) client);
    
    
        //Start the Pipeline
        if (gst_element_set_state(client->pipeline, GST_STATE_PLAYING)==0){
            GstState cur_state;
            gst_element_get_state(client->pipeline, &cur_state,NULL ,GST_CLOCK_TIME_NONE);
            while (cur_state != GST_STATE_PLAYING ){
                gst_element_set_state(client->pipeline, GST_STATE_PLAYING);
                ROS_INFO_STREAM(MODULE_NAME << "Could not Start Pipeline " << cur_state);
                usleep(5000000);
            }
        }
    }
    
    
    void c_interface_webrtc::processTextMessage(QString message)
    {
            ROS_INFO_STREAM(MODULE_NAME << "TextMessage");
    
        QWebSocket *pClient = qobject_cast<QWebSocket *>(sender());
        QJsonDocument inbox = QJsonDocument::fromJson(message.toUtf8());
        if (!inbox.isObject()) {
            ROS_INFO_STREAM(MODULE_NAME << " Error parsing JSON");
        }else{
            QJsonObject json = inbox.object();
    
            switch(json["cmd"].toInt()){
                case 1:
                break;
                case 2:
                break;
                case 3:
                break;
            }
        }
    }
    
    
    void c_interface_webrtc::processBinaryMessage(QByteArray message)
    {
        QWebSocket *pClient = qobject_cast<QWebSocket *>(sender());
    }
    
    void c_interface_webrtc::socketDisconnected()
    {
            ROS_INFO_STREAM(MODULE_NAME << "Disconnecting");
    }
    

    I cold not figure out how to push the GST part into the main thread and also was unable to bring the callbacks into the same.

    I am sure this is very basic but after starring on the code for so long i am braindead and stuck.
    any help is much appreciated !!

    PS.:
    @Christian-Ehrlicher this is the project where your information about the g_main_context_iteration helped me to get further. If you would also have an idea on this issue it would be amazing.

    jsulmJ 1 Reply Last reply 6 Dec 2021, 12:43
    0
    • M monkfood
      6 Dec 2021, 12:40

      Hello,
      i am working on a QT integration of a gstreamer WebRTC sample for video Streaming based on a gitlab.freedesktop.org/gstreamer/ sample.

      I got to the point where a very basic problem blocks my path. Namingly the QSocketNotifier is being called from a different thread. The Gstreamer initiates a new Thread (gst-pc-ops) in which a callback is called that wants to send data through a websocket living in the Main thread. But this does not work

      Inside my main I run the gst_init

      int main(int argc, char *argv[])
      { 
        QCoreApplication a(argc, argv);
        ros::init(argc, argv, "ros_module_hivecom");
      
        ros::NodeHandle nh; // create a node handle; need to pass this to the class constructor
        
        c_interface_webrtc modul_webrtc(&nh);
        gst_init (&argc, &argv);
        modul_webrtc.startWebRTC(5571);
      
        return a.exec();
      }
      
      

      In the modul_webrtc header I have all elements for the websocket

      #include <ros/ros.h>
      
      #include <QObject>
      #include <QTimer>
      #include <QWebSocketServer>
      #include <QWebSocket>
      #include <QJsonObject>
      #include <QJsonDocument>
      
      #include <gst/gst.h>
      #include <gst/sdp/sdp.h>
      #define GST_USE_UNSTABLE_API
      #include <gst/webrtc/webrtc.h>
      
      
      #define MODULE_NAME "INTERFACE_COM:" 
      
      
      struct _ReceiverEntry{
              QWebSocket *connection;
      
              GstElement *pipeline;
              GstElement *webrtcbin;
          };
      
      class c_interface_webrtc: public QObject
      {
          Q_OBJECT
      
      public:
          c_interface_webrtc(ros::NodeHandle* nodehandle);
          ~c_interface_webrtc();
          int startWebRTC(int port);
      
      private:
          ros::NodeHandle nh;
      
          //Websocket
          std::shared_ptr<QWebSocketServer> websocketServer;
          std::shared_ptr<QWebSocket> webSocket;
      
          //WebRTC
          GstWebRTCRTPTransceiver *trans;
          GArray *transceivers;
          _ReceiverEntry *client;
      
      private slots:
          void onNewConnection();
          void processTextMessage(QString message);
          void processBinaryMessage(QByteArray message);
          void socketDisconnected();
      };
      

      In the source the problem arises when changing the pipeline state to GST_STATE_PLAYING the GSt module initiates a new thread which itself calls the callback on_negotiation_needed_cb. from here on the other callbacks are also in this thread and so the sendTextMessages can not run since the thread is different.

      #include <greenhive/interface_webrtc.h>
      
      
      c_interface_webrtc::c_interface_webrtc(ros::NodeHandle* nodehandle):nh(*nodehandle)
      {
      	ros::NodeHandle nh("~");
      }
      
      c_interface_webrtc::~c_interface_webrtc(){
          
      }
      
      
      
      
      int c_interface_webrtc::startWebRTC(int port){
          
          websocketServer = std::make_shared<QWebSocketServer>(QStringLiteral("WebRTC_Server"), QWebSocketServer::NonSecureMode);
          if(websocketServer->listen(QHostAddress::Any, port))
          {
              ROS_INFO_STREAM(MODULE_NAME << "WebRTC WebSocket listening to port " << port);
              QObject::connect(websocketServer.get(), &QWebSocketServer::newConnection, this, &c_interface_webrtc::onNewConnection);
              return port;
          }
          else
          {
              throw std::runtime_error("InspectionServer: failed to listen");
              return 0;
          }
          //websocketServer->moveToThread(websocketServer.get());
      
      }
      
      
      
      
      void on_ice_candidate_cb(G_GNUC_UNUSED GstElement * webrtcbin, guint mline_index, gchar * candidate, gpointer user_data)
      {
          QJsonObject ice_json;
          QJsonObject ice_data_json;
          gchar *json_string;
          _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
      
          ice_json["type"] = "ice";
          ice_data_json["sdpMLineIndex"] = (int)mline_index;
          ice_data_json["candidate"] = candidate;
          ice_json.insert("data",ice_data_json);
      
          QJsonDocument doc(ice_json);
          receiver_entry->connection->sendTextMessage((QString)doc.toJson(QJsonDocument::Compact));
          ROS_INFO_STREAM(MODULE_NAME << "Ice Candidate " << doc.toJson(QJsonDocument::Compact).toStdString());
      }
      
      
      void on_offer_created_cb (GstPromise * promise, gpointer user_data)
      {
          ROS_INFO_STREAM(MODULE_NAME << "offer created");
      
      
          gchar *sdp_string;
          gchar *json_string;
          QJsonObject sdp_json;
          QJsonObject sdp_data_json;
          GstStructure const *reply;
          GstPromise *local_desc_promise;
          GstWebRTCSessionDescription *offer = NULL;
          _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
          
          reply = gst_promise_get_reply (promise);
          gst_structure_get(reply, "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION,&offer, NULL);
          gst_promise_unref(promise);
      
          local_desc_promise = gst_promise_new ();
          g_signal_emit_by_name(receiver_entry->webrtcbin, "set-local-description",offer, local_desc_promise);
          gst_promise_interrupt(local_desc_promise);
          gst_promise_unref(local_desc_promise);
      
          sdp_string = gst_sdp_message_as_text(offer->sdp);
          gst_print("Negotiation offer created:\n%s\n", sdp_string);
      
          sdp_json["type"] = "sdp";
      
          sdp_data_json["type"] = "offer";
          sdp_data_json["sdp"] = sdp_string;
          
          sdp_json.insert("data",sdp_data_json);
      
          QJsonDocument doc(sdp_json);
          receiver_entry->connection->sendTextMessage(doc.toJson(QJsonDocument::Compact));
      
          gst_webrtc_session_description_free (offer);
      }
      
      
      void on_negotiation_needed_cb (GstElement * webrtcbin, gpointer user_data)
      {
          ROS_INFO_STREAM(MODULE_NAME << "Negotiation Needed");
      
        GstPromise *promise;
        _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
      
        gst_print ("Creating negotiation offer\n");
      
        promise = gst_promise_new_with_change_func((GstPromiseChangeFunc) on_offer_created_cb, receiver_entry, NULL);
        g_signal_emit_by_name (G_OBJECT (webrtcbin), "create-offer", NULL, promise);
      
      }
      
      
      void c_interface_webrtc::onNewConnection()
      {
          ROS_INFO_STREAM(MODULE_NAME << "NewMessage");
      
          QWebSocket *pSocket = websocketServer->nextPendingConnection();
      
      
          QObject::connect(pSocket, &QWebSocket::textMessageReceived, this, &c_interface_webrtc::processTextMessage);
          QObject::connect(pSocket, &QWebSocket::binaryMessageReceived, this, &c_interface_webrtc::processBinaryMessage);
          QObject::connect(pSocket, &QWebSocket::disconnected, this, &c_interface_webrtc::socketDisconnected);
          client->connection = pSocket;
          client->connection->sendTextMessage(QStringLiteral("Hello, world!"));
      
      
          GError *error = NULL;
      
          client->pipeline = 
              gst_parse_launch("webrtcbin "
              "name=webrtcbin stun-server=stun:// "
              "videotestsrc ! videorate ! "
              "video/x-raw,"
              "width=1280,"
              "height=720,"
              "framerate=15/1 "
              "! videoconvert ! queue max-size-buffers=1 ! "
              "x264enc bitrate=600 speed-preset=ultrafast tune=zerolatency key-int-max=15 ! "
              "video/x-h264,profile=constrained-baseline ! queue max-size-time=100000000 ! h264parse ! "
              "rtph264pay config-interval=-1 name=payloader ! "
              "application/x-rtp,"
              "media=video,"
              "encoding-name=H264,"
              "payload=96 ! webrtcbin. ", &error);
          
          if (error != NULL) {
              ROS_INFO_STREAM(MODULE_NAME << "Could not create WebRTC pipeline: "<< error->message);
              return;
          }
      
          client->webrtcbin = gst_bin_get_by_name(GST_BIN(client->pipeline), "webrtcbin");
          if (client->webrtcbin == NULL){
              ROS_INFO_STREAM(MODULE_NAME << "Could not create WebRTC pipeline: "<< error->message);
              return;
          }
      
          g_signal_emit_by_name (client->webrtcbin, "get-transceivers",
              &transceivers);
          if (transceivers == NULL && transceivers->len <1 ){
              ROS_INFO_STREAM(MODULE_NAME << "Could not get transceivers");
      
          }
          trans = g_array_index (transceivers, GstWebRTCRTPTransceiver *, 0);
          trans->direction = GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY;
          
          //Link the signals
          g_signal_connect(client->webrtcbin, "on-negotiation-needed", G_CALLBACK(on_negotiation_needed_cb), (gpointer) client);
      
          g_signal_connect(client->webrtcbin, "on-ice-candidate", G_CALLBACK(on_ice_candidate_cb), (gpointer) client);
      
      
          //Start the Pipeline
          if (gst_element_set_state(client->pipeline, GST_STATE_PLAYING)==0){
              GstState cur_state;
              gst_element_get_state(client->pipeline, &cur_state,NULL ,GST_CLOCK_TIME_NONE);
              while (cur_state != GST_STATE_PLAYING ){
                  gst_element_set_state(client->pipeline, GST_STATE_PLAYING);
                  ROS_INFO_STREAM(MODULE_NAME << "Could not Start Pipeline " << cur_state);
                  usleep(5000000);
              }
          }
      }
      
      
      void c_interface_webrtc::processTextMessage(QString message)
      {
              ROS_INFO_STREAM(MODULE_NAME << "TextMessage");
      
          QWebSocket *pClient = qobject_cast<QWebSocket *>(sender());
          QJsonDocument inbox = QJsonDocument::fromJson(message.toUtf8());
          if (!inbox.isObject()) {
              ROS_INFO_STREAM(MODULE_NAME << " Error parsing JSON");
          }else{
              QJsonObject json = inbox.object();
      
              switch(json["cmd"].toInt()){
                  case 1:
                  break;
                  case 2:
                  break;
                  case 3:
                  break;
              }
          }
      }
      
      
      void c_interface_webrtc::processBinaryMessage(QByteArray message)
      {
          QWebSocket *pClient = qobject_cast<QWebSocket *>(sender());
      }
      
      void c_interface_webrtc::socketDisconnected()
      {
              ROS_INFO_STREAM(MODULE_NAME << "Disconnecting");
      }
      

      I cold not figure out how to push the GST part into the main thread and also was unable to bring the callbacks into the same.

      I am sure this is very basic but after starring on the code for so long i am braindead and stuck.
      any help is much appreciated !!

      PS.:
      @Christian-Ehrlicher this is the project where your information about the g_main_context_iteration helped me to get further. If you would also have an idea on this issue it would be amazing.

      jsulmJ Offline
      jsulmJ Offline
      jsulm
      Lifetime Qt Champion
      wrote on 6 Dec 2021, 12:43 last edited by jsulm 12 Jun 2021, 12:43
      #2

      @monkfood said in Howto push GST thread into QT Main Thread:

      a callback is called that wants to send data through a websocket living in the Main thread.

      Then don't use that websocket in the thread, but emit a signal in this callback instead. Connect this signal to a slot in main thread where you then use the websocket.

      https://forum.qt.io/topic/113070/qt-code-of-conduct

      1 Reply Last reply
      1

      1/2

      6 Dec 2021, 12:40

      • Login

      • Login or register to search.
      1 out of 2
      • First post
        1/2
        Last post
      0
      • Categories
      • Recent
      • Tags
      • Popular
      • Users
      • Groups
      • Search
      • Get Qt Extensions
      • Unsolved