Skip to main content
zio-cats

Streaming

Step 8 of 15

Streaming

ZIO provides ZStream built-in. Cats Effect uses fs2 as the streaming library.

Creating Streams

Cats Effect
import cats.effect.IO
import fs2.Stream

// Stream from values
val stream: Stream[IO, Int] =
  Stream(1, 2, 3, 4, 5)

// Stream from range
val range: Stream[IO, Int] =
  Stream.range(1, 10)

// Stream from effect
val fromEffect: Stream[IO, Int] =
  Stream.eval(IO.pure(42))

fs2.Stream - separate library

ZIO
import zio._
import zio.stream._

// Stream from values
val stream: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3, 4, 5)

// Stream from range
val range: ZStream[Any, Nothing, Int] =
  ZStream.range(1, 10)

// Stream from effect
val fromEffect: ZStream[Any, Nothing, Int] =
  ZStream.fromZIO(ZIO.succeed(42))

ZStream - built into ZIO

Transforming Streams

Cats Effect
import cats.effect.IO
import fs2.Stream

// Map elements
val mapped: Stream[IO, Int] =
  Stream(1, 2, 3).map(_ * 2)

// Filter elements
val filtered: Stream[IO, Int] =
  Stream(1, 2, 3, 4, 5).filter(_ % 2 == 0)

// Take first N
val taken: Stream[IO, Int] =
  Stream.range(1, 100).take(5)

map / filter / take

ZIO
import zio._
import zio.stream._

// Map elements
val mapped: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3).map(_ * 2)

// Filter elements
val filtered: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3, 4, 5).filter(_ % 2 == 0)

// Take first N
val taken: ZStream[Any, Nothing, Int] =
  ZStream.range(1, 100).take(5)

map / filter / take

Running Streams

Cats Effect
import cats.effect.IO
import fs2.Stream

// Collect to List
val list: IO[List[Int]] =
  Stream(1, 2, 3).compile.toList

// Fold/reduce
val sum: IO[Int] =
  Stream(1, 2, 3, 4, 5).compile.fold(0)(_ + _)

// Run for side effects
val printed: IO[Unit] =
  Stream(1, 2, 3)
    .evalMap(i => IO.println(i))
    .compile.drain

compile.toList / fold / drain

ZIO
import zio._
import zio.stream._

// Collect to Chunk (then List)
val list: UIO[List[Int]] =
  ZStream(1, 2, 3).runCollect.map(_.toList)

// Fold/reduce
val sum: UIO[Int] =
  ZStream(1, 2, 3, 4, 5).runFold(0)(_ + _)

// Run for side effects
val printed: UIO[Unit] =
  ZStream(1, 2, 3)
    .tap(i => ZIO.succeed(println(i)))
    .runDrain

runCollect / runFold / runDrain

Effectful Operations

Cats Effect
import cats.effect.IO
import fs2.Stream

// Map with effect
val evalMapped: Stream[IO, Int] =
  Stream(1, 2, 3).evalMap { i =>
    IO.println(s"Processing $$i").as(i * 2)
  }

// Filter with effect
val evalFiltered: Stream[IO, Int] =
  Stream(1, 2, 3, 4, 5).evalFilter { i =>
    IO.pure(i % 2 == 0)
  }

evalMap / evalFilter

ZIO
import zio._
import zio.stream._

// Map with effect
val mapZIO: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3).mapZIO { i =>
    ZIO.succeed(println(s"Processing $$i")).as(i * 2)
  }

// Filter with effect
val filterZIO: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3, 4, 5).filterZIO { i =>
    ZIO.succeed(i % 2 == 0)
  }

mapZIO / filterZIO

Chunked Processing

Cats Effect
import cats.effect.IO
import fs2.{Stream, Chunk}
import scala.concurrent.duration._

// Create from chunk
val chunked: Stream[IO, Int] =
  Stream.chunk(Chunk(1, 2, 3, 4, 5))

// Group into chunks
val grouped: Stream[IO, Chunk[Int]] =
  Stream.range(1, 100).chunkN(10)

// Group by time or size
val groupWithin: Stream[IO, Chunk[Int]] =
  Stream.range(1, 100).groupWithin(10, 5.seconds)

Chunk operations

ZIO
import zio._
import zio.stream._
import zio.Chunk

// Create from chunk
val chunked: ZStream[Any, Nothing, Int] =
  ZStream.fromChunk(Chunk(1, 2, 3, 4, 5))

// Group into chunks
val grouped: ZStream[Any, Nothing, Chunk[Int]] =
  ZStream.range(1, 100).grouped(10)

// Group by time or size
val groupWithin: ZStream[Any, Nothing, Chunk[Int]] =
  ZStream.range(1, 100).groupedWithin(10, 5.seconds)

Chunk operations

TIP:

groupedWithin is useful for batching operations: emit chunks when they reach size N or after time T, whichever comes first. This prevents delays when processing slow streams.

