Fiber Supervision
Both libraries use lightweight fibers for concurrency. ZIO has built-in supervision, while Cats Effect uses spawn with manual supervision.
Forking Fibers
import cats.effect.{IO, Fiber => CEFiber}
// Start a fiber
val fiber: IO[CEFiber[IO, Throwable, Int]] =
IO.pure(42).start
// Fire and forget
val background: IO[Unit] =
IO.println("Running in background")
.start
.voidstart - spawn a fiber
import zio._
// Fork a fiber
val fiber: UIO[Fiber[Nothing, Int]] =
ZIO.succeed(42).fork
// Fork daemon (outlives parent)
val daemon: UIO[Fiber[Nothing, Unit]] =
ZIO.succeed(println("Background"))
.forkDaemonfork / forkDaemon - start concurrent execution
Fork Variants
ZIO provides three fork strategies for different concurrency needs:
import cats.effect.IO
import cats.effect.Resource
// In current scope
val inScope = IO.println("Working").start
// Background requires explicit lifecycle
def backgroundProcess =
Resource.make(IO.println("Starting")) { _ =>
IO.println("Stopping")
}start + Resource for supervision
import zio._
// In current scope (interrupted with parent)
val inScope = Console.printLine("Working").fork
// Daemon (outlives parent - use sparingly)
val daemon = Console.printLine("Background").forkDaemon
// Scoped (explicit lifecycle management)
val scoped = ZIO.scoped {
for {
fiber <- Console.printLine("Working").forkScoped
_ <- ZIO.sleep(1.second)
} yield ()
}fork / forkDaemon / forkScoped - scope control
Fork strategy guide: Use fork by default (auto-supervision is safer). Use forkDaemon only for background processes that must outlive their parent. Use forkScoped when you need explicit scope management or manual resource cleanup.
Joining Fibers
import cats.effect.IO
import cats.effect.Outcome
// Join fiber and get result
val joined: IO[Int] = for {
fib <- IO.pure(42).start
result <- fib.joinWithNever
} yield result
// Join with outcome
val withOutcome: IO[Outcome[IO, Throwable, Int]] =
IO.pure(42).start.flatMap(_.join)joinWithNever / join
import zio._
// Join fiber and get result
val joined: UIO[Int] = for {
fib <- ZIO.succeed(42).fork
result <- fib.join
} yield result
// Await with Exit
val withExit: UIO[Exit[Nothing, Int]] =
ZIO.succeed(42).fork.flatMap(_.await)join / await
Racing
import cats.effect.{IO, Fiber => CEFiber, Outcome}
// First to complete wins, loser canceled
val raced: IO[Either[Int, String]] =
IO.pure(42).race(IO.pure("hello"))
// Both results
val both: IO[Either[
(Outcome[IO, Throwable, Int], CEFiber[IO, Throwable, String]),
(CEFiber[IO, Throwable, Int], Outcome[IO, Throwable, String])
]] = IO.pure(42).racePair(IO.pure("hello"))race / racePair
import zio._
// First to complete wins, loser canceled
val raced: UIO[Either[Int, String]] =
ZIO.succeed(42).race(ZIO.succeed("hello"))
// Get both (winner first)
val both: UIO[(Int, Fiber[Nothing, String])] =
ZIO.succeed(42).raceWith(ZIO.succeed("hello"))(
(exit, fib) => exit.fold(_ => fib.join.map(s => (0, ???)), a => ZIO.succeed((a, fib))),
(exit, fib) => exit.fold(_ => fib.join.map(a => (a, ???)), _ => ZIO.never)
)race / raceWith
Race with Type Information
When you need to know which effect won, use raceEither instead of race:
import cats.effect.IO
// race gives you the value, but which side won?
val raced: IO[String] =
IO.pure("A").race(IO.pure("B"))
// raceEither preserves type information
val whichWon: IO[Either[String, String]] =
IO.pure("left").raceEither(IO.pure("right"))race / raceEither - winner with type info
import zio._
// race loses type info (both sides must conform)
val raced: UIO[String] =
ZIO.succeed("A").race(ZIO.succeed("B"))
// raceEither keeps track of which side won
val whichWon: UIO[Either[String, Int]] =
ZIO.succeed("left").raceEither(ZIO.succeed(42))race / raceEither - winner with type info
raceEither semantics: First successful result wins and the loser is interrupted. If the first to complete fails, it waits for the second effect. If both fail, the error contains both causes.
Timeout
import cats.effect.IO
import scala.concurrent.duration._
// Timeout returns Option
val withTimeout: IO[Option[Int]] =
IO.sleep(10.seconds).as(42)
.timeout(1.second)
// Timeout or raise
val timeoutOrFail: IO[Int] =
IO.sleep(10.seconds).as(42)
.timeoutTo(1.second, IO.raiseError(
new java.util.concurrent.TimeoutException
))timeout / timeoutTo
import zio._
import zio.Duration._
// Timeout returns Option
val withTimeout: UIO[Option[Int]] =
ZIO.sleep(10.seconds).as(42)
.timeout(1.second)
// Timeout or fail
val timeoutOrFail: IO[java.util.concurrent.TimeoutException, Int] =
ZIO.sleep(10.seconds).as(42)
.timeoutFail(new java.util.concurrent.TimeoutException)(1.second)timeout / timeoutFail
Cancellation
import cats.effect.IO
// Cancel a fiber
val canceled: IO[Unit] = for {
fib <- IO.never[Int].start
_ <- fib.cancel
} yield ()
// Uncancelable region
val uncancelable: IO[Int] =
IO.uncancelable { poll =>
// poll(io) makes io cancelable again
IO.pure(42)
}cancel / uncancelable
import zio._
// Interrupt a fiber
val interrupted: UIO[Unit] = for {
fib <- ZIO.never.fork
_ <- fib.interrupt
} yield ()
// Uninterruptible region
val uninterruptible: UIO[Int] =
ZIO.uninterruptible {
ZIO.succeed(42)
}interrupt / uninterruptible
Cancellation/interruption is cooperative. Long-running computations should
check for cancellation periodically using IO.canceled or ZIO.checkInterrupted.
Supervision
import cats.effect._
import scala.concurrent.duration._
// Supervise with Resource
def supervised[A](io: IO[A]): Resource[IO, Fiber[IO, Throwable, A]] =
Resource.make(io.start)(_.cancel)
// All fibers canceled when Resource releases
val program: IO[Unit] =
supervised(IO.never[Int]).use { fib =>
IO.sleep(1.second)
}Resource-based supervision
import zio._
// Supervised scope - children interrupted on completion
val program: UIO[Unit] = ZIO.scoped {
for {
_ <- ZIO.never.forkScoped
_ <- ZIO.sleep(1.second)
} yield ()
}
// All forkScoped fibers interrupted when scope exitsforkScoped - automatic supervision
Next Steps
With concurrency covered, let's explore streaming with ZStream and fs2.