QT 6.x QAudioSink potential bug?
-
I've been developing a QT 6.x multimedia audio application based off the concepts in the QT 6 version of 'audiooutput' multimedia sample application. I need to send blocks of 16 bit PCM audio (pulled from (std::unique_ptr<SynthDevice>) a type of QIODevice) to the speaker using a dedicated worker thread.
All the audio processing runs in a separate worker thread using the ActiveObject pattern suggested at about 5 minutes into this KDAB tutorial.
In order to send the audio to the speaker, the worker thread object needs a QAudioSink object (on windows implemented as QWindowsAudioSink). Unfortunately, this class contains a QTimer field (not a QTimer pointer field).
When I follow the Active Object described in the KDAB video to run the audio processing on a dedicated worker thread, I cannot initialize the QAudioSink in the constructor followed by the canonical moveToThread to get the QAudioSink or even std::unique_ptr<QAudioSink> to play nice in this worker object. Constructing the QAudioSink or QAudioSink pointer field in the RtpWorker's constructor also constructs the QTimer field in the main thread context.
Below you have my partially stripped RtpWorker class - which works (as I had to lazy construct the QAudioSink (and its corresponding QTimer field) in the handleStart slot (which runs in the worker thread context).
Note: the correct way (I think) would be for the QWindowsAudioSink class to use a pointer to a QTimer field (like I do in this worker class). I do not have problems starting and restarting or stopping my timers.
On a separate note, it would be great if someone could explain when and how to correctly use the parent field in the constructor of the RtpWorker (forgetting for the time being that there is a bug in QWindowsAudioSink) the correct use of the parent field (and specifically when to pass this or nullptr) confuses me.
If I pass (this) for the socket member it works fine however if I pass this for the QAudioSink object there can be many problems including a double delete - once from the worker context and again from the main thread which causes a segfault.
Thanks
#pragma once // SYSTEM INCLUDES #include <QObject> #include <QThread> #include <QTimerEvent> #include <QTimer> #include <QUdpSocket> #include <QBasicTimer> #include <QAudioSink> #include <QMediaDevices> #include <QNetworkDatagram> #include <spdlog/common.h> //#include <rtaudio/RtAudio.h> // APPLICATION INCLUDES #include "rtp/rtp.h" #include "SynthDevice.h" // EXTERNAL FUNCTIONS // EXTERNAL VARIABLES // CONSTANTS // STRUCTS // TYPEDEFS // FORWARD DECLARATIONS /** * Active Object QT pattern - see tutorial https://www.youtube.com/watch?v=SncJ3D-fO7g */ class RtpWorker final : public QObject { Q_OBJECT public: RtpWorker(const RtpWorker&) = delete; RtpWorker(RtpWorker&&) noexcept = delete; RtpWorker& operator=(const RtpWorker&) = delete; RtpWorker& operator=(RtpWorker&&) noexcept = delete; //! very important that mThread does not have parent as we need to move this object //! Note that members need this as parent for thread affinity to work RtpWorker() : mpThread{ std::make_unique<QThread>() } //!< must not pass parent , mpSocket{ std::make_unique<QUdpSocket>(this) } //!< 'this' parent for thread affinity , mpWatchdog { std::make_unique<QTimer>(this) } //!< 'this' parent for thread affinity , mpTimer{ std::make_unique<QBasicTimer>() } , mpAudioSink{} //!< do not initialize here. Certainly do not pass 'this' for parent thread affinity , mpSynthDevice{} , mTXBufferList{} , mTargetIP{} , mTargetPort{} , mTSIncrement{} , mTXCounter{0} { connect(this, &RtpWorker::start, this, &RtpWorker::handleStart); connect(this, &RtpWorker::stop, this, &RtpWorker::handleStop); connect(this, &RtpWorker::updateAudio, this, &RtpWorker::handleAudioUpdated); connect(mpSocket.get(), &QUdpSocket::readyRead, this, &RtpWorker::readyRead); connect(mpSocket.get(), &QUdpSocket::bytesWritten, this, &RtpWorker::handleBytesWritten); connect(mpWatchdog.get(), &QTimer::timeout, [&]() { // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP input disconnected"); // notify the main thread Q_EMIT watchdogStatusChanged(true, "input disconnected"); }); // thread affinity magic moveToThread(mpThread.get()); mpThread->start(); } ~RtpWorker() override { QMetaObject::invokeMethod(this, "cleanup"); mpThread->wait(); } Q_SIGNALS: void stop(); void start(const qint64, const QString&, const quint16, const qint64, const QByteArray&, const QByteArrayList&); void datagramsReady(const QByteArrayList&); void bytesWritten(qint64, bool); void watchdogStatusChanged(bool, const QString&); //! Signal to asynchronously notify application thread void workerStatusChange(const QString& rStatusText, const QString& rStyleSheet, int timeout = 0) const; //! IPC Signal from worker thread used to safely update mTXBufferList void updateAudio(const qint64, const QByteArrayList&); //! Add log entry void addLogEntry(const spdlog::level::level_enum, const QString&); // slots are all called in the worker thread context private Q_SLOTS: /** * Start signal handler. * * <p>This slot runs in the worker thread context. Creates a * buffered list of transmit UDP datagrams. Each datagram is prefixed * with an RTP header (including a sequence# & timestamp). * * <p>The first datagram will always have the marker bit set. * * @param aTSIncrement [in] Timestamp increment per datagram. * * @param rTargetIP [in] Target IP address. * * @param aTargetPort [in] Target UDP port. * * @param aDurationUsecs * [in] Duration (specified in microseconds) to * sleep between recurring timer callbacks. * This time represents the duration for all * <code>mTXBufferList</code> datagrams. * * @param rHeader [in] RTP header consisting of a QByteArray * containing an rtp_hdr_t with optional contributing * sources (up to 16) & rtp_hdr_ext_t fields & data if * the x bit of the rtp_hdr_t is set. * * @param rAudioData [in] raw multichannel audio. * */ void handleStart( const qint64 aTSIncrement, const QString& rTargetIP, const quint16 aTargetPort, const qint64 aDurationUsecs, const QByteArray& rHeader, const QByteArrayList& rAudioData) { mTXCounter = 0; mTSIncrement = aTSIncrement; mTargetIP = QHostAddress(rTargetIP); mTargetPort = aTargetPort; mTXBufferList.clear(); if (mTargetIP.isMulticast()) { mpSocket->bind(QHostAddress( QHostAddress::AnyIPv4), 0); } const auto headerLen = gHeaderLenLambda(rHeader); if (headerLen >= sizeof(rtp_hdr_t)) { // copy construct the existing header const auto rawHeader = std::make_unique<QByteArray>(rHeader); const auto rtpHeader = reinterpret_cast<rtp_hdr_t*>(rawHeader->data()); auto seqNum = rtpHeader->getSeqNum(); auto timeStamp = rtpHeader->getTimestamp(); for (const auto& next : rAudioData) { // set the marker indicating a significant boundary condition if (mTXBufferList.empty()) { rtpHeader->m = 1u; } else { rtpHeader->m = 0u; } rtpHeader->setSeqNum(seqNum); rtpHeader->setTimestamp(timeStamp); // each datagram consists of an rtp_hdr_t (12 bytes + // optional extensions enclosed via headerLen) followed by // the raw audio data. QByteArrayList temp = { *rawHeader, next }; mTXBufferList.emplace_back(temp.join()); seqNum++; timeStamp += mTSIncrement; } } mpSynthDevice = std::make_unique<SynthDevice>(rAudioData); mpSynthDevice->start(); // contains QTimer (non pointer), must create in context of new thread // this is a bug in QT as far as I can tell. if (!mpAudioSink) { mpAudioSink = std::make_unique<QAudioSink>( QMediaDevices::defaultAudioOutput(), QMediaDevices::defaultAudioOutput(). preferredFormat()); } // this will cause the pushservice to pull from // the SynthDevice as required. mpAudioSink->start(mpSynthDevice.get()); // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP thread started"); // notify main thread that we successfully started Q_EMIT workerStatusChange("RTP thread started", "color:blue;font-weight:bold;", 4000); mpTimer->start(static_cast<int>(aDurationUsecs / 1000), Qt::PreciseTimer, this); } /** * Audio updated slot handler. * * @param aTSIncrement [in] Timestamp increment per datagram. * * @param rAudioBuffers [in] pure multichannel audio data. */ void handleAudioUpdated( const qint64 aTSIncrement, const QByteArrayList& rAudioBuffers) { mTSIncrement = aTSIncrement; // must have a previous set of buffers so we can pick up // from the last known good sequence number sent. // not sure what happens if this slot is called // while the RtpWorker thead is stopped if (!mTXBufferList.isEmpty()) { // use last rtp header to continue sequence number from // the last transmitted sequence number const auto headerLen = gHeaderLenLambda(mTXBufferList.back()); const auto rawHeader = std::make_unique<QByteArray>( mTXBufferList.back().sliced(0, headerLen)); const auto rtpHeader = reinterpret_cast<rtp_hdr_t*>(rawHeader->data()); auto seqNum = rtpHeader->getSeqNum() + 1; auto timeStamp = rtpHeader->getTimestamp() + mTSIncrement; mTXBufferList.clear(); for (const auto& next : rAudioBuffers) { if (mTXBufferList.empty()) { rtpHeader->m = 1u; } else { rtpHeader->m = 0u; } rtpHeader->setSeqNum(seqNum); rtpHeader->setTimestamp(timeStamp); // each datagram consists of an rtp_hdr_t (12 bytes + // optional extensions) followed by // the raw audio data. QByteArrayList temp = { *rawHeader, next }; mTXBufferList.emplace_back(temp.join()); seqNum++; timeStamp += mTSIncrement; } } } /** * QUdpSocket slot readyRead handler. This is where we read the * RTP data echoed back from the target. */ void readyRead() { QByteArrayList datagrams; while (mpSocket->hasPendingDatagrams()) { // each datagram consists of an RTP header followed by audio data const auto& networkDatagram = mpSocket->receiveDatagram(); if (networkDatagram.isValid()) { datagrams.emplace_back(networkDatagram.data()); } } // send whatever we have so far. Note that the datagram sizes might // not all be the same size if the audio configuration was changed // while still running. This will be handled in the main thread if (!datagrams.isEmpty()) { Q_EMIT datagramsReady(datagrams); // handle watchdog if (!mpWatchdog->isActive()) { // start watchdog mpWatchdog->setSingleShot(true); mpWatchdog->start(2000); } else { // pet the watchdog mpWatchdog->start(2000); } } } //! slot to handle bytes written void handleBytesWritten(qint64 aBytesWritten) { // forward to GUI Q_EMIT bytesWritten(aBytesWritten, false); } //! Stop slot handler void handleStop() { mpTimer->stop(); mpSocket->close(); mpWatchdog->stop(); mTXBufferList.clear(); mTSIncrement = 0; mTXCounter = 0; mpSynthDevice->stop(); mpAudioSink->stop(); // notify main thread that we successfully started Q_EMIT addLogEntry(spdlog::level::info, "RTP thread stopped"); Q_EMIT workerStatusChange("RTP thread stopped", "color:blue;font-weight:bold;", 4000); } //! cleanup method - called in worker thread context void cleanup() { // delete timer, socket... etc. mpTimer.reset(); mpSocket.reset(); mpWatchdog.reset(); mpSynthDevice.reset(); mpAudioSink.reset(); mTSIncrement = 0; mTXCounter = 0; // calling thread quit makes exec() event loop return mpThread->quit(); } /** * Timer callback event handler. * * @param event [in] timer event. */ void timerEvent(QTimerEvent* event) override { // send all mTXData datagrams if (event->timerId() == mpTimer->timerId()) { // blast out the entire block queued up datagrams at once for (auto& next : mTXBufferList) { mpSocket->writeDatagram( next, mTargetIP, mTargetPort); } mTXCounter += mTXBufferList.size(); // TODO change to use a sent packet stats counter const auto lastTxHeader = reinterpret_cast<rtp_hdr_t*>( mTXBufferList.back().data()); const auto lastSentSeq = lastTxHeader->getSeqNum(); const auto lastSentTS = lastTxHeader->getTimestamp(); // update all rtp headers in the mTXBufferList for (auto i = 0u; i < mTXBufferList.size(); ++i) { // update each header in the mTXBufferList // making sure to increment the timestamp, sequence number and // reset the marker bit as it has just been transmitted const auto rtpHeader = reinterpret_cast< rtp_hdr_t*>(mTXBufferList[i].data()); // increment the sequence numbers rtpHeader->setSeqNum(lastSentSeq + i + 1); // increment timestamp rtpHeader->setTimestamp(lastSentTS + (i + 1) * mTSIncrement); // reset the marker bit so it doesn't keep getting transmitted rtpHeader->m = 0u; } } else { // pass unhandled timer event up the chain QObject::timerEvent(event); } } private: std::unique_ptr<QThread> mpThread; std::unique_ptr<QUdpSocket> mpSocket; std::unique_ptr<QTimer> mpWatchdog; std::unique_ptr<QBasicTimer> mpTimer; //! Audio Output std::unique_ptr<QAudioSink> mpAudioSink; //! Audio Generator std::unique_ptr<SynthDevice> mpSynthDevice; QByteArrayList mTXBufferList; QHostAddress mTargetIP; quint16 mTargetPort; qint64 mTSIncrement; qint64 mTXCounter; };
-
Bug for reference: https://bugreports.qt.io/browse/QTBUG-108187
-
I've been developing a QT 6.x multimedia audio application based off the concepts in the QT 6 version of 'audiooutput' multimedia sample application. I need to send blocks of 16 bit PCM audio (pulled from (std::unique_ptr<SynthDevice>) a type of QIODevice) to the speaker using a dedicated worker thread.
All the audio processing runs in a separate worker thread using the ActiveObject pattern suggested at about 5 minutes into this KDAB tutorial.
In order to send the audio to the speaker, the worker thread object needs a QAudioSink object (on windows implemented as QWindowsAudioSink). Unfortunately, this class contains a QTimer field (not a QTimer pointer field).
When I follow the Active Object described in the KDAB video to run the audio processing on a dedicated worker thread, I cannot initialize the QAudioSink in the constructor followed by the canonical moveToThread to get the QAudioSink or even std::unique_ptr<QAudioSink> to play nice in this worker object. Constructing the QAudioSink or QAudioSink pointer field in the RtpWorker's constructor also constructs the QTimer field in the main thread context.
Below you have my partially stripped RtpWorker class - which works (as I had to lazy construct the QAudioSink (and its corresponding QTimer field) in the handleStart slot (which runs in the worker thread context).
Note: the correct way (I think) would be for the QWindowsAudioSink class to use a pointer to a QTimer field (like I do in this worker class). I do not have problems starting and restarting or stopping my timers.
On a separate note, it would be great if someone could explain when and how to correctly use the parent field in the constructor of the RtpWorker (forgetting for the time being that there is a bug in QWindowsAudioSink) the correct use of the parent field (and specifically when to pass this or nullptr) confuses me.
If I pass (this) for the socket member it works fine however if I pass this for the QAudioSink object there can be many problems including a double delete - once from the worker context and again from the main thread which causes a segfault.
Thanks
#pragma once // SYSTEM INCLUDES #include <QObject> #include <QThread> #include <QTimerEvent> #include <QTimer> #include <QUdpSocket> #include <QBasicTimer> #include <QAudioSink> #include <QMediaDevices> #include <QNetworkDatagram> #include <spdlog/common.h> //#include <rtaudio/RtAudio.h> // APPLICATION INCLUDES #include "rtp/rtp.h" #include "SynthDevice.h" // EXTERNAL FUNCTIONS // EXTERNAL VARIABLES // CONSTANTS // STRUCTS // TYPEDEFS // FORWARD DECLARATIONS /** * Active Object QT pattern - see tutorial https://www.youtube.com/watch?v=SncJ3D-fO7g */ class RtpWorker final : public QObject { Q_OBJECT public: RtpWorker(const RtpWorker&) = delete; RtpWorker(RtpWorker&&) noexcept = delete; RtpWorker& operator=(const RtpWorker&) = delete; RtpWorker& operator=(RtpWorker&&) noexcept = delete; //! very important that mThread does not have parent as we need to move this object //! Note that members need this as parent for thread affinity to work RtpWorker() : mpThread{ std::make_unique<QThread>() } //!< must not pass parent , mpSocket{ std::make_unique<QUdpSocket>(this) } //!< 'this' parent for thread affinity , mpWatchdog { std::make_unique<QTimer>(this) } //!< 'this' parent for thread affinity , mpTimer{ std::make_unique<QBasicTimer>() } , mpAudioSink{} //!< do not initialize here. Certainly do not pass 'this' for parent thread affinity , mpSynthDevice{} , mTXBufferList{} , mTargetIP{} , mTargetPort{} , mTSIncrement{} , mTXCounter{0} { connect(this, &RtpWorker::start, this, &RtpWorker::handleStart); connect(this, &RtpWorker::stop, this, &RtpWorker::handleStop); connect(this, &RtpWorker::updateAudio, this, &RtpWorker::handleAudioUpdated); connect(mpSocket.get(), &QUdpSocket::readyRead, this, &RtpWorker::readyRead); connect(mpSocket.get(), &QUdpSocket::bytesWritten, this, &RtpWorker::handleBytesWritten); connect(mpWatchdog.get(), &QTimer::timeout, [&]() { // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP input disconnected"); // notify the main thread Q_EMIT watchdogStatusChanged(true, "input disconnected"); }); // thread affinity magic moveToThread(mpThread.get()); mpThread->start(); } ~RtpWorker() override { QMetaObject::invokeMethod(this, "cleanup"); mpThread->wait(); } Q_SIGNALS: void stop(); void start(const qint64, const QString&, const quint16, const qint64, const QByteArray&, const QByteArrayList&); void datagramsReady(const QByteArrayList&); void bytesWritten(qint64, bool); void watchdogStatusChanged(bool, const QString&); //! Signal to asynchronously notify application thread void workerStatusChange(const QString& rStatusText, const QString& rStyleSheet, int timeout = 0) const; //! IPC Signal from worker thread used to safely update mTXBufferList void updateAudio(const qint64, const QByteArrayList&); //! Add log entry void addLogEntry(const spdlog::level::level_enum, const QString&); // slots are all called in the worker thread context private Q_SLOTS: /** * Start signal handler. * * <p>This slot runs in the worker thread context. Creates a * buffered list of transmit UDP datagrams. Each datagram is prefixed * with an RTP header (including a sequence# & timestamp). * * <p>The first datagram will always have the marker bit set. * * @param aTSIncrement [in] Timestamp increment per datagram. * * @param rTargetIP [in] Target IP address. * * @param aTargetPort [in] Target UDP port. * * @param aDurationUsecs * [in] Duration (specified in microseconds) to * sleep between recurring timer callbacks. * This time represents the duration for all * <code>mTXBufferList</code> datagrams. * * @param rHeader [in] RTP header consisting of a QByteArray * containing an rtp_hdr_t with optional contributing * sources (up to 16) & rtp_hdr_ext_t fields & data if * the x bit of the rtp_hdr_t is set. * * @param rAudioData [in] raw multichannel audio. * */ void handleStart( const qint64 aTSIncrement, const QString& rTargetIP, const quint16 aTargetPort, const qint64 aDurationUsecs, const QByteArray& rHeader, const QByteArrayList& rAudioData) { mTXCounter = 0; mTSIncrement = aTSIncrement; mTargetIP = QHostAddress(rTargetIP); mTargetPort = aTargetPort; mTXBufferList.clear(); if (mTargetIP.isMulticast()) { mpSocket->bind(QHostAddress( QHostAddress::AnyIPv4), 0); } const auto headerLen = gHeaderLenLambda(rHeader); if (headerLen >= sizeof(rtp_hdr_t)) { // copy construct the existing header const auto rawHeader = std::make_unique<QByteArray>(rHeader); const auto rtpHeader = reinterpret_cast<rtp_hdr_t*>(rawHeader->data()); auto seqNum = rtpHeader->getSeqNum(); auto timeStamp = rtpHeader->getTimestamp(); for (const auto& next : rAudioData) { // set the marker indicating a significant boundary condition if (mTXBufferList.empty()) { rtpHeader->m = 1u; } else { rtpHeader->m = 0u; } rtpHeader->setSeqNum(seqNum); rtpHeader->setTimestamp(timeStamp); // each datagram consists of an rtp_hdr_t (12 bytes + // optional extensions enclosed via headerLen) followed by // the raw audio data. QByteArrayList temp = { *rawHeader, next }; mTXBufferList.emplace_back(temp.join()); seqNum++; timeStamp += mTSIncrement; } } mpSynthDevice = std::make_unique<SynthDevice>(rAudioData); mpSynthDevice->start(); // contains QTimer (non pointer), must create in context of new thread // this is a bug in QT as far as I can tell. if (!mpAudioSink) { mpAudioSink = std::make_unique<QAudioSink>( QMediaDevices::defaultAudioOutput(), QMediaDevices::defaultAudioOutput(). preferredFormat()); } // this will cause the pushservice to pull from // the SynthDevice as required. mpAudioSink->start(mpSynthDevice.get()); // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP thread started"); // notify main thread that we successfully started Q_EMIT workerStatusChange("RTP thread started", "color:blue;font-weight:bold;", 4000); mpTimer->start(static_cast<int>(aDurationUsecs / 1000), Qt::PreciseTimer, this); } /** * Audio updated slot handler. * * @param aTSIncrement [in] Timestamp increment per datagram. * * @param rAudioBuffers [in] pure multichannel audio data. */ void handleAudioUpdated( const qint64 aTSIncrement, const QByteArrayList& rAudioBuffers) { mTSIncrement = aTSIncrement; // must have a previous set of buffers so we can pick up // from the last known good sequence number sent. // not sure what happens if this slot is called // while the RtpWorker thead is stopped if (!mTXBufferList.isEmpty()) { // use last rtp header to continue sequence number from // the last transmitted sequence number const auto headerLen = gHeaderLenLambda(mTXBufferList.back()); const auto rawHeader = std::make_unique<QByteArray>( mTXBufferList.back().sliced(0, headerLen)); const auto rtpHeader = reinterpret_cast<rtp_hdr_t*>(rawHeader->data()); auto seqNum = rtpHeader->getSeqNum() + 1; auto timeStamp = rtpHeader->getTimestamp() + mTSIncrement; mTXBufferList.clear(); for (const auto& next : rAudioBuffers) { if (mTXBufferList.empty()) { rtpHeader->m = 1u; } else { rtpHeader->m = 0u; } rtpHeader->setSeqNum(seqNum); rtpHeader->setTimestamp(timeStamp); // each datagram consists of an rtp_hdr_t (12 bytes + // optional extensions) followed by // the raw audio data. QByteArrayList temp = { *rawHeader, next }; mTXBufferList.emplace_back(temp.join()); seqNum++; timeStamp += mTSIncrement; } } } /** * QUdpSocket slot readyRead handler. This is where we read the * RTP data echoed back from the target. */ void readyRead() { QByteArrayList datagrams; while (mpSocket->hasPendingDatagrams()) { // each datagram consists of an RTP header followed by audio data const auto& networkDatagram = mpSocket->receiveDatagram(); if (networkDatagram.isValid()) { datagrams.emplace_back(networkDatagram.data()); } } // send whatever we have so far. Note that the datagram sizes might // not all be the same size if the audio configuration was changed // while still running. This will be handled in the main thread if (!datagrams.isEmpty()) { Q_EMIT datagramsReady(datagrams); // handle watchdog if (!mpWatchdog->isActive()) { // start watchdog mpWatchdog->setSingleShot(true); mpWatchdog->start(2000); } else { // pet the watchdog mpWatchdog->start(2000); } } } //! slot to handle bytes written void handleBytesWritten(qint64 aBytesWritten) { // forward to GUI Q_EMIT bytesWritten(aBytesWritten, false); } //! Stop slot handler void handleStop() { mpTimer->stop(); mpSocket->close(); mpWatchdog->stop(); mTXBufferList.clear(); mTSIncrement = 0; mTXCounter = 0; mpSynthDevice->stop(); mpAudioSink->stop(); // notify main thread that we successfully started Q_EMIT addLogEntry(spdlog::level::info, "RTP thread stopped"); Q_EMIT workerStatusChange("RTP thread stopped", "color:blue;font-weight:bold;", 4000); } //! cleanup method - called in worker thread context void cleanup() { // delete timer, socket... etc. mpTimer.reset(); mpSocket.reset(); mpWatchdog.reset(); mpSynthDevice.reset(); mpAudioSink.reset(); mTSIncrement = 0; mTXCounter = 0; // calling thread quit makes exec() event loop return mpThread->quit(); } /** * Timer callback event handler. * * @param event [in] timer event. */ void timerEvent(QTimerEvent* event) override { // send all mTXData datagrams if (event->timerId() == mpTimer->timerId()) { // blast out the entire block queued up datagrams at once for (auto& next : mTXBufferList) { mpSocket->writeDatagram( next, mTargetIP, mTargetPort); } mTXCounter += mTXBufferList.size(); // TODO change to use a sent packet stats counter const auto lastTxHeader = reinterpret_cast<rtp_hdr_t*>( mTXBufferList.back().data()); const auto lastSentSeq = lastTxHeader->getSeqNum(); const auto lastSentTS = lastTxHeader->getTimestamp(); // update all rtp headers in the mTXBufferList for (auto i = 0u; i < mTXBufferList.size(); ++i) { // update each header in the mTXBufferList // making sure to increment the timestamp, sequence number and // reset the marker bit as it has just been transmitted const auto rtpHeader = reinterpret_cast< rtp_hdr_t*>(mTXBufferList[i].data()); // increment the sequence numbers rtpHeader->setSeqNum(lastSentSeq + i + 1); // increment timestamp rtpHeader->setTimestamp(lastSentTS + (i + 1) * mTSIncrement); // reset the marker bit so it doesn't keep getting transmitted rtpHeader->m = 0u; } } else { // pass unhandled timer event up the chain QObject::timerEvent(event); } } private: std::unique_ptr<QThread> mpThread; std::unique_ptr<QUdpSocket> mpSocket; std::unique_ptr<QTimer> mpWatchdog; std::unique_ptr<QBasicTimer> mpTimer; //! Audio Output std::unique_ptr<QAudioSink> mpAudioSink; //! Audio Generator std::unique_ptr<SynthDevice> mpSynthDevice; QByteArrayList mTXBufferList; QHostAddress mTargetIP; quint16 mTargetPort; qint64 mTSIncrement; qint64 mTXCounter; };
@johnco3 said in QT 6.x QAudioSink potential bug?:
I cannot initialize the QAudioSink in the constructor followed by the canonical moveToThread
what does this mean?
-
@johnco3 said in QT 6.x QAudioSink potential bug?:
I cannot initialize the QAudioSink in the constructor followed by the canonical moveToThread
what does this mean?
@Christian-Ehrlicher Christian, from the included code, you will see that the default (empty constructor for a std::unique_ptr<QAudioSink> is called (this happens from the QT main GUI thread). If the default constructor for QAudioSink was called here, QAudioSink's nested QTimer field would be initialized in the main QT GUI thread context. This becomes a problem later when cleaning up the RtpWorker in the handleStop slot (called in the worker thread context). I saw the following console error QObject::killTimer: Timers cannot be stopped from another thread. I tracked this down to the call to mpAudioSink->stop() in the aforementioned slot. (Under the covers, this calls QWindowsAudioSink::close() which calls deviceStateChange(QAudio::StoppedState, QAudio::NoError) which in turn raises this error message. By 'canonical' I mean the de-factor way of doing this right in future. For me that means adopting the Active Object approach where all member objects (that behave well (unlike QAudioSink))
-
I don't really see it - a std:uinque_ptr() does not call anything during creation.
But when you don't pass a parent to an object then it will be created in the current thread. and you have to move it to another thread by yourself if you want. When you pass a parent and then move the parent to another thread all chidldren are moved too.
-
Hi,
On the parent topic. You would pass this when you want the created object to be handled through the parent / child tree from the creator object. Note that reparenting can happen. For example when you set a widget in a layout.
-
I don't really see it - a std:uinque_ptr() does not call anything during creation.
But when you don't pass a parent to an object then it will be created in the current thread. and you have to move it to another thread by yourself if you want. When you pass a parent and then move the parent to another thread all chidldren are moved too.
@Christian-Ehrlicher Hello Again Christian, sorry I was trying to edit the above reply but the edits are not going through. Let's try pasting here to hopefully clarify the exact source of the issues: from the included code, you will see that the default (empty constructor for a std::unique_ptr<QAudioSink> is called (this happens from the QT main GUI thread). If the default constructor for QAudioSink was called here, QAudioSink's nested QTimer field would be initialized in the main QT GUI thread context. This becomes a problem later when cleaning up the RtpWorker in the handleStop slot (called in the worker thread context). I saw the following console error QObject::killTimer: Timers cannot be stopped from another thread. I tracked this down to the call to mpAudioSink->stop() in the aforementioned slot. (Under the covers, this calls QWindowsAudioSink::close() which calls deviceStateChange(QAudio::StoppedState, QAudio::NoError) which in turn raises this error message. By 'canonical' I mean the de-factor way of doing this right in future. For me that means adopting the Active Object approach where all member objects (that behave well (unlike QAudioSink))
-
@SGaist and I already told you what happens and what you did wrong.
-
Hi,
On the parent topic. You would pass this when you want the created object to be handled through the parent / child tree from the creator object. Note that reparenting can happen. For example when you set a widget in a layout.
This sounds promising. Perhaps the QAudioSink does not have a bug then. So, given that I want the QAudioSink to be constructed in the Main thread (in my RtpWorker constructor) and later moved to operate in the worker thread context where it can be safely opened, closed, reset etc., the recommended approach would be to pass 'this' to the QAudioSink's constructor in my constructor. Presumably the call to 'moveToThread(mpThread.get())' right before calling mpThread->start(). This sounded great, however after I tried it I got the same thread ownership errors. Not sure what I am doing wrong. Here is the updated constructor if it helps
```
RtpWorker()
: mpThread{ std::make_unique<QThread>() } //!< must not pass parent
, mpSocket{ std::make_unique<QUdpSocket>(this) } //!< 'this' parent for thread affinity
, mpWatchdog{ std::make_unique<QTimer>(this) } //!< 'this' parent for thread affinity
, mpTimer{ std::make_unique<QBasicTimer>() }
, mpAudioSink{}
, mpSynthDevice{}
, mTXBufferList{}
, mTargetIP{}
, mTargetPort{}
, mTSIncrement{}
, mTXCounter{0}
{
mpAudioSink = std::make_unique<QAudioSink>(
QMediaDevices::defaultAudioOutput(),
QMediaDevices::defaultAudioOutput().
preferredFormat(), this);
connect(this, &RtpWorker::start, this, &RtpWorker::handleStart);
connect(this, &RtpWorker::stop, this, &RtpWorker::handleStop);
connect(this, &RtpWorker::updateAudio, this, &RtpWorker::handleAudioUpdated);
connect(mpSocket.get(), &QUdpSocket::readyRead, this, &RtpWorker::readyRead);
connect(mpSocket.get(), &QUdpSocket::bytesWritten, this, &RtpWorker::handleBytesWritten);
connect(mpWatchdog.get(), &QTimer::timeout, & {
// update log tab
Q_EMIT addLogEntry(spdlog::level::info, "RTP input disconnected");
// notify the main thread
Q_EMIT watchdogStatusChanged(true, "input disconnected");
});// thread affinity magic moveToThread(mpThread.get()); mpThread->start(); }
-
@SGaist and I already told you what happens and what you did wrong.
@Christian-Ehrlicher I think I found a very useful piece of information that kind of points towards the canonical way that these Active Object worker objects should be used. I think my mistake is that I allocate things in the RtpWorker constructor. These objects are owned by the main QT thread and I am not sure if moveToThread does anything with these. I found in https://wiki.qt.io/QThreads_general_usage the following important note:
By the way, one extremely important thing to note here is that you should NEVER allocate heap objects (using new) in the constructor of the QObject class as this allocation is then performed on the main thread and not on the new QThread instance, meaning that the newly created object is then owned by the main thread and not the QThread instance. This will make your code fail to work. Instead, allocate such resources in the main function slot such as process() in this case as when that is called the object will be on the new thread instance and thus it will own the resource.
I think I got lucky with the other fields in the worker's constructor (the timer and udpsocket) but the QAudioSink exposed the flaw.
-
@Christian-Ehrlicher I think I found a very useful piece of information that kind of points towards the canonical way that these Active Object worker objects should be used. I think my mistake is that I allocate things in the RtpWorker constructor. These objects are owned by the main QT thread and I am not sure if moveToThread does anything with these. I found in https://wiki.qt.io/QThreads_general_usage the following important note:
By the way, one extremely important thing to note here is that you should NEVER allocate heap objects (using new) in the constructor of the QObject class as this allocation is then performed on the main thread and not on the new QThread instance, meaning that the newly created object is then owned by the main thread and not the QThread instance. This will make your code fail to work. Instead, allocate such resources in the main function slot such as process() in this case as when that is called the object will be on the new thread instance and thus it will own the resource.
I think I got lucky with the other fields in the worker's constructor (the timer and udpsocket) but the QAudioSink exposed the flaw.
@johnco3 said in QT 6.x QAudioSink potential bug?:
These objects are owned by the main QT thread and I am not sure if moveToThread does anything with these.
Third time: since you don't pass a proper parent you have to call moveToThread() on those objects by yourself.
-
@johnco3 said in QT 6.x QAudioSink potential bug?:
These objects are owned by the main QT thread and I am not sure if moveToThread does anything with these.
Third time: since you don't pass a proper parent you have to call moveToThread() on those objects by yourself.
@Christian-Ehrlicher Christian, I did not miss or ignore your comments/suggestions. I tried an explicit moveToThread on the std::unique_ptr<QAudioSink> member and it gave me thread ownership issues. Would you be able to point to a specific change in the code above to address this.
I am not sure I completely understand what you mean by a proper parent, I assume 'this' from the context of the constructor.
It seems that the pattern I followed to initialize the mpSocket and mpWatchdog smart pointers above follows the right type of thread affinity parenting. I noticed that the move to thread code under the covers sees that the RtpWorker class contains 2 child classes being moved. After I added a full unique_ptr<QAudioSink>(..., this) member initializer for the mpAudioSink, the child count increased to 3 in d->moveToThread_helper() function call inside the QObject::moveToThread implementation. I thought things should work fine after this change as I correctly addressed the affinity for the QAudioSink at least from the increased child count, I thought so.
Unfortunately I got errors indicating that timers could not be started and stopped from different threads. The timer starts and stops occur in the handleStart and handleStop worker thread slots, where mpAudioSink->start() or mpAudioSink->stop() is called. The mpAudioSink (which is a QWindowsAudioSink) contains a QTimer field (who's thread affinity is presumably questionable).
I did get the code to work by following the guideline I mentioned in the QT thread wiki (where constructors should steer clear of member initializing full fields - but it would be great if I could get to the bottom of why I was not able to move the thread affinity of the QAudioSink like I was able to do with the QUdpSocket and QTimer fields above. Here is my current working code where I initialize the sink in only in the worker thread context. I don't even pass a this to it when doing so as by default the thread affinity is the curreny thread (the worker in this case).
/** * Active Object QT pattern - see tutorial https://www.youtube.com/watch?v=SncJ3D-fO7g */ class RtpWorker final : public QObject { Q_OBJECT public: RtpWorker(const RtpWorker&) = delete; RtpWorker(RtpWorker&&) noexcept = delete; RtpWorker& operator=(const RtpWorker&) = delete; RtpWorker& operator=(RtpWorker&&) noexcept = delete; //! very important that mThread does not have parent as we need to move this object //! Note that members need this as parent for thread affinity to work RtpWorker() : mpThread{ std::make_unique<QThread>() } //!< must not pass parent , mpSocket{ std::make_unique<QUdpSocket>(this) } //!< 'this' parent for thread affinity , mpWatchdog{ std::make_unique<QTimer>(this) } //!< 'this' parent for thread affinity , mpTimer{ std::make_unique<QBasicTimer>() } , mpAudioSink{} , mpSynthDevice{} , mTXBufferList{} , mTargetIP{} , mTargetPort{} , mTSIncrement{} , mTXCounter{0} { connect(this, &RtpWorker::start, this, &RtpWorker::handleStart); connect(this, &RtpWorker::stop, this, &RtpWorker::handleStop); connect(this, &RtpWorker::updateAudio, this, &RtpWorker::handleAudioUpdated); connect(mpSocket.get(), &QUdpSocket::readyRead, this, &RtpWorker::readyRead); connect(mpSocket.get(), &QUdpSocket::bytesWritten, this, &RtpWorker::handleBytesWritten); connect(mpWatchdog.get(), &QTimer::timeout, [&]() { // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP input disconnected"); // notify the main thread Q_EMIT watchdogStatusChanged(true, "input disconnected"); }); // thread affinity magic moveToThread(mpThread.get()); mpThread->start(); } ~RtpWorker() override { QMetaObject::invokeMethod(this, "cleanup"); mpThread->wait(); } Q_SIGNALS: void stop(); void start(const qint64, const QString&, const quint16, const qint64, const QByteArray&, const QByteArrayList&); void datagramsReady(const QByteArrayList&); void bytesWritten(qint64, bool); void watchdogStatusChanged(bool, const QString&); //! Signal to asynchronously notify application thread void workerStatusChange(const QString& rStatusText, const QString& rStyleSheet, int timeout = 0) const; //! IPC Signal from worker thread used to safely update mTXBufferList void updateAudio(const qint64, const QByteArrayList&); //! Add log entry void addLogEntry(const spdlog::level::level_enum, const QString&); // slots are all called in the worker thread context private Q_SLOTS: void handleStart( const qint64 aTSIncrement, const QString& rTargetIP, const quint16 aTargetPort, const qint64 aDurationUsecs, const QByteArray& rHeader, const QByteArrayList& rAudioData) { mTXCounter = 0; mTSIncrement = aTSIncrement; mTargetIP = QHostAddress(rTargetIP); mTargetPort = aTargetPort; mTXBufferList.clear(); if (mTargetIP.isMulticast()) { mpSocket->bind(QHostAddress( QHostAddress::AnyIPv4), 0); } const auto headerLen = gHeaderLenLambda(rHeader); if (headerLen >= sizeof(rtp_hdr_t)) { // copy construct the existing header const auto rawHeader = std::make_unique<QByteArray>(rHeader); const auto rtpHeader = reinterpret_cast<rtp_hdr_t*>(rawHeader->data()); auto seqNum = rtpHeader->getSeqNum(); auto timeStamp = rtpHeader->getTimestamp(); for (const auto& next : rAudioData) { // set the marker indicating a significant boundary condition if (mTXBufferList.empty()) { rtpHeader->m = 1u; } else { rtpHeader->m = 0u; } rtpHeader->setSeqNum(seqNum); rtpHeader->setTimestamp(timeStamp); // each datagram consists of an rtp_hdr_t (12 bytes + // optional extensions enclosed via headerLen) followed by // the raw audio data. QByteArrayList temp = { *rawHeader, next }; mTXBufferList.emplace_back(temp.join()); seqNum++; timeStamp += mTSIncrement; } } mpSynthDevice = std::make_unique<SynthDevice>(rAudioData); mpAudioSink = std::make_unique<QAudioSink>( QMediaDevices::defaultAudioOutput(), QMediaDevices::defaultAudioOutput(). preferredFormat()); //connect(mpAudioSink.get(), &QAudioSink::stateChanged, [&](QAudio::State state) { // // update log tab // Q_EMIT addLogEntry(spdlog::level::info, "QAudioSink::stateChanged"); //}); // opens the device for read only mpSynthDevice->start(); // starts the streaming using the pull service mpAudioSink->start(mpSynthDevice.get()); // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP thread started"); // notify main thread that we successfully started Q_EMIT workerStatusChange("RTP thread started", "color:blue;font-weight:bold;", 4000); mpTimer->start(static_cast<int>(aDurationUsecs / 1000), Qt::PreciseTimer, this); } void handleAudioUpdated( const qint64 aTSIncrement, const QByteArrayList& rAudioData) { mTSIncrement = aTSIncrement; // must have a previous set of buffers so we can pick up // from the last known good sequence number sent. // Not sure what happens if this slot is called // while the RtpWorker thead is stopped if (!mTXBufferList.isEmpty()) { // use last rtp header to continue sequence number from // the last transmitted sequence number const auto headerLen = gHeaderLenLambda(mTXBufferList.back()); const auto rawHeader = std::make_unique<QByteArray>( mTXBufferList.back().sliced(0, headerLen)); const auto rtpHeader = reinterpret_cast<rtp_hdr_t*>(rawHeader->data()); auto seqNum = rtpHeader->getSeqNum() + 1; auto timeStamp = rtpHeader->getTimestamp() + mTSIncrement; mTXBufferList.clear(); for (const auto& next : rAudioData) { if (mTXBufferList.empty()) { rtpHeader->m = 1u; } else { rtpHeader->m = 0u; } rtpHeader->setSeqNum(seqNum); rtpHeader->setTimestamp(timeStamp); // each datagram consists of an rtp_hdr_t (12 bytes + // optional extensions) followed by // the raw audio data. QByteArrayList temp = { *rawHeader, next }; mTXBufferList.emplace_back(temp.join()); seqNum++; timeStamp += mTSIncrement; } } // if the sink & synth device are started and open if (mpSynthDevice && mpSynthDevice->isOpen()) { const auto rawAudio = rAudioData.join(); mpSynthDevice->writeData( rawAudio.constData(), rawAudio.size()); //mpAudioSink->stop(); //mpSynthDevice->stop(); //mpSynthDevice = std::make_unique<SynthDevice>(rAudioData); //mpAudioSink = std::make_unique<QAudioSink>( // QMediaDevices::defaultAudioOutput(), // QMediaDevices::defaultAudioOutput(). // preferredFormat()); // opens the device for read only //mpSynthDevice->start(); //// starts the streaming using the pull service //mpAudioSink->start(mpSynthDevice.get()); } } //! Stop slot handler void handleStop() { mpTimer->stop(); mpSocket->close(); mpWatchdog->stop(); mTXBufferList.clear(); mTSIncrement = 0; mTXCounter = 0; if (mpSynthDevice) { mpSynthDevice->stop(); } if (mpAudioSink) { mpAudioSink->stop(); } } //! cleanup method - called in worker thread context void cleanup() { // delete timer, socket... etc. mpTimer.reset(); mpSocket.reset(); mpWatchdog.reset(); mpAudioSink.reset(); mpSynthDevice.reset(); mTSIncrement = 0; mTXCounter = 0; // calling thread quit makes exec() event loop return mpThread->quit(); } private: std::unique_ptr<QThread> mpThread; std::unique_ptr<QUdpSocket> mpSocket; std::unique_ptr<QTimer> mpWatchdog; std::unique_ptr<QBasicTimer> mpTimer; //! Audio Output std::unique_ptr<QAudioSink> mpAudioSink; //! Audio Generator std::unique_ptr<SynthDevice> mpSynthDevice; QByteArrayList mTXBufferList; QHostAddress mTargetIP; quint16 mTargetPort; qint64 mTSIncrement; qint64 mTXCounter; };
-
@Christian-Ehrlicher Christian, I did not miss or ignore your comments/suggestions. I tried an explicit moveToThread on the std::unique_ptr<QAudioSink> member and it gave me thread ownership issues. Would you be able to point to a specific change in the code above to address this.
I am not sure I completely understand what you mean by a proper parent, I assume 'this' from the context of the constructor.
It seems that the pattern I followed to initialize the mpSocket and mpWatchdog smart pointers above follows the right type of thread affinity parenting. I noticed that the move to thread code under the covers sees that the RtpWorker class contains 2 child classes being moved. After I added a full unique_ptr<QAudioSink>(..., this) member initializer for the mpAudioSink, the child count increased to 3 in d->moveToThread_helper() function call inside the QObject::moveToThread implementation. I thought things should work fine after this change as I correctly addressed the affinity for the QAudioSink at least from the increased child count, I thought so.
Unfortunately I got errors indicating that timers could not be started and stopped from different threads. The timer starts and stops occur in the handleStart and handleStop worker thread slots, where mpAudioSink->start() or mpAudioSink->stop() is called. The mpAudioSink (which is a QWindowsAudioSink) contains a QTimer field (who's thread affinity is presumably questionable).
I did get the code to work by following the guideline I mentioned in the QT thread wiki (where constructors should steer clear of member initializing full fields - but it would be great if I could get to the bottom of why I was not able to move the thread affinity of the QAudioSink like I was able to do with the QUdpSocket and QTimer fields above. Here is my current working code where I initialize the sink in only in the worker thread context. I don't even pass a this to it when doing so as by default the thread affinity is the curreny thread (the worker in this case).
/** * Active Object QT pattern - see tutorial https://www.youtube.com/watch?v=SncJ3D-fO7g */ class RtpWorker final : public QObject { Q_OBJECT public: RtpWorker(const RtpWorker&) = delete; RtpWorker(RtpWorker&&) noexcept = delete; RtpWorker& operator=(const RtpWorker&) = delete; RtpWorker& operator=(RtpWorker&&) noexcept = delete; //! very important that mThread does not have parent as we need to move this object //! Note that members need this as parent for thread affinity to work RtpWorker() : mpThread{ std::make_unique<QThread>() } //!< must not pass parent , mpSocket{ std::make_unique<QUdpSocket>(this) } //!< 'this' parent for thread affinity , mpWatchdog{ std::make_unique<QTimer>(this) } //!< 'this' parent for thread affinity , mpTimer{ std::make_unique<QBasicTimer>() } , mpAudioSink{} , mpSynthDevice{} , mTXBufferList{} , mTargetIP{} , mTargetPort{} , mTSIncrement{} , mTXCounter{0} { connect(this, &RtpWorker::start, this, &RtpWorker::handleStart); connect(this, &RtpWorker::stop, this, &RtpWorker::handleStop); connect(this, &RtpWorker::updateAudio, this, &RtpWorker::handleAudioUpdated); connect(mpSocket.get(), &QUdpSocket::readyRead, this, &RtpWorker::readyRead); connect(mpSocket.get(), &QUdpSocket::bytesWritten, this, &RtpWorker::handleBytesWritten); connect(mpWatchdog.get(), &QTimer::timeout, [&]() { // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP input disconnected"); // notify the main thread Q_EMIT watchdogStatusChanged(true, "input disconnected"); }); // thread affinity magic moveToThread(mpThread.get()); mpThread->start(); } ~RtpWorker() override { QMetaObject::invokeMethod(this, "cleanup"); mpThread->wait(); } Q_SIGNALS: void stop(); void start(const qint64, const QString&, const quint16, const qint64, const QByteArray&, const QByteArrayList&); void datagramsReady(const QByteArrayList&); void bytesWritten(qint64, bool); void watchdogStatusChanged(bool, const QString&); //! Signal to asynchronously notify application thread void workerStatusChange(const QString& rStatusText, const QString& rStyleSheet, int timeout = 0) const; //! IPC Signal from worker thread used to safely update mTXBufferList void updateAudio(const qint64, const QByteArrayList&); //! Add log entry void addLogEntry(const spdlog::level::level_enum, const QString&); // slots are all called in the worker thread context private Q_SLOTS: void handleStart( const qint64 aTSIncrement, const QString& rTargetIP, const quint16 aTargetPort, const qint64 aDurationUsecs, const QByteArray& rHeader, const QByteArrayList& rAudioData) { mTXCounter = 0; mTSIncrement = aTSIncrement; mTargetIP = QHostAddress(rTargetIP); mTargetPort = aTargetPort; mTXBufferList.clear(); if (mTargetIP.isMulticast()) { mpSocket->bind(QHostAddress( QHostAddress::AnyIPv4), 0); } const auto headerLen = gHeaderLenLambda(rHeader); if (headerLen >= sizeof(rtp_hdr_t)) { // copy construct the existing header const auto rawHeader = std::make_unique<QByteArray>(rHeader); const auto rtpHeader = reinterpret_cast<rtp_hdr_t*>(rawHeader->data()); auto seqNum = rtpHeader->getSeqNum(); auto timeStamp = rtpHeader->getTimestamp(); for (const auto& next : rAudioData) { // set the marker indicating a significant boundary condition if (mTXBufferList.empty()) { rtpHeader->m = 1u; } else { rtpHeader->m = 0u; } rtpHeader->setSeqNum(seqNum); rtpHeader->setTimestamp(timeStamp); // each datagram consists of an rtp_hdr_t (12 bytes + // optional extensions enclosed via headerLen) followed by // the raw audio data. QByteArrayList temp = { *rawHeader, next }; mTXBufferList.emplace_back(temp.join()); seqNum++; timeStamp += mTSIncrement; } } mpSynthDevice = std::make_unique<SynthDevice>(rAudioData); mpAudioSink = std::make_unique<QAudioSink>( QMediaDevices::defaultAudioOutput(), QMediaDevices::defaultAudioOutput(). preferredFormat()); //connect(mpAudioSink.get(), &QAudioSink::stateChanged, [&](QAudio::State state) { // // update log tab // Q_EMIT addLogEntry(spdlog::level::info, "QAudioSink::stateChanged"); //}); // opens the device for read only mpSynthDevice->start(); // starts the streaming using the pull service mpAudioSink->start(mpSynthDevice.get()); // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP thread started"); // notify main thread that we successfully started Q_EMIT workerStatusChange("RTP thread started", "color:blue;font-weight:bold;", 4000); mpTimer->start(static_cast<int>(aDurationUsecs / 1000), Qt::PreciseTimer, this); } void handleAudioUpdated( const qint64 aTSIncrement, const QByteArrayList& rAudioData) { mTSIncrement = aTSIncrement; // must have a previous set of buffers so we can pick up // from the last known good sequence number sent. // Not sure what happens if this slot is called // while the RtpWorker thead is stopped if (!mTXBufferList.isEmpty()) { // use last rtp header to continue sequence number from // the last transmitted sequence number const auto headerLen = gHeaderLenLambda(mTXBufferList.back()); const auto rawHeader = std::make_unique<QByteArray>( mTXBufferList.back().sliced(0, headerLen)); const auto rtpHeader = reinterpret_cast<rtp_hdr_t*>(rawHeader->data()); auto seqNum = rtpHeader->getSeqNum() + 1; auto timeStamp = rtpHeader->getTimestamp() + mTSIncrement; mTXBufferList.clear(); for (const auto& next : rAudioData) { if (mTXBufferList.empty()) { rtpHeader->m = 1u; } else { rtpHeader->m = 0u; } rtpHeader->setSeqNum(seqNum); rtpHeader->setTimestamp(timeStamp); // each datagram consists of an rtp_hdr_t (12 bytes + // optional extensions) followed by // the raw audio data. QByteArrayList temp = { *rawHeader, next }; mTXBufferList.emplace_back(temp.join()); seqNum++; timeStamp += mTSIncrement; } } // if the sink & synth device are started and open if (mpSynthDevice && mpSynthDevice->isOpen()) { const auto rawAudio = rAudioData.join(); mpSynthDevice->writeData( rawAudio.constData(), rawAudio.size()); //mpAudioSink->stop(); //mpSynthDevice->stop(); //mpSynthDevice = std::make_unique<SynthDevice>(rAudioData); //mpAudioSink = std::make_unique<QAudioSink>( // QMediaDevices::defaultAudioOutput(), // QMediaDevices::defaultAudioOutput(). // preferredFormat()); // opens the device for read only //mpSynthDevice->start(); //// starts the streaming using the pull service //mpAudioSink->start(mpSynthDevice.get()); } } //! Stop slot handler void handleStop() { mpTimer->stop(); mpSocket->close(); mpWatchdog->stop(); mTXBufferList.clear(); mTSIncrement = 0; mTXCounter = 0; if (mpSynthDevice) { mpSynthDevice->stop(); } if (mpAudioSink) { mpAudioSink->stop(); } } //! cleanup method - called in worker thread context void cleanup() { // delete timer, socket... etc. mpTimer.reset(); mpSocket.reset(); mpWatchdog.reset(); mpAudioSink.reset(); mpSynthDevice.reset(); mTSIncrement = 0; mTXCounter = 0; // calling thread quit makes exec() event loop return mpThread->quit(); } private: std::unique_ptr<QThread> mpThread; std::unique_ptr<QUdpSocket> mpSocket; std::unique_ptr<QTimer> mpWatchdog; std::unique_ptr<QBasicTimer> mpTimer; //! Audio Output std::unique_ptr<QAudioSink> mpAudioSink; //! Audio Generator std::unique_ptr<SynthDevice> mpSynthDevice; QByteArrayList mTXBufferList; QHostAddress mTargetIP; quint16 mTargetPort; qint64 mTSIncrement; qint64 mTXCounter; };
@johnco3 said in QT 6.x QAudioSink potential bug?:
I thought things should work fine after this change as I correctly addressed the affinity for the QAudioSink at least from the increased child count, I thought so.
Your assumption is correct and your description
gave me the right hint to the problematic place.You're right - it's a child of QAudioSink which gets not properly parented and therefore lives in the wrong thread.You have to live with the workaround until the bug gets fixed.btw:
RtpWorker(const RtpWorker&) = delete;
RtpWorker(RtpWorker&&) noexcept = delete;
RtpWorker& operator=(const RtpWorker&) = delete;
RtpWorker& operator=(RtpWorker&&) noexcept = delete;Not needed since you derive from QObject which is also non-copyable.
const auto rawHeader = std::make_unique<QByteArray>(rHeader);
why mess around with a unique ptr here? Simply create a QByteArray on the stack. And why do you copy it at all?
/Edit: Ok, QElapsedTimer is no QObject so the warning can not come from there. Please run your app in the debugger with either a custom message handler or the environment variable QT_FATAL_WARNINGS as described here: https://doc.qt.io/qt-6/debug.html
Then look at the backtrace to see which timer triggers this warning. -
@johnco3 said in QT 6.x QAudioSink potential bug?:
I thought things should work fine after this change as I correctly addressed the affinity for the QAudioSink at least from the increased child count, I thought so.
Your assumption is correct and your description
gave me the right hint to the problematic place.You're right - it's a child of QAudioSink which gets not properly parented and therefore lives in the wrong thread.You have to live with the workaround until the bug gets fixed.btw:
RtpWorker(const RtpWorker&) = delete;
RtpWorker(RtpWorker&&) noexcept = delete;
RtpWorker& operator=(const RtpWorker&) = delete;
RtpWorker& operator=(RtpWorker&&) noexcept = delete;Not needed since you derive from QObject which is also non-copyable.
const auto rawHeader = std::make_unique<QByteArray>(rHeader);
why mess around with a unique ptr here? Simply create a QByteArray on the stack. And why do you copy it at all?
/Edit: Ok, QElapsedTimer is no QObject so the warning can not come from there. Please run your app in the debugger with either a custom message handler or the environment variable QT_FATAL_WARNINGS as described here: https://doc.qt.io/qt-6/debug.html
Then look at the backtrace to see which timer triggers this warning.@Christian-Ehrlicher Thanks for sticking with the question and investigating it further. I appreciate that.
I remember encountering the error message while debugging the code in QWindowsAudioSink state change processing. This was triggered by calling start/stop on the QWindowsAudioSink. As you can see, there is a QTimer member (QObject subclass) that seems to be the source of this issue. The m_timer field is default constructed without a parent.
So as a general rule for thread affinity, I guess I should look for all members that are subclasses of QObject - including std:unique_ptrs or ScopedPointers (more inline with how QT does things). Then assuming each of these are well behaved (unlike QAudioSink) passing 'this' to their constructor should be the way to go. Right? (assuming well behaved classes expose a 'parent = nullptr' optional constructor parameter.
Edit: Assuming this is the bug, is there anything I have to do to ensure it is fixed. I'm a newbie in these forums. It would be nice to have it fixed but I have a workaround meanwhile. I think I found another bug in the QIODevice bytes available base class implementation for non sequential devices (where it calls a virtual size function which ends up doubling the bytes available) - this also might need to be addressed - but that probably another separate thread :)
QWindowsAudioSink::QWindowsAudioSink(QWindowsIUPointer<IMMDevice> device) : m_pushSource(new OutputPrivate(*this)), m_device(std::move(device)) { m_pushSource->open(QIODevice::WriteOnly|QIODevice::Unbuffered); m_timer.setSingleShot(true); m_timer.setTimerType(Qt::PreciseTimer); }
Here is the class in question
class QWindowsAudioSink : public QPlatformAudioSink { Q_OBJECT public: QWindowsAudioSink(QWindowsIUPointer<IMMDevice> device); ~QWindowsAudioSink(); void setFormat(const QAudioFormat& fmt) override; QAudioFormat format() const override; QIODevice* start() override; void start(QIODevice* device) override; void stop() override { close(); } void reset() override; void suspend() override; void resume() override; qsizetype bytesFree() const override; void setBufferSize(qsizetype value) override; qsizetype bufferSize() const override { return m_bufferSize; } qint64 processedUSecs() const override; QAudio::Error error() const override { return errorState; } QAudio::State state() const override { return deviceState; } void setVolume(qreal) override; qreal volume() const override { return m_volume; } private: friend class OutputPrivate; qint64 write(const char *data, qint64 len); qint64 push(const char *data, qint64 len); bool open(); void close(); void deviceStateChange(QAudio::State, QAudio::Error); void pullSource(); qint64 remainingPlayTimeUs(); QAudioFormat m_format; QAudio::Error errorState = QAudio::NoError; QAudio::State deviceState = QAudio::StoppedState; qsizetype m_bufferSize = 0; qreal m_volume = 1.0; QTimer m_timer; QScopedPointer<QIODevice> m_pushSource; QIODevice *m_pullSource = nullptr; QWindowsIUPointer<IMMDevice> m_device; QWindowsIUPointer<IAudioClient> m_audioClient; QWindowsIUPointer<IAudioRenderClient> m_renderClient; QWindowsResampler m_resampler; };
This is one of the 2 sources of the ownership error messages.
void QWindowsAudioSink::deviceStateChange(QAudio::State state, QAudio::Error error) { if (state != deviceState) { if (state == QAudio::ActiveState) { m_audioClient->Start(); qCDebug(qLcAudioOutput) << "Audio client started"; } else if (deviceState == QAudio::ActiveState) { m_timer.stop(); m_audioClient->Stop(); qCDebug(qLcAudioOutput) << "Audio client stopped"; } deviceState = state; emit stateChanged(deviceState); } if (error != errorState) { errorState = error; emit errorChanged(error); } }
-
This was fast. It was added in Qt6.3 (and Qt5.15.0) with 234f3b042dce93d261a67c3c79bec3ba2912ae9e. Do you want to write the bug report or should I?
-
This was fast. It was added in Qt6.3 (and Qt5.15.0) with 234f3b042dce93d261a67c3c79bec3ba2912ae9e. Do you want to write the bug report or should I?
@Christian-Ehrlicher Please go ahead. I would appreciate it if you could send me the link so I can keep an eye on it. BTW not sure if it affects other platforms. Thanks again.
-
Bug for reference: https://bugreports.qt.io/browse/QTBUG-108187
-
Bug for reference: https://bugreports.qt.io/browse/QTBUG-108187
@Christian-Ehrlicher Thanks for the follow up.
-
Hi @johnco3, it's possible the handling of the timer is technically a bug. It certainly means the QAudioSink class does not play well with threads.
My app requires low latency audio streaming such that changes in the UI result in responsive changes to the sound. However running QAudioSink in the main thread means having a long buffer length to avoid audio glitches (and even then they're not totally avoided). Ironically the Qt example glitches all over the place when you click the volume slider even if you don't change the value.
My solution was to instantiate the QAudioSink in a separate thread but this requires that the QIODevice also lives in that thread. I struggled with reparenting errors and in the end I use a couple of layers of container to handle things. Perhaps this diagram will explain. (If not, let me know and I'll use words!)
NB: most of the signal/slot connections must be Qt::QueuedConnection for thread safety.
-
A curious observation... I had to delve deep into the source code to get a handle on how things were working in order to find a solution. (Read: days of investigation and experiments) Taking the macOS platform as my guide, it's curious that even if you choose pull mode, behind the scenes QAudioSink creates a ring buffer governed by a timer (i.e. push style).
This actually adds latency of around 10ms as the buffer length is hard-coded. Maybe it was intended as a safety net?? Fortunately in my tests so far it does not affect responsiveness noticeably. I presume the same mechanism is used for all platforms but I haven't checked.
If I was rewriting QAudioSink, I'd get rid of the ring buffer and make it the responsibility of the developer to deliver data on time. I'm just hoping that when I do the final tests I don't find myself having to fork QtMultimedia and make this change.
-
Hi @johnco3, it's possible the handling of the timer is technically a bug. It certainly means the QAudioSink class does not play well with threads.
My app requires low latency audio streaming such that changes in the UI result in responsive changes to the sound. However running QAudioSink in the main thread means having a long buffer length to avoid audio glitches (and even then they're not totally avoided). Ironically the Qt example glitches all over the place when you click the volume slider even if you don't change the value.
My solution was to instantiate the QAudioSink in a separate thread but this requires that the QIODevice also lives in that thread. I struggled with reparenting errors and in the end I use a couple of layers of container to handle things. Perhaps this diagram will explain. (If not, let me know and I'll use words!)
NB: most of the signal/slot connections must be Qt::QueuedConnection for thread safety.
@paulmasri What a great and informative post!!! I'm very impressed.
I solved the QIODevice living in a separate thread challenge by following the Active Object pattern I mentioned, however to get this working correctly required working around a nasty deep rooted QWindowsAudioSink bug due to one of its members containing a default constructed QTimer field. Thankfully @Christian-Ehrlicher kindly reported this bug to the QT developers and they already patched it.
I am not sure if there are similar timer or other field bugs lurking on the Mac platform. More worrisome is if there are other badly parented members that prevent multimedia sources and sinks from playing nice with moveToThread.
Basically the trick to getting all this to work in a single thread was to make sure that the QAudioSink is default empty constructed. Also QIODevice classes needed to have a (QObject* parent = nullptr) constructors to work well with moveToThread. Shown below are the most important details - note that I have 2 preprocessor macros that are are currently undefined pending hopefully the next release. It is Unbelievable that there are so many bugs in the source and sinks in QT Multimedia!!! I certainly would not blame you forking it!!!
The worked around code is shown above in one of the posts - back from a few days ago when I had it self contained in a single header file. I have subsequently refactored it a bit and added cpp files.
In the RtpWorker::handleStart slot (running in the worker thread context), I lazy instantiate the QAudioSink (QWindowsAudioSink) in my case. When it come s to cleaning up, Then the RtpWorker::handleStop slot & the RtpWorker::cleanup take care of cleaning up.
I'm not sure if by following the Active Object pattern you might be able save a bit of complexity.
Here is my sink IO device that feeds the speaker
// Modeled after the Generator class example from QT 6.x audio output example. class IODeviceSink : public QIODevice { Q_OBJECT public: /** * Explicit constructor * * @param rAudioBuffers [in] array of multi-channel audio * samples. * @param parent [in] parent. */ explicit IODeviceSink( const QByteArrayList& rAudioBuffers, QObject* parent = nullptr) : QIODevice(parent) , mAudioData{ rAudioBuffers.join() } {} // Audio device should give sequential access [[nodiscard]] bool isSequential() const override { return true; } void start(); void stop(); qint64 setData(const QByteArray& rRawAudio); qint64 readData(char* data, qint64 maxlen) override; qint64 writeData(const char* data, qint64 len) override; qint64 bytesAvailable() const override; qint64 size() const override; private: qint64 m_pos = 0; QByteArray mAudioData; };
Here is my RtpWorker Active Object class - I removed the implementation details but left the constructor to next.
/** * Active Object QT pattern - see tutorial https://www.youtube.com/watch?v=SncJ3D-fO7g * See also: https://forum.qt.io/topic/140465/qt-6-x-qaudiosink-potential-bug */ class RtpWorker final : public QObject { Q_OBJECT public: //! mpThread cannot have parent, as we need to move this object //! Also, note most QT class members need 'this' as parent for //! thread affinity to work correctly. RtpWorker(); //! these deleted constructors and assignment operators are not //! strictly required as the QObject superclass prevents copy //! but left in for clarity RtpWorker(const RtpWorker&) = delete; RtpWorker(RtpWorker&&) noexcept = delete; RtpWorker& operator=(const RtpWorker&) = delete; RtpWorker& operator=(RtpWorker&&) noexcept = delete; //! virtual destructor overload ~RtpWorker() override { QMetaObject::invokeMethod(this, "cleanup"); mpThread->wait(); } Q_SIGNALS: void stop(); void start(const qint64, const QString&, const quint16, const qint64, const QByteArray&, const QByteArrayList&); void datagramsReady(const QByteArrayList&); void bytesWritten(qint64, bool); void watchdogStatusChanged(bool, const QString&); /** * Signal to asynchronously notify application thread . * * @param rStatusText [in] Status area notification text. * @param rStyleSheet [in] Text stylesheet. * @param timeout [in] Timeout after which the text * disappears. */ void workerStatusChange(const QString& rStatusText, const QString& rStyleSheet, int timeout = 0) const; /** * IPC Signal from worker thread used to safely update * mTXBufferList. * * @param aTSIncrement [in] Timestamp increment per datagram. * @param rAudioData [in] raw multichannel audio. */ void updateAudio(const qint64 aTSIncrement, const QByteArrayList& rAudioData); //! Add log entry void addLogEntry(const spdlog::level::level_enum, const QString&); // slots are all called while in the worker thread context private Q_SLOTS: /** * Start signal handler. * * <p>This slot runs in the worker thread context. Creates a * buffered list of transmit UDP datagrams. Each datagram is * prefixed with an RTP header (including a sequence# & * timestamp). * * <p>The first datagram will always have the marker bit set. * * @param aTSIncrement [in] Timestamp increment per datagram. * @param rTargetIP [in] Target IP address. * @param aTargetPort [in] Target UDP port. * @param aDurationUsecs * [in] Duration (specified in microseconds) to * sleep between recurring timer callbacks. * This time represents the duration for all * <code>mTXBufferList</code> datagrams. * @param rHeader [in] RTP header consisting of a QByteArray * containing an rtp_hdr_t with optional contributing * sources (up to 16) & rtp_hdr_ext_t fields & data if * the x bit of the rtp_hdr_t is set. * @param rAudioData [in] raw multichannel audio. */ void handleStart( const qint64 aTSIncrement, const QString& rTargetIP, const quint16 aTargetPort, const qint64 aDurationUsecs, const QByteArray& rHeader, const QByteArrayList& rAudioData); /** * Slot to handle captured audio data. * * <p>Saves the audio data to a QIODevice. * * @param rAudioBuffer [in] audio data with (containing valid * QAudioFormat). */ void handleAudioCaptured(const QAudioBuffer& rAudioBuffer); /** * Audio updated slot handler. * * @param aTSIncrement [in] Timestamp increment per datagram. * @param rAudioData [in] raw multichannel audio. */ void handleAudioUpdated( const qint64 aTSIncrement, const QByteArrayList& rAudioData); /** * QUdpSocket slot readyRead handler. * * <p>This is where we read the RTP data echoed back from the * target. */ void readyRead(); /** * Slot to handle bytes written. * * <p>Updates stats in the GUI * * @param aBytesWritten [in] number of bytes written to socket. */ void handleBytesWritten(qint64 aBytesWritten) { // forward to GUI Q_EMIT bytesWritten(aBytesWritten, false); } //! Stop slot handler void handleStop(); //! cleanup method - called in worker thread context void cleanup(); /** * Timer callback event handler. * * @param event [in] timer event. */ void timerEvent(QTimerEvent* event) override; private: std::unique_ptr<QThread> mpThread; std::unique_ptr<QBasicTimer> mpTimer; std::unique_ptr<QTimer> mpWatchdog; std::unique_ptr<QUdpSocket> mpSocket; //! Audio Source (microphone). std::unique_ptr<QAudioSource> mpAudioSource; //! IODevice connected to mpAudioSource std::unique_ptr<IODeviceSource> mpIODeviceSource; //! Audio Sink (speaker). std::unique_ptr<QAudioSink> mpAudioSink; //! IODevice connected to mpAudioSink std::unique_ptr<IODeviceSink> mpIODeviceSink; QByteArrayList mTXBufferList; QHostAddress mTargetIP; quint16 mTargetPort; qint64 mTSIncrement; qint64 mTXCounter; };
And finally, here is the constructor implementation with RESAMPLER_BUG_FIXED & QAUDIOSINK_BUG_FIXED macros undefined pending upstream fixes :
//! Constructor RtpWorker::RtpWorker() : mpThread{ std::make_unique<QThread>() } //!< must not pass parent , mpTimer{ std::make_unique<QBasicTimer>() } , mpWatchdog{ std::make_unique<QTimer>(this) } //!< 'this' parent for thread affinity , mpSocket{ std::make_unique<QUdpSocket>(this) } //!< 'this' parent for thread affinity #if defined (RESAMPLER_BUG_FIXED) // "qt.multimedia.audiooutput: Failed to setup resampler" on console , mpAudioSource{ std::make_unique<QAudioSource>( QMediaDevices::defaultAudioInput(), QMediaDevices::defaultAudioInput(). preferredFormat(), this) } , mpIODeviceSource{ std::make_unique<IODeviceSource>( QMediaDevices::defaultAudioInput(). preferredFormat(), this) } #else // cannot use either of these objects as instantiating them prevents the resampler from working , mpAudioSource{} , mpIODeviceSource{} #endif #if defined (QAUDIOSINK_BUG_FIXED) , mpAudioSink{ std::make_unique<QAudioSink>( QMediaDevices::defaultAudioOutput(), QMediaDevices::defaultAudioOutput(). preferredFormat(), this) } #else , mpAudioSink{} #endif , mpIODeviceSink{ std::make_unique<IODeviceSink>( QByteArrayList{}, this) } , mTXBufferList{} , mTargetIP{} , mTargetPort{} , mTSIncrement{} , mTXCounter{0} { connect(this, &RtpWorker::start, this, &RtpWorker::handleStart); connect(this, &RtpWorker::stop, this, &RtpWorker::handleStop); connect(this, &RtpWorker::updateAudio, this, &RtpWorker::handleAudioUpdated); connect(mpSocket.get(), &QUdpSocket::readyRead, this, &RtpWorker::readyRead); connect(mpSocket.get(), &QUdpSocket::bytesWritten, this, &RtpWorker::handleBytesWritten); connect(mpWatchdog.get(), &QTimer::timeout, [&]() { Q_EMIT addLogEntry(spdlog::level::warn, "RTP network disconnected"); Q_EMIT watchdogStatusChanged(true, "RTP network disconnected"); }); #if defined (RESAMPLER_BUG_FIXED) connect(mpIODeviceSource.get(), &IODeviceSource::audioCaptured, this, &RtpWorker::handleAudioCaptured); #endif #if defined (QAUDIOSINK_BUG_FIXED) connect(mpAudioSink.get(), &QAudioSink::stateChanged, [&](QAudio::State state) { // update log tab Q_EMIT addLogEntry(spdlog::level::info, "QAudioSink::stateChanged"); }); #endif // thread affinity magic moveToThread(mpThread.get()); mpThread->start(); } //! Stop slot handler void RtpWorker::handleStop() { mpTimer->stop(); mpSocket->close(); mpWatchdog->stop(); mTXBufferList.clear(); mTSIncrement = 0; mTXCounter = 0; if (mpAudioSource) { mpAudioSource->stop(); } if (mpIODeviceSource) { mpIODeviceSource->stop(); } if (mpAudioSink) { mpAudioSink->stop(); } if (mpIODeviceSink) { mpIODeviceSink->stop(); } // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP thread stopped"); // notify main thread that we successfully started Q_EMIT workerStatusChange("RTP thread stopped", "color:blue;font-weight:bold;", 4000); } //! cleanup method - called in worker thread context void RtpWorker::cleanup() { // delete timer, socket... etc. mpTimer.reset(); mpWatchdog.reset(); mpSocket.reset(); mpAudioSource.reset(); mpIODeviceSource.reset(); mpAudioSink.reset(); mpIODeviceSink.reset(); mTSIncrement = 0; mTXCounter = 0; // quit makes exec() return from the event loop mpThread->quit(); }