Important: Please read the Qt Code of Conduct - https://forum.qt.io/topic/113070/qt-code-of-conduct

AMQP-CPP ack not work



  • #include <QCoreApplication>
    #include <QDebug>
    #include <QString>
    #include <QSocketNotifier>
    #include <amqpcpp.h>
    #include <amqpcpp/linux_tcp.h>

    class MyTcpHandler : public AMQP::TcpHandler
    {
    virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override
    {
    auto tun_fd_monitor = new QSocketNotifier(fd, QSocketNotifier::Read, nullptr);
    QObject::connect(tun_fd_monitor, &QSocketNotifier::activated, [connection, flags](int fd){
    connection->process(fd, flags);
    });
    tun_fd_monitor = new QSocketNotifier(fd, QSocketNotifier::Write, nullptr);
    QObject::connect(tun_fd_monitor, &QSocketNotifier::activated, [connection, flags](int fd){
    connection->process(fd, AMQP::writable);
    });
    }
    };
    int main(int argc, char *argv[])
    {

    QCoreApplication a(argc, argv);
    MyTcpHandler myHandler;
    
    // address of the server
    AMQP::Address address("amqp://guest:guest@localhost/");
    
    // create a AMQP connection object
    AMQP::TcpConnection connection(&myHandler, address);
    
    // and create a channel
    AMQP::TcpChannel channel(&connection);
    
    // use the channel object to call the AMQP method you like
    channel.declareExchange("my-exchange", AMQP::fanout);
    channel.declareQueue("my-queue");
    channel.bindQueue("my-exchange", "my-queue", "");
    
    // callback function that is called when the consume operation starts
    auto startCb = [](const std::string &consumertag) {
        
        std::cout << "consume operation started" << std::endl;
    };
    
    // callback function that is called when the consume operation failed
    auto errorCb = [](const char *message) {
        
        std::cout << "consume operation failed" << std::endl;
    };
    
    // callback operation when a message was received
    auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) {
        
        std::cout << "message received: " << message.body()<<std::endl;
        
        // acknowledge the message
        qWarning()<<channel.ack(deliveryTag);
    };
    
    // start consuming from the queue, and install the callbacks
    channel.consume("my-queue")
    .onReceived(messageCb)
    .onSuccess(startCb)
    .onError(errorCb);
    
    // start a transaction
    channel.startTransaction();
    
    // publish a number of messages
    channel.publish("my-exchange", "my-key", "my first message");
    channel.publish("my-exchange", "my-key", "another message");
    
    // commit the transactions, and set up callbacks that are called when
    // the transaction was successful or not
    channel.commitTransaction()
    .onSuccess([]() {
        qWarning()<<__LINE__;
    })
    .onError([](const char *message) {
        // none of the messages were published
        // now we have to do it all over again
        qWarning()<<__LINE__<<message;
    });
    return a.exec();
    

    }

    Here is my code. I can consume the msg, but rabbitmq-server docs not remove the msg from queue. I guess channel.ack(deliveryTag) has something wrong.


  • Lifetime Qt Champion

    Hi,

    Do you have the same code not using Qt working ?


Log in to reply