Channel
Async multi-producer, multi-consumer channels in std.channel.
std.channel provides async multi-producer, multi-consumer channels. Senders
and receivers are owned handles; cloning a handle retains the underlying
channel, and dropping or closing handles updates channel liveness.
bounded<T>(capacity) returns a (Sender<T>, Receiver<T>) pair. Capacity 0
creates a rendezvous channel. Positive capacity allows sends to complete before
a receiver is waiting.
import std.channel;
async fn main() {
let (tx, rx) = channel.bounded<i32>(1);
try await tx.send(42);
let msg = await rx.recv();
match msg {
.Value(v) => {
assert v == 42;
}
.Closed => {
assert false;
}
}
}
Sender.send(value) returns future.Future<void throws(ChannelError)>. The
value is moved into the channel. Receiver.recv() returns
future.Future<Recv<T>>, where Recv<T> is either .Value(T) or .Closed.
import std.channel;
import std.os;
async fn send_value(tx: channel.Sender<i32>, value: i32) throws(channel.ChannelError) -> void {
try await tx.send(value);
}
async fn main() {
let (tx, rx) = channel.bounded<i32>(1);
let sending = send_value(tx.clone(), 21);
let msg = await rx.recv();
try await sending;
match msg {
.Value(value) => _ = os.write("{value}\n"),
.Closed => _ = os.write("closed\n"),
}
}
Payload ownership moves into the channel during send. A received
Recv.Value(value) is owned by the receiver and drops on the receiver side like
any other local value. Buffered payloads that are never received are destroyed
by channel cleanup when the channel becomes unreachable. Since user-defined
drop types are not valid channel payloads in 0.0.0-dev, channel cleanup cannot run a
user destructor on a different worker thread.
Closing a sender prevents future sends through that handle. Buffered values can
still be received before Recv.Closed.
import std.channel;
fn value_or_closed(msg: channel.Recv<i32>) -> i32 {
return match msg {
.Value(v) => v,
.Closed => -1,
};
}
async fn main() {
let (tx, rx) = channel.bounded<i32>(1);
try await tx.send(5);
tx.close();
assert value_or_closed(await rx.recv()) == 5;
assert value_or_closed(await rx.recv()) == -1;
}
Channel payloads cross runtime worker queues, so T must be
thread-send-safe. Scalars, send-safe enums, and plain owned structs whose
fields are send-safe are accepted.
Checked references, raw pointers, str, slices, fixed arrays, futures, scoped
future handles, erased callables, Unique<T>, and user-defined drop types
are rejected in 0.0.0-dev. Direct closure expressions and named functions may
be sent only when their captures are send-safe.
select waits on channel send and receive operations inside an async fn.
import std.channel;
import std.os;
async fn main() {
let (tx, rx) = channel.bounded<i32>(1);
try await tx.send(7);
select {
case msg = rx.recv() {
match msg {
.Value(v) => _ = os.write("recv {v}\n"),
.Closed => _ = os.write("closed\n"),
}
}
case tx.send(9) {
_ = os.write("sent\n");
}
else {
_ = os.write("else\n");
}
}
}
| Item | Shape |
|---|---|
channel.bounded | bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) |
Sender.clone | clone() -> Sender<T> |
Sender.close | close() |
Sender.send | send(value: T) -> future.Future<void throws(ChannelError)> |
Receiver.clone | clone() -> Receiver<T> |
Receiver.close | close() |
Receiver.recv | recv() -> future.Future<Recv<T>> |
ChannelError | Closed |
Recv<T> | Value(T), Closed |