AMQP-CPP: Broken pipe error in TCP Handler

Multi tool use
AMQP-CPP: Broken pipe error in TCP Handler
Unfortunately, in my project I always end up in the onError function in the event handler with the error message "Broken pipe". Unfortunately, I never get into the onConnected state. The monitor Funktion in the Event Handler is called twice with the Flag AMQP::readable. After that, it is called with no flags set, that is the time when my pipe gets broken.
Here is what I do in my code.
First I open the connection:
int Communicator_RabbitMQ::Open(string device)
{
AMQP::Address address(AMQP::Address("amqp://test:test@localhost/"));
// make a connection
m_connection = std::make_shared< AMQP::TcpConnection> (&oCommunicator_RabbitMQ_Handler, address);
// we need a channel too
m_channel = std::make_shared <AMQP::TcpChannel> (m_connection.get());
m_channel->declareExchange("my-exchange", AMQP::fanout);
m_channel->declareQueue("my-queue");
m_channel->bindQueue("my-exchange", "my-queue", "my-routing-key");
m_channel->declareExchange("cyos_tx_exchange", AMQP::direct);
m_channel->declareQueue("cyos_queue");
m_channel->bindQueue("cyos_tx_exchange", "cyos_queue", "");
return true;
}
then I call the read function cyclically in my thread:
string Communicator_RabbitMQ::Read()
{
int result = 0;
int maxfd = 1;
struct timeval tv
{
1, 0
};
string returnValue; //Rückgabe der Methode
string message; // Nachricht aus RabbitMQ
try
{
FD_ZERO(&oCommunicator_RabbitMQ_Handler.m_set);
FD_SET(oCommunicator_RabbitMQ_Handler.m_fd, &oCommunicator_RabbitMQ_Handler.m_set);
if (oCommunicator_RabbitMQ_Handler.m_fd != -1)
{
maxfd = oCommunicator_RabbitMQ_Handler.m_fd + 1;
}
result = select(FD_SETSIZE, &oCommunicator_RabbitMQ_Handler.m_set, NULL, NULL, &tv);
if ((result == -1) && errno == EINTR)
{
TRACE(L"Error in socket");
}
else if (result > 0)
{
if (oCommunicator_RabbitMQ_Handler.m_flags & AMQP::readable)
TRACE(L"Got something");
if (FD_ISSET(oCommunicator_RabbitMQ_Handler.m_fd, &oCommunicator_RabbitMQ_Handler.m_set))
{
m_connection->process(oCommunicator_RabbitMQ_Handler.m_fd, oCommunicator_RabbitMQ_Handler.m_flags);
}
}
}
catch (exception e)
{
cout << e.what();
}
return "";
}
Here is the TCP Event Handler:
#pragma once
class Communicator_RabbitMQ_Handler : public AMQP::TcpHandler
{
private:
/**
* Method that is called when the connection succeeded
* @param socket Pointer to the socket
*/
virtual void onConnected(AMQP::TcpConnection* connection)
{
std::cout << "connected" << std::endl;
}
/**
* When the connection ends up in an error state this method is called.
* This happens when data comes in that does not match the AMQP protocol
*
* After this method is called, the connection no longer is in a valid
* state and can be used. In normal circumstances this method is not called.
*
* @param connection The connection that entered the error state
* @param message Error message
*/
virtual void onError(AMQP::TcpConnection* connection, const char* message)
{
// report error
std::cout << "AMQP TCPConnection error: " << message << std::endl;
}
/**
* Method that is called when the connection was closed.
* @param connection The connection that was closed and that is now unusable
*/
virtual void onClosed(AMQP::TcpConnection* connection)
{
std::cout << "closed" << std::endl;
}
/**
* Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
* @param connection The TCP connection object that is reporting
* @param fd The filedescriptor to be monitored
* @param flags Should the object be monitored for readability or writability?
*/
virtual void monitor(AMQP::TcpConnection* connection, int fd, int flags)
{
//TRACE(L"Communicator_RabbitMQ_Handler, monitor called, %d, %d, %x", fd, flags, &m_set);
// we did not yet have this watcher - but that is ok if no filedescriptor was registered
if (flags == 0)
return;
if (flags & AMQP::readable)
{
FD_SET(fd, &m_set);
m_fd = fd;
m_flags = flags;
}
}
public:
Communicator_RabbitMQ_Handler() = default;
int m_fd = -1;
int m_flags = 0;
fd_set m_set;
};
RabbitMQ Log entry:
2018-07-02 07:04:50.272 [info] <0.9653.0> accepting AMQP connection <0.9653.0> ([::1]:39602 -> [::1]:5672)
2018-07-02 07:04:50.273 [warning] <0.9653.0> closing AMQP connection <0.9653.0> ([::1]:39602 -> [::1]:5672):
{handshake_timeout,handshake}
1 Answer
1
I finally fixed this problem by increasing the hanshake timeout to 20 seconds in the rabbitmq.config file. I just added the following in that file:
handshake_timeout = 20000
The value is given in milliseconds, an the default is 10 seconds, which seems not enough for my solution.
No, it runs on a Pi 3 Model B. How can I detect a possible performance problem?
– selbolder
Jul 4 at 6:05
Which is running on the Pi - RabbitMQ or your C++ app? If it's RabbitMQ, starting a new AMQP connection may be slow that your fix is required.
– Luke Bakken
Jul 5 at 13:29
Both are running on the Pi.
– selbolder
Jul 5 at 13:46
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
That is an unusual fix - is your RabbitMQ server or client application running on a slow virtual machine?
– Luke Bakken
Jul 3 at 14:50