Converting Kafka Wrapper into a Layer
I'm working on converting an existing wrapper we have around kafka into a Layer.
The library I'm using for kafka (kafkajs) takes
Does this seem like a sane/sensible way to go about this kind of callback/configuration? Nitpicks welcome!
The library I'm using for kafka (kafkajs) takes
logCreatorlogCreator a factory/callback that lets you customize how it emits its logs, and I'm trying to configure things so the logs go through effect.Does this seem like a sane/sensible way to go about this kind of callback/configuration? Nitpicks welcome!
import { Config, Console, Context, Effect, Runtime } from 'effect';
import kafkajs from 'kafkajs';
export interface KafkaAdapter {
readonly _: unique symbol;
}
export const KafkaAdapter = Context.GenericTag<
KafkaAdapter,
Effect.Effect.Success<typeof live>
>('@context/KafkaAdapter');
const KafkaAdapterConfig = Config.all({ /* ... */ });
const KAFKAJS_LOG_LEVEL_MAP: Record<
kafkajs.logLevel,
keyof Pick<Console, 'debug' | 'error' | 'info' | 'warn'>
> = {
[kafkajs.logLevel.DEBUG]: 'debug',
[kafkajs.logLevel.ERROR]: 'error',
[kafkajs.logLevel.INFO]: 'info',
[kafkajs.logLevel.NOTHING]: 'error',
[kafkajs.logLevel.WARN]: 'warn'
};
export const live = Effect.gen(function* ($) {
const config = yield* $(KafkaAdapterConfig);
const runtime = yield* $(Effect.runtime<never>());
const runSync = Runtime.runSync(runtime);
const logCreator: kafkajs.logCreator =
() =>
({ level, log, label, namespace }) => {
const { message, ...extra } = log;
const mappedLogLevel = KAFKAJS_LOG_LEVEL_MAP[level] || 'debug';
const method = Console[mappedLogLevel];
runSync(
method(message).pipe(
Effect.annotateLogs('extra', extra),
Effect.annotateLogs('label', label),
Effect.annotateLogs('namespace', namespace)
)
);
};
const kafka = new kafkajs.Kafka({
clientId: config.clientId,
brokers: config.brokers,
logCreator
// ...
});
// ...
});import { Config, Console, Context, Effect, Runtime } from 'effect';
import kafkajs from 'kafkajs';
export interface KafkaAdapter {
readonly _: unique symbol;
}
export const KafkaAdapter = Context.GenericTag<
KafkaAdapter,
Effect.Effect.Success<typeof live>
>('@context/KafkaAdapter');
const KafkaAdapterConfig = Config.all({ /* ... */ });
const KAFKAJS_LOG_LEVEL_MAP: Record<
kafkajs.logLevel,
keyof Pick<Console, 'debug' | 'error' | 'info' | 'warn'>
> = {
[kafkajs.logLevel.DEBUG]: 'debug',
[kafkajs.logLevel.ERROR]: 'error',
[kafkajs.logLevel.INFO]: 'info',
[kafkajs.logLevel.NOTHING]: 'error',
[kafkajs.logLevel.WARN]: 'warn'
};
export const live = Effect.gen(function* ($) {
const config = yield* $(KafkaAdapterConfig);
const runtime = yield* $(Effect.runtime<never>());
const runSync = Runtime.runSync(runtime);
const logCreator: kafkajs.logCreator =
() =>
({ level, log, label, namespace }) => {
const { message, ...extra } = log;
const mappedLogLevel = KAFKAJS_LOG_LEVEL_MAP[level] || 'debug';
const method = Console[mappedLogLevel];
runSync(
method(message).pipe(
Effect.annotateLogs('extra', extra),
Effect.annotateLogs('label', label),
Effect.annotateLogs('namespace', namespace)
)
);
};
const kafka = new kafkajs.Kafka({
clientId: config.clientId,
brokers: config.brokers,
logCreator
// ...
});
// ...
});