Creating a NATS Stream Wrapper: Seeking Guidance on Subscription Implementation

Hello! looking some help on creating an Stream, im trying to do a wrapper for Nats, the API of nat i can subscribe to events from a Subject what i am doing wrong here ?
thank you in advance!! keep learning this amazing library 🚀
// Define the interface for the resource
export interface NatsStream {
  readonly natsClient: NatsConnection
  readonly subscription: (subject: string) => Subscription
  readonly close: () => Promise<void>
}

// Simulate getting the resource
const getNatsStream = async ({ servers = ['0.0.0.0:4222'] }: { servers?: string[] } = {}): Promise<NatsStream> => {
  const options: ConnectionOptions = {
    servers: servers.map((host) => `nats://${host}`),
    timeout: 10 * 1000, // 10s
    maxReconnectAttempts: 10,
  }

  const natsClient = await connect(options)

  return {
    natsClient,
    subscription: (subject: string) => {
      return natsClient.subscribe(subject, {
        callback: (err, msg) => {
          if (err) {
            return T.fail(err)
          }
          return T.succeed(msg)
        }
      })
    },
    close: () => natsClient.close(),
  }
}

// Define the acquisition of the resource with error handling
const acquire = T.gen(function* (_) {
  const config = yield* _(EnvConfig)
  return T.tryPromise({
    try: () => getNatsStream({ servers: config.natsServers.split(',') }),
    catch: (error) => new NatsError({ cause: error }),
  })
}).pipe(
  T.flatMap((e) => e),
  T.tap(() => T.logInfo('Nats acquired.....')),
)

// Define the release of the resource
const release = (resource: NatsStream) =>
  T.promise(() => resource.close()).pipe(T.tap(() => T.logInfo('Nats Resource released')))

export const NatsStream = Stream.acquireRelease(acquire, release).pipe(
  Stream.flatMap((client) => {
    return client.subscription('Users_findAll')
  })
)
Screenshot_2024-06-06_at_8.46.32_p.m..png
Was this page helpful?