AsyncFuture v0.3 Released - Use QFuture like a Promise Object


  • Qt Champions 2016

    Hello,

    AynscFuture v0.3 has been released. It is a C++ library that could convert a signal into a QFuture type and use it like a Promise object in Javascript. It could chain multiple QFuture into a sequence for advanced workflow.

    Project Site
    https://github.com/benlau/asyncfuture

    API Document and example code are available


    QFuture is usually used together with QtConcurrent to represent the result of an asynchronous computation. It is a powerful component for multi-thread programming. But its usage is limited to the result of threads. And QtConcurrent only provides MapReduce / FilterReduce model that may not fit your use cases. Moreover, it doesn't work with the asynchronous signal emitted by QObject. And it is a bit trouble to setup the listener function via QFutureWatcher.

    AsyncFuture is designed to enhance the function to offer a better way to use it for asynchronous programming. It provides a Promise object like interface. This project is inspired by AsynQt and RxCpp.

    Remarks: You may use this project together with QuickFuture for QML programming.

    Features

    1. Convert a signal from QObject into a QFuture object

    
    #include "asyncfuture.h"
    using namespace AsyncFuture;
    
    // Convert a signal from QObject into a QFuture object
    
    QFuture<void> future = observe(timer,
                                   &QTimer::timeout).future();
    
    /* Listen from the future without using QFutureWatcher<T>*/
    observe(future).subscribe([]() {
        // onCompleted. It is invoked when the observed future is finished successfully
        qDebug() << "onCompleted";
    },[]() {
        // onCanceled
        qDebug() << "onCancel";
    });
    
    /* It is chainable. Listen from a timeout signal only once */
    observe(timer, &QTimer::timeout).subscribe([=]() { /*…*/ });
    

    2. Combine multiple futures with different type into a single future object

    /* Combine multiple futures with different type into a single future */
    
    QFuture<QImage> f1 = QtConcurrent::run(readImage, QString("image.jpg"));
    
    QFuture<void> f2 = observe(timer, &QTimer::timeout).future();
    
    QFuture<QImage> result = (combine() << f1 << f2).subscribe([=](){
        // Read an image but do not return before timeout
        return f1.result();
    }).future();
    
    

    3. Use QFuture like a Promise object

    Create a QFuture and Complete / cancel it by yourself.

    // Complete / cancel a future on your own choice
    auto d = deferred<bool>();
    
    observe(d.future()).subscribe([]() {
        qDebug() << "onCompleted";
    }, []() {
        qDebug() << "onCancel";
    });
    
    d.complete(true); // or d.cancel();
    
    QCOMPARE(d.future().isFinished(), true);
    QCOMPARE(d.future().isCanceled(), false);
    
    

    Complete / cancel a future according to another future object.

    // Complete / cancel a future according to another future object.
    
    auto d = deferred<void>();
    
    d.complete(QtConcurrent::run(timeout));
    
    QCOMPARE(d.future().isFinished(), false);
    QCOMPARE(d.future().isCanceled(), false);
    
    

    Read a file. If timeout, cancel it.

    
    auto timeout = observe(timer, &QTimer::timeout).future();
    
    auto defer = deferred<QString>();
    
    defer.complete(QtConcurrent::run(readFileworker, fileName));
    defer.cancel(timeout);
    
    return defer.future();
    

    4. Chainable Future - Advanced multi-threading programming model

    Futures can be chained into a sequence of process. And represented by a single future object.

    /* Start a thread and process its result in main thread */
    
    QFuture<QImage> reading = QtConcurrent::run(readImage, QString("image.jpg"));
    
    QFuture<bool> validating = observe(reading).context(contextObject, validator).future();
    
        // Read image by a thread, when it is ready, run the validator function
        // in the thread of the contextObject(e.g main thread)
        // And it return another QFuture to represent the final result.
    
    /* Start a thread and process its result in main thread, then start another thread. */
    
    QFuture<int> f1 = QtConcurrent::mapped(input, mapFunc);
    
    QFuture<int> f2 = observe(f1).context(contextObject, [=](QFuture<int> future) {
        // You may use QFuture as the input argument of your callback function
        // It will be set to the observed future object. So that you may obtain
        // the value of results()
    
        qDebug() << future.results();
    
        // Return another QFuture is possible.
        return QtConcurrent::run(reducerFunc, future.results());
    }).future();
    
    // f2 is constructed before the QtConcurrent::run statement
    // But its value is equal to the result of reducerFunc
    
    

    More examples are available at : asyncfuture/example.cpp at master · benlau/asyncfuture

    Installation

    AsyncFuture is a single header library. You could just download the asyncfuture.h in your source tree or install it via qpm

    qpm install async.future.pri
    

    or

    wget https://raw.githubusercontent.com/benlau/asyncfuture/master/asyncfuture.h
    

    API

    Still under construction

    AsyncFuture::observe(QObject* object, PointerToMemberFunc signal)

    This function creates an Observable<ARG> object which contains a future to represent the result of the signal. You could obtain the future by the future() method. And observe the result by subscribe() / context() methods

    The ARG type is equal to the first parameter of the signal. If the signal does not contain any argument, ARG will be void. In case it has more than one argument, the rest will be ignored.

    QFuture<void> f1 = observe(timer, &QTimer::timeout).future();
    QFuture<bool> f2 = observe(button, &QAbstractButton::toggled).future();
    

    See Observable<T>

    AsyncFuture::observe(QFuture<T> future)

    This function creates an Observable<T> object which provides an interface for observing the input future. See Observable<T>

    // Convert a signal from QObject into QFuture
    QFuture<bool> future = observe(button, &QAbstractButton::toggled).future();
    
    
    // Listen from the future without using QFutureWatcher<T>
    observe(future).subscribe([](bool toggled) {
        // onCompleted. It is invoked when the observed future is finished successfully
        qDebug() << "onCompleted";
    },[]() {
        // onCanceled
        qDebug() << "onCancel";
    });
    
    observe(future).context(context, [](bool toggled) {
        // simialr to subscribe, but this function will not be invoked if context object
        // is destroyed.
    });
    
    

    AsyncFuture::combine(CombinatorMode mode = FailFast)

    This function creates a Combinator object (inherit Observable<void>) for combining multiple future objects with different type into a single future.

    For example:

    
    QFuture<QImage> f1 = QtConcurrent::run(readImage, QString("image.jpg"));
    QFuture<void> f2 = observe(timer, &QTimer::timeout).future();
    
    QFuture<QImage> result = (combine(AllSettled) << f1 << f2).subscribe([=](){
        // Read an image but do not return before timeout
        return f1.result();
    }).future();
    
    

    Once all the observed futures finished, the contained future will be finished too. And it will be cancelled immediately if any one of the observed future is cancelled in fail fast mode. In case you wish the cancellation take place after all the futures finished, you should set mode to AllSettled.

    AsyncFuture::deferred<T>()

    The deferred() function return a Deferred object that allows you to set QFuture completed/cancelled manually.

    auto d = deferred<bool>();
    
    observe(d.future()).subscribe([]() {
        qDebug() << "onCompleted";
    }, []() {
        qDebug() << "onCancel";
    });
    
    d.complete(true); // or d.cancel();
    

    See Deferred<T>

    AsyncFuture Class Diagram

    Observable<T>

    Obsevable<T> is a chainable utility for observing a QFuture object. It is created by the observe() function. It can register multiple callbacks to be triggered in different situations. And that will create a new Observable<T> / QFuture object to represent the result of the callback function. It may even call QtConcurrent::run() within the callback function to create a thread. Therefore, it could create a more complex/flexible workflow.

    QFuture<T> future()

    Obtain the QFuture object to represent the result.

    Observable<R> context(QObject* contextObject, Completed onCompleted)

    Add a callback function that listens to the finished signal from the observing QFuture object. The callback won't be triggered if the future is cancelled.

    The callback is invoked in the thread of the context object, In case the context object is destroyed before the finished signal, the callback function won't be triggered and the returned Observable object will cancel its future.

    The return value is an Observable<R> object where R is the return type of the onCompleted callback.

    
    auto validator = [](QImage input) -> bool {
       /* A dummy function. Return true for any case. */
       return true;
    };
    
    QFuture<QImage> reading = QtConcurrent::run(readImage, QString("image.jpg"));
    
    QFuture<bool> validating = observe(reading).context(contextObject, validator).future();
    

    In the above example, the result of validating is supposed to be true. However, if the contextObject is destroyed before reading future finished, it will be cancelled and the result will become undefined.

    Observable<T> subscribe(Completed onCompleted, Canceled onCanceled)

    Observable<T> subscribe(Completed onCompleted);
    Observable<T> subscribe(Completed onCompleted, Canceled onCanceled);
    

    Register a onCompleted and/or onCanceled callback to the observed QFuture object. Unlike the context() function, the callbacks will be triggered as long as the current thread exists. The return value is an Observable<R> object where R is the return type of the onCompleted callback.

    QFuture<bool> future = observe(button, &QAbstractButton::toggled).future();
    
    // Listen from the future without using QFutureWatcher<T>
    observe(future).subscribe([](bool toggled) {
        // onCompleted. It is invoked when the observed future is finished successfully
        qDebug() << "onCompleted";
    },[]() {
        // onCanceled
        qDebug() << "onCancel";
    });
    
    

    Completed Callback Funcion

    In subscribe() / context(), you may pass a function with zero or one argument as the onCompleted callback. If you give it an argument, the type must be either of T or QFuture<T>. That would obtain the result or the observed future itself.

    QFuture<QImage> reading = QtConcurrent::run(readImage, QString("image.jpg"));
    
    observe(reading).subscribe([]() {
    });
    
    observe(reading).subscribe([](QImage result) {
    });
    
    observe(reading).subscribe([](QFuture<QImage> future) {
      // In case you need to get future.results
    });
    

    The return type can be none or any kind of value. That would determine what type of Observable<R> generated by context()/subscribe().

    In case, you return a QFuture object. Then the new Observable<R> object will be deferred to complete/cancel until your future object is resolved. Therefore, you could run QtConcurrent::run within your callback function to make a more complex/flexible multi-threading programming models.

    
    QFuture<int> f1 = QtConcurrent::mapped(input, mapFunc);
    
    QFuture<int> f2 = observe(f1).context(contextObject, [=](QFuture<int> future) {
        // You may use QFuture as the input argument of your callback function
        // It will be set to the observed future object. So that you may obtain
        // the value of results()
    
        qDebug() << future.results();
    
        // Return another thread is possible.
        return QtConcurrent::run(reducerFunc, future.results());
    }).future();
    
    // f2 is constructed before the QtConcurrent::run statement
    // But its value is equal to the result of reducerFunc
    
    

    Deferred<T>

    The deferred<T>() function return a Deferred<T> object that allows you to manipulate a QFuture manually. The future() function return a running QFuture<T>. You have to call Deferred.complete() / Deferred.cancel() to trigger the status changes.

    The usage of complete/cancel in a Deferred object is pretty similar to the resolve/reject in a Promise object. You could complete a future by calling complete with a result value. If you give it another future, then it will observe the input future and change status once that is finished.

    Auto Cancellation

    The Deferred<T> object is an explicitly shared class. You may own multiple copies and they are pointed to the same piece of shared data. In case, all of the instances are destroyed, it will cancel its future automatically.

    But there has an exception if you have even called Deferred.complete(QFuture<T>) / Deferred.cancel(QFuture<ANY>) then it won't cancel its future due to destruction. That will leave to the observed future to determine the final state.

      QFuture<void> future;
      {
    
        auto defer = deferred<void>();
        future = defer.future();
      }
      QCOMPARE(future.isCanceled(), true); // Due to auto cancellation
    
      QFuture<void> future;
      {
    
        auto defer = deferred<void>();
        future = defer.future();
        defer.complete(QtConcurrent::run(worker));
      }
      QCOMPARE(future.isCanceled(), false);
    

    complete(T) / complete()

    Complete this future object with the given arguments

    complete(QFuture<T>)

    This future object is deferred to complete/cancel. It will adopt the state from the input future. If the input future is completed, then it will be completed too. That is same for cancel.

    cancel()

    Cancel the future object

    cancel(QFuture<ANY>)

    This future object is deferred to cancel according to the input future. Once it is completed, this future will be cancelled. However, if the input future is cancelled. Then this future object will just ignore it. Unless it fulfils the auto-cancellation rule.


  • Qt Champions 2017

    Hi Ben,
    This looks pretty useful, but as usual I'm about to be the voice of dissent ...
    I'm not template savvy enough to give you a detailed annotations on the code, but I'd like to point out what I'd like/expect of the API as an user.

    Convert a signal from QObject into a QFuture object

    observe(future).subscribe([]() {
        // onCompleted. It is invoked when the observed future is finished successfully
        qDebug() << "onCompleted";
    },[]() {
        // onCanceled
        qDebug() << "onCancel";
    });
    

    seems a bit inflexible. I'd go with providing a separate method for each of the callbacks. This way you can add also onError at some point if that makes sense. I'd much prefer to have it like this:

    observe(future)
        .onCompleted( [] () {
           // ... some stuff
        })
        .onCanceled( [] () {
            // ... other stuff
        });
    

    This should be easily achievable by just returning a reference to this from the onWhateverHappened functions.

    Combine multiple futures with different type into a single future object

    I'd really love to use this:

    QFuture<QImage> result = (combine() << f1 << f2).subscribe([=](){
        // Read an image but do not return before timeout
        return f1.result();
    }).future();
    

    as:

    QFuture<QImage> someFuture = QtConcurrent::run(readImage, QString("image.jpg"));
    
    observe(f1).observe(timer, &QTimer::timeout).onCompleted([=] ()  {
        // This would be equivalent to calling: combine() << ... .subscribe
    });
    

    again, just chaining the operations seems much more intuitive.

    Use QFuture like a Promise object

    Is there a specific reason why there is a different way to handle this case? You could in principle provide this as a standard functionality, couldn't you?

    For example,

    Observable<bool> promise;
    promise.observe( ... ).onCompleted( [] () {
    });
    
    promise.cancel(); // No argument, or future, or Observable< ...> or signal
    

    Or modifying the other example:

    Observable<QString> promise;
    
    promise
        .onCompleted([] () {
            // We read whatever it is we read
        })
        .complete(QtConcurrent::run(readFileworker, fileName))
        .cancel(timer, &QTimer::timeout);
    

    Chainable Future - Advanced multi-threading programming model

    This gets rather complicated, but I believe you're better of using your own class throughout and use the QFuture only to integrate into the existing framework. By the way here I don't quite get what's the purpose of the context object. Could you elaborate a bit?

    In conclusion

    All in all I think you could safely aggregate all the functionality under one roof and it seems to be a useful abstraction over the typical threading techniques.


  • Qt Champions 2016

    Hi @kshegunov,

    Thanks for suggestion.

    @kshegunov said in AsyncFuture v0.3 Released - Use QFuture like a Promise Object:

    Hi Ben,
    This looks pretty useful, but as usual I'm about to be the voice of dissent ...
    I'm not template savvy enough to give you a detailed annotations on the code, but I'd like to point out what I'd like/expect of the API as an user.

    Convert a signal from QObject into a QFuture object

    observe(future).subscribe([]() {
        // onCompleted. It is invoked when the observed future is finished successfully
        qDebug() << "onCompleted";
    },[]() {
        // onCanceled
        qDebug() << "onCancel";
    });
    

    seems a bit inflexible. I'd go with providing a separate method for each of the callbacks. This way you can add also onError at some point if that makes sense. I'd much prefer to have it like this:

    observe(future)
        .onCompleted( [] () {
           // ... some stuff
        })
        .onCanceled( [] () {
            // ... other stuff
        });
    

    This should be easily achievable by just returning a reference to this from the onWhateverHappened functions.

    It may add an onCanceled() function but onCompleted and onCanceled can not be chained. Because all the observer function (subscribe and context) will return a new QFuture to represent the result.

    auto future = observe(input).subscribe().future(); // still use subscribe, as onCompleted is not supported yet.
    

    The output future represents the result of subscribe. So future.onCanceled is not observing the first future.

    Combine multiple futures with different type into a single future object

    I'd really love to use this:

    QFuture<QImage> result = (combine() << f1 << f2).subscribe([=](){
        // Read an image but do not return before timeout
        return f1.result();
    }).future();
    

    as:

    QFuture<QImage> someFuture = QtConcurrent::run(readImage, QString("image.jpg"));
    
    observe(f1).observe(timer, &QTimer::timeout).onCompleted([=] ()  {
        // This would be equivalent to calling: combine() << ... .subscribe
    });
    

    again, just chaining the operations seems much more intuitive.

    I have considered similar design like observe(f1).combine(f2).combine(f3) .... but I have given up the idea. Firstly, it violates the rule of returning a new future mentioned in the previous answer. And then, when a user is doing a chain, they would assume the process imply a sequence but a combination is not a sequence. It is a parallel process and support fail fast mode / all settled mode.

    Although "operator<<" is also a kind of chaining, I think that it is less confusing as the syntax is different.

    Use QFuture like a Promise object

    Is there a specific reason why there is a different way to handle this case? You could in principle provide this as a standard functionality, couldn't you?

    For example,

    Observable<bool> promise;
    promise.observe( ... ).onCompleted( [] () {
    });
    
    promise.cancel(); // No argument, or future, or Observable< ...> or signal
    

    What I want to achieve is using a same mechanism to handle concurrent and asynchronous function. AsyncFuture does not deal with multi-threading issue directly as QtConcurrent already do it best. If a user returns an Observable of their function, then the behaviour of an asynchronous function is different then a concurrent function (e.g the function may be a library not written by the user).

    Therefore, it is always suggested to return a QFuture from a function instead of Observable. Observable should be an interface class to manipulate QFuture only.

    Or modifying the other example:

    Observable<QString> promise;
    
    promise
        .onCompleted([] () {
            // We read whatever it is we read
        })
        .complete(QtConcurrent::run(readFileworker, fileName))
        .cancel(timer, &QTimer::timeout);
    

    The complete(QtConcurrent::run(readFileworker, fileName)) will be executed immediately instead of waiting the result of previous future. To avoid this problem, it need the mechanism mentioned in feature 4 - "Chainable Future"

    Chainable Future - Advanced multi-threading programming model

    This gets rather complicated, but I believe you're better of using your own class throughout and use the QFuture only to integrate into the existing framework. By the way here I don't quite get what's the purpose of the context object. Could you elaborate a bit?

    The callback registered by context is invoked in the thread of the context object, In case the context object is destroyed before the finished signal of the observed future, the callback function won't be triggered and the returned Observable object will cancel its future.

    It is supposed to use for a fast exit condition. For example, if you have a FileReader object in a page of QML application. User quit the current page before the reader is finished. Then the FileReader object will be destroyed before the callback registered by subscribe Use context could avoid extra checking.

    Anyway, I think that is quite confusing. I would take it out of the example code.

    In conclusion

    All in all I think you could safely aggregate all the functionality under one roof and it seems to be a useful abstraction over the typical threading techniques.


  • Qt Champions 2017

    @benlau said in AsyncFuture v0.3 Released - Use QFuture like a Promise Object:

    Firstly, it violates the rule of returning a new future mentioned in the previous answer.

    Well I think this is where we disagree actually. I firmly believe you should not really abide by such a rule, but rather return your object(s) directly and if you need to be compatible with some QFuture expecting function perhaps you could add an implicit cast operator (i.e. your type can decay to QFuture<> whenever that's required).


  • Qt Champions 2016

    @kshegunov said in AsyncFuture v0.3 Released - Use QFuture like a Promise Object:

    @benlau said in AsyncFuture v0.3 Released - Use QFuture like a Promise Object:

    Firstly, it violates the rule of returning a new future mentioned in the previous answer.

    Well I think this is where we disagree actually. I firmly believe you should not really abide by such a rule, but rather return your object(s) directly and if you need to be compatible with some QFuture expecting function perhaps you could add an implicit cast operator (i.e. your type can decay to QFuture<> whenever that's required).

    The problem is not on it uses an own custom object or QFuture. Because the real assumption is - Calling a chain function represents a new step of a flow. Therefore observe(f1).onCompleted().onCancel() and observe(f1).combine(f2).combine(f3) are not a good way to use. The first statement breaks the assumption totally (no matter do I use a custom object/ QFuture). The second statement should represent a sequence of steps, but a combination is a parallel process which has no dependence on each other.


  • Qt Champions 2016

    Let's take an example to show why the rule/design principle is important.

    Requirements: Read a file by a thread, then upload to a server. It should show a loading spinner until it is finished.

    QFuture<void> ExampleClass::readAndUpload(QString file) {
      auto read = [](QString file) -> QByteArray {
        // ...
      };
    
     auto upload = [](QByteArray data) {
       // ...
       return observe(reply, &QNetworkReply::finished)
     };
     
    auto cleanup = []() {
        qDebug() << "Done"; // Now it only print a done on finished, but it could be changed to do validation / state changes 
    };
    
    return observe(QtConcurrent::run(read, file)).subscribe(upload).subscribe(cleanup).future()
    }
    

    That is a typical example of an asynchronous flow with multiple steps. That may look complicated, but this kind of problem is complicated anyway. Alternative solutions (except async/await) are usually more troublesome to manage.

    AsyncFuture solves this by chaining the callback functions. A callback function may take input from the previous step and return another QFuture for next step to wait for (optional).

    Therefore, it is very important that a chain function (subscribe) should return a new object to represent a new step. As it doesn't know how many steps are needed, user terminate the chain when he/she think that is enough.

    observe(f1).combine(f2).combine(f3) and observe(future).onCompleted().onCanceled() are bad because they returns an existing object instead of create a new (No matter that is QFuture or custom Promise). The new API are inconsistent with subscribe . But subscribe is more important because it solves the most troublesome problem in asynchronous programming.

    p.s That should be fine.

    auto observer = observe(future);
    observer.onCompleted(callback1);
    observer.onCanceled(callback2);
    // observer.onCompleted(callback1).onCanceled(callback2) are not equivalent to above code
    

Log in to reply