Important: Please read the Qt Code of Conduct - https://forum.qt.io/topic/113070/qt-code-of-conduct

QUdpSocket: has no more pending datagrams



  • Hi all,

    I use QUdpSocket from Qt5.6.2 on Ubuntu 16.04 to get 4 channels audio stream from a specific device, sending Udp datagrams.

    Everything works fine until it reaches the 11402th audio datagram of the 4th channel, every time, at each test. I represent about 23 seconds. After this 11402th audio datagram, QUdpSocket->hasPendingDatagrams() return false whereas the audio device sending audio stream continues to send datagrams. In parallel, Wireshark notices me that other datagrams come in.

    I don't know what to do. I guess I could use an other library for this task...
    What do you think.
    Thanks



  • can you show us your code? hard to guess for now



  • Of course, here it is the processPendingDatagram() function:

    ///
    /// \brief AudioUdpRecorder::processPendingDatagrams Fonction appelée à chaque paquet réseau
    /// qui interprète les différents messages.
    ///
    void AudioCoreRecorder::processPendingDatagrams()
    {
        while (udpSocket->hasPendingDatagrams())
    	{
            QByteArray datagram;
    		datagram.resize(udpSocket->pendingDatagramSize());
            qint64 coreMsgByteSize = udpSocket->readDatagram(datagram.data(), datagram.size());
            // Sécurité: on vérifie si le type de message du Core Module existe:
            if (coreMsgByteSize < 1)
            {
                message.write(Message::WARNING, QString("Received null Core datagram"));
                return;
            }
            ////////////////////////////////////////////////////////////////////
            // On attend un START pour récupérrer les données du Core Module:
            ////////////////////////////////////////////////////////////////////
            if (getLastCoreRecorderMsgType() == CoreRecorderMsg::STOP)
            {
                // On informe qu'on n'enregistre pas
                emit recordingChanged(false);
                // On interprète le type de message du Core Module:
                setLastCoreRecorderMsgType(datagram.at(0));
                message.write(Message::DEBUG, QString("Received Core %1 datagram")
                              .arg(coreRecorderMsgTypeStringMap.value(getLastCoreRecorderMsgType())));
                /////////////////////////////////////////////////////////////
                // Il y a 2 possibilités:
                // o OK: on reçoit un message de type START du Core Module
                //   --> on répond START,
                // o KO: on reçoit un autre type de message du Core Module
                //   --> --
                /////////////////////////////////////////////////////////////
                // o OK: on reçoit un message de type START du Core Module
                //   --> on répond START,
                //   --> modèle: [uint8 uint8 uint8[256]]
                if (getLastCoreRecorderMsgType() == CoreRecorderMsg::START)
                {
                    if (coreMsgByteSize < 2)
                    {
                        message.write(Message::WARNING, QString(" --> Bad channel Core flag"));
                        // datagramMutex.unlock();
                        return;
                    }
                    // On actualise l'état des channels en fonction du channel flag du message du Core Module,
                    // qui informe à son tour l'audioFileRecorder sur le nombre de voies à réceptionner:
                    setChannelFlags(datagram.at(1));
                    // On informe le audioFileRecorder de créer un fichier:
                    audioFileRecorder->wavFileOpen();
                    // On reset les pageIndex de chaque channel pour pouvoir tester la continuité des samples:
                    resetPageIndex();
                    // Si tout va bien, qu'on est prêt, on renvoie le même message
                    emit messageTypeToSend(CoreRecorderMsg::START);
                }
                // o KO: on reçoit un autre type de message du Core Module
                //   --> --
                else
                {
                    message.write(Message::WARNING, QString(" --> inapropriate message type: %")
                                  .arg(coreRecorderMsgTypeStringMap[getLastCoreRecorderMsgType()]));
                }
            }
            ////////////////////////////////////////
            // On attend les data du Core Module:
            ////////////////////////////////////////
            else if (getLastCoreRecorderMsgType() == CoreRecorderMsg::START
                     || getLastCoreRecorderMsgType() == CoreRecorderMsg::DATA)
            {
                // On interprète le type de message du Core Module:
                setLastCoreRecorderMsgType(datagram.at(0));
                /////////////////////////////////////////////////////////////
                // Il y a 3 possibilités:
                // o OK: on reçoit un message de type DATA du Core Module
                //   --> on enregistre les DATA,
                // o OK: on reçoit un message de type STOP du Core Module
                //   --> on répond STOP,
                // o KO: on reçoit un message de type START du Core Module
                //   --> on répond START,
                // o KO: on reçoit un autre type de message du Core Module
                //   --> --,
                /////////////////////////////////////////////////////////////
                // o OK: on reçoit un message de type DATA du Core Module
                //   --> on enregistre les DATA,
                //   --> modèle: [uint8 uint8 uint16 uint32 uint16[512]],
                if (getLastCoreRecorderMsgType() == CoreRecorderMsg::DATA)
                {
                    if (coreMsgByteSize < 2)
                    {
                        // On informe qu'on n'enregistre pas
                        emit recordingChanged(false);
                        message.write(Message::WARNING, QString(" --> Bad Core channel flag"));
                        // datagramMutex.unlock();
                        return;
                    }
                    // Sécurité ?????:
                    ushort sharedVectorSizeShort = ushort(coreMsgByteSize-CORE_MODULE_OFFSET_DATA)/2; // Taille un short ;-)
                    if (coreMsgByteSize < CORE_MODULE_DATA_SIZE)
                    {
                        message.write(Message::DEBUG, QString(" --> incomplete data frame?"));
                    }
                    ///////////////////////////////////////
                    // on acquiert les données (16 bits):
                    ///////////////////////////////////////
                    // On informe qu'on enregistre
                    emit recordingChanged(true);
                    // On pointe sur le 8 ème élément (arithmétique de pointeurs),
                    // la où commencent le sample frame:
                    const ushort *ptr =
                            reinterpret_cast<const ushort*>(datagram.constData()+CORE_MODULE_OFFSET_DATA);
                    ///////////////////////////////////
                    // THREAD CONCURRENCY MANAGEMENT //
                    // o    Prend un jeton pour Fill (nécessite qu'un jeton ait été libéré
                    //      par le wavRecorder)
                    bufferFilling.acquire(1);
                    // On actualise l'état des channels en fonction du channel flag du message du Core Module
                    // qui prévient également l'audioFileRecorder du numéro de la voie.
                    // NOTE: il est important d'informer l'audioFileRecorder APRES le semaphore pour qu'il reste
                    // identique dans la phase de "filling" et la phase de "Recording".
                    setChannelOnTheAir(datagram.at(1));
                    // Test de continuité des DATA: les indexes doivent être consécutifs, sans quoi on perd des données...
                    if (!updateActualPageIndex(datagram.mid(CORE_MODULE_OFFSET_INDEX,4)))
                        message.write(Message::ERROR, QString(" --> samples not successive"));
                    // DEBUG:
                    message.write(Message::DEBUG, QString("%1/%2/%3/%4")
                                  .arg(getPageIndexList().at(0))
                                  .arg(getPageIndexList().at(1))
                                  .arg(getPageIndexList().at(2))
                                  .arg(getPageIndexList().at(3)));
                    ///////////////////////////////////
                    if (getChannelOnTheAir() == 0)
                    {
                        // On remplit le buffer avec les samples:
                        for (ushort i = 0; i < sharedVectorSizeShort; i++)
                            sharedVectorChannel0[i] = *(ptr+i); // Arithmetique de pointeurs
                        // On met à 0 les non remplis:
                        for (ushort i = sharedVectorSizeShort; i < CORE_MODULE_BUFFER_SIZE; i++)
                            sharedVectorChannel0[i] = 0; // Arithmetique de pointeurs
                    }
                    if (getChannelOnTheAir() == 1)
                    {
                        // On remplit le buffer:
                        for (ushort i = 0; i < sharedVectorSizeShort; i++)
                            sharedVectorChannel1[i] = *(ptr+i); // Arithmetique de pointeurs
                        for (ushort i = sharedVectorSizeShort; i < CORE_MODULE_BUFFER_SIZE; i++)
                            sharedVectorChannel1[i] = 0; // Arithmetique de pointeurs
                    }
                    if (getChannelOnTheAir() == 2)
                    {
                        // On remplit le buffer:
                        for (ushort i = 0; i < sharedVectorSizeShort; i++)
                            sharedVectorChannel2[i] = *(ptr+i); // Arithmetique de pointeurs
                        for (ushort i = sharedVectorSizeShort; i < CORE_MODULE_BUFFER_SIZE; i++)
                            sharedVectorChannel2[i] = 0; // Arithmetique de pointeurs
                    }
                    if (getChannelOnTheAir() == 3)
                    {
                        // On remplit le buffer:
                        for (ushort i = 0; i < sharedVectorSizeShort; i++)
                            sharedVectorChannel3[i] = *(ptr+i); // Arithmetique de pointeurs
                        for (ushort i = sharedVectorSizeShort; i < CORE_MODULE_BUFFER_SIZE; i++)
                            sharedVectorChannel3[i] = 0; // Arithmetique de pointeurs
                    }
                    ///////////////////////////////////
                    // THREAD CONCURRENCY MANAGEMENT //
                    // o     Rend un jeton pour Record
                    bufferRecording.release(1);
                    ///////////////////////////////////
                }
                // o OK: on reçoit un message de type STOP du Core Module
                //   --> on répond STOP,
                else if (getLastCoreRecorderMsgType() == CoreRecorderMsg::STOP)
                {
                    message.write(Message::DEBUG, QString("Received Core %1 datagram")
                                  .arg(coreRecorderMsgTypeStringMap.value(getLastCoreRecorderMsgType())));
                    // On informe qu'on n'enregistre pas
                    emit recordingChanged(false);
                    // On ferme le fichier:
                    audioFileRecorder->wavFileClose();
                }
                // o KO: on reçoit un message de type START du Core Module
                //   --> on répond START,
                else if (getLastCoreRecorderMsgType() == CoreRecorderMsg::START)
                {
                    message.write(Message::DEBUG, QString("Received Core %1 datagram")
                                  .arg(coreRecorderMsgTypeStringMap.value(getLastCoreRecorderMsgType())));
                    // On informe qu'on n'enregistre pas
                    emit recordingChanged(false);
                    // On indique que c'est pas normal:
                    message.write(Message::WARNING, QString(" --> Unexpected because already recording"));
                    if (coreMsgByteSize < 2)
                    {
                        message.write(Message::WARNING, QString(" --> Bad channel Core flag"));
                        // datagramMutex.unlock();
                        return;
                    }
                    setLastCoreRecorderMsgType(datagram.at(0));
                    // On actualise l'état des channels en fonction du channel flag  du message du Core Module:
                    setChannelFlags(datagram.at(1));
                    // Si tout va bien, qu'on est prêt, on renvoie le même message
                    emit messageTypeToSend(CoreRecorderMsg::START);
                }
                // o KO: on reçoit un autre type de message du Core Module
                //   --> --
                else
                {
                    message.write(Message::DEBUG, QString("Received Core %1 datagram")
                                  .arg(coreRecorderMsgTypeStringMap.value(getLastCoreRecorderMsgType())));
                    // On informe qu'on n'enregistre pas
                    emit recordingChanged(false);
                    message.write(Message::WARNING, QString(" --> inapropriate message type: %")
                                  .arg(coreRecorderMsgTypeStringMap[getLastCoreRecorderMsgType()]));
                }
            }
        }
        message.write(Message::DEBUG, QString("exit()"));
    }
    


  • Sorry for my last post. Here I post a new one with a simple test which lead to the same problem:

    main

    #include <QCoreApplication>
    #include <QDebug>
    #include "socketmanager.h"
    
    int main(int argc, char *argv[])
    {
        QCoreApplication a(argc, argv);
        SocketManager coreModule;
        coreModule.bind();
    
        return a.exec();
    }
    
    **socketmanager.h**
    #ifndef SOCKETMANAGER_H
    #define SOCKETMANAGER_H
    
    #include <QObject>
    #include <QDebug>
    #include <QUdpSocket>
    #include <QThread>
    
    class SocketManager: public QObject
    {
        Q_OBJECT
    public:
                            SocketManager();
        QUdpSocket          *udpSocket;
        bool                started;
        unsigned int        count;
    
        void                bind();
    public slots:
        void                processPendingDatagrams();
        void                sendPendingDatagrams();
    };
    
    #endif // SOCKETMANAGER_H
    

    socketmanager.cpp

    #include "socketmanager.h"
    
    SocketManager::SocketManager()
    {
        started = false;
        count = 0;
        udpSocket = new QUdpSocket;
    }
    
    
    void SocketManager::bind()
    {
        qDebug() << "Wait for binding";
        connect(udpSocket,SIGNAL(readyRead()), this,SLOT(processPendingDatagrams()));
        while(!udpSocket->bind(QHostAddress("192.168.1.167"),8998))
        {
            QThread::currentThread()->msleep(1000);
        }
    
        qDebug() << "Binded";
    }
    
    void SocketManager::processPendingDatagrams()
    {
        while (udpSocket->hasPendingDatagrams())
        {
            QByteArray datagram;
            datagram.resize(udpSocket->pendingDatagramSize());
            qint64 coreMsgByteSize = udpSocket->readDatagram(datagram.data(), datagram.size());
            if (!started)
            {
                sendPendingDatagrams();
                qDebug() << "Started";
                started = true;
                return;
            }
            if (coreMsgByteSize > 500)
            {
                count++;
                if (count%4 == 0)
                    qDebug() << count/4;
            }
        }
    }
    
    
    void SocketManager::sendPendingDatagrams()
    {
        QByteArray block;
        block.append(char(0x40));
        block.append(char(0x0F));
        block.append(char(0xFF));   // Ici on configure le temps d'enregistrement sur
        block.append(char(0xFF));   // 4 octets, pour l'instant on met le max!
        block.append(char(0xFF));
        block.append(char(0xFF));
        if (udpSocket->writeDatagram(block,
                                     QHostAddress("192.168.1.166"), 8888) < 0)
        {
            qDebug() << "Failed to send datagram";
        }
        else
        {
            qDebug() << "Datagram sent";
        }
    }
    

    Result
    The last console output is (always, whatever the test): 11402

    Thank you



  • Could you try to change if (coreMsgByteSize > 500) to if (coreMsgByteSize > 0)

    You are leaking the socket. udpSocket = new QUdpSocket; should be udpSocket = new QUdpSocket(this);



  • Hi VRonin,

    Given that this code is a simple test, I do quickly and then bad, so leaks... ;-)
    Anyway, I first made a filter with "if (coreMsgByteSize > 500)" because the first datagram from the device is of size 30 bytes whereas the audio stream datagrams are of size 1032 bytes.

    I will do changes on monday.

    I have seen several QtBugs about QUdpSocket: bug1, bug2 around Qt version I use. Although I updated Qt version from 5.5.1 to 5.6.2 to check if it could be the solution, but... no.



  • Hi,

    here is the console output from the last code, with change from if (coreMsgByteSize > 500) to if (coreMsgByteSize > 0):
    Wait for binding
    Binded
    Datagram sent
    Started
    1
    2
    3
    4
    ...
    11400



  • So very bizarre...

    • Could you try with Qt 5.11 and see if it's the same?
    • In SocketManager::bind() could you add, as first line:
    QTimer* backupTimer = new QTimer(this);
    connect(backupTimer,&QTimer::timeout,this,&SocketManager::processPendingDatagrams);
    backupTimer->start(2000);
    


  • In my last test, my device sent me 4 audio channels, each datagram representing a 512 samples buffer from one channel.
    I just have done a test with only one channel, that means each datagram is from the channel one, now. The recording stops at the same iteration, but after 4 times more time. The console output is now:
    Wait for binding
    Binded
    Datagram sent
    Started
    1
    2
    3
    4
    ...
    11401



  • @VRonin said in QUdpSocket: has no more pending datagrams:

    Could you try with Qt 5.11 and see if it's the same?
    In SocketManager::bind() could you add, as first line:

    I tried on Ubuntu with Qt5.6 & 5.11 and on Win10 with Qt5.6:

    • On Ubuntu the iteration stops at 11402 whatever the Qt version
    • On Win10 the iteration stops with a random number (between 300 and 400).

    I also used your timer to trigger the processPendingDatagrams() function but it seems that udpSocket->hasPendingDatagrams() return false because nothing happens.



  • Hello, hello,

    I have tried to manage the audio stream with another library (ASIO) and the result is the same as I got with QtNetwork. I conclude the problem comes from the device, not from the software I develop.

    Sorry for wasting your time and thank you for trying to help me...


Log in to reply