QUdpSocket: has no more pending datagrams



  • Hi all,

    I use QUdpSocket from Qt5.6.2 on Ubuntu 16.04 to get 4 channels audio stream from a specific device, sending Udp datagrams.

    Everything works fine until it reaches the 11402th audio datagram of the 4th channel, every time, at each test. I represent about 23 seconds. After this 11402th audio datagram, QUdpSocket->hasPendingDatagrams() return false whereas the audio device sending audio stream continues to send datagrams. In parallel, Wireshark notices me that other datagrams come in.

    I don't know what to do. I guess I could use an other library for this task...
    What do you think.
    Thanks



  • can you show us your code? hard to guess for now



  • Of course, here it is the processPendingDatagram() function:

    ///
    /// \brief AudioUdpRecorder::processPendingDatagrams Fonction appelée à chaque paquet réseau
    /// qui interprète les différents messages.
    ///
    void AudioCoreRecorder::processPendingDatagrams()
    {
        while (udpSocket->hasPendingDatagrams())
    	{
            QByteArray datagram;
    		datagram.resize(udpSocket->pendingDatagramSize());
            qint64 coreMsgByteSize = udpSocket->readDatagram(datagram.data(), datagram.size());
            // Sécurité: on vérifie si le type de message du Core Module existe:
            if (coreMsgByteSize < 1)
            {
                message.write(Message::WARNING, QString("Received null Core datagram"));
                return;
            }
            ////////////////////////////////////////////////////////////////////
            // On attend un START pour récupérrer les données du Core Module:
            ////////////////////////////////////////////////////////////////////
            if (getLastCoreRecorderMsgType() == CoreRecorderMsg::STOP)
            {
                // On informe qu'on n'enregistre pas
                emit recordingChanged(false);
                // On interprète le type de message du Core Module:
                setLastCoreRecorderMsgType(datagram.at(0));
                message.write(Message::DEBUG, QString("Received Core %1 datagram")
                              .arg(coreRecorderMsgTypeStringMap.value(getLastCoreRecorderMsgType())));
                /////////////////////////////////////////////////////////////
                // Il y a 2 possibilités:
                // o OK: on reçoit un message de type START du Core Module
                //   --> on répond START,
                // o KO: on reçoit un autre type de message du Core Module
                //   --> --
                /////////////////////////////////////////////////////////////
                // o OK: on reçoit un message de type START du Core Module
                //   --> on répond START,
                //   --> modèle: [uint8 uint8 uint8[256]]
                if (getLastCoreRecorderMsgType() == CoreRecorderMsg::START)
                {
                    if (coreMsgByteSize < 2)
                    {
                        message.write(Message::WARNING, QString(" --> Bad channel Core flag"));
                        // datagramMutex.unlock();
                        return;
                    }
                    // On actualise l'état des channels en fonction du channel flag du message du Core Module,
                    // qui informe à son tour l'audioFileRecorder sur le nombre de voies à réceptionner:
                    setChannelFlags(datagram.at(1));
                    // On informe le audioFileRecorder de créer un fichier:
                    audioFileRecorder->wavFileOpen();
                    // On reset les pageIndex de chaque channel pour pouvoir tester la continuité des samples:
                    resetPageIndex();
                    // Si tout va bien, qu'on est prêt, on renvoie le même message
                    emit messageTypeToSend(CoreRecorderMsg::START);
                }
                // o KO: on reçoit un autre type de message du Core Module
                //   --> --
                else
                {
                    message.write(Message::WARNING, QString(" --> inapropriate message type: %")
                                  .arg(coreRecorderMsgTypeStringMap[getLastCoreRecorderMsgType()]));
                }
            }
            ////////////////////////////////////////
            // On attend les data du Core Module:
            ////////////////////////////////////////
            else if (getLastCoreRecorderMsgType() == CoreRecorderMsg::START
                     || getLastCoreRecorderMsgType() == CoreRecorderMsg::DATA)
            {
                // On interprète le type de message du Core Module:
                setLastCoreRecorderMsgType(datagram.at(0));
                /////////////////////////////////////////////////////////////
                // Il y a 3 possibilités:
                // o OK: on reçoit un message de type DATA du Core Module
                //   --> on enregistre les DATA,
                // o OK: on reçoit un message de type STOP du Core Module
                //   --> on répond STOP,
                // o KO: on reçoit un message de type START du Core Module
                //   --> on répond START,
                // o KO: on reçoit un autre type de message du Core Module
                //   --> --,
                /////////////////////////////////////////////////////////////
                // o OK: on reçoit un message de type DATA du Core Module
                //   --> on enregistre les DATA,
                //   --> modèle: [uint8 uint8 uint16 uint32 uint16[512]],
                if (getLastCoreRecorderMsgType() == CoreRecorderMsg::DATA)
                {
                    if (coreMsgByteSize < 2)
                    {
                        // On informe qu'on n'enregistre pas
                        emit recordingChanged(false);
                        message.write(Message::WARNING, QString(" --> Bad Core channel flag"));
                        // datagramMutex.unlock();
                        return;
                    }
                    // Sécurité ?????:
                    ushort sharedVectorSizeShort = ushort(coreMsgByteSize-CORE_MODULE_OFFSET_DATA)/2; // Taille un short ;-)
                    if (coreMsgByteSize < CORE_MODULE_DATA_SIZE)
                    {
                        message.write(Message::DEBUG, QString(" --> incomplete data frame?"));
                    }
                    ///////////////////////////////////////
                    // on acquiert les données (16 bits):
                    ///////////////////////////////////////
                    // On informe qu'on enregistre
                    emit recordingChanged(true);
                    // On pointe sur le 8 ème élément (arithmétique de pointeurs),
                    // la où commencent le sample frame:
                    const ushort *ptr =
                            reinterpret_cast<const ushort*>(datagram.constData()+CORE_MODULE_OFFSET_DATA);
                    ///////////////////////////////////
                    // THREAD CONCURRENCY MANAGEMENT //
                    // o    Prend un jeton pour Fill (nécessite qu'un jeton ait été libéré
                    //      par le wavRecorder)
                    bufferFilling.acquire(1);
                    // On actualise l'état des channels en fonction du channel flag du message du Core Module
                    // qui prévient également l'audioFileRecorder du numéro de la voie.
                    // NOTE: il est important d'informer l'audioFileRecorder APRES le semaphore pour qu'il reste
                    // identique dans la phase de "filling" et la phase de "Recording".
                    setChannelOnTheAir(datagram.at(1));
                    // Test de continuité des DATA: les indexes doivent être consécutifs, sans quoi on perd des données...
                    if (!updateActualPageIndex(datagram.mid(CORE_MODULE_OFFSET_INDEX,4)))
                        message.write(Message::ERROR, QString(" --> samples not successive"));
                    // DEBUG:
                    message.write(Message::DEBUG, QString("%1/%2/%3/%4")
                                  .arg(getPageIndexList().at(0))
                                  .arg(getPageIndexList().at(1))
                                  .arg(getPageIndexList().at(2))
                                  .arg(getPageIndexList().at(3)));
                    ///////////////////////////////////
                    if (getChannelOnTheAir() == 0)
                    {
                        // On remplit le buffer avec les samples:
                        for (ushort i = 0; i < sharedVectorSizeShort; i++)
                            sharedVectorChannel0[i] = *(ptr+i); // Arithmetique de pointeurs
                        // On met à 0 les non remplis:
                        for (ushort i = sharedVectorSizeShort; i < CORE_MODULE_BUFFER_SIZE; i++)
                            sharedVectorChannel0[i] = 0; // Arithmetique de pointeurs
                    }
                    if (getChannelOnTheAir() == 1)
                    {
                        // On remplit le buffer:
                        for (ushort i = 0; i < sharedVectorSizeShort; i++)
                            sharedVectorChannel1[i] = *(ptr+i); // Arithmetique de pointeurs
                        for (ushort i = sharedVectorSizeShort; i < CORE_MODULE_BUFFER_SIZE; i++)
                            sharedVectorChannel1[i] = 0; // Arithmetique de pointeurs
                    }
                    if (getChannelOnTheAir() == 2)
                    {
                        // On remplit le buffer:
                        for (ushort i = 0; i < sharedVectorSizeShort; i++)
                            sharedVectorChannel2[i] = *(ptr+i); // Arithmetique de pointeurs
                        for (ushort i = sharedVectorSizeShort; i < CORE_MODULE_BUFFER_SIZE; i++)
                            sharedVectorChannel2[i] = 0; // Arithmetique de pointeurs
                    }
                    if (getChannelOnTheAir() == 3)
                    {
                        // On remplit le buffer:
                        for (ushort i = 0; i < sharedVectorSizeShort; i++)
                            sharedVectorChannel3[i] = *(ptr+i); // Arithmetique de pointeurs
                        for (ushort i = sharedVectorSizeShort; i < CORE_MODULE_BUFFER_SIZE; i++)
                            sharedVectorChannel3[i] = 0; // Arithmetique de pointeurs
                    }
                    ///////////////////////////////////
                    // THREAD CONCURRENCY MANAGEMENT //
                    // o     Rend un jeton pour Record
                    bufferRecording.release(1);
                    ///////////////////////////////////
                }
                // o OK: on reçoit un message de type STOP du Core Module
                //   --> on répond STOP,
                else if (getLastCoreRecorderMsgType() == CoreRecorderMsg::STOP)
                {
                    message.write(Message::DEBUG, QString("Received Core %1 datagram")
                                  .arg(coreRecorderMsgTypeStringMap.value(getLastCoreRecorderMsgType())));
                    // On informe qu'on n'enregistre pas
                    emit recordingChanged(false);
                    // On ferme le fichier:
                    audioFileRecorder->wavFileClose();
                }
                // o KO: on reçoit un message de type START du Core Module
                //   --> on répond START,
                else if (getLastCoreRecorderMsgType() == CoreRecorderMsg::START)
                {
                    message.write(Message::DEBUG, QString("Received Core %1 datagram")
                                  .arg(coreRecorderMsgTypeStringMap.value(getLastCoreRecorderMsgType())));
                    // On informe qu'on n'enregistre pas
                    emit recordingChanged(false);
                    // On indique que c'est pas normal:
                    message.write(Message::WARNING, QString(" --> Unexpected because already recording"));
                    if (coreMsgByteSize < 2)
                    {
                        message.write(Message::WARNING, QString(" --> Bad channel Core flag"));
                        // datagramMutex.unlock();
                        return;
                    }
                    setLastCoreRecorderMsgType(datagram.at(0));
                    // On actualise l'état des channels en fonction du channel flag  du message du Core Module:
                    setChannelFlags(datagram.at(1));
                    // Si tout va bien, qu'on est prêt, on renvoie le même message
                    emit messageTypeToSend(CoreRecorderMsg::START);
                }
                // o KO: on reçoit un autre type de message du Core Module
                //   --> --
                else
                {
                    message.write(Message::DEBUG, QString("Received Core %1 datagram")
                                  .arg(coreRecorderMsgTypeStringMap.value(getLastCoreRecorderMsgType())));
                    // On informe qu'on n'enregistre pas
                    emit recordingChanged(false);
                    message.write(Message::WARNING, QString(" --> inapropriate message type: %")
                                  .arg(coreRecorderMsgTypeStringMap[getLastCoreRecorderMsgType()]));
                }
            }
        }
        message.write(Message::DEBUG, QString("exit()"));
    }
    


  • Sorry for my last post. Here I post a new one with a simple test which lead to the same problem:

    main

    #include <QCoreApplication>
    #include <QDebug>
    #include "socketmanager.h"
    
    int main(int argc, char *argv[])
    {
        QCoreApplication a(argc, argv);
        SocketManager coreModule;
        coreModule.bind();
    
        return a.exec();
    }
    
    **socketmanager.h**
    #ifndef SOCKETMANAGER_H
    #define SOCKETMANAGER_H
    
    #include <QObject>
    #include <QDebug>
    #include <QUdpSocket>
    #include <QThread>
    
    class SocketManager: public QObject
    {
        Q_OBJECT
    public:
                            SocketManager();
        QUdpSocket          *udpSocket;
        bool                started;
        unsigned int        count;
    
        void                bind();
    public slots:
        void                processPendingDatagrams();
        void                sendPendingDatagrams();
    };
    
    #endif // SOCKETMANAGER_H
    

    socketmanager.cpp

    #include "socketmanager.h"
    
    SocketManager::SocketManager()
    {
        started = false;
        count = 0;
        udpSocket = new QUdpSocket;
    }
    
    
    void SocketManager::bind()
    {
        qDebug() << "Wait for binding";
        connect(udpSocket,SIGNAL(readyRead()), this,SLOT(processPendingDatagrams()));
        while(!udpSocket->bind(QHostAddress("192.168.1.167"),8998))
        {
            QThread::currentThread()->msleep(1000);
        }
    
        qDebug() << "Binded";
    }
    
    void SocketManager::processPendingDatagrams()
    {
        while (udpSocket->hasPendingDatagrams())
        {
            QByteArray datagram;
            datagram.resize(udpSocket->pendingDatagramSize());
            qint64 coreMsgByteSize = udpSocket->readDatagram(datagram.data(), datagram.size());
            if (!started)
            {
                sendPendingDatagrams();
                qDebug() << "Started";
                started = true;
                return;
            }
            if (coreMsgByteSize > 500)
            {
                count++;
                if (count%4 == 0)
                    qDebug() << count/4;
            }
        }
    }
    
    
    void SocketManager::sendPendingDatagrams()
    {
        QByteArray block;
        block.append(char(0x40));
        block.append(char(0x0F));
        block.append(char(0xFF));   // Ici on configure le temps d'enregistrement sur
        block.append(char(0xFF));   // 4 octets, pour l'instant on met le max!
        block.append(char(0xFF));
        block.append(char(0xFF));
        if (udpSocket->writeDatagram(block,
                                     QHostAddress("192.168.1.166"), 8888) < 0)
        {
            qDebug() << "Failed to send datagram";
        }
        else
        {
            qDebug() << "Datagram sent";
        }
    }
    

    Result
    The last console output is (always, whatever the test): 11402

    Thank you



  • Could you try to change if (coreMsgByteSize > 500) to if (coreMsgByteSize > 0)

    You are leaking the socket. udpSocket = new QUdpSocket; should be udpSocket = new QUdpSocket(this);



  • Hi VRonin,

    Given that this code is a simple test, I do quickly and then bad, so leaks... ;-)
    Anyway, I first made a filter with "if (coreMsgByteSize > 500)" because the first datagram from the device is of size 30 bytes whereas the audio stream datagrams are of size 1032 bytes.

    I will do changes on monday.

    I have seen several QtBugs about QUdpSocket: bug1, bug2 around Qt version I use. Although I updated Qt version from 5.5.1 to 5.6.2 to check if it could be the solution, but... no.



  • Hi,

    here is the console output from the last code, with change from if (coreMsgByteSize > 500) to if (coreMsgByteSize > 0):
    Wait for binding
    Binded
    Datagram sent
    Started
    1
    2
    3
    4
    ...
    11400



  • So very bizarre...

    • Could you try with Qt 5.11 and see if it's the same?
    • In SocketManager::bind() could you add, as first line:
    QTimer* backupTimer = new QTimer(this);
    connect(backupTimer,&QTimer::timeout,this,&SocketManager::processPendingDatagrams);
    backupTimer->start(2000);
    


  • In my last test, my device sent me 4 audio channels, each datagram representing a 512 samples buffer from one channel.
    I just have done a test with only one channel, that means each datagram is from the channel one, now. The recording stops at the same iteration, but after 4 times more time. The console output is now:
    Wait for binding
    Binded
    Datagram sent
    Started
    1
    2
    3
    4
    ...
    11401



  • @VRonin said in QUdpSocket: has no more pending datagrams:

    Could you try with Qt 5.11 and see if it's the same?
    In SocketManager::bind() could you add, as first line:

    I tried on Ubuntu with Qt5.6 & 5.11 and on Win10 with Qt5.6:

    • On Ubuntu the iteration stops at 11402 whatever the Qt version
    • On Win10 the iteration stops with a random number (between 300 and 400).

    I also used your timer to trigger the processPendingDatagrams() function but it seems that udpSocket->hasPendingDatagrams() return false because nothing happens.



  • Hello, hello,

    I have tried to manage the audio stream with another library (ASIO) and the result is the same as I got with QtNetwork. I conclude the problem comes from the device, not from the software I develop.

    Sorry for wasting your time and thank you for trying to help me...


Log in to reply
 

Looks like your connection to Qt Forum was lost, please wait while we try to reconnect.