Effect CommunityEC
Effect Community2mo ago
2 replies
Lloyd

Creating an Atom with `AtomRpc` for Push-Based Subscription in Effect Typescript

question about making an atom using AtomRpc with push-based subscription. I have an RPC that is streaming event data from the server to the client and the first time I set this up was needing to poll the atom in order to return the pullNext():
export class WebSocketClient extends AtomRpc.Tag<WebSocketClient>()(
  "WebSocketClient",
  {
    group: WebSocketRpc,
    protocol: RpcClient.layerProtocolSocket({
      retryTransientErrors: true,
    }).pipe(
      Layer.provide(BrowserSocket.layerWebSocket(WS_URL)),
      Layer.provide(RpcSerialization.layerNdjson),
    ),
  },
) {}

export function PresencePanel() {
  const [eventsResult, pullNext] = useAtom(WebSocketClient.query("subscribe", {}));

  useEffect(() => {
    if (Result.isSuccess(eventsResult) && !streamData.done) {
      const timeoutId = setTimeout(() => {
        pullNext();
      }, 100);
      return () => clearTimeout(timeoutId);
    }
    return;
  }, [eventsResult, pullNext]);
  // ...
});

But was able to use the WebSocketClient.runtime.fn to unwrap the stream:
export const presenceSubscriptionAtom = WebSocketClient.runtime.fn(() =>
  Effect.gen(function* () {
    yield* Effect.log("Starting presence subscription stream");
    const client = yield* WebSocketClient;
    return client("subscribe", {});
  }).pipe(
    Effect.map((stream) =>
      stream.pipe(
        Stream.scan<WebSocketEvent[], WebSocketEvent>([], (acc, event) => [
          ...acc,
          event,
        ]),
      ),
    ),
    Stream.unwrap,
  ),
);

export function PresencePanel() {
  const [eventsResult, startSubscription] = useAtom(presenceSubscriptionAtom);

  useEffect(() => {
    startSubscription();
  }, [startSubscription]);
  //...
})

There is complete example here. Maybe i'm missing somehting obvious or there is some config I can change to clean this up?
Was this page helpful?