Effect CommunityEC
Effect Community•3y ago•
22 replies
sampotter

Achieving Concurrency on a Per Second Basis

Is there a way to achieve concurrency on a per second basis?

I'm new to effect and am trying to migrate some code so that it's functionally equivalent. The following code is rate limiting requests to an external service so that it will never exceed 30 requests per second.

import { RateLimit } from 'async-sema';
import { viem } from 'utils';
import { toHex } from 'viem';

const limit = RateLimit(30);

export async function getBlock(blockNumber: number) {
  const block = await limit().then(() =>
    viem.request({
      method: 'eth_getBlockByNumber',
      params: [toHex(blockNumber), true],
    }),
  );

  if (!block) throw new Error(`Block ${blockNumber} not found`);

  const transactions = await Promise.all(
    block.transactions.map(async tx => {
      if (typeof tx === 'string') throw new Error(`Transaction not found in block ${blockNumber}`);

      const receipt = await limit().then(() =>
        viem.request({
          params: [tx.hash],
          method: 'eth_getTransactionReceipt',
        }),
      );

      return { ...tx, ...receipt };
    }),
  );

  return { ...block, transactions };
}


And so far this is my implementation with effect:

Effect.gen(function* (_) {
  const block = yield* _(
    Effect.retry(
      Effect.tryPromise(() => {
        return viem.request({
          method: 'eth_getBlockByNumber',
          params: [toHex(blockNumber), true],
        });
      }),
      Schedule.intersect(Schedule.jittered(Schedule.exponential(1000)), Schedule.recurs(5)),
    ),
  );

  if (!block) return yield* _(Effect.fail(new Error(`Block ${blockNumber} not found`)));

  const transactions = yield* _(
    Effect.forEach(
      block.transactions as RpcTransaction[],
      tx => {
        return Effect.gen(function* (_) {
          const receipt = yield* _(
            Effect.retry(
              Effect.tryPromise(() => {
                return viem.request({
                  params: [tx.hash],
                  method: 'eth_getTransactionReceipt',
                });
              }),
              Schedule.intersect(Schedule.jittered(Schedule.exponential(1000)), Schedule.recurs(5)),
            ),
          );

          if (!receipt) return yield* _(Effect.fail(new Error(`No transaction`)));

          return yield* _(Effect.succeed({ ...tx, ...receipt }));
        });
      },
      { concurrency: 30 },
    ),
  );

  return yield* _(Effect.succeed({ ...block, transactions }));
})


How would I achieve time based concurrency? This might be a dumb question but the documentation doesn't have much on concurrency at the moment, so any help would be appreciated 🙂
Was this page helpful?