Channel

Async multi-producer, multi-consumer channels in std.channel.

std.channel provides async multi-producer, multi-consumer channels. Senders and receivers are owned handles; cloning a handle retains the underlying channel, and dropping or closing handles updates channel liveness.

bounded<T>(capacity) returns a (Sender<T>, Receiver<T>) pair. Capacity 0 creates a rendezvous channel. Positive capacity allows sends to complete before a receiver is waiting.

import std.channel;

async fn main() {
    let (tx, rx) = channel.bounded<i32>(1);
    try await tx.send(42);

    let msg = await rx.recv();
    match msg {
        .Value(v) => {
            assert v == 42;
        }
        .Closed => {
            assert false;
        }
    }
}

Sending And Receiving

Sender.send(value) returns future.Future<void throws(ChannelError)>. The value is moved into the channel. Receiver.recv() returns future.Future<Recv<T>>, where Recv<T> is either .Value(T) or .Closed.

import std.channel;
import std.os;

async fn send_value(tx: channel.Sender<i32>, value: i32) throws(channel.ChannelError) -> void {
    try await tx.send(value);
}

async fn main() {
    let (tx, rx) = channel.bounded<i32>(1);

    let sending = send_value(tx.clone(), 21);
    let msg = await rx.recv();
    try await sending;

    match msg {
        .Value(value) => _ = os.write("{value}\n"),
        .Closed => _ = os.write("closed\n"),
    }
}

Payload ownership moves into the channel during send. A received Recv.Value(value) is owned by the receiver and drops on the receiver side like any other local value. Buffered payloads that are never received are destroyed by channel cleanup when the channel becomes unreachable. Since user-defined drop types are not valid channel payloads in 0.0.0-dev, channel cleanup cannot run a user destructor on a different worker thread.

Closing

Closing a sender prevents future sends through that handle. Buffered values can still be received before Recv.Closed.

import std.channel;

fn value_or_closed(msg: channel.Recv<i32>) -> i32 {
    return match msg {
        .Value(v) => v,
        .Closed => -1,
    };
}

async fn main() {
    let (tx, rx) = channel.bounded<i32>(1);
    try await tx.send(5);
    tx.close();

    assert value_or_closed(await rx.recv()) == 5;
    assert value_or_closed(await rx.recv()) == -1;
}

Send Safety

Channel payloads cross runtime worker queues, so T must be thread-send-safe. Scalars, send-safe enums, and plain owned structs whose fields are send-safe are accepted.

Checked references, raw pointers, str, slices, fixed arrays, futures, scoped future handles, erased callables, Unique<T>, and user-defined drop types are rejected in 0.0.0-dev. Direct closure expressions and named functions may be sent only when their captures are send-safe.

Select

select waits on channel send and receive operations inside an async fn.

import std.channel;
import std.os;

async fn main() {
    let (tx, rx) = channel.bounded<i32>(1);
    try await tx.send(7);

    select {
    case msg = rx.recv() {
        match msg {
            .Value(v) => _ = os.write("recv {v}\n"),
            .Closed => _ = os.write("closed\n"),
        }
    }
    case tx.send(9) {
        _ = os.write("sent\n");
    }
    else {
        _ = os.write("else\n");
    }
    }
}

API Reference

ItemShape
channel.boundedbounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>)
Sender.cloneclone() -> Sender<T>
Sender.closeclose()
Sender.sendsend(value: T) -> future.Future<void throws(ChannelError)>
Receiver.cloneclone() -> Receiver<T>
Receiver.closeclose()
Receiver.recvrecv() -> future.Future<Recv<T>>
ChannelErrorClosed
Recv<T>Value(T), Closed