Managing Reconnections and Error Handling in amqplib Connections

hey!
I'd like to know if it's possible to manage reconnections within an amqplib connection?
I'd actually like to be able to handle errors coming from a connection.on('..') to completely recreate my layer on top with a new connection.

Thanks for your help!

export const createRabbitMQConnection = (config: QueueConfig, logger: Logger) =>
  Effect.acquireRelease(
    Effect.gen(function* () {
      yield* logger.info('Creating RabbitMQ connection');
      const connection = yield* Effect.tryPromise(() => amqp.connect(config.url));
      yield* logger.info('RabbitMQ connection created successfully');
      return connection;
    }).pipe(
      Effect.mapError((error) => {
        const errorMessage = error instanceof Error ? error.cause : String(error);
        return new QueueConnectionFailedError({
          reason: `Failed to create RabbitMQ Connection: ${errorMessage}`,
        });
      })
    ),
    (connection) =>
      Effect.promise(() => connection.close()).pipe(
        Effect.tap(() => logger.info('RabbitMQ connection closed successfully')),
        Effect.tapError((error) => logger.error(`Error closing RabbitMQ connection: ${error}`)),
        Effect.ignore
      )
  );
Was this page helpful?