Error Recovery

Streams can fail during processing. Both libraries provide operators to handle errors.

Cats Effect
import cats.effect.IO
import fs2.Stream
import scala.concurrent.duration._

// Catch specific errors
val caught: Stream[IO, Int] =
  Stream.eval(IO.raiseError(new Exception("Failed")))
    .handleErrorWith {
      case _: Exception => Stream(0)
    }

// Retry on failure
val retried: Stream[IO, Int] =
  Stream.eval(IO(1))
    .retry(
      delay = 100.millis,
      maxAttempts = 3
    )

// Catch all errors
val safe: Stream[IO, Either[Throwable, Int]] =
  Stream.eval(IO(1)).attempt

handleErrorWith / retry / attempt

ZIO
import zio._
import zio.stream._

// Catch specific errors
val caught: ZStream[Any, Nothing, Int] =
  ZStream.fail(new Exception("Failed"))
    .catchSome {
      case _: Exception => ZStream.succeed(0)
    }

// Retry on failure
val retried: ZStream[Any, Nothing, Int] =
  ZStream.fromZIO(ZIO.succeed(1))
    .retry(Schedule.recurs(3))

// Catch all errors
val safe: ZStream[Any, Nothing, Either[Throwable, Int]] =
  ZStream.fromZIO(ZIO.succeed(1))
    .either

catchSome / retry / either

WARNING:

Error recovery in streams applies per-element or per-segment. A failing element doesn't terminate the entire stream if caught.

Backpressure Handling

Both ZStream and fs2 handle backpressure automatically.

Cats Effect
import cats.effect.IO
import fs2.Stream
import scala.concurrent.duration._

// Fast producer, slow consumer
val producer: Stream[IO, Int] =
  Stream.range(1, 1000)

val consumer: Stream[IO, Unit] =
  producer
    .evalMap { i =>
      // Simulate slow processing
      IO.sleep(100.millis) *> IO.println(i)
    }

fs2 handles backpressure automatically

ZIO
import zio._
import zio.stream._

// Fast producer, slow consumer
val producer: ZStream[Any, Nothing, Int] =
  ZStream.range(1, 1000)

val consumer: ZStream[Any, Nothing, Unit] =
  producer
    .mapZIO { i =>
      // Simulate slow processing
      ZIO.sleep(100.millis) *> ZIO.succeed(println(i))
    }

ZStream handles backpressure automatically

TIP:

Pull-based streaming means the consumer controls the flow. Fast producers don't overwhelm slow consumers—elements are only pulled when the consumer is ready.

Concurrent Streams

Cats Effect
import cats.effect.IO
import fs2.Stream

// Merge (interleave)
val merged: Stream[IO, Int] =
  Stream(1, 2, 3).merge(Stream(4, 5, 6))

// Zip (pair elements)
val zipped: Stream[IO, (Int, String)] =
  Stream(1, 2, 3).zip(Stream("a", "b", "c"))

// Parallel map
val parMapped: Stream[IO, Int] =
  Stream(1, 2, 3).parEvalMap(4) { i =>
    IO.pure(i * 2)
  }

merge / zip / parEvalMap

ZIO
import zio._
import zio.stream._

// Merge (interleave)
val merged: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3).merge(ZStream(4, 5, 6))

// Zip (pair elements)
val zipped: ZStream[Any, Nothing, (Int, String)] =
  ZStream(1, 2, 3).zip(ZStream("a", "b", "c"))

// Parallel map
val parMapped: ZStream[Any, Nothing, Int] =
  ZStream(1, 2, 3).mapZIOPar(4) { i =>
    ZIO.succeed(i * 2)
  }

merge / zip / mapZIOPar

TIP:

ZStream is built into ZIO with no additional dependencies. fs2 is a separate library but integrates seamlessly with Cats Effect.

Resource-Safe Streams

Cats Effect
import cats.effect._
import fs2.Stream
import java.io.BufferedReader

// Stream with resource
val lines: Stream[IO, String] =
  Stream.resource(
    Resource.fromAutoCloseable(
      IO.blocking(new BufferedReader(???))
    )
  ).flatMap { reader =>
    Stream.repeatEval(IO.blocking(reader.readLine()))
      .takeWhile(_ != null)
  }

Stream.resource for safe acquisition

ZIO
import zio._
import zio.stream._
import java.io.BufferedReader

// Stream with resource
val lines: ZStream[Any, Throwable, String] =
  ZStream.fromZIO(
    ZIO.fromAutoCloseable(
      ZIO.attemptBlocking(new BufferedReader(???))
    )
  ).flatMap { reader =>
    ZStream.repeatZIO(ZIO.attemptBlocking(reader.readLine()))
      .takeWhile(_ != null)
  }

ZStream with scoped resources

Next Steps

Streaming is powerful in both ecosystems. Let's look at application structure.

Next: Application Structure →