Effect CommunityEC
Effect Community•3y ago•
3 replies
Nomadin

Handling Cleanup in Stream.flatMap with `{ switch: true }`

How to do "cleanup" in Stream.flatMap when using it with { switch: true }?



Usecase:

Say there is an input field, which accepts user input and turns it into a Stream<never, never, input>. And whenever the input changes (debounced), we run multiple asynchronous validations on the input value in parallel. In order to show the validation result as quickly as possible, we emit new value on the resulting stream whenever any validation process resolves a result.
However, if a new input arrives when some of the validations are still running, we should cancel them and start over with the new input.

type Validator = 'a' | 'b' | 'c';
type Result = 'loading' | 'pass' | 'fail'
// Stream<never, never, T> is abbrevated to Stream<T> for simplicity.
declare const validationProgram: (input: Stream<string>) => Stream<Record<Validator, Result>>
// Example
// input: x ------ start validate('a', x') validate('b', x) validate('c', x) in parallel
// output: { a: 'loading', b: 'loading', c: 'loading' }
// ... 
// output: { a: 'loading', b: 'pass',    c: 'loading' }
// input: y ----- cancel validate('a', x) and validate('c', x) and start over with y
// output: { a: 'loading', b: 'loading', c: 'loading' }
// ...
// output: { a: 'loading', b: 'pass',    c: 'loading' }
// ...
// output: { a: 'fail'   , b: 'pass',    c: 'loading' }
// ...
// output: { a: 'fail'   , b: 'pass',    c: 'pass'    }


function validationProgram(
  input: Stream<string>
) : Stream<Record<Validator, Result>> {
  return pipe(
    inputStream,
    Stream.debounce(300),
    Stream.flatMap((input) => {
      const resultStream = /* how to do this with proper cleanup */
      return pipe(
        Stream.sync(() => ({
          a: 'loading', b: 'loading', c: 'loading'
        })),
        Stream.concat(resultStream)
      )
    }, { switch: true })
  )
}
Was this page helpful?