QThread sub milisecond timer problems



  • I am building a program to stream video packets over the network. My code functions properly for the most part, but the part of my code that I use for timing has problems. Frequently datagrams are sent 50-200+ ms late. I am using a qthread spinning in a while loop that checks a qElapsedTimer. Any gap between datagrams that is greater then 200ms causes problems with the mux hardware downstream, so that problem is unacceptable. I have tested to make sure the send datagram function is executed in the correct .

    @
    #ifndef STREAM_H
    #define STREAM_H

    #include "QtCore"
    #include <QUdpSocket>
    #include "QThread"

    typedef QList< QByteArray > BufferList;
    Q_DECLARE_METATYPE(BufferList);
    Q_DECLARE_METATYPE(QHostAddress);

    class Worker : public QObject
    {
    Q_OBJECT

    public:
    explicit Worker();
    ~Worker();
    bool loop;

    public slots:
    void send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);

    signals:
    void done_streaming();

    private:
    void save_file(BufferList datagram_buffer);
    QUdpSocket *udp_streaming_socket;
    int datagram_index , socket_state;

    QElapsedTimer elapsed_timer;
    QHostAddress        ip_stream_address;
    qint16              ip_stream_port;
    

    };

    class stream : public QObject
    {
    Q_OBJECT

    QThread workerThread;
    

    public:
    explicit stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent);
    ~stream();

    public slots:
    void send_udp_packet();
    void make_udp_packet(QByteArray packet);
    void done_with_worker();
    signals:
    void start_stream(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer);
    void done_with_stream();
    private:
    QHostAddress ip_stream_address;
    qint16 ip_stream_port;

    BufferList  datagram_buffer;
    QByteArray  datagram, packet;
    Worker *worker;
    qint64 timer_freq;
    int packet_index , KbitRate , pkts_PerDgram , packet_size , dgram_size;
    

    };
    #endif // STREAM_H

    @



  • @
    #include "stream.h"
    #include "windows.h"
    /// ========================================================================================================
    stream::stream(QHostAddress stream_addr, qint16 stream_port, int kBitRate, int pktsPerDgram,int pkt_size, QObject *parent) :
    QObject(parent)
    {
    qRegisterMetaType<QHostAddress>("QHostAddress");
    qRegisterMetaType<BufferList>("BufferList");

    KbitRate = kBitRate;
    ip_stream_address = stream_addr;
    ip_stream_port = stream_port;
    
    timer_freq = 8*pkt_size*pktsPerDgram; // 8 bits per byte, ms between packets.
    timer_freq = timer_freq*1000000;
    timer_freq = timer_freq/(kBitRate);
    
    worker = new Worker;
    
    connect(&workerThread, &QThread::finished, worker, &QObject::deleteLater);
    connect(this, &stream::start_stream , worker, &Worker::send_datagrams );
    connect(worker,SIGNAL(done_streaming()),this,SLOT(done_with_worker()));
    worker->moveToThread(&workerThread);
    workerThread.start(QThread::TimeCriticalPriority);
    

    // Prepare constants to build datagram.
    pkts_PerDgram = pktsPerDgram;
    packet_size = pkt_size;
    dgram_size = pkt_size*pktsPerDgram;
    qDebug()<< "thread id = " << QThread::currentThreadId();
    //qDebug()<< "workerthread id = "<< workerThread.currentThreadId();
    qDebug()<< "priority is " <<workerThread.priority();
    packet.fill('1',packet_size);
    datagram.clear();
    packet_index=0;
    }

    stream::~stream()
    {
    qDebug("stopping stream");
    worker->loop=false;
    workerThread.quit();
    workerThread.wait();
    }

    void stream::make_udp_packet(QByteArray packet)
    {
    // QByteArray((char*)pktBuffer,packet_size);
    if (packet_index < pkts_PerDgram)
    {
    datagram.append(packet);
    packet_index++;

         if(packet_index == pkts_PerDgram)
             {
                datagram_buffer.append( datagram );
                datagram.clear();
                packet_index=0;
             }
    }
    

    }

    void stream::send_udp_packet()
    {
    qDebug()<< "stream thread id = " << QThread::currentThreadId();
    start_stream( ip_stream_address, ip_stream_port , timer_freq ,datagram_buffer);
    }

    void stream::done_with_worker()
    {
    qDebug("done with worker");
    emit done_with_stream();
    }

    Worker::Worker()
    {
    loop=true;
    qDebug("Constructor");
    }

    Worker::~Worker()
    {
    qDebug("Closing Socket");
    udp_streaming_socket->close();
    }
    void Worker::send_datagrams(QHostAddress stream_addr, qint16 stream_port, qint64 timer_freq , BufferList datagram_buffer)
    {
    udp_streaming_socket = new QUdpSocket(this); // switch to QUdpSocket and remove winsock when Qt is patched

    ip_stream_address = stream_addr;
    ip_stream_port = stream_port;
    datagram_index = 0;
    qint64 start_time, end_time;
        elapsed_timer.start();
    qDebug()<< "worker thread id = " << QThread::currentThreadId();
    //qDebug()<< "thread priority = " << QThread::Priority;
    while(loop)
    {
        while(elapsed_timer.nsecsElapsed() <= timer_freq)
        {
            // spin until time to send next packet
        }
        if(datagram_buffer.size()<=datagram_index)
        {
            emit done_streaming();
            save_file&#40;datagram_buffer&#41;;
            qDebug("done with stream");
            break;
        }
        if(datagram_buffer.size()>datagram_index)
        {
            start_time = elapsed_timer.nsecsElapsed();
            socket_state = udp_streaming_socket->writeDatagram(datagram_buffer.at(datagram_index).data(), datagram_buffer.at(datagram_index).size() ,ip_stream_address , ip_stream_port);
            udp_streaming_socket->waitForBytesWritten();
            //end_time = elapsed_timer.nsecsElapsed();
            if(start_time - timer_freq > 50000)
            {
                qDebug()<< "Late by = " << start_time - timer_freq << " ns at datagram " << datagram_index ;
            }
            elapsed_timer.restart();     // start elapsed timer
            if(socket_state<=0)
            {
                qDebug()<< "Socket error "<< udp_streaming_socket->error();
            }
            datagram_index++;
        }
    }
    udp_streaming_socket->close();
    qDebug()<< "worker thread id = " << QThread::currentThreadId();
    //qDebug()<< "thread priority = " << QThread::Priority;
    

    }

    void Worker::save_file(BufferList datagram_buffer)
    {
    QFile file("c:\3abn\qudp_test.ts");
    if(file.open(QIODevice::WriteOnly))
    {
    QDataStream out(&file);
    for(int i=0; i<datagram_buffer.size();i++)
    out.writeRawData(datagram_buffer.at(i).data(),datagram_buffer.at(i).size());
    file.close();
    }
    }

    @



  • Might this have to do with the process priority?

    http://msdn.microsoft.com/en-us/library/windows/desktop/ms685100(v=vs.85).aspx

    I noticed this becoming a problem most when I moved from a very simple GUI to one with a system tray icon and other stuff going on in the main GUI thread. I need real time priority for the worker thread, but not for the GUI. I can enforce that this will be running on a multi core machine, so consuming all cycles of one core for this isn't a problem. Time accuracy is most important, although going with a real time OS is something I would like to avoid. Is there a cross platform way to manage the process priority, or do I need to make blocks of code for each platform I want to support? Also, if I set the process priority as real time will that basically mean I will be tying up at least two CPU cores even if I demote the thread priority of the GUI thread? It seems a little wrong to assign real time priority to the whole process of a program with a system tray icon.....n real time priority to the whole process of a program with a system tray icon...

    PS: the link function of this forum is broken for the above link.


Log in to reply
 

Looks like your connection to Qt Forum was lost, please wait while we try to reconnect.