Learn Zig Series (#42) - Key-Value Store: TCP Server
Project B: Key-Value Store (3/4)
What will I learn
- You will learn designing a simple binary protocol for get/put/delete commands;
- You will learn request and response framing with length-prefixed messages;
- You will learn applying the TCP accept loop from episode 21 to the KV store;
- You will learn dispatching parsed commands to the in-memory store;
- You will learn concurrent client handling with a per-connection thread model;
- You will learn error responses for missing keys and malformed requests;
- You will learn graceful shutdown with connection draining;
- You will learn testing by spawning the server and verifying round-trip operations.
Requirements
- A working modern computer running macOS, Windows or Ubuntu;
- An installed Zig 0.14+ distribution (download from ziglang.org);
- The ambition to learn Zig programming.
Difficulty
- Intermediate
Curriculum (of the Learn Zig Series):
- Zig Programming Tutorial - ep001 - Intro
- Learn Zig Series (#2) - Hello Zig, Variables and Types
- Learn Zig Series (#3) - Functions and Control Flow
- Learn Zig Series (#4) - Error Handling (Zig's Best Feature)
- Learn Zig Series (#5) - Arrays, Slices, and Strings
- Learn Zig Series (#6) - Structs, Enums, and Tagged Unions
- Learn Zig Series (#7) - Memory Management and Allocators
- Learn Zig Series (#8) - Pointers and Memory Layout
- Learn Zig Series (#9) - Comptime (Zig's Superpower)
- Learn Zig Series (#10) - Project Structure, Modules, and File I/O
- Learn Zig Series (#11) - Mini Project: Building a Step Sequencer
- Learn Zig Series (#12) - Testing and Test-Driven Development
- Learn Zig Series (#13) - Interfaces via Type Erasure
- Learn Zig Series (#14) - Generics with Comptime Parameters
- Learn Zig Series (#15) - The Build System (build.zig)
- Learn Zig Series (#16) - Sentinel-Terminated Types and C Strings
- Learn Zig Series (#17) - Packed Structs and Bit Manipulation
- Learn Zig Series (#18) - Async Concepts and Event Loops
- Learn Zig Series (#18b) - Addendum: Async Returns in Zig 0.16
- Learn Zig Series (#19) - SIMD with @Vector
- Learn Zig Series (#20) - Working with JSON
- Learn Zig Series (#21) - Networking and TCP Sockets
- Learn Zig Series (#22) - Hash Maps and Data Structures
- Learn Zig Series (#23) - Iterators and Lazy Evaluation
- Learn Zig Series (#24) - Logging, Formatting, and Debug Output
- Learn Zig Series (#25) - Mini Project: HTTP Status Checker
- Learn Zig Series (#26) - Writing a Custom Allocator
- Learn Zig Series (#27) - C Interop: Calling C from Zig
- Learn Zig Series (#28) - C Interop: Exposing Zig to C
- Learn Zig Series (#29) - Inline Assembly and Low-Level Control
- Learn Zig Series (#30) - Thread Safety and Atomics
- Learn Zig Series (#31) - Memory-Mapped I/O and Files
- Learn Zig Series (#32) - Compile-Time Reflection with @typeInfo
- Learn Zig Series (#33) - Building a State Machine with Tagged Unions
- Learn Zig Series (#34) - Performance Profiling and Optimization
- Learn Zig Series (#35) - Cross-Compilation and Target Triples
- Learn Zig Series (#36) - Mini Project: CLI Task Runner
- Learn Zig Series (#37) - Markdown to HTML: Tokenizer and Lexer
- Learn Zig Series (#38) - Markdown to HTML: Parser and AST
- Learn Zig Series (#39) - Markdown to HTML: Renderer and CLI
- Learn Zig Series (#40) - Key-Value Store: In-Memory Store
- Learn Zig Series (#41) - Key-Value Store: Write-Ahead Log
- Learn Zig Series (#42) - Key-Value Store: TCP Server (this post)
Learn Zig Series (#42) - Key-Value Store: TCP Server
Here we go -- part three of the key-value store project! In episode 40 we built the in-memory store (get, put, delete, TTL, snapshots). In episode 41 we made it durable with a write-ahead log so data survives crashes. But right now our store is still a library -- the only way to use it is by importing it directly into your own Zig program. That's fine for some use cases, but most real-world key-value stores are network services. Redis runs on port 6379. Memcached on port 11211. Your application connects over TCP and sends commands. That's what we're building today.
We're going to put a TCP server in front of our PersistentKvStore, design a simple binary protocol for commands, handle multiple clients concurrently with threads, and make sure the whole thing shuts down cleanly when you hit Ctrl+C. This is where episode 21 (TCP sockets) and episode 30 (threads and atomics) come back in a big way ;-)
The protocol: keeping it simple
Before we write any server code, we need to decide how clients talk to the server. There are two broad choices: text protocols (like HTTP, Redis's RESP, or Memcached's ASCII protocol) and binary protocols (like Memcached's binary protocol, gRPC, or what we used for the WAL entries in episode 41).
We already have experience with binary framing from the WAL, so let's stick with binary. It's more compact on the wire, easier to parse (no delimiter scanning), and faster to process. The downside is you can't just telnet in and type commands -- but we'll build a proper client in the next episode anyway.
Here's the protocol layout. Every message (both requests and responses) starts with the same 9-byte header:
+--------+--------+-----------+
| type | status | body_len |
| 1 byte | 1 byte | 4 bytes |
+--------+--------+-----------+
| key_len | (for req) |
| 4 bytes | |
+------------------+-----------+
Actually, let me define the full request and response formats separately, because they're slighly different:
Request format:
+------+--------+--------+------+-------+
| cmd | key_len| val_len| key | value |
| 1 b | 4 b LE | 4 b LE | var | var |
+------+--------+--------+------+-------+
- cmd:
0x01= PUT,0x02= GET,0x03= DELETE,0x04= PING - key_len: little-endian u32, length of key bytes
- val_len: little-endian u32, length of value bytes (0 for GET/DELETE/PING)
- key: raw bytes (empty for PING)
- value: raw bytes (only present for PUT)
Response format:
+--------+--------+-------+
| status | val_len| value |
| 1 byte | 4 b LE | var |
+--------+--------+-------+
- status:
0x00= OK,0x01= NOT_FOUND,0x02= ERROR - val_len: little-endian u32, length of the response value (0 for simple OK/error)
- value: raw bytes (the value for GET responses, error message for ERROR)
The PING command is there for health checks -- the client sends PING, the server responds OK with an empty body. Every network service needs this; it's how load balancers and monitoring systems verify the service is alive without doing real work.
Let's define these in code:
const std = @import("std");
pub const Command = enum(u8) {
put = 0x01,
get = 0x02,
delete = 0x03,
ping = 0x04,
};
pub const Status = enum(u8) {
ok = 0x00,
not_found = 0x01,
err = 0x02,
};
pub const REQUEST_HEADER_SIZE = 1 + 4 + 4; // cmd + key_len + val_len
pub const RESPONSE_HEADER_SIZE = 1 + 4; // status + val_len
pub const MAX_KEY_SIZE = 1024; // 1 KB max key
pub const MAX_VALUE_SIZE = 1024 * 1024; // 1 MB max value
Why separate limits for keys and values? Keys are identifiers -- they should be short. If someone is using 1 MB keys, they're doing something wrong. Values can be larger, but we still cap them at 1 MB to prevent a single rogue request from eating all our memory. Redis defaults to 512 MB max value size, but we're a simple tutorial store, not Redis ;-)
Parsing requests
The request parser reads from a stream (any type that implements reader/writer), validates the header, and returns a parsed request struct:
pub const Request = struct {
cmd: Command,
key: []const u8,
value: []const u8,
};
pub fn readRequest(reader: anytype, allocator: std.mem.Allocator) !?Request {
// Read the 9-byte header
var header: [REQUEST_HEADER_SIZE]u8 = undefined;
const n = reader.readAll(&header) catch return null;
if (n < REQUEST_HEADER_SIZE) return null;
// Parse command byte
const cmd: Command = std.meta.intToEnum(Command, header[0]) catch {
return error.InvalidCommand;
};
// Parse lengths
const key_len = std.mem.readInt(u32, header[1..5], .little);
const val_len = std.mem.readInt(u32, header[5..9], .little);
// Validate sizes
if (key_len > MAX_KEY_SIZE) return error.KeyTooLarge;
if (val_len > MAX_VALUE_SIZE) return error.ValueTooLarge;
// For PING, no key or value expected
if (cmd == .ping) {
return .{ .cmd = .ping, .key = &.{}, .value = &.{} };
}
// Read key bytes
const key = try allocator.alloc(u8, key_len);
errdefer allocator.free(key);
const key_read = reader.readAll(key) catch {
allocator.free(key);
return null;
};
if (key_read < key_len) {
allocator.free(key);
return null;
}
// Read value bytes (if any)
var value: []u8 = &.{};
if (val_len > 0) {
value = try allocator.alloc(u8, val_len);
errdefer allocator.free(value);
const val_read = reader.readAll(value) catch {
allocator.free(value);
allocator.free(key);
return null;
};
if (val_read < val_len) {
allocator.free(value);
allocator.free(key);
return null;
}
}
return .{
.cmd = cmd,
.key = key,
.value = value,
};
}
The function returns ?Request (optional) rather than !Request for the "client disconnected" case. When the client closes the connection, readAll returns fewer bytes than requested (or zero), and we return null to signal "no more requests". Actual protocol errors (unknown command, oversized key) return proper errors. This distinction matters in the accept loop -- null means clean disconnect, error means something went wrong.
Notice the errdefer pattern on the allocations. If we successfully allocate the key buffer but then fail to read the value, we need to free the key. Zig's errdefer handles this automatically -- it runs the deferred statement only if the scope exits via an error. We covered this back in episode 4 but it's worth pointing out every time, because it's one of those things that prevents memory leaks in error paths that would be extremely easy to miss in C.
Writing responses
The response writer is simpler -- just a status byte, a value length, and optionally the value data:
pub fn writeResponse(writer: anytype, status: Status, value: []const u8) !void {
var header: [RESPONSE_HEADER_SIZE]u8 = undefined;
header[0] = @intFromEnum(status);
std.mem.writeInt(u32, header[1..5], @intCast(value.len), .little);
// Write header + value
try writer.writeAll(&header);
if (value.len > 0) {
try writer.writeAll(value);
}
}
Short and sweet. The header is 5 bytes (status + u32 length), followed by the value payload. For OK responses to PUT/DELETE, the value is empty. For GET responses, it's the stored value. For ERROR responses, it's a human-readable error message.
The TCP server struct
Now for the server itself. It needs to manage a listening socket, a reference to the persistent KV store, and a way to track active connections for graceful shutdown:
pub const Server = struct {
listener: std.net.Server,
store: *PersistentKvStore,
allocator: std.mem.Allocator,
active_connections: std.atomic.Value(u32),
should_stop: std.atomic.Value(bool),
pub fn init(
allocator: std.mem.Allocator,
store: *PersistentKvStore,
address: []const u8,
port: u16,
) !Server {
const addr = try std.net.Address.parseIp(address, port);
const listener = try addr.listen(.{
.reuse_address = true,
});
return .{
.listener = listener,
.store = store,
.allocator = allocator,
.active_connections = std.atomic.Value(u32).init(0),
.should_stop = std.atomic.Value(bool).init(false),
};
}
pub fn deinit(self: *Server) void {
self.listener.deinit();
}
};
The active_connections counter and should_stop flag are both atomics. We need atomics here because the accept loop and the connection handler threads will be reading and writing these values concurrently. We covered this in episode 30 -- atomics give us safe concurrent access without needing a mutex for simple counters and flags.
The reuse_address option on the listener is important. Without it, if you stop the server and restart it quickly, the OS might refuse to bind to the same port because the old socket is still in TIME_WAIT state. With reuse_address = true, we tell the OS "I know what I'm doing, let me rebind immediately." Every production server sets this.
The accept loop
The accept loop is the heart of any TCP server. It sits in a loop, waits for incoming connections, and spawns a thread for each one:
pub fn run(self: *Server) !void {
std.log.info("KV server listening on port {d}", .{
self.listener.listen_address.getPort(),
});
while (!self.should_stop.load(.acquire)) {
// Accept with a timeout so we can check should_stop periodically
const connection = self.listener.accept() catch |err| {
if (self.should_stop.load(.acquire)) break;
std.log.err("accept error: {}", .{err});
continue;
};
// Increment connection counter
_ = self.active_connections.fetchAdd(1, .release);
// Spawn a handler thread for this connection
const thread = std.Thread.spawn(.{}, handleConnection, .{
self, connection,
}) catch |err| {
std.log.err("failed to spawn handler thread: {}", .{err});
connection.stream.close();
_ = self.active_connections.fetchSub(1, .release);
continue;
};
thread.detach();
}
}
A few things worth noticeing here. First, we call thread.detach() after spawning. A detached thread runs independently -- we don't need to join it later, and it cleans up its own resources when it exits. This is the right model for a server where connections come and go and we don't need to collect their return values.
Second, if Thread.spawn fails (maybe the OS hit its thread limit), we close the connection and decrement the counter. The client will see a reset connection, which is unfortunate but better than leaking resources. In a production server you'd want to log a loud warning here because hitting the thread limit means you're under heavy load (or have a connection leak).
Having said that, the thread-per-connection model has a well-known scalability ceiling. Every thread costs memory (for its stack, typically 2-8 MB) and OS resources. If you have 10,000 concurrent connections, that's 10,000 threads and potentially 20-80 GB of stack space. Production servers use I/O multiplexing (epoll on Linux, kqueue on macOS) to handle thousands of connections with just a handful of threads. Zig has std.io.poll for that, but thread-per-connection is simpler to understand and perfectly fine for hundreds of concurrent clients -- which is more than our little KV store will ever see.
The connection handler
Each connection gets its own handler running in its own thread. The handler reads requests in a loop, dispatches them to the store, and writes responses:
fn handleConnection(self: *Server, connection: std.net.Server.Connection) void {
defer {
connection.stream.close();
_ = self.active_connections.fetchSub(1, .release);
}
const reader = connection.stream.reader();
const writer = connection.stream.writer();
while (!self.should_stop.load(.acquire)) {
const maybe_request = readRequest(reader, self.allocator) catch |err| {
// Protocol error -- send error response and close
const msg = switch (err) {
error.InvalidCommand => "unknown command",
error.KeyTooLarge => "key exceeds maximum size",
error.ValueTooLarge => "value exceeds maximum size",
else => "internal error",
};
writeResponse(writer, .err, msg) catch {};
return;
};
// null means client disconnected cleanly
const request = maybe_request orelse return;
defer self.allocator.free(request.key);
defer if (request.value.len > 0) self.allocator.free(request.value);
// Dispatch the command
self.dispatchCommand(writer, request) catch |err| {
std.log.err("dispatch error: {}", .{err});
return;
};
}
}
The handler is clean because we separated parsing (readRequest) from dispatching. The defer block at the top ensures we always close the connection and decrement the counter, regardless of how the handler exits -- clean disconnect, protocol error, write failure, whatever. This is the kind of resource management that Zig's defer makes practically effortless.
Protocol errors get an error response sent back to the client before we close the connection. This is better than silently dropping them, because the client gets a reason for the failure. The catch {} on the writeResponse is intentional -- if we can't even send the error message (the connection is already broken), there's nothing useful we can do about it.
Dispatching commands to the store
The dispatch function is where the actual key-value operations happen. It takes a parsed request, calls the right method on the store, and sends back the response:
fn dispatchCommand(self: *Server, writer: anytype, request: Request) !void {
switch (request.cmd) {
.put => {
self.store.put(request.key, request.value) catch |err| {
const msg = std.fmt.allocPrint(
self.allocator,
"put failed: {}",
.{err},
) catch "put failed";
defer if (@TypeOf(msg) == []u8) self.allocator.free(msg);
try writeResponse(writer, .err, msg);
return;
};
try writeResponse(writer, .ok, &.{});
},
.get => {
if (self.store.get(request.key)) |value| {
try writeResponse(writer, .ok, value);
} else {
try writeResponse(writer, .not_found, &.{});
}
},
.delete => {
const deleted = self.store.delete(request.key) catch |err| {
const msg = std.fmt.allocPrint(
self.allocator,
"delete failed: {}",
.{err},
) catch "delete failed";
defer if (@TypeOf(msg) == []u8) self.allocator.free(msg);
try writeResponse(writer, .err, msg);
return;
};
if (deleted) {
try writeResponse(writer, .ok, &.{});
} else {
try writeResponse(writer, .not_found, &.{});
}
},
.ping => {
try writeResponse(writer, .ok, "PONG");
},
}
}
This is straight forward -- PUT stores the value and responds OK, GET returns the value or NOT_FOUND, DELETE removes the key (or responds NOT_FOUND if it didn't exist), PING responds with "PONG". The error handling for PUT and DELETE is a bit verbose because the WAL write can fail (disk full, I/O error), and we want to report that back to the client rather than just crashing.
One thing to note: the get method returns a reference to the value in the hash map, not a copy. This means the response writer is sending bytes directly from the store's internal memory. That's efficient (no allocation needed for the response) but it means the store's mutex must be held during the write, or else another thread could modify the value while we're sending it. Our current PersistentKvStore doesn't have a mutex because in episodes 40 and 41 it was single-threaded. We need to fix that.
Adding thread safety to the store
Our PersistentKvStore from the previous episodes was designed for single-threaded use. Now that multiple client threads are calling put, get, and delete concurrently, we need a mutex. Without it, two threads could modify the hash map at the same time, corrupting its internal state. This is the exact scenario we discussed in episode 30:
pub const ThreadSafeKvStore = struct {
inner: PersistentKvStore,
mutex: std.Thread.Mutex,
pub fn init(allocator: std.mem.Allocator, wal_path: []const u8) !ThreadSafeKvStore {
return .{
.inner = try PersistentKvStore.open(allocator, wal_path),
.mutex = .{},
};
}
pub fn deinit(self: *ThreadSafeKvStore) void {
self.inner.close();
}
pub fn put(self: *ThreadSafeKvStore, key: []const u8, value: []const u8) !void {
self.mutex.lock();
defer self.mutex.unlock();
try self.inner.put(key, value);
}
pub fn get(self: *ThreadSafeKvStore, key: []const u8) ?[]const u8 {
self.mutex.lock();
defer self.mutex.unlock();
return self.inner.get(key);
}
pub fn delete(self: *ThreadSafeKvStore, key: []const u8) !bool {
self.mutex.lock();
defer self.mutex.unlock();
return self.inner.delete(key);
}
};
This is the simplest possible thread-safety wrapper: a single mutex that protects all operations. Lock, do the thing, unlock. It's correct but not optimal -- a reader-writer lock (std.Thread.RwLock) would allow multiple concurrent readers while only blocking for writes. For our tutorial store though, the single mutex is fine. The bottleneck will be the network, not the lock contention.
The defer self.mutex.unlock() pattern guarantees the mutex gets released even if the operation returns an error. If we forgot to unlock on one error path, we'd deadlock the entire server -- every subsequent request would block forever waiting for the lock. This is exactly the kind of bug that's trivially preventable with defer and practically guaranteed to happen eventually without it.
Graceful shutdown
Ctrl+C sends SIGINT to the process. By default, the OS just kills it -- which means connections get dropped mid-request, the WAL might have a partial write at the end, and clients get confusing "connection reset" errors. A proper server handles SIGINT gracefully: stop accepting new connections, wait for active requests to finish, then exit cleanly.
In Zig, we can set up a signal handler using std.posix.sigaction:
var global_server: ?*Server = null;
fn signalHandler(_: c_int) callconv(.C) void {
if (global_server) |server| {
server.should_stop.store(true, .release);
// Close the listener to unblock accept()
server.listener.deinit();
}
}
pub fn installSignalHandler(server: *Server) !void {
global_server = server;
var sa = std.posix.Sigaction{
.handler = .{ .handler = signalHandler },
.mask = std.posix.empty_sigset,
.flags = 0,
};
try std.posix.sigaction(std.posix.SIG.INT, &sa, null);
try std.posix.sigaction(std.posix.SIG.TERM, &sa, null);
}
The signal handler sets the should_stop flag and closes the listener socket. Closing the listener causes the pending accept() call in the run loop to return an error, which breaks out of the loop. The handler threads check should_stop at the top of their read loop, so they'll exit after finishing their current request.
The global variable is unfortunate but unavoidable -- signal handlers have a C function signature (fn(c_int) callconv(.C) void) and can't capture any context. This is a POSIX limitation, not a Zig one. In real production code you'd use something like signalfd on Linux to avoid global state entirely, but that's platform-specific and more complex than we need here.
After the run loop exits, we wait for active connections to drain:
pub fn waitForDrain(self: *Server, timeout_ms: u64) void {
const start = std.time.milliTimestamp();
while (self.active_connections.load(.acquire) > 0) {
const elapsed = @as(u64, @intCast(std.time.milliTimestamp() - start));
if (elapsed >= timeout_ms) {
std.log.warn(
"shutdown timeout: {d} connections still active",
.{self.active_connections.load(.acquire)},
);
return;
}
std.time.sleep(10 * std.time.ns_per_ms); // check every 10ms
}
std.log.info("all connections drained", .{});
}
This polls the connection counter every 10 milliseconds. If all connections close within the timeout, great. If not, we log a warning and exit anyway -- a stuck connection shouldn't prevent the server from ever shutting down. The timeout is configurable; 5 seconds is a reasonable default.
Putting it all together: main.zig
Here's the main function that ties everything together:
const std = @import("std");
const ThreadSafeKvStore = @import("thread_safe_kv.zig").ThreadSafeKvStore;
const Server = @import("server.zig").Server;
const installSignalHandler = @import("server.zig").installSignalHandler;
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer {
const check = gpa.deinit();
if (check == .leak) {
std.log.err("memory leak detected!", .{});
}
}
const allocator = gpa.allocator();
// Parse command-line args (or use defaults)
const args = try std.process.argsAlloc(allocator);
defer std.process.argsFree(allocator, args);
const port: u16 = if (args.len > 1)
std.fmt.parseInt(u16, args[1], 10) catch 7878
else
7878;
const wal_path = if (args.len > 2) args[2] else "kv-store.wal";
// Open the persistent store (replays WAL on startup)
var store = try ThreadSafeKvStore.init(allocator, wal_path);
defer store.deinit();
// Start the TCP server
var server = try Server.init(allocator, &store, "0.0.0.0", port);
defer server.deinit();
try installSignalHandler(&server);
std.log.info("KV store ready. WAL: {s}, Port: {d}", .{ wal_path, port });
// Run the accept loop (blocks until shutdown signal)
server.run() catch |err| {
if (server.should_stop.load(.acquire)) {
// Expected -- shutdown signal triggered the error
} else {
return err;
}
};
// Wait for active connections to finish
std.log.info("shutting down, waiting for connections...", .{});
server.waitForDrain(5000);
std.log.info("server stopped", .{});
}
The startup sequence is: allocator, parse args, open store (which replays the WAL), start server, install signal handler, run. The shutdown sequence (triggered by Ctrl+C) is: stop accepting, drain connections, deinit server, deinit store (which closes the WAL), check for memory leaks.
Default port is 7878 (like Rust's book uses for their web server tutorial). You can change it via the command line: ./kv-server 6379 mydata.wal. The WAL path is also configurable because you might want multiple instances with separate data files.
Testing the server
Testing a network server is trickier than testing a library. You need to start the server, connect a client, send requests, verify responses, and then shut everything down. Here's how:
const std = @import("std");
const Server = @import("server.zig").Server;
const ThreadSafeKvStore = @import("thread_safe_kv.zig").ThreadSafeKvStore;
const proto = @import("protocol.zig");
fn startTestServer(
allocator: std.mem.Allocator,
store: *ThreadSafeKvStore,
) !struct { server: Server, thread: std.Thread } {
var server = try Server.init(allocator, store, "127.0.0.1", 0);
// Port 0 lets the OS assign a random available port
const thread = try std.Thread.spawn(.{}, Server.run, .{&server});
// Give the server a moment to start accepting
std.time.sleep(50 * std.time.ns_per_ms);
return .{ .server = server, .thread = thread };
}
fn connectClient(port: u16) !std.net.Stream {
const addr = try std.net.Address.parseIp("127.0.0.1", port);
return try std.net.tcpConnectToAddress(addr);
}
fn sendPut(stream: std.net.Stream, key: []const u8, value: []const u8) !proto.Status {
const writer = stream.writer();
var header: [proto.REQUEST_HEADER_SIZE]u8 = undefined;
header[0] = @intFromEnum(proto.Command.put);
std.mem.writeInt(u32, header[1..5], @intCast(key.len), .little);
std.mem.writeInt(u32, header[5..9], @intCast(value.len), .little);
try writer.writeAll(&header);
try writer.writeAll(key);
try writer.writeAll(value);
// Read response
var resp_header: [proto.RESPONSE_HEADER_SIZE]u8 = undefined;
_ = try stream.reader().readAll(&resp_header);
return std.meta.intToEnum(proto.Status, resp_header[0]) catch .err;
}
fn sendGet(
stream: std.net.Stream,
key: []const u8,
allocator: std.mem.Allocator,
) !struct { status: proto.Status, value: []const u8 } {
const writer = stream.writer();
var header: [proto.REQUEST_HEADER_SIZE]u8 = undefined;
header[0] = @intFromEnum(proto.Command.get);
std.mem.writeInt(u32, header[1..5], @intCast(key.len), .little);
std.mem.writeInt(u32, header[5..9], 0, .little);
try writer.writeAll(&header);
try writer.writeAll(key);
// Read response header
var resp_header: [proto.RESPONSE_HEADER_SIZE]u8 = undefined;
_ = try stream.reader().readAll(&resp_header);
const status = std.meta.intToEnum(proto.Status, resp_header[0]) catch .err;
const val_len = std.mem.readInt(u32, resp_header[1..5], .little);
// Read response value
var value: []u8 = &.{};
if (val_len > 0) {
value = try allocator.alloc(u8, val_len);
_ = try stream.reader().readAll(value);
}
return .{ .status = status, .value = value };
}
test "server PUT and GET round-trip" {
const allocator = std.testing.allocator;
const wal_path = "test_server.wal";
defer std.fs.cwd().deleteFile(wal_path) catch {};
var store = try ThreadSafeKvStore.init(allocator, wal_path);
defer store.deinit();
var srv = try startTestServer(allocator, &store);
defer {
srv.server.should_stop.store(true, .release);
srv.server.listener.deinit();
srv.thread.join();
}
const port = srv.server.listener.listen_address.getPort();
const stream = try connectClient(port);
defer stream.close();
// PUT a value
const put_status = try sendPut(stream, "greeting", "hello world");
try std.testing.expectEqual(proto.Status.ok, put_status);
// GET it back
const result = try sendGet(stream, "greeting", allocator);
defer if (result.value.len > 0) allocator.free(result.value);
try std.testing.expectEqual(proto.Status.ok, result.status);
try std.testing.expectEqualStrings("hello world", result.value);
}
test "server GET missing key returns NOT_FOUND" {
const allocator = std.testing.allocator;
const wal_path = "test_server_missing.wal";
defer std.fs.cwd().deleteFile(wal_path) catch {};
var store = try ThreadSafeKvStore.init(allocator, wal_path);
defer store.deinit();
var srv = try startTestServer(allocator, &store);
defer {
srv.server.should_stop.store(true, .release);
srv.server.listener.deinit();
srv.thread.join();
}
const port = srv.server.listener.listen_address.getPort();
const stream = try connectClient(port);
defer stream.close();
const result = try sendGet(stream, "nonexistent", allocator);
defer if (result.value.len > 0) allocator.free(result.value);
try std.testing.expectEqual(proto.Status.not_found, result.status);
}
test "server handles multiple clients" {
const allocator = std.testing.allocator;
const wal_path = "test_server_multi.wal";
defer std.fs.cwd().deleteFile(wal_path) catch {};
var store = try ThreadSafeKvStore.init(allocator, wal_path);
defer store.deinit();
var srv = try startTestServer(allocator, &store);
defer {
srv.server.should_stop.store(true, .release);
srv.server.listener.deinit();
srv.thread.join();
}
const port = srv.server.listener.listen_address.getPort();
// Client 1 writes
const stream1 = try connectClient(port);
defer stream1.close();
const s1 = try sendPut(stream1, "from_client_1", "hello");
try std.testing.expectEqual(proto.Status.ok, s1);
// Client 2 reads what client 1 wrote
const stream2 = try connectClient(port);
defer stream2.close();
const result = try sendGet(stream2, "from_client_1", allocator);
defer if (result.value.len > 0) allocator.free(result.value);
try std.testing.expectEqual(proto.Status.ok, result.status);
try std.testing.expectEqualStrings("hello", result.value);
}
The key trick is port 0. When you bind to port 0, the OS assigns a random available port. This is essential for tests -- if you hardcode a port, the test fails when something else is already using it, or when two test runs happen in parallell. Port 0 plus listen_address.getPort() to discover the assigned port is the standard pattern.
The startTestServer function spawns the server on a background thread and gives it 50ms to start accepting. In the defer block, we set should_stop, close the listener to unblock accept, and join the thread. This ensures the server is fully shut down before the test deallocates the store.
The multi-client test verifies that two separate TCP connections can interact with the same store -- client 1 writes, client 2 reads. This works because both connections dispatch to the same ThreadSafeKvStore instance, and the mutex ensures consistency.
Updated project structure
The KV store project now has these files:
kv-store/
src/
kv_store.zig -- in-memory store (episode 40)
wal.zig -- write-ahead log (episode 41)
persistent_kv.zig -- PersistentKvStore wrapping store + WAL
thread_safe_kv.zig -- ThreadSafeKvStore mutex wrapper
protocol.zig -- binary protocol: Command, Status, parse/write
server.zig -- TCP server: accept loop, handlers, signal, drain
main.zig -- entry point with CLI args
kv_store_test.zig -- in-memory store tests
wal_test.zig -- WAL + persistence tests
server_test.zig -- server integration tests
build.zig
Each episode added one or two files. The in-memory store is the foundation, the WAL sits on top of it, the thread-safe wrapper sits on top of that, and the server is the outermost layer. If you wanted to use this as a library without the server, you'd just import persistent_kv.zig or thread_safe_kv.zig directly. Layered design -- each layer is independently useful.
You can test it manually with nc (netcat) or by writing a quick script that constructs binary packets. But writing raw binary by hand gets old fast, which is exactly why the next episode will build a proper client library. With a client, you'll be able to write client.put("key", "value") and have it handle all the protocol serialization under the hood.
Wat we geleerd hebben
- Designing a binary protocol with a fixed-size header (command byte + two length fields) followed by variable-length key and value data -- compact, fast to parse, and unambiguous
- Separating request parsing from command dispatch so the handler logic stays clean and each part can be tested independently
- Using the thread-per-connection model for concurrent client handling -- each accepted connection gets its own thread with an independent read loop
- Wrapping the PersistentKvStore in a mutex for thread safety, because multiple handler threads dispatch commands to the same store concurrently
- Setting up POSIX signal handlers for graceful shutdown -- catching SIGINT/SIGTERM, setting a stop flag, closing the listener to unblock accept, and draining active connections with a timeout
- Using port 0 in tests so the OS assigns a random port, avoiding conflicts between test runs or with other services
- Why atomics are the right tool for the connection counter and stop flag (simple values read/written from multiple threads) while a mutex is right for the store (complex multi-step operations that must be atomic)
- The tradeoffs of thread-per-connection vs I/O multiplexing -- threads are simple and fine for hundreds of connections, but don't scale to tens of thousands
We now have a real network service. Programs on the same machine (or across the network) can connect, store data, and retrieve it. The data is durable thanks to the WAL, thread-safe thanks to the mutex wrapper, and the server shuts down cleanly on Ctrl+C. What's missing is a convenient way to talk to it -- right now you'd have to hand-craft binary packets. The next step is building a client library that handles the protocol details, plus some benchmarks to see how fast this thing actually is.
Bedankt en tot de volgende keer!