Solved AMQP-CPP ack not work
-
#include <QCoreApplication>
#include <QDebug>
#include <QString>
#include <QSocketNotifier>
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>class MyTcpHandler : public AMQP::TcpHandler
{
virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override
{
auto tun_fd_monitor = new QSocketNotifier(fd, QSocketNotifier::Read, nullptr);
QObject::connect(tun_fd_monitor, &QSocketNotifier::activated, [connection, flags](int fd){
connection->process(fd, flags);
});
tun_fd_monitor = new QSocketNotifier(fd, QSocketNotifier::Write, nullptr);
QObject::connect(tun_fd_monitor, &QSocketNotifier::activated, [connection, flags](int fd){
connection->process(fd, AMQP::writable);
});
}
};
int main(int argc, char *argv[])
{QCoreApplication a(argc, argv); MyTcpHandler myHandler; // address of the server AMQP::Address address("amqp://guest:guest@localhost/"); // create a AMQP connection object AMQP::TcpConnection connection(&myHandler, address); // and create a channel AMQP::TcpChannel channel(&connection); // use the channel object to call the AMQP method you like channel.declareExchange("my-exchange", AMQP::fanout); channel.declareQueue("my-queue"); channel.bindQueue("my-exchange", "my-queue", ""); // callback function that is called when the consume operation starts auto startCb = [](const std::string &consumertag) { std::cout << "consume operation started" << std::endl; }; // callback function that is called when the consume operation failed auto errorCb = [](const char *message) { std::cout << "consume operation failed" << std::endl; }; // callback operation when a message was received auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::cout << "message received: " << message.body()<<std::endl; // acknowledge the message qWarning()<<channel.ack(deliveryTag); }; // start consuming from the queue, and install the callbacks channel.consume("my-queue") .onReceived(messageCb) .onSuccess(startCb) .onError(errorCb); // start a transaction channel.startTransaction(); // publish a number of messages channel.publish("my-exchange", "my-key", "my first message"); channel.publish("my-exchange", "my-key", "another message"); // commit the transactions, and set up callbacks that are called when // the transaction was successful or not channel.commitTransaction() .onSuccess([]() { qWarning()<<__LINE__; }) .onError([](const char *message) { // none of the messages were published // now we have to do it all over again qWarning()<<__LINE__<<message; }); return a.exec();
}
Here is my code. I can consume the msg, but rabbitmq-server docs not remove the msg from queue. I guess channel.ack(deliveryTag) has something wrong.
-
Hi,
Do you have the same code not using Qt working ?