Handling Duplicate RSS Items in Effect: Stream vs. Manual Hash Set Approach

Has anyone built an RSS reader or thirdparty API poller in Effect that removes duplicates and only shows new items to subscribers?

I have working code that polls the RSS Feed and creates a Stream from it, but it seems Stream.changes doesn't filter out duplicates.

Maybe Stream also isn't the right thing to do and i should manually handle the RSS Feed Items with a Hash set and if a unseen hash comes in add it to a Queue?

Not sure how to handle this.


import Parser from "rss-parser";
import { RSSFeed, RSSItem } from "../rss-api-schema";
import { Effect, Stream } from "effect";

const parser = new Parser();

const fetchRSSFeed = (feed: RSSFeed) =>
  Effect.gen(function* (_) {
    const result = yield* _(Effect.tryPromise(() => parser.parseURL(feed.url)));
    return result.items.map(
      (item) =>
        new RSSItem({
          title: item.title ?? "",
          link: item.link ?? "",
          pubDate: new Date(item.pubDate ?? ""),
          content: item.content ?? "",
        })
    );
  });

const streamRSSFeed = (feed: RSSFeed) =>
  Stream.repeatEffect(fetchRSSFeed(feed)).pipe(
    Stream.mapError(
      (error) =>
        new Error(`Failed to fetch RSS feed ${feed.url}: ${error.message}`)
    ),
    Stream.flatMap(Stream.fromIterable),
    Stream.changes
  );

const subscribeToRSSFeed = (feed: RSSFeed) =>
  streamRSSFeed(feed).pipe(
    Stream.tap((item) => Effect.log(`New item in ${feed.url}: ${item.title}`)),
    Stream.runDrain,
  );

export const subscribeToAllFeeds = (feeds: RSSFeed[]) =>
  Effect.forEach(feeds, subscribeToRSSFeed, { concurrency: 'unbounded' }).pipe(
    Effect.catchAll((error) =>
      Effect.log(`Error in RSS subscription: ${error.message}`)
    )
  );
Was this page helpful?