export async function* createRedisSubscription({
redis,
signal,
channel,
}: {
redis: Redis;
signal?: AbortSignal;
channel: string;
}) {
const subscriber = redis.duplicate();
await subscriber.subscribe(channel);
console.log(`Subscribed to channel: ${channel}`);
try {
const handleMessage = (channelId: string, message: string) => {
if (channelId === channel) {
console.log(`Received message on channel ${channel}:`, message);
return JSON.parse(message);
}
console.log(`Ignored message from channel ${channelId}`);
return null;
};
while (!signal?.aborted) {
const message = await new Promise((resolve) => {
const messageHandler = (channelId: string, message: string) => {
const parsedMessage = handleMessage(channelId, message);
if (parsedMessage) {
resolve(parsedMessage);
subscriber.off("message", messageHandler);
}
};
subscriber.on("message", messageHandler);
});
if (message) {
yield message;
}
await new Promise((resolve) => setTimeout(resolve, 100));
}
} finally {
console.log(`Cleaning up subscription for channel: ${channel}`);
await subscriber.unsubscribe(channel);
await subscriber.quit();
}
}
export async function* createRedisSubscription({
redis,
signal,
channel,
}: {
redis: Redis;
signal?: AbortSignal;
channel: string;
}) {
const subscriber = redis.duplicate();
await subscriber.subscribe(channel);
console.log(`Subscribed to channel: ${channel}`);
try {
const handleMessage = (channelId: string, message: string) => {
if (channelId === channel) {
console.log(`Received message on channel ${channel}:`, message);
return JSON.parse(message);
}
console.log(`Ignored message from channel ${channelId}`);
return null;
};
while (!signal?.aborted) {
const message = await new Promise((resolve) => {
const messageHandler = (channelId: string, message: string) => {
const parsedMessage = handleMessage(channelId, message);
if (parsedMessage) {
resolve(parsedMessage);
subscriber.off("message", messageHandler);
}
};
subscriber.on("message", messageHandler);
});
if (message) {
yield message;
}
await new Promise((resolve) => setTimeout(resolve, 100));
}
} finally {
console.log(`Cleaning up subscription for channel: ${channel}`);
await subscriber.unsubscribe(channel);
await subscriber.quit();
}
}