Losing stream data at time to write it, using threads



  • Hello

    I have been writing a program that receives USB data and then write the data to a file, i am working with threads, one thread to receive the data and another thread to write, but i am losing bytes.

    In the thread of the receive part emits a signal when the transfer is received to the slot in another thread to write the data, the file is filling whit data, but some data is losing .

    For example i am trying to receive packets of video .TS (transport stream) the sequence is 204 bytes whit 4 bytes of header then 204 bytes again with 4 bytes of header, the buffer from the libusb could send me 1 Mb really quick so when that buffer arrives the same thread emits the signal to the other thread for write that information.

    But the usb always send me data i am thinking when the thread goes to emit the signal packets are loosing because always coming, or the other thread can't write too fast in the file so when the signal es emitted again the process of writing not even finish.

    I check with a hex editor and yes inside the 1Mb the 204 bytes are repetitive until the another 1Mb information stars the format is lost. They don't math between then.

    Here is the code i am using

    MAINWINDOW

    cpp

    #include "mainwindow.h"
    #include "ui_mainwindow.h"
    #include "backend.h"
    #include "receive.h"          //Receive usb transfer
    #include "write.h"            //Write to a file the usb transfer
    #include <QThread>
    
    MainWindow::MainWindow(QWidget *parent) :
        QMainWindow(parent),
        ui(new Ui::MainWindow)
    {
        //This create the ui interface of QT
        ui->setupUi(this);
    
        //Background of the application
        QMainWindow::setStyleSheet("* { background-color: rgb(250, 250, 250); }");
    
        //create usb_channels object of backend
        backend * usb_channels=new backend;
        
        //Make usb connection
        usb_channels->usb_conecction();
    
        //Conecction of the buttons channel Begin
        //521000 frequency Begin
        connect(ui->pushButton,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57856()));
        connect(ui->pushButton_2,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57857()));
        connect(ui->pushButton_3,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57858()));
        connect(ui->pushButton_4,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57859()));
        connect(ui->pushButton_5,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57860()));
        connect(ui->pushButton_6,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57880()));
        //521000 frequency End
        //533000 frequency Begin
        connect(ui->pushButton_7,SIGNAL(clicked()),usb_channels,SLOT(channel_533000_57920()));
        connect(ui->pushButton_8,SIGNAL(clicked()),usb_channels,SLOT(channel_533000_57921()));
        connect(ui->pushButton_9,SIGNAL(clicked()),usb_channels,SLOT(channel_533000_57922()));
        connect(ui->pushButton_10,SIGNAL(clicked()),usb_channels,SLOT(channel_533000_57944()));
        //533000 frequency End
        //539000 frequency Begin
        connect(ui->pushButton_11,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57952()));
        connect(ui->pushButton_12,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57953()));
        connect(ui->pushButton_13,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57954()));
        connect(ui->pushButton_14,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57955()));
        connect(ui->pushButton_15,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57956()));
        connect(ui->pushButton_16,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57976()));
        //539000 frequency End
        //527000 frequency Begin
        connect(ui->pushButton_17,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57888()));
        connect(ui->pushButton_18,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57889()));
        connect(ui->pushButton_19,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57890()));
        connect(ui->pushButton_20,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57891()));
        connect(ui->pushButton_21,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57892()));
        connect(ui->pushButton_22,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57912()));
        //527000 frequency End
    
        //Conecction of the buttons channel End
    
        //Objects of calss receive and write
        receive *receive_usb = new receive();
        write *write_to_file = new write();
        
        //Threads for write and receive usb transfer
        QThread *threadReceive = new QThread();
        QThread *threadWrite = new QThread();
    
        //passing the usb handle deveice
        receive_usb->device_handle_receive=usb_channels->device_handle;
    
        //create the file ts
        write_to_file->create_file();
    
        //When the buffer fills and emited the signal of receive write
        connect(receive_usb,SIGNAL(received(unsigned char*)), write_to_file, SLOT(write_to_file(unsigned char*)));
    
        //Conection from buttons to transfer_usb Begin
        //521000 frequency Begin
        connect(ui->pushButton,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_2,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_3,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_3,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_4,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_5,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_6,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        //521000 frequency End
        //533000 frequency Begin
        connect(ui->pushButton_7,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_8,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_9,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_10,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        //533000 frequency End
        //539000 frequency Begin
        connect(ui->pushButton_11,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_12,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_13,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_14,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_15,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_16,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        //539000 frequency End
        //527000 frequency Begin
        connect(ui->pushButton_17,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_18,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_19,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_20,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_21,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        connect(ui->pushButton_22,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb()));
        //527000 frequency End
        //Conection from buttons to transfer_usb End
    
        //Move to threads
        receive_usb->moveToThread(threadReceive);
        write_to_file->moveToThread(threadWrite);
    
        //Strat the threads
        threadReceive->start();
        threadWrite->start();
    }
    
    MainWindow::~MainWindow()
    {
        delete ui;
    }
    

    WRITE
    header

    #ifndef WRITE_H
    #define WRITE_H
    #include <QFile>   //To use files
    #include <QDataStream> //To stream to the file
    #include <QObject>     //To use the Macro of Qobject
    #include <QByteArray>
    
    class write : public QObject
    {
        Q_OBJECT
    public:
        write();  //constructor
        //--------------FUNCTION TO CREATE A FILE BEGIN-------------
        void create_file(); //Create the File
        //--------------FUNCTION TO CREATE A FILE END---------------
    
    public slots:
        void write_to_file(unsigned char *MPEG2_TS);  //Write to File
    
    private:
        //--------------VARIABLES TO CREATE THE FILE BEGIN-----------
        QFile file;                                      //File to write the ts packets
        QDataStream out;                                 //To write in the file
        //--------------VARIABLES TO CREATE THE FILE END-------------
    
    
    };
    
    #endif // WRITE_H
    
    

    cpp

    #include "write.h"
    
    write::write()
    {
    
    }
    
    void write::create_file()
    {
        //Name of the file
        file.setFileName("data.ts");
    
        //Creation of the file, mode writeonly
        file.open(QIODevice::WriteOnly);
    }
    
    void write::write_to_file(unsigned char *MPEG2_TS)
    {
        out.setDevice(&file); //to hadel the file writing
    
        for (unsigned int j=0;j<1024-1;j++)
        {
        out<<MPEG2_TS[j]; //Insert the bytte o he file
        }
    }
    
    

    RECEIVE
    header

    #ifndef RECEIVE_H
    #define RECEIVE_H
    #include <libusb-1.0/libusb.h> //Include libsub
    #include <QObject>
    #include <QByteArray>
    
    class receive: public QObject
    {
        Q_OBJECT
    public:
        receive();
        libusb_device_handle *device_handle_receive;
    
    public slots:
        void transfer_usb(); //recieving tranfer from USB
    
    private:
        //the buffer to receive 245760 bytes= 240 Kb
        //unsigned char MPEG2_TS[245760];
        void SemcoSleep(unsigned int nSleepTims_ms);                           //Function to make a delay
        //the buffer to send over signals and slot
      //  QByteArray MPEG2_TS_BYTE;
    
        //Variable to register the errors
        unsigned int error_receive;
    
        //Provate function to emit the singal, receive parameter buffer
        void emitreceived(unsigned char *MPEG2_TS);
    
    signals:
        //Signal emited
        void received(unsigned char *MPEG2_TS);
    
    };
    
    #endif // RECEIVE_H
    
    

    cpp

    #include "receive.h"
    #include <unistd.h> //JUST FOR SLEEP USING usleep(microseconds);
    
    receive::receive()
    {
    
    }
    
    //-----------------------FUNCTION FOR USB RECEIVING BEGIN----------------
    
    void receive::transfer_usb()
    {
        unsigned char *MPEG2_TS=new unsigned char[1024];
    
        SemcoSleep(4000);
    
        while(1)
        {
    
            //It's a return of libusb_interrupt_transfer with the number of bytes that were transferred
            int transferred=0;
    
            //It's a return of libusb_interrupt_transfer return 0 if the transfer was successful
            int result=0;
    
            //Receiving Transfer
            result=libusb_interrupt_transfer (device_handle_receive,   //The device handle
                                              0x82,                 //Endponit 2
                                              MPEG2_TS,             //Buffer
                                              1024,              //Size of Buffer
                                              &transferred,         //Return the number of bytes transferred
                                              0                //waiting time to receive the answer
                                              );
    
    
            if (result!=0) error_receive=10;   //For error 10: "Error in transfer in thread one"
            //Private function to emit the signal or received
    
            emitreceived(MPEG2_TS);
        }
    }
    
    //-----------------------FUNCTION FOR USB RECEIVING END------------------
    
    //Private function t emit the signal
    void receive::emitreceived(unsigned char *MPEG2_TS)
    {
        emit received(MPEG2_TS);
    }
    
    //Function to make a delay
    void receive::SemcoSleep(unsigned int nSleepTims_ms)
    {
        nSleepTims_ms=nSleepTims_ms*1000;// to transform to miliseconds
        usleep(nSleepTims_ms);
    }
    
    

    In the mainwindow.cpp the first part is for the usb connection and initialization until yo get to this sentence

    //Objects of calss receive and write
      receive *receive_usb = new receive();
      write *write_to_file = new write();
    

    The a lot buttons are for different channels, there are a lot connects practical do the same.

    I am thing about this i don't know how to handle, i google it to find if some one could have the same problem but i couldn't find good information about it



  • You have a race condition on MPEG2_TS which you also leak

    change unsigned char *MPEG2_TS=new unsigned char[1024]; to QByteArray MPEG2_TS(1024,0) and move the declaration inside the while(1). you can use MPEG2_TS.data() to pass it to libusb

    You'll have to change the signature of your signals and slots too I'm afraid.



  • @VRonin said in Losing stream data at time to wirte it , using threads:

    You have a race condition on MPEG2_TS which you also leak

    change unsigned char *MPEG2_TS=new unsigned char[1024]; to QByteArray MPEG2_TS(1024,0) and move the declaration inside the while(1). you can use MPEG2_TS.data() to pass it to libusb

    You'll have to change the signature of your signals and slots too I'm afraid.

    I have problem for the unsigned, MPEG2_TS.data() it's just char and libusb uses unsigned char

    /home/pasante/Escritorio/PIZZA_FILE/receive.cpp:29: error: invalid conversion from ‘char*’ to ‘unsigned char*’ [-fpermissive]
    MPEG2_TS.data(), //Buffer
    ~~~~~~~~~~~~~^~

    EDIT:

    (unsigned char)MPEG2_TS.data()* , pass this to libusb, i am debuggin now to see if the connects works

    EDIT2: (unsigned char)MPEG2_TS.data()* that kills me all the data in the hex file



  • @aurquiel said in Losing stream data at time to write it, using threads:

    MPEG2_TS.data() it's just char and libusb uses unsigned char

    reinterpret_cast<unsigned char*>(MPEG2_TS.data())


Log in to reply
 

Looks like your connection to Qt Forum was lost, please wait while we try to reconnect.