Streaming
Both Effect and ZIO provide streaming for processing large or infinite data sequences.
Stream Type Signature
ZIO (Scala)
// ZIO: ZStream[-R, +E, +O]
// R: Environment
// E: Error type
// O: Output element type
val stream: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3, 4, 5)ZStream[-R, +E, +O]
Effect (TypeScript)
// Effect: Stream<A, E, R>
// A: Output element type
// E: Error type
// R: Requirements
const stream: Stream<number, never, never> =
Stream.succeed(1, 2, 3, 4, 5)Stream<A, E, R>
Creating Streams
ZIO (Scala)
// ZIO: Creating streams
val fromValues: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3)
val fromRange: ZStream[Any, Nothing, Int] =
ZStream.range(1, 10)
val fromEffect: ZStream[Any, Nothing, String] =
ZStream.fromEffect(ZIO.succeed("Hello"))
val fromChunk: ZStream[Any, Nothing, Int] =
ZStream.fromChunk(Chunk(1, 2, 3))
val unfold: ZStream[Any, Nothing, Int] =
ZStream.unfold(0)(n =>
if (n < 10) Some((n, n + 1))
else None
)ZStream(...), ZStream.range, ZStream.fromEffect
Effect (TypeScript)
// Effect: Creating streams
const fromValues: Stream<number, never, never> =
Stream.succeed(1, 2, 3)
const fromRange: Stream<number, never, never> =
Stream.range(1, 10)
const fromEffect: Stream<string, never, never> =
Stream.fromEffect(Effect.succeed("Hello"))
const fromIterable: Stream<number, never, never> =
Stream.from([1, 2, 3])
const unfold: Stream<number, never, never> =
Stream.unfold(0, (n) =>
n < 10
? Option.some([n, n + 1])
: Option.none()
)Stream.succeed, Stream.range, Stream.fromEffect
Transformations
ZIO (Scala)
// ZIO: map, flatMap, filter
val transformed: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3, 4, 5)
.map(_ * 2)
.filter(_ % 4 === 0)
.flatMap(n => ZStream(n, n + 1))
// ZStream also has:
// .take(n), .drop(n), .takeWhile(_ < 5)
// .chunk(limit), .sliding(size)Stream methods on ZStream
Effect (TypeScript)
// Effect: Stream.map, flatMap, filter
const transformed: Stream<number, never, never> =
Stream.succeed(1, 2, 3, 4, 5).pipe(
Stream.map((n) => n * 2),
Stream.filter((n) => n % 4 === 0),
Stream.flatMap((n) =>
Stream.succeed(n, n + 1)
)
)
// Stream also has:
// Stream.take(stream, n), Stream.drop(stream, n)
// Stream.takeWhile(stream, p)
// Stream.chunk(stream, n), Stream.sliding(stream, size)Stream functions using pipe syntax
Running Streams
ZIO (Scala)
// ZIO: Running streams
val toList: ZIO[Any, Nothing, List[Int]] =
ZStream(1, 2, 3).runCollect
// Result: List(1, 2, 3)
val sum: ZIO[Any, Nothing, Int] =
ZStream(1, 2, 3).runSum
// Result: 6
val foreach: ZIO[Any, Nothing, Unit] =
ZStream(1, 2, 3).runForeach(n =>
ZIO.succeed(println(s"Item: $n"))
)
val drain: ZIO[Any, Nothing, Unit] =
ZStream(1, 2, 3).runDrainrunCollect, runSum, runForeach, runDrain
Effect (TypeScript)
// Effect: Running streams
const toArray: Effect<Array<number>, never, never> =
Stream.runCollect(
Stream.succeed(1, 2, 3)
)
// Result: [1, 2, 3]
const sum: Effect<number, never, never> =
Stream.sum(
Stream.succeed(1, 2, 3)
)
// Result: 6
const forEach: Effect<void, never, never> =
Stream.forEach(
Stream.succeed(1, 2, 3),
(n) =>
Effect.sync(() =>
console.log(`Item: ${n}`)
)
)
const drain: Effect<void, never, never> =
Stream.runDrain(
Stream.succeed(1, 2, 3)
)Stream.runCollect, Stream.sum, Stream.forEach, Stream.runDrain
Sinks
ZIO (Scala)
// ZIO: ZSink for aggregating streams
val count: ZIO[Any, Nothing, Long] =
ZStream(1, 2, 3).run(ZSink.count)
// Result: 3
val last: ZIO[Any, Option[Nothing], Int] =
ZStream(1, 2, 3).run(ZSink.last)
// Result: Some(3)
val fold: ZIO[Any, Nothing, Int] =
ZStream(1, 2, 3).run(
ZSink.fold(0)(_ + _)
)
// Result: 6ZSink.count, ZSink.last, ZSink.fold
Effect (TypeScript)
// Effect: Sinks for aggregating streams
const count: Effect<number, never, never> =
Stream.run(
Stream.succeed(1, 2, 3),
Sink.count()
)
// Result: 3
const last: Effect<
Option<number>,
never,
never
> = Stream.run(
Stream.succeed(1, 2, 3),
Sink.last()
)
// Result: Option.some(3)
const fold: Effect<number, never, never> =
Stream.run(
Stream.succeed(1, 2, 3),
Sink.fold(
0,
(acc, n) => acc + n
)
)
// Result: 6Sink.count(), Sink.last(), Sink.fold
Concurrent Streams
ZIO (Scala)
// ZIO: ZStream.merge, ZStream.interleave
val merged: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3).merge(
ZStream(4, 5, 6)
)
val interleaved: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3).interleave(
ZStream(4, 5, 6)
)
// Result: 1, 4, 2, 5, 3, 6
// ZStream.zipPar, ZStream.parJoin
val zipped: ZStream[Any, Nothing, (Int, String)] =
ZStream(1, 2, 3).zipPar(
ZStream("a", "b", "c")
)ZStream.merge, ZStream.interleave, ZStream.zipPar
Effect (TypeScript)
// Effect: Stream.merge, Stream.interleave
const merged: Stream<number, never, never> =
Stream.merge(
Stream.succeed(1, 2, 3),
Stream.succeed(4, 5, 6)
)
const interleaved: Stream<number, never, never> =
Stream.interleave(
Stream.succeed(1, 2, 3),
Stream.succeed(4, 5, 6)
)
// Result: 1, 4, 2, 5, 3, 6
// Stream.zip for parallel
const zipped: Stream<
readonly [number, string],
never,
never
> = Stream.zip(
Stream.succeed(1, 2, 3),
Stream.succeed("a", "b", "c")
)Stream.merge, Stream.interleave, Stream.zip
Stream Quick Reference
| ZIO | Effect | Purpose |
|---|---|---|
ZStream(1, 2, 3) | Stream.succeed(1, 2, 3) | Create from values |
ZStream.range(1, 10) | Stream.range(1, 10) | Range of values |
stream.map(f) | Stream.map(stream, f) | Transform elements |
stream.filter(p) | Stream.filter(stream, p) | Filter elements |
stream.runCollect | Stream.runCollect(stream) | Collect to array |
stream.runSum | Stream.sum(stream) | Sum all values |
stream.runForeach(f) | Stream.forEach(stream, f) | Side effect per element |
stream.run(ZSink.count) | Stream.run(stream, Sink.count()) | Aggregate with sink |
TIP:
Streams are lazy and can represent infinite sequences. Use Stream.take to limit elements and Stream.runDrain to execute effects for side effects without collecting results.