Skip to main content
effect-zio

Streaming

Step 12 of 15

Streaming

Both Effect and ZIO provide streaming for processing large or infinite data sequences.

Stream Type Signature

ZIO (Scala)
// ZIO: ZStream[-R, +E, +O]
// R: Environment
// E: Error type
// O: Output element type

val stream: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3, 4, 5)

ZStream[-R, +E, +O]

Effect (TypeScript)
// Effect: Stream<A, E, R>
// A: Output element type
// E: Error type
// R: Requirements

const stream: Stream<number, never, never> =
  Stream.succeed(1, 2, 3, 4, 5)

Stream<A, E, R>

Creating Streams

ZIO (Scala)
// ZIO: Creating streams
val fromValues: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3)

val fromRange: ZStream[Any, Nothing, Int] =
  ZStream.range(1, 10)

val fromEffect: ZStream[Any, Nothing, String] =
  ZStream.fromEffect(ZIO.succeed("Hello"))

val fromChunk: ZStream[Any, Nothing, Int] =
  ZStream.fromChunk(Chunk(1, 2, 3))

val unfold: ZStream[Any, Nothing, Int] =
  ZStream.unfold(0)(n =>
      if (n < 10) Some((n, n + 1))
      else None
    )

ZStream(...), ZStream.range, ZStream.fromEffect

Effect (TypeScript)
// Effect: Creating streams
const fromValues: Stream<number, never, never> =
  Stream.succeed(1, 2, 3)

const fromRange: Stream<number, never, never> =
  Stream.range(1, 10)

const fromEffect: Stream<string, never, never> =
  Stream.fromEffect(Effect.succeed("Hello"))

const fromIterable: Stream<number, never, never> =
  Stream.from([1, 2, 3])

const unfold: Stream<number, never, never> =
  Stream.unfold(0, (n) =>
    n < 10
      ? Option.some([n, n + 1])
      : Option.none()
  )

Stream.succeed, Stream.range, Stream.fromEffect

Transformations

ZIO (Scala)
// ZIO: map, flatMap, filter
val transformed: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3, 4, 5)
    .map(_ * 2)
    .filter(_ % 4 === 0)
    .flatMap(n => ZStream(n, n + 1))

// ZStream also has:
// .take(n), .drop(n), .takeWhile(_ < 5)
// .chunk(limit), .sliding(size)

Stream methods on ZStream

Effect (TypeScript)
// Effect: Stream.map, flatMap, filter
const transformed: Stream<number, never, never> =
  Stream.succeed(1, 2, 3, 4, 5).pipe(
    Stream.map((n) => n * 2),
    Stream.filter((n) => n % 4 === 0),
    Stream.flatMap((n) =>
      Stream.succeed(n, n + 1)
    )
  )

// Stream also has:
// Stream.take(stream, n), Stream.drop(stream, n)
// Stream.takeWhile(stream, p)
// Stream.chunk(stream, n), Stream.sliding(stream, size)

Stream functions using pipe syntax

Running Streams

ZIO (Scala)
// ZIO: Running streams
val toList: ZIO[Any, Nothing, List[Int]] =
  ZStream(1, 2, 3).runCollect
  // Result: List(1, 2, 3)

val sum: ZIO[Any, Nothing, Int] =
  ZStream(1, 2, 3).runSum
  // Result: 6

val foreach: ZIO[Any, Nothing, Unit] =
  ZStream(1, 2, 3).runForeach(n =>
    ZIO.succeed(println(s"Item: $n"))
  )

val drain: ZIO[Any, Nothing, Unit] =
  ZStream(1, 2, 3).runDrain

runCollect, runSum, runForeach, runDrain

Effect (TypeScript)
// Effect: Running streams
const toArray: Effect<Array<number>, never, never> =
  Stream.runCollect(
    Stream.succeed(1, 2, 3)
  )
  // Result: [1, 2, 3]

