Creating a NATS Stream Wrapper: Seeking Guidance on Subscription Implementation
Hello! looking some help on creating an Stream, im trying to do a wrapper for Nats, the API of nat i can subscribe to events from a Subject what i am doing wrong here ?
thank you in advance!! keep learning this amazing library
thank you in advance!! keep learning this amazing library
// Define the interface for the resource
export interface NatsStream {
readonly natsClient: NatsConnection
readonly subscription: (subject: string) => Subscription
readonly close: () => Promise<void>
}
// Simulate getting the resource
const getNatsStream = async ({ servers = ['0.0.0.0:4222'] }: { servers?: string[] } = {}): Promise<NatsStream> => {
const options: ConnectionOptions = {
servers: servers.map((host) => `nats://${host}`),
timeout: 10 * 1000, // 10s
maxReconnectAttempts: 10,
}
const natsClient = await connect(options)
return {
natsClient,
subscription: (subject: string) => {
return natsClient.subscribe(subject, {
callback: (err, msg) => {
if (err) {
return T.fail(err)
}
return T.succeed(msg)
}
})
},
close: () => natsClient.close(),
}
}
// Define the acquisition of the resource with error handling
const acquire = T.gen(function* (_) {
const config = yield* _(EnvConfig)
return T.tryPromise({
try: () => getNatsStream({ servers: config.natsServers.split(',') }),
catch: (error) => new NatsError({ cause: error }),
})
}).pipe(
T.flatMap((e) => e),
T.tap(() => T.logInfo('Nats acquired.....')),
)
// Define the release of the resource
const release = (resource: NatsStream) =>
T.promise(() => resource.close()).pipe(T.tap(() => T.logInfo('Nats Resource released')))
export const NatsStream = Stream.acquireRelease(acquire, release).pipe(
Stream.flatMap((client) => {
return client.subscription('Users_findAll')
})
)// Define the interface for the resource
export interface NatsStream {
readonly natsClient: NatsConnection
readonly subscription: (subject: string) => Subscription
readonly close: () => Promise<void>
}
// Simulate getting the resource
const getNatsStream = async ({ servers = ['0.0.0.0:4222'] }: { servers?: string[] } = {}): Promise<NatsStream> => {
const options: ConnectionOptions = {
servers: servers.map((host) => `nats://${host}`),
timeout: 10 * 1000, // 10s
maxReconnectAttempts: 10,
}
const natsClient = await connect(options)
return {
natsClient,
subscription: (subject: string) => {
return natsClient.subscribe(subject, {
callback: (err, msg) => {
if (err) {
return T.fail(err)
}
return T.succeed(msg)
}
})
},
close: () => natsClient.close(),
}
}
// Define the acquisition of the resource with error handling
const acquire = T.gen(function* (_) {
const config = yield* _(EnvConfig)
return T.tryPromise({
try: () => getNatsStream({ servers: config.natsServers.split(',') }),
catch: (error) => new NatsError({ cause: error }),
})
}).pipe(
T.flatMap((e) => e),
T.tap(() => T.logInfo('Nats acquired.....')),
)
// Define the release of the resource
const release = (resource: NatsStream) =>
T.promise(() => resource.close()).pipe(T.tap(() => T.logInfo('Nats Resource released')))
export const NatsStream = Stream.acquireRelease(acquire, release).pipe(
Stream.flatMap((client) => {
return client.subscription('Users_findAll')
})
)
