import { Chunk, Duration, Effect, GroupBy, pipe, Stream } from "effect"
class Exam {
constructor(readonly person: string, readonly score: number) {}
}
// Define a list of exam results
const examResults = Stream.make(...[
new Exam("Alex", 64),
new Exam("Michael", 97),
new Exam("Bill", 77),
new Exam("John", 78),
new Exam("Bobby", 71)
])
const expensiveTask = (exam: Exam) =>
Effect.gen(function*() {
yield* Effect.log(`Processing ${exam.person}`)
yield* Effect.sleep(Duration.seconds(Math.random() * 5))
})
const streamEffect = pipe(
examResults,
Stream.mapEffect((exam) =>
Effect.zipRight(
expensiveTask(exam),
Effect.succeed(exam)
), { concurrency: 2 }),
Stream.tap((exam) => Effect.log(`Finished ${exam.person}`))
)
const streamTap = pipe(
examResults,
Stream.tap((exam) => expensiveTask(exam)),
Stream.tap((exam) => Effect.log(`Finished ${exam.person}`))
)
// runs each side effect sequentially
Effect.runPromise(Stream.runCollect(streamTap)).then(console.log)
// runs side effects with concurrency
Effect.runPromise(Stream.runCollect(streamEffect)).then(console.log)
import { Chunk, Duration, Effect, GroupBy, pipe, Stream } from "effect"
class Exam {
constructor(readonly person: string, readonly score: number) {}
}
// Define a list of exam results
const examResults = Stream.make(...[
new Exam("Alex", 64),
new Exam("Michael", 97),
new Exam("Bill", 77),
new Exam("John", 78),
new Exam("Bobby", 71)
])
const expensiveTask = (exam: Exam) =>
Effect.gen(function*() {
yield* Effect.log(`Processing ${exam.person}`)
yield* Effect.sleep(Duration.seconds(Math.random() * 5))
})
const streamEffect = pipe(
examResults,
Stream.mapEffect((exam) =>
Effect.zipRight(
expensiveTask(exam),
Effect.succeed(exam)
), { concurrency: 2 }),
Stream.tap((exam) => Effect.log(`Finished ${exam.person}`))
)
const streamTap = pipe(
examResults,
Stream.tap((exam) => expensiveTask(exam)),
Stream.tap((exam) => Effect.log(`Finished ${exam.person}`))
)
// runs each side effect sequentially
Effect.runPromise(Stream.runCollect(streamTap)).then(console.log)
// runs side effects with concurrency
Effect.runPromise(Stream.runCollect(streamEffect)).then(console.log)