Streaming
ZIO provides ZStream built-in. Cats Effect uses fs2 as the streaming library.
Creating Streams
import cats.effect.IO
import fs2.Stream
// Stream from values
val stream: Stream[IO, Int] =
Stream(1, 2, 3, 4, 5)
// Stream from range
val range: Stream[IO, Int] =
Stream.range(1, 10)
// Stream from effect
val fromEffect: Stream[IO, Int] =
Stream.eval(IO.pure(42))fs2.Stream - separate library
import zio._
import zio.stream._
// Stream from values
val stream: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3, 4, 5)
// Stream from range
val range: ZStream[Any, Nothing, Int] =
ZStream.range(1, 10)
// Stream from effect
val fromEffect: ZStream[Any, Nothing, Int] =
ZStream.fromZIO(ZIO.succeed(42))ZStream - built into ZIO
Transforming Streams
import cats.effect.IO
import fs2.Stream
// Map elements
val mapped: Stream[IO, Int] =
Stream(1, 2, 3).map(_ * 2)
// Filter elements
val filtered: Stream[IO, Int] =
Stream(1, 2, 3, 4, 5).filter(_ % 2 == 0)
// Take first N
val taken: Stream[IO, Int] =
Stream.range(1, 100).take(5)map / filter / take
import zio._
import zio.stream._
// Map elements
val mapped: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3).map(_ * 2)
// Filter elements
val filtered: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3, 4, 5).filter(_ % 2 == 0)
// Take first N
val taken: ZStream[Any, Nothing, Int] =
ZStream.range(1, 100).take(5)map / filter / take
Running Streams
import cats.effect.IO
import fs2.Stream
// Collect to List
val list: IO[List[Int]] =
Stream(1, 2, 3).compile.toList
// Fold/reduce
val sum: IO[Int] =
Stream(1, 2, 3, 4, 5).compile.fold(0)(_ + _)
// Run for side effects
val printed: IO[Unit] =
Stream(1, 2, 3)
.evalMap(i => IO.println(i))
.compile.draincompile.toList / fold / drain
import zio._
import zio.stream._
// Collect to Chunk (then List)
val list: UIO[List[Int]] =
ZStream(1, 2, 3).runCollect.map(_.toList)
// Fold/reduce
val sum: UIO[Int] =
ZStream(1, 2, 3, 4, 5).runFold(0)(_ + _)
// Run for side effects
val printed: UIO[Unit] =
ZStream(1, 2, 3)
.tap(i => ZIO.succeed(println(i)))
.runDrainrunCollect / runFold / runDrain
Effectful Operations
import cats.effect.IO
import fs2.Stream
// Map with effect
val evalMapped: Stream[IO, Int] =
Stream(1, 2, 3).evalMap { i =>
IO.println(s"Processing $$i").as(i * 2)
}
// Filter with effect
val evalFiltered: Stream[IO, Int] =
Stream(1, 2, 3, 4, 5).evalFilter { i =>
IO.pure(i % 2 == 0)
}evalMap / evalFilter
import zio._
import zio.stream._
// Map with effect
val mapZIO: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3).mapZIO { i =>
ZIO.succeed(println(s"Processing $$i")).as(i * 2)
}
// Filter with effect
val filterZIO: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3, 4, 5).filterZIO { i =>
ZIO.succeed(i % 2 == 0)
}mapZIO / filterZIO
Chunked Processing
import cats.effect.IO
import fs2.{Stream, Chunk}
import scala.concurrent.duration._
// Create from chunk
val chunked: Stream[IO, Int] =
Stream.chunk(Chunk(1, 2, 3, 4, 5))
// Group into chunks
val grouped: Stream[IO, Chunk[Int]] =
Stream.range(1, 100).chunkN(10)
// Group by time or size
val groupWithin: Stream[IO, Chunk[Int]] =
Stream.range(1, 100).groupWithin(10, 5.seconds)Chunk operations
import zio._
import zio.stream._
import zio.Chunk
// Create from chunk
val chunked: ZStream[Any, Nothing, Int] =
ZStream.fromChunk(Chunk(1, 2, 3, 4, 5))
// Group into chunks
val grouped: ZStream[Any, Nothing, Chunk[Int]] =
ZStream.range(1, 100).grouped(10)
// Group by time or size
val groupWithin: ZStream[Any, Nothing, Chunk[Int]] =
ZStream.range(1, 100).groupedWithin(10, 5.seconds)Chunk operations
groupedWithin is useful for batching operations: emit chunks when they reach size N or after time T, whichever comes first. This prevents delays when processing slow streams.
Error Recovery
Streams can fail during processing. Both libraries provide operators to handle errors.
import cats.effect.IO
import fs2.Stream
import scala.concurrent.duration._
// Catch specific errors
val caught: Stream[IO, Int] =
Stream.eval(IO.raiseError(new Exception("Failed")))
.handleErrorWith {
case _: Exception => Stream(0)
}
// Retry on failure
val retried: Stream[IO, Int] =
Stream.eval(IO(1))
.retry(
delay = 100.millis,
maxAttempts = 3
)
// Catch all errors
val safe: Stream[IO, Either[Throwable, Int]] =
Stream.eval(IO(1)).attempthandleErrorWith / retry / attempt
import zio._
import zio.stream._
// Catch specific errors
val caught: ZStream[Any, Nothing, Int] =
ZStream.fail(new Exception("Failed"))
.catchSome {
case _: Exception => ZStream.succeed(0)
}
// Retry on failure
val retried: ZStream[Any, Nothing, Int] =
ZStream.fromZIO(ZIO.succeed(1))
.retry(Schedule.recurs(3))
// Catch all errors
val safe: ZStream[Any, Nothing, Either[Throwable, Int]] =
ZStream.fromZIO(ZIO.succeed(1))
.eithercatchSome / retry / either
Error recovery in streams applies per-element or per-segment. A failing element doesn't terminate the entire stream if caught.
Backpressure Handling
Both ZStream and fs2 handle backpressure automatically.
import cats.effect.IO
import fs2.Stream
import scala.concurrent.duration._
// Fast producer, slow consumer
val producer: Stream[IO, Int] =
Stream.range(1, 1000)
val consumer: Stream[IO, Unit] =
producer
.evalMap { i =>
// Simulate slow processing
IO.sleep(100.millis) *> IO.println(i)
}fs2 handles backpressure automatically
import zio._
import zio.stream._
// Fast producer, slow consumer
val producer: ZStream[Any, Nothing, Int] =
ZStream.range(1, 1000)
val consumer: ZStream[Any, Nothing, Unit] =
producer
.mapZIO { i =>
// Simulate slow processing
ZIO.sleep(100.millis) *> ZIO.succeed(println(i))
}ZStream handles backpressure automatically
Pull-based streaming means the consumer controls the flow. Fast producers don't overwhelm slow consumers—elements are only pulled when the consumer is ready.
Concurrent Streams
import cats.effect.IO
import fs2.Stream
// Merge (interleave)
val merged: Stream[IO, Int] =
Stream(1, 2, 3).merge(Stream(4, 5, 6))
// Zip (pair elements)
val zipped: Stream[IO, (Int, String)] =
Stream(1, 2, 3).zip(Stream("a", "b", "c"))
// Parallel map
val parMapped: Stream[IO, Int] =
Stream(1, 2, 3).parEvalMap(4) { i =>
IO.pure(i * 2)
}merge / zip / parEvalMap
import zio._
import zio.stream._
// Merge (interleave)
val merged: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3).merge(ZStream(4, 5, 6))
// Zip (pair elements)
val zipped: ZStream[Any, Nothing, (Int, String)] =
ZStream(1, 2, 3).zip(ZStream("a", "b", "c"))
// Parallel map
val parMapped: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3).mapZIOPar(4) { i =>
ZIO.succeed(i * 2)
}merge / zip / mapZIOPar
ZStream is built into ZIO with no additional dependencies. fs2 is a separate library but integrates seamlessly with Cats Effect.
Resource-Safe Streams
import cats.effect._
import fs2.Stream
import java.io.BufferedReader
// Stream with resource
val lines: Stream[IO, String] =
Stream.resource(
Resource.fromAutoCloseable(
IO.blocking(new BufferedReader(???))
)
).flatMap { reader =>
Stream.repeatEval(IO.blocking(reader.readLine()))
.takeWhile(_ != null)
}Stream.resource for safe acquisition
import zio._
import zio.stream._
import java.io.BufferedReader
// Stream with resource
val lines: ZStream[Any, Throwable, String] =
ZStream.fromZIO(
ZIO.fromAutoCloseable(
ZIO.attemptBlocking(new BufferedReader(???))
)
).flatMap { reader =>
ZStream.repeatZIO(ZIO.attemptBlocking(reader.readLine()))
.takeWhile(_ != null)
}ZStream with scoped resources
Next Steps
Streaming is powerful in both ecosystems. Let's look at application structure.