Unsolved Losing stream data at time to write it, using threads
-
Hello
I have been writing a program that receives USB data and then write the data to a file, i am working with threads, one thread to receive the data and another thread to write, but i am losing bytes.
In the thread of the receive part emits a signal when the transfer is received to the slot in another thread to write the data, the file is filling whit data, but some data is losing .
For example i am trying to receive packets of video .TS (transport stream) the sequence is 204 bytes whit 4 bytes of header then 204 bytes again with 4 bytes of header, the buffer from the libusb could send me 1 Mb really quick so when that buffer arrives the same thread emits the signal to the other thread for write that information.
But the usb always send me data i am thinking when the thread goes to emit the signal packets are loosing because always coming, or the other thread can't write too fast in the file so when the signal es emitted again the process of writing not even finish.
I check with a hex editor and yes inside the 1Mb the 204 bytes are repetitive until the another 1Mb information stars the format is lost. They don't math between then.
Here is the code i am using
MAINWINDOW
cpp
#include "mainwindow.h" #include "ui_mainwindow.h" #include "backend.h" #include "receive.h" //Receive usb transfer #include "write.h" //Write to a file the usb transfer #include <QThread> MainWindow::MainWindow(QWidget *parent) : QMainWindow(parent), ui(new Ui::MainWindow) { //This create the ui interface of QT ui->setupUi(this); //Background of the application QMainWindow::setStyleSheet("* { background-color: rgb(250, 250, 250); }"); //create usb_channels object of backend backend * usb_channels=new backend; //Make usb connection usb_channels->usb_conecction(); //Conecction of the buttons channel Begin //521000 frequency Begin connect(ui->pushButton,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57856())); connect(ui->pushButton_2,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57857())); connect(ui->pushButton_3,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57858())); connect(ui->pushButton_4,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57859())); connect(ui->pushButton_5,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57860())); connect(ui->pushButton_6,SIGNAL(clicked()),usb_channels,SLOT(channel_521000_57880())); //521000 frequency End //533000 frequency Begin connect(ui->pushButton_7,SIGNAL(clicked()),usb_channels,SLOT(channel_533000_57920())); connect(ui->pushButton_8,SIGNAL(clicked()),usb_channels,SLOT(channel_533000_57921())); connect(ui->pushButton_9,SIGNAL(clicked()),usb_channels,SLOT(channel_533000_57922())); connect(ui->pushButton_10,SIGNAL(clicked()),usb_channels,SLOT(channel_533000_57944())); //533000 frequency End //539000 frequency Begin connect(ui->pushButton_11,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57952())); connect(ui->pushButton_12,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57953())); connect(ui->pushButton_13,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57954())); connect(ui->pushButton_14,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57955())); connect(ui->pushButton_15,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57956())); connect(ui->pushButton_16,SIGNAL(clicked()),usb_channels,SLOT(channel_539000_57976())); //539000 frequency End //527000 frequency Begin connect(ui->pushButton_17,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57888())); connect(ui->pushButton_18,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57889())); connect(ui->pushButton_19,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57890())); connect(ui->pushButton_20,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57891())); connect(ui->pushButton_21,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57892())); connect(ui->pushButton_22,SIGNAL(clicked()),usb_channels,SLOT(channel_527000_57912())); //527000 frequency End //Conecction of the buttons channel End //Objects of calss receive and write receive *receive_usb = new receive(); write *write_to_file = new write(); //Threads for write and receive usb transfer QThread *threadReceive = new QThread(); QThread *threadWrite = new QThread(); //passing the usb handle deveice receive_usb->device_handle_receive=usb_channels->device_handle; //create the file ts write_to_file->create_file(); //When the buffer fills and emited the signal of receive write connect(receive_usb,SIGNAL(received(unsigned char*)), write_to_file, SLOT(write_to_file(unsigned char*))); //Conection from buttons to transfer_usb Begin //521000 frequency Begin connect(ui->pushButton,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_2,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_3,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_3,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_4,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_5,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_6,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); //521000 frequency End //533000 frequency Begin connect(ui->pushButton_7,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_8,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_9,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_10,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); //533000 frequency End //539000 frequency Begin connect(ui->pushButton_11,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_12,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_13,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_14,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_15,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_16,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); //539000 frequency End //527000 frequency Begin connect(ui->pushButton_17,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_18,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_19,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_20,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_21,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); connect(ui->pushButton_22,SIGNAL(clicked()),receive_usb,SLOT(transfer_usb())); //527000 frequency End //Conection from buttons to transfer_usb End //Move to threads receive_usb->moveToThread(threadReceive); write_to_file->moveToThread(threadWrite); //Strat the threads threadReceive->start(); threadWrite->start(); } MainWindow::~MainWindow() { delete ui; }
WRITE
header#ifndef WRITE_H #define WRITE_H #include <QFile> //To use files #include <QDataStream> //To stream to the file #include <QObject> //To use the Macro of Qobject #include <QByteArray> class write : public QObject { Q_OBJECT public: write(); //constructor //--------------FUNCTION TO CREATE A FILE BEGIN------------- void create_file(); //Create the File //--------------FUNCTION TO CREATE A FILE END--------------- public slots: void write_to_file(unsigned char *MPEG2_TS); //Write to File private: //--------------VARIABLES TO CREATE THE FILE BEGIN----------- QFile file; //File to write the ts packets QDataStream out; //To write in the file //--------------VARIABLES TO CREATE THE FILE END------------- }; #endif // WRITE_H
cpp
#include "write.h" write::write() { } void write::create_file() { //Name of the file file.setFileName("data.ts"); //Creation of the file, mode writeonly file.open(QIODevice::WriteOnly); } void write::write_to_file(unsigned char *MPEG2_TS) { out.setDevice(&file); //to hadel the file writing for (unsigned int j=0;j<1024-1;j++) { out<<MPEG2_TS[j]; //Insert the bytte o he file } }
RECEIVE
header#ifndef RECEIVE_H #define RECEIVE_H #include <libusb-1.0/libusb.h> //Include libsub #include <QObject> #include <QByteArray> class receive: public QObject { Q_OBJECT public: receive(); libusb_device_handle *device_handle_receive; public slots: void transfer_usb(); //recieving tranfer from USB private: //the buffer to receive 245760 bytes= 240 Kb //unsigned char MPEG2_TS[245760]; void SemcoSleep(unsigned int nSleepTims_ms); //Function to make a delay //the buffer to send over signals and slot // QByteArray MPEG2_TS_BYTE; //Variable to register the errors unsigned int error_receive; //Provate function to emit the singal, receive parameter buffer void emitreceived(unsigned char *MPEG2_TS); signals: //Signal emited void received(unsigned char *MPEG2_TS); }; #endif // RECEIVE_H
cpp
#include "receive.h" #include <unistd.h> //JUST FOR SLEEP USING usleep(microseconds); receive::receive() { } //-----------------------FUNCTION FOR USB RECEIVING BEGIN---------------- void receive::transfer_usb() { unsigned char *MPEG2_TS=new unsigned char[1024]; SemcoSleep(4000); while(1) { //It's a return of libusb_interrupt_transfer with the number of bytes that were transferred int transferred=0; //It's a return of libusb_interrupt_transfer return 0 if the transfer was successful int result=0; //Receiving Transfer result=libusb_interrupt_transfer (device_handle_receive, //The device handle 0x82, //Endponit 2 MPEG2_TS, //Buffer 1024, //Size of Buffer &transferred, //Return the number of bytes transferred 0 //waiting time to receive the answer ); if (result!=0) error_receive=10; //For error 10: "Error in transfer in thread one" //Private function to emit the signal or received emitreceived(MPEG2_TS); } } //-----------------------FUNCTION FOR USB RECEIVING END------------------ //Private function t emit the signal void receive::emitreceived(unsigned char *MPEG2_TS) { emit received(MPEG2_TS); } //Function to make a delay void receive::SemcoSleep(unsigned int nSleepTims_ms) { nSleepTims_ms=nSleepTims_ms*1000;// to transform to miliseconds usleep(nSleepTims_ms); }
In the mainwindow.cpp the first part is for the usb connection and initialization until yo get to this sentence
//Objects of calss receive and write receive *receive_usb = new receive(); write *write_to_file = new write();
The a lot buttons are for different channels, there are a lot connects practical do the same.
I am thing about this i don't know how to handle, i google it to find if some one could have the same problem but i couldn't find good information about it
-
You have a race condition on
MPEG2_TS
which you also leakchange
unsigned char *MPEG2_TS=new unsigned char[1024];
toQByteArray MPEG2_TS(1024,0)
and move the declaration inside thewhile(1)
. you can useMPEG2_TS.data()
to pass it to libusbYou'll have to change the signature of your signals and slots too I'm afraid.
-
@VRonin said in Losing stream data at time to wirte it , using threads:
You have a race condition on
MPEG2_TS
which you also leakchange
unsigned char *MPEG2_TS=new unsigned char[1024];
toQByteArray MPEG2_TS(1024,0)
and move the declaration inside thewhile(1)
. you can useMPEG2_TS.data()
to pass it to libusbYou'll have to change the signature of your signals and slots too I'm afraid.
I have problem for the unsigned, MPEG2_TS.data() it's just char and libusb uses unsigned char
/home/pasante/Escritorio/PIZZA_FILE/receive.cpp:29: error: invalid conversion from ‘char*’ to ‘unsigned char*’ [-fpermissive]
MPEG2_TS.data(), //Buffer
~~~~~~~~~~~~~^~EDIT:
(unsigned char)MPEG2_TS.data()* , pass this to libusb, i am debuggin now to see if the connects works
EDIT2: (unsigned char)MPEG2_TS.data()* that kills me all the data in the hex file
-
@aurquiel said in Losing stream data at time to write it, using threads:
MPEG2_TS.data() it's just char and libusb uses unsigned char
reinterpret_cast<unsigned char*>(MPEG2_TS.data())