Issues using QUdpSocket::writeDatagram for high volumes of data



  • I'm working on writing a UDP video feed transmission system. The data I'm working with is a special type of video at relatively low resolution at 30 FPS, heavily compressed. I'm experiencing difficulties with using the QUdpSocket::writeDatagram with my data. In the theoretical worst case my data amounts to no more than 56.52 Mbit/s, but in practice this number remains around 30-40 Mbit/s. This probably seems high, but it's the currently best option for getting near-realtime video (x264 and such have issues with seeking frames out of order). The computers I'm using are in the same LAN with a 100MBit/s router in the middle and they are connected using an ethernet cable. The senders are Linux computers (tested on a Raspberry pi model 3 and a reasonably able Debian laptop) and the receiver is Windows 8.1 desktop with a reasonably good processor. The laptop was introduced in testing after we started guessing that the Raspberry USB is causing issues as the Raspberry Ethernet controller shares USB bandwidth.

    1. In the following code segments below, am I doing something wrong in reading or writing my datagrams?
    2. If my data reaches high quantities, writeDatagram is no longer able to output data and needs to try multiple times. Usually the function succeeds within 100-200 tries. Interestingly this problem happens much more on my laptop (with a more powerful network card I'd imagine) than on the Raspberry. Is there some socket option I need to specify to make the function work better with large quantities of data? I understand all this function does is insert my datagram in an internal buffer which the socket has, later to be sent to the operating system's own socket implementation. Should this buffer be increased at the sender or receiver, or both? I've tried adding waitForBytesWritten() and flush() after writeDatagram, but that didn't appear to do anything.
    3. For QTcpSockets it makes sense that the QAbstractSocket::write() returns a positive number that is not equal to the requested amount of bytes that you asked the function to write. However, is it possible for QUdpSocket::writeDatagram to return a positive value that is less than the full size of the datagram? In my tests I didn't see this happen, but the documentation suggests it's possible.
    4. I also attempted using QTcpSocket, but with this the delay in transmitting the data was so large the latency made the video appear sometimes 10 seconds late on my receiver, even if it this solved packet loss.

    Simplified version of the sender:

    qint64 written = 0;
    int wrote = 0;
    int write_bytes = SOCKET_DATA_WRITE_MAX;    // I've tried values from 512 to 50000. At 512 I end up losing more packets than at 50000.
    int read_offset = 0;
    int to_write = ar.size();   // QByteArray containing from around 5000 bytes (minimum case) to around 200 000 (worst case) bytes
    
    while (written < to_write)
    {
        // If the message is larger than the max byte size per write() call, split it in parts
        write_bytes = SOCKET_DATA_WRITE_MAX;
        if (to_write - written < SOCKET_DATA_WRITE_MAX)
            write_bytes = to_write - written;
    
        memset(udp_outputbuffer, 0, SOCKET_MAX_SIZE);
    
        // construct a header that will accompany every message, informing what kind of data
        // this message contains and what range of the full data is included
    
        quint8 magic = 190;
        quint32 written_so_far = written;
        quint32 this_datagram_write = write_bytes;
        quint32 total_write = to_write;
        quint32 ts = timestamp;
    
        char* header_data = udp_outputbuffer;
        memcpy(header_data, &magic, sizeof(quint8));
        header_data += sizeof(quint8);
        memcpy(header_data, &written_so_far, sizeof(quint32));
        header_data += sizeof(quint32);
        memcpy(header_data, &this_datagram_write, sizeof(quint32));
        header_data += sizeof(quint32);
        memcpy(header_data, &total_write, sizeof(quint32));
        header_data += sizeof(quint32);
        memcpy(header_data, &ts, sizeof(quint32));
        header_data += sizeof(quint32);
        memcpy(header_data, &total_frames, sizeof(quint32));
        header_data += sizeof(quint32);
    
        // See how much we can write
        // header is 21 bytes
        memcpy(udp_outputbuffer + SOCKET_HEADER_SIZE, ar.data() + read_offset, write_bytes);
    
        wrote = -1;
        int tries = 0;
        bool keep_trying = true;
    
        wrote = -1;
        tries = 0;
        keep_trying = true;
    
        while (wrote == -1 && keep_trying == true)
        {
            wrote = udp_streamer_socket->writeDatagram(udp_outputbuffer, SOCKET_HEADER_SIZE + write_bytes, stream_ip, stream_port);
    
            if (wrote < 0)
            {
                // The error message this thing prints is: "Unable to send a message"
                std::cout << udp_streamer_socket->errorString().toLatin1().constData() << std::endl;
                tries++;
                if (tries > 1000)
                {
                    keep_trying = false;
                    std::cout << "failed outputting frame: " << total_frames << std::endl;
                }
            }
            else
            {
                // Some amount was written. Remove the header size to get amount of real data that was transmitted
    
                if (wrote != SOCKET_HEADER_SIZE + write_bytes)
                {
                    // Is it possible to less than datagram.size() to be written? How is that handled?
    
                    std::cout << "wrote " << wrote << " out of " << SOCKET_HEADER_SIZE + write_bytes << std::endl;
                    tries++;
                    if (tries > 1000)
                    {
                        keep_trying = false;
                        std::cout << "failed outputting frame: " << total_frames << std::endl;
                    }
                }
            }
        }
        wrote -= SOCKET_HEADER_SIZE;
        written += wrote;
        read_offset += wrote;
    }
    
    // proceed with other things
    
    

    Simplified version of the receiver:

    void FeedNetworkReceiver::udp_readStreamerSocket()
    {
        while (udp_streamer_listen->hasPendingDatagrams())
        {
            QByteArray datagram;
            datagram.resize(udp_streamer_listen->pendingDatagramSize());
            QHostAddress sender;
            quint16 sender_port;
            udp_streamer_listen->readDatagram(datagram.data(), datagram.size(), &sender, &sender_port);
    
            if (datagram.size() < HEADER_SIZE)
            {
                std::cout << "datagram size is smaller than header size " << datagram.size() << std::endl;
                continue;
            }
    
            QByteArray read_data_container;
            read_data_container.resize(HEADER_SIZE);
            char* read_data = read_data_container.data();
            memcpy(read_data, datagram.data(), HEADER_SIZE);
    
            quint8 magic;
            quint32 written;
            quint32 write_bytes;
            quint32 total_size;
            quint32 timestamp;
            quint32 frame_number;
    
            memcpy(&magic, read_data, sizeof(quint8));
            read_data += sizeof(quint8);
            memcpy(&written, read_data, sizeof(quint32));
            read_data += sizeof(quint32);
            memcpy(&write_bytes, read_data, sizeof(quint16));
            read_data += sizeof(quint32);
            memcpy(&total_size, read_data, sizeof(quint32));
            read_data += sizeof(quint32);
            memcpy(&timestamp, read_data, sizeof(quint32));
            read_data += sizeof(quint32);
            memcpy(&frame_number, read_data, sizeof(quint32));
    
            if (magic != HEADER_MAGIC)
            {
                std::cout << "Got invalid magic header: " << magic << std::endl;
                continue;
            }
    
            if (timestamp == 0)
            {
                std::cout << "got zero timestamp" << std::endl;
                continue;
            }
    
            unsigned int current_time = QDateTime::currentDateTime().toTime_t();
    
            access.lock();
            if (timestamp < current_min_timestamp || frame_number < current_min_frame)
            {
                // This frame is so old that we no longer care about it as we have
                // already moved on, processing some frame that comes after this frame
                old_packets++;
                std::cout << "old packet: " << old_packets << std::endl;
                access.unlock();
                continue;
            }
            access.unlock();
            // Remove header section from the datagram
            datagram.remove(0, HEADER_SIZE);
    
            auto iter = std::find_if(pending_frames.begin(), pending_frames.end(),
                        [&](const PendingCompressedFrame& a)
                        {return a.timestamp == timestamp; });
            if (iter == pending_frames.end())
            {
                // No chunks have yet arrived for this frame
                PendingCompressedFrame in;
                in.timestamp = timestamp;
                in.frame_number = frame_number;
                in.data_so_far = datagram.size();
                in.finished_frame_size = total_size;
                in.time_to_live = current_time + 5;
                std::tuple<qint32, QByteArray> data_chunk;
                std::get<0>(data_chunk) = written;
                std::get<1>(data_chunk) = datagram;
                in.data.push_back(data_chunk);
                pending_frames.push_back(in);
            }
            else
            {
                // See if we already have this segment. This happens due to
                // redundancies being used to circumvent UDP packet loss.
                // redundancies are currently disabled
    
                PendingCompressedFrame& p = *iter;
    
                auto iter2 = std::find_if(p.data.begin(), p.data.end(),
                             [&](std::tuple<qint32, QByteArray>& a) { return std::get<0>(a) == written; } );
                if (iter2 != p.data.end())
                {
                    // This segment already arrived safely
                    continue;
                }
    
                auto& t = *iter;
    
                if (t.time_to_live < current_time)
                {
                    // This frame is over 5 seconds old - drop it
                    t.delete_later = true;
                    std::cout << "frame is too old " << timestamp << ", " << t.time_to_live << " / " << current_time << std::endl;
                }
                else
                {
                    std::tuple<qint32, QByteArray> in;
                    std::get<0>(in) = written;
                    std::get<1>(in) = datagram;
                    t.data_so_far += datagram.size();   // datagram.size() no longer includes HEADER_SIZE
                    t.data.push_back(in);
                }
            }
        }
    
        // ... ... code where the frames are sorted and used
    }
    
    

  • Qt Champions 2016

    @Jesse-Kaukonen
    I advise benchmarking the code. For one a datagram will always arrive whole (not like with TCP where you can have the message chopped in pieces), but there's a maximum size to it. Also there's no reception notification (like with TCP) as UDP is a "connectionless" protocol. These two things in mind you either send the datagram or not, you can't have anything else returned for the written bytes. On a related note, I'd remove the copying I saw in your code as much as humanly possible.

    I'd say you can make use of a somewhat "ugly" cast (provided you use a POD structure) instead of copying your data to a byte array (which you shouldn't do like you do in the first place). Consider this:

    QByteArray datagram;
    // ... Insert size checks here as well!
    PendingCompressedFrame & frame = *reinterpret_cast<PendingCompressedFrame *>(datagram.data()); // No copying, valid while the byte array is still there.
    

    EDIT:
    Warning: This will work across some platforms. Alignment and endianness should match for it to have a meaningful result.

    Additionally, why the mutex? What are you protecting with it exactly?

    One good way to deal with such throughput is to have a dedicated thread for the reception that'll put the received datagrams (no processing) into a thread safe queue (or you can also use a signal-slot connection). From there you can have another thread that orders them (they may not come in the order of sending) and ultimately preparing them for display.

    Kind regards.



  • @kshegunov said:

    I advise benchmarking the code. For one a datagram will always arrive whole (not like with TCP where you can have the message chopped in pieces), but there's a maximum size to it. Also there's no reception notification (like with TCP) as UDP is a "connectionless" protocol. These two things in mind you either send the datagram or not, you can't have anything else returned for the written bytes.

    I suspected as much. Good to know.

    QByteArray datagram;
    // ... Insert size checks here as well!
    PendingCompressedFrame & frame = *reinterpret_cast<PendingCompressedFrame *>(datagram.data()); // No copying, valid while the byte array is still there.
    

    As of yet, the receiver's processing time hasn't been a bottleneck so I haven't looked into optimizing it (it completes in around 0-1ms). Thank you for the suggestion though.

    Additionally, why the mutex? What are you protecting with it exactly?

    The mutex is not needed there. There's some other data that is being accessed from a DLL from a separate thread, but I should have removed the mutex from the simplified receiver.

    One good way to deal with such throughput is to have a dedicated thread for the reception that'll put the received datagrams (no processing) into a thread safe queue (or you can also use a signal-slot connection). From there you can have another thread that orders them (they may not come in the order of sending) and ultimately preparing them for display.

    That's a good idea. Keep readyRead() as minimal as possible to get data out of the socket's buffer, allowing more data in.

    The biggest question I'm still wondering is the performance of writedatagram. I've been looking into the assorted socket options, but it will take some reading to really understand what's going on with each flag.



Looks like your connection to Qt Forum was lost, please wait while we try to reconnect.