Important: Please read the Qt Code of Conduct - https://forum.qt.io/topic/113070/qt-code-of-conduct

QThread sub milisecond timer problems



  • I am building a program to stream video packets over the network. My code functions properly for the most part, but the part of my code that I use for timing has problems. Frequently datagrams are sent 50-200+ ms late. I am using a qthread spinning in a while loop that checks a qElapsedTimer. Any gap between datagrams that is greater then 200ms causes problems with the mux hardware downstream, so that problem is unacceptable. I have tested to make sure the send datagram function is executed in the correct .

    @
    #ifndef STREAM_H
    #define STREAM_H

    #include "QtCore"
    #include <QUdpSocket>
    #include "QThread"

    typedef QList< QByteArray > BufferList;
    Q_DECLARE_METATYPE(BufferList);
    Q_DECLARE_METATYPE(QHostAddress);

    class Worker : public QObject
    {
    Q_OBJECT

    public:
    explicit Worker();
    ~Worker();
    bool loop;

    public slots:
    void send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);

    signals:
    void done_streaming();

    private:
    void save_file(BufferList datagram_buffer);
    QUdpSocket *udp_streaming_socket;
    int datagram_index , socket_state;

    QElapsedTimer elapsed_timer;
    QHostAddress        ip_stream_address;
    qint16              ip_stream_port;
    

    };

    class stream : public QObject
    {
    Q_OBJECT

    QThread workerThread;
    

    public:
    explicit stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent);
    ~stream();

    public slots:
    void send_udp_packet();
    void make_udp_packet(QByteArray packet);
    void done_with_worker();
    signals:
    void start_stream(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);
    void done_with_stream();
    private:
    QHostAddress ip_stream_address;
    qint16 ip_stream_port;

    BufferList  datagram_buffer;
    QByteArray  datagram, packet;
    Worker *worker;
    qint64 timer_freq;
    int packet_index , KbitRate , pkts_PerDgram , packet_size , dgram_size;
    

    };
    #endif // STREAM_H

    @



  • @
    #include "stream.h"
    #include "windows.h"
    /// ========================================================================================================
    stream::stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent) :
    QObject(parent)
    {
    qRegisterMetaType<QHostAddress>("QHostAddress");
    qRegisterMetaType<BufferList>("BufferList");

    KbitRate = kBitRate;
    ip_stream_address = stream_addr;
    ip_stream_port = stream_port;
    
    timer_freq = 8*pkt_size*pktsPerDgram; // 8 bits per byte, ms between packets.
    timer_freq = timer_freq*1000000;
    timer_freq = timer_freq/(kBitRate);
    
    worker = new Worker;
    
    connect(&workerThread, &QThread::finished, worker, &QObject::deleteLater);
    connect(this, &stream::start_stream , worker, &Worker::send_datagrams );
    connect(worker,SIGNAL(done_streaming()),this,SLOT(done_with_worker()));
    worker->moveToThread(&workerThread);
    workerThread.start(QThread::TimeCriticalPriority);
    

    // Prepare constants to build datagram.
    pkts_PerDgram = pktsPerDgram;
    packet_size = pkt_size;
    dgram_size = pkt_size*pktsPerDgram;
    qDebug()<< "thread id = " << QThread::currentThreadId();
    //qDebug()<< "workerthread id = "<< workerThread.currentThreadId();
    qDebug()<< "priority is " <<workerThread.priority();
    packet.fill('1',packet_size);
    datagram.clear();
    packet_index=0;
    }

    stream::~stream()
    {
    qDebug("stopping stream");
    worker->loop=false;
    workerThread.quit();
    workerThread.wait();
    }

    void stream::make_udp_packet(QByteArray packet)
    {
    // QByteArray((char*)pktBuffer,packet_size);
    if (packet_index < pkts_PerDgram)
    {
    datagram.append(packet);
    packet_index++;

         if(packet_index == pkts_PerDgram)
             {
                datagram_buffer.append( datagram );
                datagram.clear();
                packet_index=0;
             }
    }
    

    }

    void stream::send_udp_packet()
    {
    qDebug()<< "stream thread id = " << QThread::currentThreadId();
    start_stream( ip_stream_address, ip_stream_port , timer_freq ,datagram_buffer);
    }

    void stream::done_with_worker()
    {
    qDebug("done with worker");
    emit done_with_stream();
    }

    Worker::Worker()
    {
    loop=true;
    qDebug("Constructor");
    }

    Worker::~Worker()
    {
    qDebug("Closing Socket");
    udp_streaming_socket->close();
    }
    void Worker::send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer)
    {
    udp_streaming_socket = new QUdpSocket(this); // switch to QUdpSocket and remove winsock when Qt is patched

    ip_stream_address = stream_addr;
    ip_stream_port = stream_port;
    datagram_index = 0;
    qint64 start_time, end_time;
        elapsed_timer.start();
    qDebug()<< "worker thread id = " << QThread::currentThreadId();
    //qDebug()<< "thread priority = " << QThread::Priority;
    while(loop)
    {
        while(elapsed_timer.nsecsElapsed() <= timer_freq)
        {
            // spin until time to send next packet
        }
        if(datagram_buffer.size()<=datagram_index)
        {
            emit done_streaming();
            save_file&#40;datagram_buffer&#41;;
            qDebug("done with stream");
            break;
        }
        if(datagram_buffer.size()>datagram_index)
        {
            start_time = elapsed_timer.nsecsElapsed();
            socket_state = udp_streaming_socket->writeDatagram(datagram_buffer.at(datagram_index).data(), datagram_buffer.at(datagram_index).size() ,ip_stream_address , ip_stream_port);
            udp_streaming_socket->waitForBytesWritten();
            //end_time = elapsed_timer.nsecsElapsed();
            if(start_time - timer_freq > 50000)
            {
                qDebug()<< "Late by = " << start_time - timer_freq << " ns at datagram " << datagram_index ;
            }
            elapsed_timer.restart();     // start elapsed timer
            if(socket_state<=0)
            {
                qDebug()<< "Socket error "<< udp_streaming_socket->error();
            }
            datagram_index++;
        }
    }
    udp_streaming_socket->close();
    qDebug()<< "worker thread id = " << QThread::currentThreadId();
    //qDebug()<< "thread priority = " << QThread::Priority;
    

    }

    void Worker::save_file(BufferList datagram_buffer)
    {
    QFile file("c:\3abn\qudp_test.ts");
    if(file.open(QIODevice::WriteOnly))
    {
    QDataStream out(&file);
    for(int i=0; i<datagram_buffer.size();i++)
    out.writeRawData(datagram_buffer.at(i).data(),datagram_buffer.at(i).size());
    file.close();
    }
    }

    @



  • Might this have to do with the process priority?

    http://msdn.microsoft.com/en-us/library/windows/desktop/ms685100(v=vs.85).aspx

    I noticed this becoming a problem most when I moved from a very simple GUI to one with a system tray icon and other stuff going on in the main GUI thread. I need real time priority for the worker thread, but not for the GUI. I can enforce that this will be running on a multi core machine, so consuming all cycles of one core for this isn't a problem. Time accuracy is most important, although going with a real time OS is something I would like to avoid. Is there a cross platform way to manage the process priority, or do I need to make blocks of code for each platform I want to support? Also, if I set the process priority as real time will that basically mean I will be tying up at least two CPU cores even if I demote the thread priority of the GUI thread? It seems a little wrong to assign real time priority to the whole process of a program with a system tray icon.....n real time priority to the whole process of a program with a system tray icon...

    PS: the link function of this forum is broken for the above link.


Log in to reply