Spring RabbitMQ - How to perform application recovery, on RabbitMQ reconnection

I connect to RabbitMQ through a CachingConnectionFactory. This has a simple, built in - forever retry mechanism to recover. To support dynamic queue management (creation, listening via API), I manipulate the RabbitListenerEndpointRegistry registry; When the connection to RabbitMQ is interrupted (any network outage), the recovery mechanism is successful. However, the registry is initialized, by Spring. I have been unable to find a place to hook into the reconnection flow, such that I can rebuild my registry on reconnection. The registry is leveraged by a single @RabbitListener Where can I execute my own recovery function, eg dbDao.getActiveQueues().forEach(queueName -> getMessageListenerContainerById(RECEIVED_FROM_CLIENT).addQueueNames(queueName));
8 Replies
JavaBot
JavaBot3w ago
This post has been reserved for your question.
Hey @Jack9! Please use /close or the Close Post button above when your problem is solved. Please remember to follow the help guidelines. This post will be automatically marked as dormant after 300 minutes of inactivity.
TIP: Narrow down your issue to simple and precise questions to maximize the chance that others will reply in here.
Hosti
Hosti3w ago
Maybe something added as an annotation to the @RabbitListener(id = RECEIVED_FROM_CLIENT, autoStartup = "true", containerFactory = "rabbitListenerContainerFactory") Then defined: @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory, ObjectMapper objectMapper
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter(objectMapper));
factory.setPrefetchCount(0);
factory.setConcurrentConsumers(rabbitMqConcurrentConsumers);
factory.setMaxConcurrentConsumers(rabbitMqMaxConcurrentConsumers);
factory.setAdviceChain(appRecovery()); <-----
return factory;
}
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter(objectMapper));
factory.setPrefetchCount(0);
factory.setConcurrentConsumers(rabbitMqConcurrentConsumers);
factory.setMaxConcurrentConsumers(rabbitMqMaxConcurrentConsumers);
factory.setAdviceChain(appRecovery()); <-----
return factory;
}
This message has been formatted automatically. You can disable this using /preferences.
daysling
daysling3w ago
@Jack9 Have you tried to listen to ConnectionRecoveredEvent? You can rebuild your registry in the listener it's called connectionlistener or snth afaik remember Ugh I am in my bed rn otherwise I'd have checked it out for you
Hosti
Hosti3w ago
I gave it a try. No dice.
public class RabbitConnectionListener implements RecoveryListener {
@Override
public void handleRecovery(Recoverable recoverable) {
System.out.println("RabbitConnectionListener handleRecovery");
}

@Override
public void handleRecoveryStarted(Recoverable recoverable) {
System.out.println("RabbitConnectionListener handleRecoveryStarted");
}
}

@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitMqHost);
connectionFactory.setPort(Integer.parseInt(rabbitMqPort));
connectionFactory.setVirtualHost(rabbitMqVirtualhost);
connectionFactory.setUsername(rabbitMqUsername);
connectionFactory.setPassword(rabbitMqPassword);
connectionFactory.setRecoveryListener(recoveryListener()); <--- bean for class above
return connectionFactory;
}
public class RabbitConnectionListener implements RecoveryListener {
@Override
public void handleRecovery(Recoverable recoverable) {
System.out.println("RabbitConnectionListener handleRecovery");
}

@Override
public void handleRecoveryStarted(Recoverable recoverable) {
System.out.println("RabbitConnectionListener handleRecoveryStarted");
}
}

@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitMqHost);
connectionFactory.setPort(Integer.parseInt(rabbitMqPort));
connectionFactory.setVirtualHost(rabbitMqVirtualhost);
connectionFactory.setUsername(rabbitMqUsername);
connectionFactory.setPassword(rabbitMqPassword);
connectionFactory.setRecoveryListener(recoveryListener()); <--- bean for class above
return connectionFactory;
}
These functions do not trigger on recovery.
This message has been formatted automatically. You can disable this using /preferences.
Jack9
Jack9OP3w ago
Maybe I need a diff version with @EventListener, but I have nothing to go on The ConnectionEventListener interface is only for connectionClosed() and connectionErrorOccurred()
JavaBot
JavaBot3w ago
If you are finished with your post, please close it. If you are not, please ignore this message. Note that you will not be able to send further messages here after this post have been closed but you will be able to create new posts.
Hosti
Hosti3w ago
The answer is: @Component
public class AmqConnectionRecoveryListener implements ConnectionListener, ApplicationListener<ContextRefreshedEvent> {

@Autowired
private final ConnectionFactory connectionFactory;

@PostConstruct
void registerConnectionListener() {
try {
connectionFactory.addConnectionListener(this);
log.info("Registered ConnectionListener with RabbitMQ ConnectionFactory");
} catch (Exception e) {
log.warn("Unable to register ConnectionListener: {}", e.getMessage());
}
}

@Override
public void onCreate(org.springframework.amqp.rabbit.connection.Connection connection) {
rebuildDynamicQueues();
}

@Override
public void onClose(org.springframework.amqp.rabbit.connection.Connection connection) {

}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
rebuildDynamicQueues();
}

public void rebuildDynamicQueues() { .... }
}
public class AmqConnectionRecoveryListener implements ConnectionListener, ApplicationListener<ContextRefreshedEvent> {

@Autowired
private final ConnectionFactory connectionFactory;

@PostConstruct
void registerConnectionListener() {
try {
connectionFactory.addConnectionListener(this);
log.info("Registered ConnectionListener with RabbitMQ ConnectionFactory");
} catch (Exception e) {
log.warn("Unable to register ConnectionListener: {}", e.getMessage());
}
}

@Override
public void onCreate(org.springframework.amqp.rabbit.connection.Connection connection) {
rebuildDynamicQueues();
}

@Override
public void onClose(org.springframework.amqp.rabbit.connection.Connection connection) {

}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
rebuildDynamicQueues();
}

public void rebuildDynamicQueues() { .... }
}
This message has been formatted automatically. You can disable this using /preferences.
JavaBot
JavaBot3w ago
Post Closed
This post has been closed by <@374014726348210176>.

Did you find this page helpful?