Lazy evaluation pattern with cancellable task and caller that could be destroyed?



  • Hi,
    I'm porting a Java application that use extensively the lazy evaluation pattern in order to parallelize and postpone heavy computation tasks (using a external application).

    I see how to achieve it using QFuture and QtConcurrent::run but I would like to be able to cancel a task if finally I won't need it (if it has not started could be sufficient)

    I also need to use a QThreadPool with a specific size so I'm looking into QRunnable.
    My problem is that my object Element, that would call the async task will at one point want to fetch a result (even if it has to wait the end of the task) but also could be deleted while the task is running (or before it was scheduled).

    I'm getting headache trying to find a solution without race conditions.
    If I put the QRunnable task not autodelete, I can then use tryTake to remove it but the problem is that if I destruct my Element object, I'm not able to destroy the Task if it is running... thus I could have leak of my QRunnable task.
    If my QRunnable is autodelete, then my issue is how to submit back the result to the Element (I'm having a pointer on it) and be sure it hasn't been deleted.

    Any recommendation?
    Please let me know if you need more clarification on the use case.
    Cheers

    PS: here is what I was doing using QFuture and QtConcurrent::run

    #include <QCoreApplication>
    #include "qtconcurrentrun.h"
    #include <QFuture>
    #include <QString>
    
    class Element;
    double computeMassEvolution(Element *elem, float timeAfter);
    
    class Element {
    public:
        Element(float mass) : _id(nextId++),_mass(mass), _futurMass(){}
        Element(const Element &src) = delete;
    
        Element(Element &src) : _id(nextId++),_mass(src._mass),_futurMass()
        {
            if (src.update())
                _mass = src._mass;
        }
    
        ~Element() = default;
    
        void computeFuturMass(float timeFromNow)
        {
            _futurMass = QtConcurrent::run(&threadPool, computeMassEvolution, this, timeFromNow);
        }
    
        bool update()
        {
            if (!_futurMass.isCanceled())
            {
                _mass = _futurMass.result();
                _futurMass = QFuture<double>();
                return true;
            }
            return false;
        }
    
        QString str() {
            return QString("#%1 <mass %2> <isEvolving: %3>").arg(_id).arg(_mass).arg(!_futurMass.isCanceled());
        }
    
    private:
        ushort          _id;
        double          _mass;
        QFuture<double> _futurMass;
    
    public:
        static QThreadPool threadPool;
        static ushort       nextId;
    };
    ushort Element::nextId = 0;
    QThreadPool Element::threadPool;
    
    double computeMassEvolution(Element *elem, float timeAfter)
    {
        QThread::sleep(2);
        qDebug() << "Updating mass of elem " << elem->str();
        return timeAfter;
    }
    
    void useElem(Element &elem)
    {
        elem.update();
        qDebug() << "use " << elem.str();
    }
    
    void dumpElem(Element &elem)
    {
        qDebug() << "dump: " << elem.str();
    }
    
    void test_using_QFuture()
    {
        qDebug() << ">>>>>>>>>> Test 1 with QFuture";
        Element elemSrc(28);
        dumpElem(elemSrc);
        elemSrc.computeFuturMass(38);
        dumpElem(elemSrc);
    
        // Potentially doing other stuff
    
        Element elem2(elemSrc); // we wait for the task to be finished
    
        elem2.computeFuturMass(42);
        dumpElem(elem2);
    
        for (ushort i=0; i<5; ++i)
        {
            qDebug() << "Main Thread doing stuff...";
            QThread::sleep(1);
        }
    
        dumpElem(elem2); // we don't need to fetch the result
        useElem(elem2); // we want to fetch the result
    
        qDebug() << "<<<<<<<<<<< Test 1 with QFuture";
    }
    
    int main(int argc, char *argv[])
    {
        QCoreApplication a(argc, argv);
        qDebug() << "Ideal nb of Threads: " << QThread::idealThreadCount();
    
        test_using_QFuture();
    
        ushort maxThreads = 2;
        Element::threadPool.setMaxThreadCount(maxThreads);
    
        return a.exec();
    }
    
    


  • I think I've found a solution to do what I need by using a TaskManager (singleton) that will always be alive. That solve the problem of not knowing who from an Element or the task it launches could be destroyed first \o/

    So that this manager that will launch the Tasks for each Element. It keeps a Map "_activeTasks" of the couples (Element, Task).

    When the Task finishes (it inherits from both QObject and QRunnable), it sends a Direct signal to the TaskManager that will remove the entry from the map "_activeTasks" and fetch the results in another one "_results".

    The Task can so be auto delete in the ThreadPool. (The TaskManager own the ThreadPool but never remove tasks)
    Instead in the destructor of an Element, if it has launched a Task (it keeps an AtomicInt to know about it) then it will ask the TaskManager to cancel it. This has for action to remove the entry from the Map "_activeTasks" and set a Cancel AtomicInt on the Task that wouldl return at the beginning of the run method if it didn't start already.

    The TaskManager offer a getResult method that will wait on a Task to finish if needed. So I'm using a mutex and a WaitCondition that I give an handle to the Task to wake the Manager up.
    There is a extra Mutex to protect the 2 Maps _activeTasks and _results.

    It seems to work as expected.
    I still need to test it more.
    If someone is interested by the code, let me know and I'll publish it.
    Cheers



  • Here is the implementation I'll be using.
    Let me know if you've any comments.
    It's not big. The main classes are the LazyComputationManager that can schedule ComputationTasks in a QThreadpool based on requests from the Elements.
    ComputationTask is pure virtual as we could have several kind of computations.
    EvolutionTask is a simple example so I can test the framework in the main.cpp

    The basic idea is that when an Element ask for lazy evaluation computation, the ComputationTask will do a copy of the Element where the result will be stored so it doesn't impact straight away the source Element that will have to fetch the result when/if it wants to use it.


Log in to reply
 

Looks like your connection to Qt Forum was lost, please wait while we try to reconnect.