Learn Zig Series (#30) - Thread Safety and Atomics
What will I learn
- You will learn how to write solutions for the Episode 29 exercises;
- You will learn creating threads with
std.Thread.spawn; - You will learn joining threads and collecting results;
- You will learn data races: why shared mutable state is dangerous;
- You will learn
std.atomic.Valuefor lock-free atomic operations; - You will learn atomic ordering: Relaxed, Acquire, Release, SeqCst;
- You will learn
std.Thread.Mutexandstd.Thread.Conditionfor synchronization; - You will learn thread-safe data structures with mutex wrappers;
- You will learn a practical example: building a concurrent counter and a producer-consumer queue.
Requirements
- A working modern computer running macOS, Windows or Ubuntu;
- An installed Zig 0.14+ distribution (download from ziglang.org);
- The ambition to learn Zig programming.
Difficulty
- Intermediate
Curriculum (of the Learn Zig Series):
- Zig Programming Tutorial - ep001 - Intro
- Learn Zig Series (#2) - Hello Zig, Variables and Types
- Learn Zig Series (#3) - Functions and Control Flow
- Learn Zig Series (#4) - Error Handling (Zig's Best Feature)
- Learn Zig Series (#5) - Arrays, Slices, and Strings
- Learn Zig Series (#6) - Structs, Enums, and Tagged Unions
- Learn Zig Series (#7) - Memory Management and Allocators
- Learn Zig Series (#8) - Pointers and Memory Layout
- Learn Zig Series (#9) - Comptime (Zig's Superpower)
- Learn Zig Series (#10) - Project Structure, Modules, and File I/O
- Learn Zig Series (#11) - Mini Project: Building a Step Sequencer
- Learn Zig Series (#12) - Testing and Test-Driven Development
- Learn Zig Series (#13) - Interfaces via Type Erasure
- Learn Zig Series (#14) - Generics with Comptime Parameters
- Learn Zig Series (#15) - The Build System (build.zig)
- Learn Zig Series (#16) - Sentinel-Terminated Types and C Strings
- Learn Zig Series (#17) - Packed Structs and Bit Manipulation
- Learn Zig Series (#18) - Async Concepts and Event Loops
- Learn Zig Series (#18b) - Addendum: Async Returns in Zig 0.16
- Learn Zig Series (#19) - SIMD with @Vector
- Learn Zig Series (#20) - Working with JSON
- Learn Zig Series (#21) - Networking and TCP Sockets
- Learn Zig Series (#22) - Hash Maps and Data Structures
- Learn Zig Series (#23) - Iterators and Lazy Evaluation
- Learn Zig Series (#24) - Logging, Formatting, and Debug Output
- Learn Zig Series (#25) - Mini Project: HTTP Status Checker
- Learn Zig Series (#26) - Writing a Custom Allocator
- Learn Zig Series (#27) - C Interop: Calling C from Zig
- Learn Zig Series (#28) - C Interop: Exposing Zig to C
- Learn Zig Series (#29) - Inline Assembly and Low-Level Control
- Learn Zig Series (#30) - Thread Safety and Atomics (this post)
Learn Zig Series (#30) - Thread Safety and Atomics
Solutions to Episode 29 Exercises
Exercise 1 - cpuBrand() function using extended CPUID leaves:
const std = @import("std");
fn cpuid(leaf: u32, subleaf: u32) struct { eax: u32, ebx: u32, ecx: u32, edx: u32 } {
var eax: u32 = undefined;
var ebx: u32 = undefined;
var ecx: u32 = undefined;
var edx: u32 = undefined;
asm volatile (
"cpuid"
: [eax] "={eax}" (eax),
[ebx] "={ebx}" (ebx),
[ecx] "={ecx}" (ecx),
[edx] "={edx}" (edx)
: [leaf] "{eax}" (leaf),
[sub] "{ecx}" (subleaf)
:
);
return .{ .eax = eax, .ebx = ebx, .ecx = ecx, .edx = edx };
}
fn cpuBrand() [48]u8 {
var brand: [48]u8 = undefined;
for (0..3) |i| {
const leaf: u32 = 0x80000002 + @as(u32, @intCast(i));
const r = cpuid(leaf, 0);
const offset = i * 16;
@memcpy(brand[offset..][0..4], std.mem.asBytes(&r.eax));
@memcpy(brand[offset + 4 ..][0..4], std.mem.asBytes(&r.ebx));
@memcpy(brand[offset + 8 ..][0..4], std.mem.asBytes(&r.ecx));
@memcpy(brand[offset + 12 ..][0..4], std.mem.asBytes(&r.edx));
}
return brand;
}
pub fn main() void {
const brand = cpuBrand();
std.debug.print("CPU Brand: {s}\n", .{&brand});
}
Leaves 0x80000002, 0x80000003, and 0x80000004 each return 16 bytes of the brand string spread across all four registers. Unlike the vendor string (leaf 0) which has EBX-EDX-ECX in that odd order, the brand string registers go in straight EAX-EBX-ECX-EDX order. The result is typically padded with null bytes or spaces at the end. Simple loop, three iterations, 16 bytes each = 48 byte brand string.
Exercise 2 - Benchmarking three summation strategies with RDTSCP:
const std = @import("std");
fn rdtscp() u64 {
var lo: u32 = undefined;
var hi: u32 = undefined;
asm volatile ("rdtscp"
: [lo] "={eax}" (lo),
[hi] "={edx}" (hi)
:
: "ecx"
);
return (@as(u64, hi) << 32) | @as(u64, lo);
}
fn sumPlain(data: []const u32) u64 {
var total: u64 = 0;
for (data) |v| total += v;
return total;
}
fn sumUnrolled(data: []const u32) u64 {
var total: u64 = 0;
var i: usize = 0;
while (i + 4 <= data.len) : (i += 4) {
total += data[i];
total += data[i + 1];
total += data[i + 2];
total += data[i + 3];
}
while (i < data.len) : (i += 1) {
total += data[i];
}
return total;
}
fn sumSimd(data: []const u32) u64 {
var acc: @Vector(4, u64) = @splat(0);
var i: usize = 0;
while (i + 4 <= data.len) : (i += 4) {
const chunk: @Vector(4, u32) = data[i..][0..4].*;
const wide: @Vector(4, u64) = @intCast(chunk);
acc += wide;
}
var total: u64 = @reduce(.Add, acc);
while (i < data.len) : (i += 1) {
total += data[i];
}
return total;
}
fn measure(comptime f: anytype, data: []const u32) u64 {
// warm up
for (0..100) |_| _ = f(data);
var min: u64 = std.math.maxInt(u64);
for (0..1000) |_| {
const s = rdtscp();
_ = f(data);
const e = rdtscp();
const d = e - s;
if (d < min) min = d;
}
return min;
}
pub fn main() void {
var data: [1024]u32 = undefined;
for (&data, 0..) |*slot, i| {
slot.* = @as(u32, @truncate(i *% 6971 +% 7919));
}
const slice: []const u32 = &data;
const c_plain = measure(sumPlain, slice);
const c_unrolled = measure(sumUnrolled, slice);
const c_simd = measure(sumSimd, slice);
std.debug.print("Plain: {d} cycles\n", .{c_plain});
std.debug.print("Unrolled: {d} cycles ({d:.2}x)\n", .{
c_unrolled,
@as(f64, @floatFromInt(c_plain)) / @as(f64, @floatFromInt(c_unrolled)),
});
std.debug.print("SIMD: {d} cycles ({d:.2}x)\n", .{
c_simd,
@as(f64, @floatFromInt(c_plain)) / @as(f64, @floatFromInt(c_simd)),
});
}
The SIMD version uses @Vector(4, u64) accumulators widened from the u32 data so we don't overflow during reduction. On modern x86_64 hardware with AVX2 you should see a meaningful speedup for the SIMD path. The unrolled version helps the compiler pipeline the additions but won't match true SIMD width. Exact speedup ratios vary wildly by CPU -- on my machine the SIMD path is roughly 2-3x faster than plain for this array size.
Exercise 3 - spinWait with RDTSC busy loop:
const std = @import("std");
fn rdtsc() u64 {
var lo: u32 = undefined;
var hi: u32 = undefined;
asm volatile ("rdtsc"
: [lo] "={eax}" (lo),
[hi] "={edx}" (hi)
:
: "edx"
);
return (@as(u64, hi) << 32) | @as(u64, lo);
}
fn rdtscp() u64 {
var lo: u32 = undefined;
var hi: u32 = undefined;
asm volatile ("rdtscp"
: [lo] "={eax}" (lo),
[hi] "={edx}" (hi)
:
: "ecx"
);
return (@as(u64, hi) << 32) | @as(u64, lo);
}
fn spinWait(target_cycles: u64) void {
const start = rdtsc();
while (true) {
const now = rdtsc();
if (now - start >= target_cycles) break;
}
}
pub fn main() void {
const targets = [_]u64{ 1000, 10000, 100000 };
for (targets) |target| {
const before = rdtscp();
spinWait(target);
const after = rdtscp();
const actual = after - before;
const diff: i64 = @as(i64, @intCast(actual)) - @as(i64, @intCast(target));
std.debug.print("Target: {d:>6} | Actual: {d:>6} | Diff: {d:>+6}\n", .{
target, actual, diff,
});
}
// Why this isn't reliable for wall-clock timing:
// TSC counts cycles (or fixed-frequency ticks), NOT wall-clock time.
// On modern CPUs with turbo boost, the TSC frequency may differ from
// the actual clock speed. The OS can also migrate this thread to a
// different core mid-wait (different TSC on some older CPUs). Context
// switches add unpredictable delays. std.time.Timer uses the OS
// monotonic clock (clock_gettime CLOCK_MONOTONIC on Linux) which
// accounts for all of these issues and gives reliable wall-clock
// durations regardless of CPU frequency scaling or core migration.
}
The key insight: TSC is a hardware counter that counts at a fixed rate (on modern CPUs, at least -- older ones scaled with CPU frequency). It's good for measuring how many cycles something took, but not for "wait exactly 5 microseconds." For wall-clock timing, always use the OS timer facilities.
Here we go! Last episode we dropped all the way down to the CPU instruction level with inline assembly -- talking directly to the silicon. That was fun, but also very much single-threaded territory. One CPU core, one sequence of instructions, deterministic execution. Today we go sideways: we're adding more CPUs to the picture. Multiple threads, shared memory, and all the spectacular ways things break when two cores try to touch the same data at the same time ;-)
Now, if you've been following this series from the beginning, you know Zig is very explicit about things other languages hide from you. Memory allocation? You pass allocators explicitly (episode 7). Error handling? You propagate errors explicitly (episode 4). Threading is no different. Zig gives you raw OS threads with std.Thread, atomic operations for lock-free programming, and mutexes for everything else. No hidden garbage collector threads, no runtime scheduler, no magic. Just you, the operating system, and the hardware.
This is where systems programming gets real. And also where it gets really dangerous if you don't pay attention.
Creating threads with std.Thread.spawn
In Zig, spawning a thread is straightforward. You call std.Thread.spawn with a config, a function, and the arguments to pass to that function. The function runs on a new OS thread:
const std = @import("std");
fn workerTask(id: usize) void {
std.debug.print("Thread {d} starting on OS thread\n", .{id});
// Simulate some work
var sum: u64 = 0;
for (0..1_000_000) |i| {
sum +%= i *% (id + 1);
}
std.debug.print("Thread {d} finished: sum = {d}\n", .{ id, sum });
}
pub fn main() !void {
const num_threads = 4;
var threads: [num_threads]std.Thread = undefined;
// Spawn threads
for (0..num_threads) |i| {
threads[i] = try std.Thread.spawn(.{}, workerTask, .{i});
}
std.debug.print("Main thread: all workers spawned\n", .{});
// Join all threads (wait for them to finish)
for (&threads) |*t| {
t.join();
}
std.debug.print("Main thread: all workers done\n", .{});
}
The .{} first argument is std.Thread.SpawnConfig -- it lets you set the stack size and, optionally, give the thread a name. The default config gives each thread an 8 MiB stack, which is plenty for most work. If you're spawning thousands of threads (don't -- use a thread pool instead), you might want to shrink the stack.
One thing that's important: std.Thread.spawn returns a std.Thread value that you MUST call .join() on eventually. Joining waits for the thread to complete and cleans up its resources. If you drop a std.Thread without joining it, you leak an OS thread handle. Zig won't crash on you for this, but it's a resource leak that adds up if you're spawning threads in a loop.
The function you pass to spawn can be any function -- named, anonymous, method on a struct. The arguments are passed as a tuple. Zig figures out the types at comptime (we covered comptime parameter resolution in episode 9 and episode 14).
Joining threads and collecting results
Joining is how you synchronize. The main thread calls .join() on each worker, which blocks until that worker finishes. But what if you want threads to return values? The spawn function itself doesn't directly return a value from the thread function -- the thread function's return type is used for error handling. Instead, you pass a pointer to shared state that the thread writes its result into:
const std = @import("std");
fn computeSum(data: []const u64, result: *u64) void {
var total: u64 = 0;
for (data) |v| {
total +%= v;
}
result.* = total;
}
pub fn main() !void {
// Create some data to process in parallel
var data: [1_000_000]u64 = undefined;
for (&data, 0..) |*slot, i| {
slot.* = @as(u64, i) + 1;
}
// Split data into 4 chunks, one per thread
const chunk_size = data.len / 4;
var results: [4]u64 = undefined;
var threads: [4]std.Thread = undefined;
for (0..4) |i| {
const start = i * chunk_size;
const end = if (i == 3) data.len else (i + 1) * chunk_size;
threads[i] = try std.Thread.spawn(
.{},
computeSum,
.{ data[start..end], &results[i] },
);
}
for (&threads) |*t| {
t.join();
}
var grand_total: u64 = 0;
for (results) |r| {
grand_total +%= r;
}
std.debug.print("Sum of 1..1000000 = {d}\n", .{grand_total});
// Expected: 500000500000
}
Each thread writes to its own slot in the results array. Because each thread writes to a DIFFERENT memory location, there's no conflict. This is the safest pattern for parallel computation: partition the work so that threads operate on completely independent data. No sharing, no problems.
NB: this "split and merge" approach is the basis of most practical parallelism. Map-reduce, parallel sorting, image processing -- you divide the data into non-overlapping chunks, process each independently, then combine the results. It's boring and predictable, which is exactly what you want with concurrency. The exciting stuff (shared mutable state, lock-free data structures) is where bugs live.
Data races: why shared mutable state is dangerous
Let's intentionally write broken code to see what happens when two threads modify the same variable simultaneously:
const std = @import("std");
var shared_counter: u64 = 0;
fn incrementMany() void {
for (0..100_000) |_| {
shared_counter += 1;
}
}
pub fn main() !void {
var threads: [8]std.Thread = undefined;
for (&threads) |*t| {
t.* = try std.Thread.spawn(.{}, incrementMany, .{});
}
for (&threads) |*t| {
t.join();
}
std.debug.print("Expected: {d}\n", .{8 * 100_000});
std.debug.print("Actual: {d}\n", .{shared_counter});
}
If you run this, the actual count will almost certainly be LESS than 800,000. On my machine I typically see something like 300,000-600,000. The problem is classic: shared_counter += 1 is NOT an atomic operation. At the machine level, it's at least three steps -- load the value from memory into a register, increment the register, store the register back to memory. When two threads both load the same value (say 42), both increment to 43, and both store 43 -- you've done two increments but only advanced the counter by one. This is a data race, and it's undefined behavior in virtually every language.
The maddening thing about data races is that they don't always produce wrong results. If you compile in debug mode, the operations are slower, less likely to overlap, and the output might look correct. Switch to ReleaseFast, and suddenly the counter is wildly wrong. Or it's correct on your machine but broken on a different CPU with a different cache coherency protocol. This is precisely the kind of bug that passes all your tests and then blows up in production.
Zig is honest about this: it does NOT protect you from data races at compile time. Rust's borrow checker catches these statically. Zig trusts you to get it right. That means you need to understand the tools Zig provides for safe concurrency -- and use them.
std.atomic.Value for lock-free atomic operations
The lightest-weight solution for shared state is atomic operations. An atomic operation is guaranteed by the CPU hardware to complete as a single indivisible step -- no other core can see a half-finished result. Zig provides std.atomic.Value for this:
const std = @import("std");
var counter = std.atomic.Value(u64).init(0);
fn incrementAtomic() void {
for (0..100_000) |_| {
_ = counter.fetchAdd(1, .seq_cst);
}
}
pub fn main() !void {
var threads: [8]std.Thread = undefined;
for (&threads) |*t| {
t.* = try std.Thread.spawn(.{}, incrementAtomic, .{});
}
for (&threads) |*t| {
t.join();
}
std.debug.print("Expected: {d}\n", .{8 * 100_000});
std.debug.print("Actual: {d}\n", .{counter.load(.seq_cst)});
// Always exactly 800000
}
fetchAdd atomically loads the current value, adds 1, stores the result, and returns the old value -- all as one indivisible operation. No other thread can sneak in between the load and store. The .seq_cst parameter is the memory ordering -- we'll discuss that in detail shortly.
std.atomic.Value supports several atomic operations:
const std = @import("std");
pub fn main() void {
var val = std.atomic.Value(u32).init(10);
// Store a value atomically
val.store(42, .seq_cst);
// Load the current value atomically
const current = val.load(.seq_cst);
std.debug.print("Current: {d}\n", .{current});
// Fetch and add: returns old value, stores old + 5
const old = val.fetchAdd(5, .seq_cst);
std.debug.print("fetchAdd(5): old={d}, new={d}\n", .{ old, val.load(.seq_cst) });
// Fetch and subtract
_ = val.fetchSub(3, .seq_cst);
std.debug.print("After fetchSub(3): {d}\n", .{val.load(.seq_cst)});
// Compare and swap: only store if current value matches expected
const result = val.cmpxchgStrong(44, 100, .seq_cst, .seq_cst);
if (result) |old_val| {
std.debug.print("CAS failed: expected 44, found {d}\n", .{old_val});
} else {
std.debug.print("CAS succeeded: value is now 100\n", .{});
}
// Compare and swap with actual current value
_ = val.cmpxchgStrong(val.load(.seq_cst), 999, .seq_cst, .seq_cst);
std.debug.print("Final value: {d}\n", .{val.load(.seq_cst)});
}
Compare-and-swap (CAS) is the fundamental building block of lock-free programming. It says: "If the value is currently X, change it to Y. Otherwise, tell me what it actually is." This single operation is enough to build lock-free stacks, queues, and all sorts of concurrent data structures. It's what mutexes themselves are built on top of, underneath.
Atomic ordering: Relaxed, Acquire, Release, SeqCst
Memory ordering is the tricky part. When you do an atomic operation, the ordering parameter tells the CPU how much it needs to constrain the visibility of surrounding memory operations. This matters because modern CPUs reorder instructions aggressively for performance, and what one core "sees" in memory can lag behind what another core has actually written.
const std = @import("std");
var data: u64 = 0;
var flag = std.atomic.Value(bool).init(false);
fn producer() void {
// Write data FIRST, then set the flag
data = 42;
flag.store(true, .release);
// Release ordering: all writes BEFORE this store
// are guaranteed to be visible to any thread that
// does an acquire load of this same atomic and sees 'true'
}
fn consumer() void {
// Spin until we see the flag
while (!flag.load(.acquire)) {
std.atomic.spinLoopHint();
}
// Acquire ordering: all writes that happened BEFORE
// the corresponding release store are now visible to us
std.debug.print("Data: {d}\n", .{data});
// Guaranteed to print 42, not 0 or garbage
}
pub fn main() !void {
const t1 = try std.Thread.spawn(.{}, producer, .{});
const t2 = try std.Thread.spawn(.{}, consumer, .{});
t1.join();
t2.join();
}
The four orderings from weakest to strongest:
.relaxed-- no ordering guarantees at all. Only guarantees atomicity of the operation itself. Useful for simple counters where you don't care about ordering relative to other memory. Fastest..acquire-- on a LOAD: guarantees that all memory reads/writes after this load see everything that was visible before the corresponding release store. Think of it as "acquire the data that was published.".release-- on a STORE: guarantees that all memory reads/writes before this store are visible to any thread that does an acquire load and sees this stored value. Think of it as "release (publish) your data.".seq_cst(sequentially consistent) -- the strongest ordering. ALL operations appear to happen in a single global order that every thread agrees on. Slowest, but easiest to reason about. When in doubt, use this one.
The acquire/release pair is the workhorse of concurrent programming. The producer writes its data, then does a release store on a flag. The consumer does an acquire load on the same flag. When the consumer sees the flag is set, it's guaranteed to see all the data the producer wrote before setting the flag. This is how you safely publish data from one thread to another without a mutex.
std.atomic.spinLoopHint() is a nice touch -- it tells the CPU "I'm busy-waiting, don't burn full power on this core." On x86 it emits the PAUSE instruction which saves energy and avoids contention on the memory bus. Small thing, but it matters in tight spin loops.
My rule of thumb: use .seq_cst until profiling shows it's a bottleneck (it almost never is). Then carefully think about acquire/release pairs for the hot path. .relaxed is only for things like statistics counters where you truly don't care about ordering. Getting memory ordering wrong produces bugs that are nearly impossible to reproduce and debug. Safety first.
std.Thread.Mutex and std.Thread.Condition for synchronization
Atomics are great for simple variables, but for protecting complex data structures you need a mutex (mutual exclusion). A mutex ensures that only one thread at a time can execute a critical section of code:
const std = @import("std");
const SharedState = struct {
mutex: std.Thread.Mutex = .{},
count: u64 = 0,
history: [10]u64 = [_]u64{0} ** 10,
history_len: usize = 0,
fn increment(self: *SharedState) void {
self.mutex.lock();
defer self.mutex.unlock();
self.count += 1;
if (self.history_len < self.history.len) {
self.history[self.history_len] = self.count;
self.history_len += 1;
}
}
fn getCount(self: *SharedState) u64 {
self.mutex.lock();
defer self.mutex.unlock();
return self.count;
}
};
fn worker(state: *SharedState) void {
for (0..10_000) |_| {
state.increment();
}
}
pub fn main() !void {
var state = SharedState{};
var threads: [8]std.Thread = undefined;
for (&threads) |*t| {
t.* = try std.Thread.spawn(.{}, worker, .{&state});
}
for (&threads) |*t| {
t.join();
}
std.debug.print("Final count: {d}\n", .{state.count});
std.debug.print("First 10 increments: ", .{});
for (state.history[0..state.history_len]) |v| {
std.debug.print("{d} ", .{v});
}
std.debug.print("\n", .{});
}
The defer self.mutex.unlock() pattern is essential -- it guarantees the mutex gets unlocked even if the code between lock and unlock panics or returns early. We talked about defer way back in episode 7 for freeing memory, and the same principle applies: pair every acquire with a release, and use defer to make sure it happens.
For more complex coordination -- "wait until some condition is true, then proceed" -- you use a Condition variable together with a mutex:
const std = @import("std");
const Signal = struct {
mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
ready: bool = false,
payload: u64 = 0,
fn wait(self: *Signal) u64 {
self.mutex.lock();
defer self.mutex.unlock();
while (!self.ready) {
self.cond.wait(&self.mutex);
}
return self.payload;
}
fn send(self: *Signal, value: u64) void {
self.mutex.lock();
defer self.mutex.unlock();
self.payload = value;
self.ready = true;
self.cond.signal();
}
};
fn waiter(sig: *Signal, id: usize) void {
const value = sig.wait();
std.debug.print("Thread {d} received: {d}\n", .{ id, value });
}
pub fn main() !void {
var sig = Signal{};
const t1 = try std.Thread.spawn(.{}, waiter, .{ &sig, 1 });
const t2 = try std.Thread.spawn(.{}, waiter, .{ &sig, 2 });
// Give threads time to start waiting
std.time.sleep(50 * std.time.ns_per_ms);
std.debug.print("Main: sending signal...\n", .{});
sig.send(42);
t1.join();
t2.join();
}
The while (!self.ready) loop around cond.wait is CRITICAL. Condition variables can have spurious wakeups -- the OS might wake your thread even though nobody called .signal(). Always check your condition in a loop, never assume a single wakeup means your condition is actually met. This is true in every language and every OS. I've seen people skip the loop and spend days debugging intermittent failures that only show up under high load. The while loop costs almost nothing and eliminates an entire class of bugs.
Also note: .signal() wakes ONE waiting thread. If you want to wake ALL waiting threads (say, to broadcast a shutdown signal), use .broadcast() instead.
Thread-safe data structures with mutex wrappers
A common pattern is wrapping an existing data structure with a mutex to make it safe for concurrent access. Let's build a thread-safe bounded queue:
const std = @import("std");
fn ThreadSafeQueue(comptime T: type, comptime capacity: usize) type {
return struct {
const Self = @This();
buffer: [capacity]T = undefined,
head: usize = 0,
tail: usize = 0,
count: usize = 0,
mutex: std.Thread.Mutex = .{},
not_empty: std.Thread.Condition = .{},
not_full: std.Thread.Condition = .{},
pub fn push(self: *Self, item: T) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.count == capacity) {
self.not_full.wait(&self.mutex);
}
self.buffer[self.tail] = item;
self.tail = (self.tail + 1) % capacity;
self.count += 1;
self.not_empty.signal();
}
pub fn pop(self: *Self) T {
self.mutex.lock();
defer self.mutex.unlock();
while (self.count == 0) {
self.not_empty.wait(&self.mutex);
}
const item = self.buffer[self.head];
self.head = (self.head + 1) % capacity;
self.count -= 1;
self.not_full.signal();
return item;
}
pub fn len(self: *Self) usize {
self.mutex.lock();
defer self.mutex.unlock();
return self.count;
}
};
}
pub fn main() !void {
var queue = ThreadSafeQueue(u64, 32){};
std.debug.print("Pushing 10 items...\n", .{});
for (0..10) |i| {
queue.push(i * 10);
}
std.debug.print("Queue length: {d}\n", .{queue.len()});
std.debug.print("Popping: ", .{});
for (0..10) |_| {
std.debug.print("{d} ", .{queue.pop()});
}
std.debug.print("\n", .{});
}
This uses comptime generics (from episode 14) to make the queue work with any type. The two condition variables (not_empty and not_full) are the classic bounded buffer pattern -- producers wait when the queue is full, consumers wait when it's empty, and each side signals the other when it changes state.
The push function locks the mutex, waits if full (releasing the mutex during the wait so other threads can drain the queue), writes the item, advances the tail pointer, and signals any waiting consumer. Mirror image for pop. This is textbook producer-consumer synchronization and it works perfectly -- but it's also the slowest approach because every single push and pop acquires a lock. For high-throughput scenarios, lock-free queues using CAS operations are faster but significantly more complex to implement correctly.
Practical example: a concurrent counter and a producer-consumer queue
Let's combine everything into a practical example. Multiple producer threads generate work items, multiple consumer threads process them, and an atomic counter tracks progress:
const std = @import("std");
fn ThreadSafeQueue(comptime T: type, comptime capacity: usize) type {
return struct {
const Self = @This();
buffer: [capacity]T = undefined,
head: usize = 0,
tail: usize = 0,
count: usize = 0,
mutex: std.Thread.Mutex = .{},
not_empty: std.Thread.Condition = .{},
not_full: std.Thread.Condition = .{},
pub fn push(self: *Self, item: T) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.count == capacity) {
self.not_full.wait(&self.mutex);
}
self.buffer[self.tail] = item;
self.tail = (self.tail + 1) % capacity;
self.count += 1;
self.not_empty.signal();
}
pub fn tryPop(self: *Self) ?T {
self.mutex.lock();
defer self.mutex.unlock();
if (self.count == 0) return null;
const item = self.buffer[self.head];
self.head = (self.head + 1) % capacity;
self.count -= 1;
self.not_full.signal();
return item;
}
};
}
const WorkItem = struct {
producer_id: usize,
sequence: u64,
value: u64,
};
var items_produced = std.atomic.Value(u64).init(0);
var items_consumed = std.atomic.Value(u64).init(0);
var producers_done = std.atomic.Value(u32).init(0);
fn producer(queue: *ThreadSafeQueue(WorkItem, 64), id: usize) void {
for (0..1000) |seq| {
const item = WorkItem{
.producer_id = id,
.sequence = seq,
.value = @as(u64, id) * 1000 + seq,
};
queue.push(item);
_ = items_produced.fetchAdd(1, .relaxed);
}
_ = producers_done.fetchAdd(1, .release);
}
fn consumer(queue: *ThreadSafeQueue(WorkItem, 64), id: usize, num_producers: u32) void {
var local_sum: u64 = 0;
var local_count: u64 = 0;
while (true) {
if (queue.tryPop()) |item| {
local_sum +%= item.value;
local_count += 1;
_ = items_consumed.fetchAdd(1, .relaxed);
} else {
// Queue is empty -- check if all producers are done
if (producers_done.load(.acquire) == num_producers) {
// Double-check queue is still empty
if (queue.tryPop()) |late_item| {
local_sum +%= late_item.value;
local_count += 1;
_ = items_consumed.fetchAdd(1, .relaxed);
} else {
break;
}
}
std.atomic.spinLoopHint();
}
}
std.debug.print("Consumer {d}: processed {d} items, sum = {d}\n", .{
id, local_count, local_sum,
});
}
pub fn main() !void {
const num_producers = 4;
const num_consumers = 2;
var queue = ThreadSafeQueue(WorkItem, 64){};
var prod_threads: [num_producers]std.Thread = undefined;
var cons_threads: [num_consumers]std.Thread = undefined;
// Start consumers first so they're ready for work
for (0..num_consumers) |i| {
cons_threads[i] = try std.Thread.spawn(.{}, consumer, .{
&queue, i, num_producers,
});
}
// Start producers
for (0..num_producers) |i| {
prod_threads[i] = try std.Thread.spawn(.{}, producer, .{ &queue, i });
}
// Join producers
for (&prod_threads) |*t| t.join();
// Join consumers
for (&cons_threads) |*t| t.join();
std.debug.print("\nTotal produced: {d}\n", .{items_produced.load(.seq_cst)});
std.debug.print("Total consumed: {d}\n", .{items_consumed.load(.seq_cst)});
}
Several things to notice here. The items_produced and items_consumed counters use .relaxed ordering because they're just statistics -- we don't need them synchronized with anything else. But producers_done uses .release on the store side and .acquire on the load side, because when a consumer sees all producers are done, it needs to be sure all the produced items are actually visible in the queue.
The consumer does a double-check after seeing all producers are done: "queue empty AND producers finished? Check the queue ONE more time." This prevents a race where the last producer increments producers_done but hasn't flushed its last item through the queue yet. This kind of careful protocol design is what separates working concurrent code from code that loses items 0.001% of the time under heavy load.
I used tryPop (non-blocking) instead of blocking pop for the consumers because they need to be able to check the termination condition between pops. With a blocking pop, a consumer could get stuck waiting forever after all producers have finished and the queue is empty. There are more elegant shutdown mechanisms (sentinel values, atomic shutdown flags combined with condition broadcasts), but this simple spin-and-check approach is clear and correct.
Having said that, spinning wastes CPU. In a production system you'd either use a blocking queue with a proper shutdown signal, or use std.Thread.Condition to sleep when the queue is empty and get woken when new items arrive. The thread-safe queue from the previous section already does this with not_empty.wait(). But I wanted to show the explicit atomic coordination pattern because it illustrates how atomics and ordering work in practice.
Wat we geleerd hebben
std.Thread.spawncreates OS threads. Always.join()every thread you spawn to avoid resource leaks.- Data races happen when multiple threads access the same memory and at least one is writing. The result is undefined behavior -- wrong values, crashes, bugs that only appear in release builds.
std.atomic.Valueprovides hardware-level atomic operations:fetchAdd,fetchSub,cmpxchgStrong,store,load. These are indivisible -- no other thread can see a half-finished atomic operation.- Memory orderings control how surrounding non-atomic memory operations are visible across threads.
.seq_cstis safest (total order)..acquire/.releasepairs are the workhorse for publishing data between threads..relaxedis for statistcs counters only. std.Thread.Mutexprovides mutual exclusion for protecting complex data structures. Always usedefer self.mutex.unlock()to prevent deadlocks from early returns or panics.std.Thread.Conditionlets threads sleep until a condition is met. Always check your condition in awhileloop (spurious wakeups!). Use.signal()to wake one waiter,.broadcast()to wake all.- Thread-safe queues combine a mutex with two conditions (
not_empty,not_full) for bounded producer-consumer coordination. - Producer-consumer is the canonical concurrency pattern: producers generate work, consumers process it, a shared queue mediates. Atomic counters track progress with minimal overhead.
Concurrency in Zig is explicit, manual, and powerful. You get the same primitives the OS kernel uses -- threads, mutexes, conditions, atomics -- with no runtime overhead and no hidden magic. The price is that you must think carefully about data races, memory ordering, and shutdown protocols. But that's exactly the tradeoff Zig makes everywhere: no hidden costs, no surprises, full control. The patterns we've built today -- splitting data between threads, atomic counters, mutex-protected structures, producer-consumer queues -- these show up in almost every concurrent system. How those same concurrency patterns interact with Zig's memory model when you start mapping files and sharing buffers between threads gets really intersting ;-)
Exercises
Build a thread pool that maintains a fixed number of worker threads (say 4) and a task queue. Implement a
submit(task_fn, args)function that queues work and ashutdown()function that stops all workers gracefully. Submit 20 tasks that each sleep for a random duration (10-50ms) and print their task ID. Verify that no more than 4 tasks run concurrently by tracking active task count with an atomic.Implement a read-write lock using
std.Thread.Mutexandstd.Thread.Condition. Multiple threads should be able to read simultaneously, but writes must be exclusive. Test it with 6 reader threads that repeatedly read a shared[100]u32array (verifying all elements are equal) and 2 writer threads that periodically set all elements to a new value. The readers should never see a partially-written array.Create two versions of a shared counter: one using
std.atomic.Valuewith.relaxedordering and one with.seq_cstordering. Spawn 8 threads that each increment 1,000,000 times. Measure the wall-clock time for both versions usingstd.time.Timer. Print the timing difference and explain (in a comment) why.relaxedis faster and whether the counter result is still correct with.relaxedfor this specific use case.
Bedankt en tot de volgende keer!