Skip to main content
zio-cats

Concurrent Data Structures

Step 12 of 15

Concurrent Data Structures

ZIO and Cats Effect provide concurrent data structures for coordinating work between fibers. ZIO includes these built-in, while Cats Effect requires the cats-effect-std library.

Ref: Shared Mutable State

Ref holds a single mutable value that can be safely accessed and modified concurrently.

Cats Effect
import cats.effect._
import cats.effect.std._

// Create and modify a Ref
val program: IO[Int] =
  for {
    counter <- Ref.of[IO, Int](0)
    _       <- counter.update(_ + 1)
    _       <- counter.update(_ + 1)
    value   <- counter.get
  } yield value

Ref requires cats-effect-std

ZIO
import zio._

// Create and modify a Ref
val program =
  for {
    counter <- Ref.make(0)
    _       <- counter.update(_ + 1)
    _       <- counter.update(_ + 1)
    value   <- counter.get
  } yield value

Ref is built into ZIO

Both libraries provide atomic operations: update, modify, getAndUpdate, updateAndGet.

Queue: Work Distribution

Queue distributes work among multiple consumers. Each value is consumed by exactly one fiber.

Cats Effect
import cats.effect._
import cats.effect.std._

// Distribute work to two workers
val program: IO[Unit] =
  for {
    queue <- Queue.unbounded[IO, Int]
    _     <- queue.take.flatMap(work).forever.start
    _     <- queue.take.flatMap(work).forever.start
    _     <- IO.foreachDiscard(1 to 10)(queue.offer)
  } yield ()

Cats Effect uses .start for concurrent execution

ZIO
import zio._

// Distribute work to two workers
val program =
  for {
    queue <- Queue.unbounded[Int]
    _     <- queue.take.flatMap(work).forever.fork
    _     <- queue.take.flatMap(work).forever.fork
    _     <- ZIO.foreachDiscard(1 to 10)(queue.offer)
  } yield ()

ZIO uses .fork for concurrent execution

Bounded Queues

Both libraries support bounded queues with different strategies when full:

Cats Effect
import cats.effect._
import cats.effect.std._

// Different queue strategies
val bounded   = Queue.bounded[IO, Int](100)   // Back pressure
val dropping  = Queue.dropping[IO, Int](100)  // Drop new

CE: bounded, dropping (no sliding in cats-effect-std)

ZIO
import zio._

// Different queue strategies
val bounded   = Queue.bounded[Int](100)   // Back pressure
val sliding   = Queue.sliding[Int](100)    // Drop oldest
val dropping  = Queue.dropping[Int](100)   // Drop new

ZIO: bounded, sliding, dropping

TIP:

Cats Effect does not provide a sliding queue. For sliding behavior, use fs2 broadcast through Queue or implement manually.

Hub: Broadcasting

Hub broadcasts values to multiple subscribers. Each value is received by every subscriber (ZIO-only feature).

Cats Effect
// No direct Hub equivalent in cats-effect
// Use multiple queues or fs2 broadcast:
import fs2._

val stream: Stream[IO, Int] = Stream(1, 2, 3)

// Broadcast to multiple sinks
stream
  .broadcastThrough[IO](
    _.evalMap(log),
    _.evalMap(persist)
  )

Use fs2 Stream broadcasting instead

ZIO
import zio._

// Broadcast to multiple subscribers
val program =
  for {
    hub <- Hub.bounded[Int](100)
    _   <- hub.subscribe.flatMap(
            dequeue => dequeue.take.flatMap(log).forever
          ).fork
    _   <- hub.subscribe.flatMap(
            dequeue => dequeue.take.flatMap(persist).forever
          ).fork
    _   <- hub.publish(42)
  } yield ()

Hub is built-in, optimized for broadcast

WARNING:

Cats Effect does not have a built-in Hub equivalent. Use fs2 Stream.broadcastThrough or create multiple queues manually.

Promise: Work Synchronization

Promise allows one fiber to wait for a value to be set by another fiber.

Cats Effect
import cats.effect._
import cats.effect.std._

// Coordinate two fibers
val program: IO[Unit] =
  for {
    deferred <- Deferred[IO, Unit]
    _        <- (IO.print("Hello, ") *> deferred.complete(())).start
    _        <- (deferred.get *> IO.print("World!
")).start
  } yield ()

Deferred: complete/get

ZIO
import zio._

// Coordinate two fibers
val program =
  for {
    promise <- Promise.make[Nothing, Unit]
    _       <- (Console.print("Hello, ") *> promise.succeed(())).fork
    _       <- (promise.await *> Console.print("World!
")).fork
  } yield ()

Promise: succeed/await

Key differences:

  • ZIO: Promise.succeed / Promise.await
  • Cats Effect: Deferred.complete / Deferred.get

Semaphore: Work Limiting

Semaphore limits the number of fibers that can access a resource concurrently.

Cats Effect
import cats.effect._
import cats.effect.std._
import cats.syntax.all._

// Limit to 5 concurrent database connections
val program: IO[Unit] =
  for {
    semaphore <- Semaphore[IO](5)
    _         <- (1 to 100).parTraverse_ { id =>
                   semaphore.permit.use { _ =>
                     queryDatabase(id)
                   }
                 }
  } yield ()

permit.use creates a scoped resource

ZIO
import zio._

// Limit to 5 concurrent database connections
val program =
  for {
    semaphore <- Semaphore.make(5)
    _         <- ZIO.foreachPar(1 to 100) { id =>
                   semaphore.withPermit {
                     queryDatabase(id)
                   }
                 }
  } yield ()

withPermit automatically acquires/releases

Semaphore Pattern Differences

Cats Effect
import cats.effect._
import cats.effect.std._

// Cats Effect semaphore patterns
val semaphore = Semaphore[IO](5)

// Pattern 1: permit.use (auto release)
semaphore.permit.use { _ => action }

// Pattern 2: manual acquire/release
for {
  _ <- semaphore.acquire
  _ <- action
  _ <- semaphore.release
} yield ()

permit.use takes a function returning IO

ZIO
import zio._

// ZIO semaphore patterns
val semaphore = Semaphore.make(5)

// Pattern 1: withPermit (auto release)
semaphore.withPermit(action)

// Pattern 2: manual acquire/release
for {
  _ <- semaphore.acquire
  _ <- action
  _ <- semaphore.release
} yield ()

withPermit takes ZIO directly

Choosing the Right Structure

Use CaseZIO StructureCats Effect Structure
Shared stateRefRef (cats-effect-std)
Work distributionQueueQueue (cats-effect-std)
BroadcastingHubfs2 broadcastThrough
Work syncPromiseDeferred (cats-effect-std)
Work limitingSemaphoreSemaphore (cats-effect-std)
Pub/subHub.subscribefs2 broadcast
TIP:

Use Queue for distributing work (each value to one consumer). Use Hub for broadcasting (each value to all consumers). Cats Effect uses fs2 Stream for broadcast patterns.

Key Differences

FeatureZIOcats-effect-std
DependenciesBuilt-inSeparate cats-effect-std library
HubHub for broadcastUse fs2 Stream.broadcastThrough
Queue slidingQueue.slidingNot available
Promise namingPromise.succeedDeferred.complete
Semaphore usagewithPermit(action)permit.use(_ => action)
TIP:

ZIO includes all concurrent data structures built-in. Cats Effect splits them across cats-effect (Deferred) and cats-effect-std (Ref, Queue, Semaphore).

Next Steps

You've learned the concurrent data structures for coordination. Next: configuration loading.

Next: Configuration →