const sum: Effect<number, never, never> =
  Stream.sum(
    Stream.succeed(1, 2, 3)
  )
  // Result: 6

const forEach: Effect<void, never, never> =
  Stream.forEach(
    Stream.succeed(1, 2, 3),
    (n) =>
      Effect.sync(() =>
        console.log(`Item: ${n}`)
      )
  )

const drain: Effect<void, never, never> =
  Stream.runDrain(
    Stream.succeed(1, 2, 3)
  )

Stream.runCollect, Stream.sum, Stream.forEach, Stream.runDrain

Sinks

ZIO (Scala)
// ZIO: ZSink for aggregating streams
val count: ZIO[Any, Nothing, Long] =
  ZStream(1, 2, 3).run(ZSink.count)
  // Result: 3

val last: ZIO[Any, Option[Nothing], Int] =
  ZStream(1, 2, 3).run(ZSink.last)
  // Result: Some(3)

val fold: ZIO[Any, Nothing, Int] =
  ZStream(1, 2, 3).run(
    ZSink.fold(0)(_ + _)
  )
  // Result: 6

ZSink.count, ZSink.last, ZSink.fold

Effect (TypeScript)
// Effect: Sinks for aggregating streams
const count: Effect<number, never, never> =
  Stream.run(
    Stream.succeed(1, 2, 3),
    Sink.count()
  )
  // Result: 3

const last: Effect<
  Option<number>,
  never,
  never
> = Stream.run(
    Stream.succeed(1, 2, 3),
    Sink.last()
  )
  // Result: Option.some(3)

const fold: Effect<number, never, never> =
  Stream.run(
    Stream.succeed(1, 2, 3),
    Sink.fold(
      0,
      (acc, n) => acc + n
    )
  )
  // Result: 6

Sink.count(), Sink.last(), Sink.fold

Concurrent Streams

ZIO (Scala)
// ZIO: ZStream.merge, ZStream.interleave
val merged: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3).merge(
    ZStream(4, 5, 6)
  )

val interleaved: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3).interleave(
    ZStream(4, 5, 6)
  )
  // Result: 1, 4, 2, 5, 3, 6

// ZStream.zipPar, ZStream.parJoin
val zipped: ZStream[Any, Nothing, (Int, String)] =
  ZStream(1, 2, 3).zipPar(
    ZStream("a", "b", "c")
  )

ZStream.merge, ZStream.interleave, ZStream.zipPar

Effect (TypeScript)
// Effect: Stream.merge, Stream.interleave
const merged: Stream<number, never, never> =
  Stream.merge(
    Stream.succeed(1, 2, 3),
    Stream.succeed(4, 5, 6)
  )

const interleaved: Stream<number, never, never> =
  Stream.interleave(
    Stream.succeed(1, 2, 3),
    Stream.succeed(4, 5, 6)
  )
  // Result: 1, 4, 2, 5, 3, 6

// Stream.zip for parallel
const zipped: Stream<
  readonly [number, string],
  never,
  never
> = Stream.zip(
    Stream.succeed(1, 2, 3),
    Stream.succeed("a", "b", "c")
  )

Stream.merge, Stream.interleave, Stream.zip

Stream Quick Reference

ZIOEffectPurpose
ZStream(1, 2, 3)Stream.succeed(1, 2, 3)Create from values
ZStream.range(1, 10)Stream.range(1, 10)Range of values
stream.map(f)Stream.map(stream, f)Transform elements
stream.filter(p)Stream.filter(stream, p)Filter elements
stream.runCollectStream.runCollect(stream)Collect to array
stream.runSumStream.sum(stream)Sum all values
stream.runForeach(f)Stream.forEach(stream, f)Side effect per element
stream.run(ZSink.count)Stream.run(stream, Sink.count())Aggregate with sink
TIP:

Streams are lazy and can represent infinite sequences. Use Stream.take to limit elements and Stream.runDrain to execute effects for side effects without collecting results.

Next: Schema (Validation) →