Important: Please read the Qt Code of Conduct - https://forum.qt.io/topic/113070/qt-code-of-conduct

Howto push GST thread into QT Main Thread



  • Hello,
    i am working on a QT integration of a gstreamer WebRTC sample for video Streaming based on a gitlab.freedesktop.org/gstreamer/ sample.

    I got to the point where a very basic problem blocks my path. Namingly the QSocketNotifier is being called from a different thread. The Gstreamer initiates a new Thread (gst-pc-ops) in which a callback is called that wants to send data through a websocket living in the Main thread. But this does not work

    Inside my main I run the gst_init

    int main(int argc, char *argv[])
    { 
      QCoreApplication a(argc, argv);
      ros::init(argc, argv, "ros_module_hivecom");
    
      ros::NodeHandle nh; // create a node handle; need to pass this to the class constructor
      
      c_interface_webrtc modul_webrtc(&nh);
      gst_init (&argc, &argv);
      modul_webrtc.startWebRTC(5571);
    
      return a.exec();
    }
    
    

    In the modul_webrtc header I have all elements for the websocket

    #include <ros/ros.h>
    
    #include <QObject>
    #include <QTimer>
    #include <QWebSocketServer>
    #include <QWebSocket>
    #include <QJsonObject>
    #include <QJsonDocument>
    
    #include <gst/gst.h>
    #include <gst/sdp/sdp.h>
    #define GST_USE_UNSTABLE_API
    #include <gst/webrtc/webrtc.h>
    
    
    #define MODULE_NAME "INTERFACE_COM:" 
    
    
    struct _ReceiverEntry{
            QWebSocket *connection;
    
            GstElement *pipeline;
            GstElement *webrtcbin;
        };
    
    class c_interface_webrtc: public QObject
    {
        Q_OBJECT
    
    public:
        c_interface_webrtc(ros::NodeHandle* nodehandle);
        ~c_interface_webrtc();
        int startWebRTC(int port);
    
    private:
        ros::NodeHandle nh;
    
        //Websocket
        std::shared_ptr<QWebSocketServer> websocketServer;
        std::shared_ptr<QWebSocket> webSocket;
    
        //WebRTC
        GstWebRTCRTPTransceiver *trans;
        GArray *transceivers;
        _ReceiverEntry *client;
    
    private slots:
        void onNewConnection();
        void processTextMessage(QString message);
        void processBinaryMessage(QByteArray message);
        void socketDisconnected();
    };
    

    In the source the problem arises when changing the pipeline state to GST_STATE_PLAYING the GSt module initiates a new thread which itself calls the callback on_negotiation_needed_cb. from here on the other callbacks are also in this thread and so the sendTextMessages can not run since the thread is different.

    #include <greenhive/interface_webrtc.h>
    
    
    c_interface_webrtc::c_interface_webrtc(ros::NodeHandle* nodehandle):nh(*nodehandle)
    {
    	ros::NodeHandle nh("~");
    }
    
    c_interface_webrtc::~c_interface_webrtc(){
        
    }
    
    
    
    
    int c_interface_webrtc::startWebRTC(int port){
        
        websocketServer = std::make_shared<QWebSocketServer>(QStringLiteral("WebRTC_Server"), QWebSocketServer::NonSecureMode);
        if(websocketServer->listen(QHostAddress::Any, port))
        {
            ROS_INFO_STREAM(MODULE_NAME << "WebRTC WebSocket listening to port " << port);
            QObject::connect(websocketServer.get(), &QWebSocketServer::newConnection, this, &c_interface_webrtc::onNewConnection);
            return port;
        }
        else
        {
            throw std::runtime_error("InspectionServer: failed to listen");
            return 0;
        }
        //websocketServer->moveToThread(websocketServer.get());
    
    }
    
    
    
    
    void on_ice_candidate_cb(G_GNUC_UNUSED GstElement * webrtcbin, guint mline_index, gchar * candidate, gpointer user_data)
    {
        QJsonObject ice_json;
        QJsonObject ice_data_json;
        gchar *json_string;
        _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
    
        ice_json["type"] = "ice";
        ice_data_json["sdpMLineIndex"] = (int)mline_index;
        ice_data_json["candidate"] = candidate;
        ice_json.insert("data",ice_data_json);
    
        QJsonDocument doc(ice_json);
        receiver_entry->connection->sendTextMessage((QString)doc.toJson(QJsonDocument::Compact));
        ROS_INFO_STREAM(MODULE_NAME << "Ice Candidate " << doc.toJson(QJsonDocument::Compact).toStdString());
    }
    
    
    void on_offer_created_cb (GstPromise * promise, gpointer user_data)
    {
        ROS_INFO_STREAM(MODULE_NAME << "offer created");
    
    
        gchar *sdp_string;
        gchar *json_string;
        QJsonObject sdp_json;
        QJsonObject sdp_data_json;
        GstStructure const *reply;
        GstPromise *local_desc_promise;
        GstWebRTCSessionDescription *offer = NULL;
        _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
        
        reply = gst_promise_get_reply (promise);
        gst_structure_get(reply, "offer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION,&offer, NULL);
        gst_promise_unref(promise);
    
        local_desc_promise = gst_promise_new ();
        g_signal_emit_by_name(receiver_entry->webrtcbin, "set-local-description",offer, local_desc_promise);
        gst_promise_interrupt(local_desc_promise);
        gst_promise_unref(local_desc_promise);
    
        sdp_string = gst_sdp_message_as_text(offer->sdp);
        gst_print("Negotiation offer created:\n%s\n", sdp_string);
    
        sdp_json["type"] = "sdp";
    
        sdp_data_json["type"] = "offer";
        sdp_data_json["sdp"] = sdp_string;
        
        sdp_json.insert("data",sdp_data_json);
    
        QJsonDocument doc(sdp_json);
        receiver_entry->connection->sendTextMessage(doc.toJson(QJsonDocument::Compact));
    
        gst_webrtc_session_description_free (offer);
    }
    
    
    void on_negotiation_needed_cb (GstElement * webrtcbin, gpointer user_data)
    {
        ROS_INFO_STREAM(MODULE_NAME << "Negotiation Needed");
    
      GstPromise *promise;
      _ReceiverEntry *receiver_entry = (_ReceiverEntry *) user_data;
    
      gst_print ("Creating negotiation offer\n");
    
      promise = gst_promise_new_with_change_func((GstPromiseChangeFunc) on_offer_created_cb, receiver_entry, NULL);
      g_signal_emit_by_name (G_OBJECT (webrtcbin), "create-offer", NULL, promise);
    
    }
    
    
    void c_interface_webrtc::onNewConnection()
    {
        ROS_INFO_STREAM(MODULE_NAME << "NewMessage");
    
        QWebSocket *pSocket = websocketServer->nextPendingConnection();
    
    
        QObject::connect(pSocket, &QWebSocket::textMessageReceived, this, &c_interface_webrtc::processTextMessage);
        QObject::connect(pSocket, &QWebSocket::binaryMessageReceived, this, &c_interface_webrtc::processBinaryMessage);
        QObject::connect(pSocket, &QWebSocket::disconnected, this, &c_interface_webrtc::socketDisconnected);
        client->connection = pSocket;
        client->connection->sendTextMessage(QStringLiteral("Hello, world!"));
    
    
        GError *error = NULL;
    
        client->pipeline = 
            gst_parse_launch("webrtcbin "
            "name=webrtcbin stun-server=stun:// "
            "videotestsrc ! videorate ! "
            "video/x-raw,"
            "width=1280,"
            "height=720,"
            "framerate=15/1 "
            "! videoconvert ! queue max-size-buffers=1 ! "
            "x264enc bitrate=600 speed-preset=ultrafast tune=zerolatency key-int-max=15 ! "
            "video/x-h264,profile=constrained-baseline ! queue max-size-time=100000000 ! h264parse ! "
            "rtph264pay config-interval=-1 name=payloader ! "
            "application/x-rtp,"
            "media=video,"
            "encoding-name=H264,"
            "payload=96 ! webrtcbin. ", &error);
        
        if (error != NULL) {
            ROS_INFO_STREAM(MODULE_NAME << "Could not create WebRTC pipeline: "<< error->message);
            return;
        }
    
        client->webrtcbin = gst_bin_get_by_name(GST_BIN(client->pipeline), "webrtcbin");
        if (client->webrtcbin == NULL){
            ROS_INFO_STREAM(MODULE_NAME << "Could not create WebRTC pipeline: "<< error->message);
            return;
        }
    
        g_signal_emit_by_name (client->webrtcbin, "get-transceivers",
            &transceivers);
        if (transceivers == NULL && transceivers->len <1 ){
            ROS_INFO_STREAM(MODULE_NAME << "Could not get transceivers");
    
        }
        trans = g_array_index (transceivers, GstWebRTCRTPTransceiver *, 0);
        trans->direction = GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY;
        
        //Link the signals
        g_signal_connect(client->webrtcbin, "on-negotiation-needed", G_CALLBACK(on_negotiation_needed_cb), (gpointer) client);
    
        g_signal_connect(client->webrtcbin, "on-ice-candidate", G_CALLBACK(on_ice_candidate_cb), (gpointer) client);
    
    
        //Start the Pipeline
        if (gst_element_set_state(client->pipeline, GST_STATE_PLAYING)==0){
            GstState cur_state;
            gst_element_get_state(client->pipeline, &cur_state,NULL ,GST_CLOCK_TIME_NONE);
            while (cur_state != GST_STATE_PLAYING ){
                gst_element_set_state(client->pipeline, GST_STATE_PLAYING);
                ROS_INFO_STREAM(MODULE_NAME << "Could not Start Pipeline " << cur_state);
                usleep(5000000);
            }
        }
    }
    
    
    void c_interface_webrtc::processTextMessage(QString message)
    {
            ROS_INFO_STREAM(MODULE_NAME << "TextMessage");
    
        QWebSocket *pClient = qobject_cast<QWebSocket *>(sender());
        QJsonDocument inbox = QJsonDocument::fromJson(message.toUtf8());
        if (!inbox.isObject()) {
            ROS_INFO_STREAM(MODULE_NAME << " Error parsing JSON");
        }else{
            QJsonObject json = inbox.object();
    
            switch(json["cmd"].toInt()){
                case 1:
                break;
                case 2:
                break;
                case 3:
                break;
            }
        }
    }
    
    
    void c_interface_webrtc::processBinaryMessage(QByteArray message)
    {
        QWebSocket *pClient = qobject_cast<QWebSocket *>(sender());
    }
    
    void c_interface_webrtc::socketDisconnected()
    {
            ROS_INFO_STREAM(MODULE_NAME << "Disconnecting");
    }
    

    I cold not figure out how to push the GST part into the main thread and also was unable to bring the callbacks into the same.

    I am sure this is very basic but after starring on the code for so long i am braindead and stuck.
    any help is much appreciated !!

    PS.:
    @Christian-Ehrlicher this is the project where your information about the g_main_context_iteration helped me to get further. If you would also have an idea on this issue it would be amazing.


  • Lifetime Qt Champion

    @monkfood said in Howto push GST thread into QT Main Thread:

    a callback is called that wants to send data through a websocket living in the Main thread.

    Then don't use that websocket in the thread, but emit a signal in this callback instead. Connect this signal to a slot in main thread where you then use the websocket.


Log in to reply