import Parser from "rss-parser";
import { RSSFeed, RSSItem } from "../rss-api-schema";
import { Effect, Stream } from "effect";
const parser = new Parser();
const fetchRSSFeed = (feed: RSSFeed) =>
Effect.gen(function* (_) {
const result = yield* _(Effect.tryPromise(() => parser.parseURL(feed.url)));
return result.items.map(
(item) =>
new RSSItem({
title: item.title ?? "",
link: item.link ?? "",
pubDate: new Date(item.pubDate ?? ""),
content: item.content ?? "",
})
);
});
const streamRSSFeed = (feed: RSSFeed) =>
Stream.repeatEffect(fetchRSSFeed(feed)).pipe(
Stream.mapError(
(error) =>
new Error(`Failed to fetch RSS feed ${feed.url}: ${error.message}`)
),
Stream.flatMap(Stream.fromIterable),
Stream.changes
);
const subscribeToRSSFeed = (feed: RSSFeed) =>
streamRSSFeed(feed).pipe(
Stream.tap((item) => Effect.log(`New item in ${feed.url}: ${item.title}`)),
Stream.runDrain,
);
export const subscribeToAllFeeds = (feeds: RSSFeed[]) =>
Effect.forEach(feeds, subscribeToRSSFeed, { concurrency: 'unbounded' }).pipe(
Effect.catchAll((error) =>
Effect.log(`Error in RSS subscription: ${error.message}`)
)
);
import Parser from "rss-parser";
import { RSSFeed, RSSItem } from "../rss-api-schema";
import { Effect, Stream } from "effect";
const parser = new Parser();
const fetchRSSFeed = (feed: RSSFeed) =>
Effect.gen(function* (_) {
const result = yield* _(Effect.tryPromise(() => parser.parseURL(feed.url)));
return result.items.map(
(item) =>
new RSSItem({
title: item.title ?? "",
link: item.link ?? "",
pubDate: new Date(item.pubDate ?? ""),
content: item.content ?? "",
})
);
});
const streamRSSFeed = (feed: RSSFeed) =>
Stream.repeatEffect(fetchRSSFeed(feed)).pipe(
Stream.mapError(
(error) =>
new Error(`Failed to fetch RSS feed ${feed.url}: ${error.message}`)
),
Stream.flatMap(Stream.fromIterable),
Stream.changes
);
const subscribeToRSSFeed = (feed: RSSFeed) =>
streamRSSFeed(feed).pipe(
Stream.tap((item) => Effect.log(`New item in ${feed.url}: ${item.title}`)),
Stream.runDrain,
);
export const subscribeToAllFeeds = (feeds: RSSFeed[]) =>
Effect.forEach(feeds, subscribeToRSSFeed, { concurrency: 'unbounded' }).pipe(
Effect.catchAll((error) =>
Effect.log(`Error in RSS subscription: ${error.message}`)
)
);