Concurrent Data Structures
ZIO and Cats Effect provide concurrent data structures for coordinating work between fibers. ZIO includes these built-in, while Cats Effect requires the cats-effect-std library.
Ref: Shared Mutable State
Ref holds a single mutable value that can be safely accessed and modified concurrently.
import cats.effect._
import cats.effect.std._
// Create and modify a Ref
val program: IO[Int] =
for {
counter <- Ref.of[IO, Int](0)
_ <- counter.update(_ + 1)
_ <- counter.update(_ + 1)
value <- counter.get
} yield valueRef requires cats-effect-std
import zio._
// Create and modify a Ref
val program =
for {
counter <- Ref.make(0)
_ <- counter.update(_ + 1)
_ <- counter.update(_ + 1)
value <- counter.get
} yield valueRef is built into ZIO
Both libraries provide atomic operations: update, modify, getAndUpdate, updateAndGet.
Queue: Work Distribution
Queue distributes work among multiple consumers. Each value is consumed by exactly one fiber.
import cats.effect._
import cats.effect.std._
// Distribute work to two workers
val program: IO[Unit] =
for {
queue <- Queue.unbounded[IO, Int]
_ <- queue.take.flatMap(work).forever.start
_ <- queue.take.flatMap(work).forever.start
_ <- IO.foreachDiscard(1 to 10)(queue.offer)
} yield ()Cats Effect uses .start for concurrent execution
import zio._
// Distribute work to two workers
val program =
for {
queue <- Queue.unbounded[Int]
_ <- queue.take.flatMap(work).forever.fork
_ <- queue.take.flatMap(work).forever.fork
_ <- ZIO.foreachDiscard(1 to 10)(queue.offer)
} yield ()ZIO uses .fork for concurrent execution
Bounded Queues
Both libraries support bounded queues with different strategies when full:
import cats.effect._
import cats.effect.std._
// Different queue strategies
val bounded = Queue.bounded[IO, Int](100) // Back pressure
val dropping = Queue.dropping[IO, Int](100) // Drop newCE: bounded, dropping (no sliding in cats-effect-std)
import zio._
// Different queue strategies
val bounded = Queue.bounded[Int](100) // Back pressure
val sliding = Queue.sliding[Int](100) // Drop oldest
val dropping = Queue.dropping[Int](100) // Drop newZIO: bounded, sliding, dropping
Cats Effect does not provide a sliding queue. For sliding behavior, use fs2 broadcast through Queue or implement manually.
Hub: Broadcasting
Hub broadcasts values to multiple subscribers. Each value is received by every subscriber (ZIO-only feature).
// No direct Hub equivalent in cats-effect
// Use multiple queues or fs2 broadcast:
import fs2._
val stream: Stream[IO, Int] = Stream(1, 2, 3)
// Broadcast to multiple sinks
stream
.broadcastThrough[IO](
_.evalMap(log),
_.evalMap(persist)
)Use fs2 Stream broadcasting instead
import zio._
// Broadcast to multiple subscribers
val program =
for {
hub <- Hub.bounded[Int](100)
_ <- hub.subscribe.flatMap(
dequeue => dequeue.take.flatMap(log).forever
).fork
_ <- hub.subscribe.flatMap(
dequeue => dequeue.take.flatMap(persist).forever
).fork
_ <- hub.publish(42)
} yield ()Hub is built-in, optimized for broadcast
Cats Effect does not have a built-in Hub equivalent. Use fs2 Stream.broadcastThrough or create multiple queues manually.
Promise: Work Synchronization
Promise allows one fiber to wait for a value to be set by another fiber.
import cats.effect._
import cats.effect.std._
// Coordinate two fibers
val program: IO[Unit] =
for {
deferred <- Deferred[IO, Unit]
_ <- (IO.print("Hello, ") *> deferred.complete(())).start
_ <- (deferred.get *> IO.print("World!
")).start
} yield ()Deferred: complete/get
import zio._
// Coordinate two fibers
val program =
for {
promise <- Promise.make[Nothing, Unit]
_ <- (Console.print("Hello, ") *> promise.succeed(())).fork
_ <- (promise.await *> Console.print("World!
")).fork
} yield ()Promise: succeed/await
Key differences:
- ZIO:
Promise.succeed/Promise.await - Cats Effect:
Deferred.complete/Deferred.get
Semaphore: Work Limiting
Semaphore limits the number of fibers that can access a resource concurrently.
import cats.effect._
import cats.effect.std._
import cats.syntax.all._
// Limit to 5 concurrent database connections
val program: IO[Unit] =
for {
semaphore <- Semaphore[IO](5)
_ <- (1 to 100).parTraverse_ { id =>
semaphore.permit.use { _ =>
queryDatabase(id)
}
}
} yield ()permit.use creates a scoped resource
import zio._
// Limit to 5 concurrent database connections
val program =
for {
semaphore <- Semaphore.make(5)
_ <- ZIO.foreachPar(1 to 100) { id =>
semaphore.withPermit {
queryDatabase(id)
}
}
} yield ()withPermit automatically acquires/releases
Semaphore Pattern Differences
import cats.effect._
import cats.effect.std._
// Cats Effect semaphore patterns
val semaphore = Semaphore[IO](5)
// Pattern 1: permit.use (auto release)
semaphore.permit.use { _ => action }
// Pattern 2: manual acquire/release
for {
_ <- semaphore.acquire
_ <- action
_ <- semaphore.release
} yield ()permit.use takes a function returning IO
import zio._
// ZIO semaphore patterns
val semaphore = Semaphore.make(5)
// Pattern 1: withPermit (auto release)
semaphore.withPermit(action)
// Pattern 2: manual acquire/release
for {
_ <- semaphore.acquire
_ <- action
_ <- semaphore.release
} yield ()withPermit takes ZIO directly
Choosing the Right Structure
| Use Case | ZIO Structure | Cats Effect Structure |
|---|---|---|
| Shared state | Ref | Ref (cats-effect-std) |
| Work distribution | Queue | Queue (cats-effect-std) |
| Broadcasting | Hub | fs2 broadcastThrough |
| Work sync | Promise | Deferred (cats-effect-std) |
| Work limiting | Semaphore | Semaphore (cats-effect-std) |
| Pub/sub | Hub.subscribe | fs2 broadcast |
Use Queue for distributing work (each value to one consumer). Use Hub for broadcasting (each value to all consumers). Cats Effect uses fs2 Stream for broadcast patterns.
Key Differences
| Feature | ZIO | cats-effect-std |
|---|---|---|
| Dependencies | Built-in | Separate cats-effect-std library |
| Hub | Hub for broadcast | Use fs2 Stream.broadcastThrough |
| Queue sliding | Queue.sliding | Not available |
| Promise naming | Promise.succeed | Deferred.complete |
| Semaphore usage | withPermit(action) | permit.use(_ => action) |
ZIO includes all concurrent data structures built-in. Cats Effect splits them across cats-effect (Deferred) and cats-effect-std (Ref, Queue, Semaphore).
Next Steps
You've learned the concurrent data structures for coordination. Next: configuration loading.