package core:sync/chan
Overview
This package provides both high-level and low-level channel types for thread-safe communication.
While channels are essentially thread-safe queues under the hood, their primary purpose is to facilitate safe communication between multiple readers and multiple writers. Although they can be used like queues, channels are designed with synchronization and concurrent messaging patterns in mind.
Provided types:
Chan
a high level channel
Raw_Chan
a low level channel
Raw_Queue
a low level non-threadsafe queue implementation used internally
Example:
import "core:sync/chan"
import "core:fmt"
import "core:thread"
// The consumer reads from the channel until it's closed.
// Closing the channel acts as a signal to stop.
consumer :: proc(recv_chan: chan.Chan(int, .Recv)) {
for {
value, ok := chan.recv(recv_chan)
if !ok {
break // More idiomatic than return here
}
fmt.println("[CONSUMER] Received:", value)
}
fmt.println("[CONSUMER] Channel closed, stopping.")
}
// The producer sends `count` number of messages.
producer :: proc(send_chan: chan.Chan(int, .Send), count: int) {
for i in 0..<count {
fmt.println("[PRODUCER] Sending:", i)
success := chan.send(send_chan, i)
if !success {
fmt.println("[PRODUCER] Failed to send, channel may be closed.")
return
}
}
// Signal that production is complete by closing the channel.
chan.close(send_chan)
fmt.println("[PRODUCER] Done producing, channel closed.")
}
chan_example :: proc() {
// Create an unbuffered channel for int messages
c, err := chan.create(chan.Chan(int), context.allocator)
assert(err == .None)
defer chan.destroy(c)
// Start the consumer thread
consumer_thread := thread.create_and_start_with_poly_data(chan.as_recv(c), consumer)
defer thread.destroy(consumer_thread)
// Start the producer thread with 5 messages (change count as needed)
producer_thread := thread.create_and_start_with_poly_data2(chan.as_send(c), 5, producer)
defer thread.destroy(producer_thread)
// Wait for both threads to complete
thread.join_multiple(consumer_thread, producer_thread)
}
Index
Constants (0)
This section is empty.
Variables (0)
This section is empty.
Procedures (27)
Procedure Groups (2)
Types
Chan ¶
Chan :: struct {}
A typed wrapper around Raw_Chan
which should be used
preferably.
Note: all procedures accepting Raw_Chan
also accept Chan
.
Inputs
$T
: The type of the messages
Direction
: what Direction
the channel supports
Example:
import "core:sync/chan"
chan_example :: proc() {
// Create an unbuffered channel with messages of type int,
// supporting both sending and receiving.
// Creating unidirectional channels, although possible, is useless.
c, _ := chan.create(chan.Chan(int), context.allocator)
defer chan.destroy(c)
// This channel can now only be used for receiving messages
recv_only_channel: chan.Chan(int, .Recv) = chan.as_recv(c)
// This channel can now only be used for sending messages
send_only_channel: chan.Chan(int, .Send) = chan.as_send(c)
}
Related Procedures With Returns
Direction ¶
Direction :: enum int { Send = -1, Both = 0, Recv = 1, }
Determines what operations Chan
supports.
Raw_Chan ¶
Raw_Chan :: struct { // Shared allocator: runtime.Allocator, allocation_size: int, msg_size: u16, closed: b16, // guarded by `mutex` mutex: sync.Mutex, r_cond: sync.Cond, w_cond: sync.Cond, r_waiting: int, // guarded by `mutex` w_waiting: int, // Buffered queue: ^Raw_Queue, // Unbuffered unbuffered_data: rawptr, }
Raw_Chan
allows for thread-safe communication using fixed-size messages.
This is the low-level implementation of Chan
, which does not include
the concept of Direction.
Example:
import "core:sync/chan"
raw_chan_example :: proc() {
// Create an unbuffered channel with messages of type int,
c, _ := chan.create_raw(size_of(int), align_of(int), context.allocator)
defer chan.destroy(c)
}
Related Procedures With Parameters
- can_recv
- can_send
- cap
- close
- destroy
- is_buffered
- is_closed
- is_unbuffered
- len
- recv_raw
- send_raw
- try_recv_raw
- try_send_raw
Related Procedures With Returns
- create_raw_buffered
- create_raw_unbuffered
- create_raw (procedure groups)
Raw_Queue ¶
Raw_Queue
is a non-thread-safe queue implementation designed to store messages
of fixed size and alignment.
Note: For most use cases, it is recommended to use core:container/queue
instead,
as Raw_Queue
is used internally by Raw_Chan
and may not provide the desired
level of convenience for typical applications.
Related Procedures With Parameters
Constants
This section is empty.
Variables
This section is empty.
Procedures
as_recv ¶
as_recv :: proc "contextless" (c: $C) -> (r: Chan($T, $D=1)) {…}
Creates a version of a channel that can only be used for receiving not sending.
Inputs
c
: The channel
Returns:
An Allocator_Error
Example:
import "core:sync/chan"
as_recv_example :: proc() {
consumer :: proc(c: chan.Chan(int, .Recv)) {
value, ok := chan.recv(c)
// compile-time error:
// chan.send(c, 22)
}
c, err := chan.create(chan.Chan(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
chan.send(c, 112)
consumer(chan.as_recv(c))
}
as_send ¶
as_send :: proc "contextless" (c: $C) -> (s: Chan($T, $D=-1)) {…}
Creates a version of a channel that can only be used for sending not receiving.
Inputs
c
: The channel
Returns:
An Allocator_Error
Example:
import "core:sync/chan"
as_send_example :: proc() {
// this procedure takes a channel that can only
// be used for sending not receiving.
producer :: proc(c: chan.Chan(int, .Send)) {
chan.send(c, 112)
// compile-time error:
// value, ok := chan.recv(c)
}
c, err := chan.create(chan.Chan(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
producer(chan.as_send(c))
}
can_recv ¶
Returns whether a message is ready to be read, i.e.,
if a call to recv
or recv_raw
would block
Inputs
c
: The channel
Returns
true
if a message can be read, false
otherwise
Example:
import "core:sync/chan"
can_recv_example :: proc() {
c, err := chan.create(chan.Chan(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
assert(!chan.can_recv(c), "the cannel is empty")
assert(chan.send(c, 2))
assert(chan.can_recv(c), "there is message to read")
}
can_send ¶
Returns whether a message can be sent without blocking the current thread. Specifically, it checks if the channel is buffered and not full, or if there is already a reader waiting for a message.
Inputs
c
: The channel
Returns
true
if a message can be send, false
otherwise
Example:
import "core:sync/chan"
can_send_example :: proc() {
c, err := chan.create(chan.Chan(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
assert(chan.can_send(c), "the channel's buffer is not full")
assert(chan.send(c, 2))
assert(!chan.can_send(c), "the channel's buffer is full")
}
cap ¶
Returns the number of elements the channel could hold.
Note: Unbuffered channels will always return 0
because they cannot hold elements.
Inputs
c
: The channel
Returns: Number of elements
Example:
import "core:sync/chan"
import "core:fmt"
cap_example :: proc() {
c, _ := chan.create(chan.Chan(int), 2, context.allocator)
defer chan.destroy(c)
fmt.println(chan.cap(c))
}
2
close ¶
Closes the channel, preventing new messages from being added.
Inputs
c
: The channel
Returns:
true
if the channel was closed by this operation, false
if it was already closed
Example:
import "core:sync/chan"
close_example :: proc() {
c, _ := chan.create(chan.Chan(int), 2, context.allocator)
defer chan.destroy(c)
// Sending a message to an open channel
assert(chan.send(c, 1), "allowed to send")
// Closing the channel successfully
assert(chan.close(c), "successfully closed")
// Trying to send a message after the channel is closed (should fail)
assert(!chan.send(c, 1), "not allowed to send after close")
// Trying to close the channel again (should fail since it's already closed)
assert(!chan.close(c), "was already closed")
}
create_buffered ¶
create_buffered :: proc($C: typeid/[0][0]$E, #any_int cap: int, allocator: runtime.Allocator) -> (c: $/[0][0]$E, err: runtime.Allocator_Error) {…}
Creates a buffered version of the specified Chan
type.
Allocates Using Provided Allocator
Inputs
$C
: Type of Chan
to create
cap
: The capacity of the channel
allocator
: The allocator to use
Returns:
The initialized Chan
An Allocator_Error
Example:
import "core:sync/chan"
create_buffered_example :: proc() {
c, err := chan.create_buffered(chan.Chan(int), 10, context.allocator)
assert(err == .None)
defer chan.destroy(c)
}
create_raw_buffered ¶
create_raw_buffered :: proc(#any_int msg_size, #any_int msg_alignment: int, #any_int cap: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) {…}
Creates a buffered Raw_Chan
for messages of the specified
size and alignment.
Allocates Using Provided Allocator
Inputs
msg_size
: The size of the messages the messages being sent
msg_alignment
: The alignment of the messages being sent
cap
: The capacity of the channel
allocator
: The allocator to use
Returns:
The initialized Raw_Chan
An Allocator_Error
Example:
import "core:sync/chan"
create_raw_unbuffered_example :: proc() {
c, err := chan.create_raw_buffered(size_of(int), align_of(int), 10, context.allocator)
assert(err == .None)
defer chan.destroy(c)
}
create_raw_unbuffered ¶
create_raw_unbuffered :: proc(#any_int msg_size, #any_int msg_alignment: int, allocator: runtime.Allocator) -> (c: ^Raw_Chan, err: runtime.Allocator_Error) {…}
Creates an unbuffered Raw_Chan
for messages of the specified
size and alignment.
Allocates Using Provided Allocator
Inputs
msg_size
: The size of the messages the messages being sent
msg_alignment
: The alignment of the messages being sent
allocator
: The allocator to use
Returns:
The initialized Raw_Chan
An Allocator_Error
Example:
import "core:sync/chan"
create_raw_unbuffered_example :: proc() {
unbuffered, err := chan.create_raw(size_of(int), align_of(int), context.allocator)
assert(err == .None)
defer chan.destroy(unbuffered)
}
create_unbuffered ¶
create_unbuffered :: proc($C: typeid/[0][0]$E, allocator: runtime.Allocator) -> (c: $/[0][0]$E, err: runtime.Allocator_Error) {…}
Creates an unbuffered version of the specified Chan
type.
Allocates Using Provided Allocator
Inputs
$C
: Type of Chan
to create
allocator
: The allocator to use
Returns:
The initialized Chan
An Allocator_Error
Example:
import "core:sync/chan"
create_unbuffered_example :: proc() {
c, err := chan.create_unbuffered(chan.Chan(int), context.allocator)
assert(err == .None)
defer chan.destroy(c)
}
destroy ¶
destroy :: proc(c: ^Raw_Chan) -> (err: runtime.Allocator_Error) {…}
Destroys the Channel.
Inputs
c
: The channel to destroy
Returns:
An Allocator_Error
is_buffered ¶
Checks if the given channel is buffered.
Inputs
c
: The channel
Returns:
true
if the channel is buffered, false
otherwise
Example:
import "core:sync/chan"
is_buffered_example :: proc() {
c, _ := chan.create(chan.Chan(int), 1, context.allocator)
defer chan.destroy(c)
assert(chan.is_buffered(c))
}
is_closed ¶
Returns if the channel is closed or not
Inputs
c
: The channel
Returns:
true
if the channel is closed, false
otherwise
is_unbuffered ¶
Checks if the given channel is unbuffered.
Inputs
c
: The channel
Returns:
true
if the channel is unbuffered, false
otherwise
Example:
import "core:sync/chan"
is_buffered_example :: proc() {
c, _ := chan.create(chan.Chan(int), context.allocator)
defer chan.destroy(c)
assert(chan.is_unbuffered(c))
}
len ¶
Returns the number of elements currently in the channel.
Note: Unbuffered channels will always return 0
because they cannot hold elements.
Inputs
c
: The channel
Returns: Number of elements
Example:
import "core:sync/chan"
import "core:fmt"
len_example :: proc() {
c, _ := chan.create(chan.Chan(int), 2, context.allocator)
defer chan.destroy(c)
fmt.println(chan.len(c))
assert(chan.send(c, 1)) // add an element
fmt.println(chan.len(c))
}
0 1
raw_queue_init ¶
Initializes a Raw_Queue
Inputs
q
: A pointert to the Raw_Queue
to initialize
data
: The pointer to backing slice storing the messages
cap
: The capacity of the queue
size
: The size of a message
Example:
import "core:sync/chan"
raw_queue_init_example :: proc() {
// use a stack allocated array as backing storage
storage: [100]int
rq: chan.Raw_Queue
chan.raw_queue_init(&rq, &storage, cap(storage), size_of(int))
}
raw_queue_pop ¶
Removes and returns the first element of the queue.
Note: The returned element is only guaranteed to be valid until the next
raw_queue_push
operation. Accessing it after that point may result in
undefined behavior.
Inputs
c
: A pointer to the Raw_Queue
.
Returns
A pointer to the first element in the queue, or nil
if the queue is empty.
Example:
import "core:sync/chan"
raw_queue_pop_example :: proc() {
storage: [100]int
rq: chan.Raw_Queue
chan.raw_queue_init(&rq, &storage, cap(storage), size_of(int))
assert(chan.raw_queue_pop(&rq) == nil, "queue was empty")
// add an element to the queue
value := 2
assert(chan.raw_queue_push(&rq, &value), "there was enough space")
assert((cast(^int)chan.raw_queue_pop(&rq))^ == 2, "retrieved the element")
}
raw_queue_push ¶
Add an element to the queue.
Note: The message referenced by data
must match the size
and alignment used when the Raw_Queue
was initialized.
Inputs
q
: A pointert to the Raw_Queue
data
: The pointer to message to add
Returns
true
if the element was added, false
when the queue is already full
Example:
import "core:sync/chan"
raw_queue_push_example :: proc() {
storage: [100]int
rq: chan.Raw_Queue
chan.raw_queue_init(&rq, &storage, cap(storage), size_of(int))
value := 2
assert(chan.raw_queue_push(&rq, &value), "there was enough space")
}
recv ¶
recv :: proc "contextless" (c: $C) -> (data: $T, ok: bool) {…}
Reads a message from the channel, blocking the current thread if:
the channel is unbuffered
the channel's buffer is empty
until the channel is being written to. recv
will return
false
when attempting to receive a message on an already closed channel.
Inputs
c
: The channel
Returns
The message
true
if a message was received, false
when the channel was already closed
Example:
import "core:sync/chan"
recv_example :: proc() {
c, err := chan.create(chan.Chan(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
assert(chan.send(c, 2))
value, ok := chan.recv(c)
assert(ok, "the value was received")
// this would block since the channel is now empty
// value, ok = chan.recv(c)
// reading from a closed channel returns false
chan.close(c)
value, ok = chan.recv(c)
assert(!ok, "the channel is closed")
}
recv_raw ¶
Reads a message from the channel, blocking the current thread if:
the channel is unbuffered
the channel's buffer is empty
until the channel is being written to. recv_raw
will return
false
when attempting to receive a message on an already closed channel.
Note: The location pointed to by msg_out
must match the size
and alignment used when the Raw_Chan
was created.
Inputs
c
: The channel
msg_out
: Pointer to where the message should be stored
Returns
true
if a message was received, false
when the channel was already closed
Example:
import "core:sync/chan"
recv_raw_example :: proc() {
c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
value := 2
assert(chan.send_raw(c, &value))
assert(chan.recv_raw(c, &value))
// this would block since the channel is now empty
// assert(chan.recv_raw(c, &value))
// reading from a closed channel returns false
chan.close(c)
assert(! chan.recv_raw(c, &value))
}
select_raw ¶
select_raw :: proc(recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: []rawptr, recv_out: rawptr) -> (select_idx: int, ok: bool) {…}
Attempts to either send or receive messages on the specified channels.
select_raw
first identifies which channels have messages ready to be received
and which are available for sending. It then randomly selects one operation
(either a send or receive) to perform.
Note: Each message in send_msgs
corresponds to the send channel at the same index in sends
.
Inputs
recv
: A slice of channels to read from
sends
: A slice of channels to send messages on
send_msgs
: A slice of messages to send
recv_out
: A pointer to the location where, when receiving, the message should be stored
Returns
Position of the available channel which was used for receiving or sending
true
if sending/receiving was successfull, false
if the channel was closed or no channel was available
Example:
import "core:sync/chan"
import "core:fmt"
select_raw_example :: proc() {
c, err := chan.create(chan.Chan(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
// sending value '1' on the channel
value1 := 1
msgs := [?]rawptr{&value1}
send_chans := [?]^chan.Raw_Chan{c}
// for simplicity the same channel used for sending is also used for receiving
receive_chans := [?]^chan.Raw_Chan{c}
// where the value from the read should be stored
received_value: int
idx, ok := chan.select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value)
fmt.println("SELECT: ", idx, ok)
fmt.println("RECEIVED VALUE ", received_value)
idx, ok = chan.select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value)
fmt.println("SELECT: ", idx, ok)
fmt.println("RECEIVED VALUE ", received_value)
// closing of a channel also affects the select operation
chan.close(c)
idx, ok = chan.select_raw(receive_chans[:], send_chans[:], msgs[:], &received_value)
fmt.println("SELECT: ", idx, ok)
}
SELECT: 0 true RECEIVED VALUE 0 SELECT: 0 true RECEIVED VALUE 1 SELECT: 0 false
send ¶
send :: proc "contextless" (c: $C, data: $T) -> (ok: bool) {…}
Sends the specified message, blocking the current thread if:
the channel is unbuffered
the channel's buffer is full
until the channel is being read from. send
will return
false
when attempting to send on an already closed channel.
Inputs
c
: The channel
data
: The message to send
Returns
true
if the message was sent, false
when the channel was already closed
Example:
import "core:sync/chan"
send_example :: proc() {
c, err := chan.create(chan.Chan(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
assert(chan.send(c, 2))
// this would block since the channel has a buffersize of 1
// assert(chan.send(c, 2))
// sending on a closed channel returns false
chan.close(c)
assert(! chan.send(c, 2))
}
send_raw ¶
Sends the specified message, blocking the current thread if:
the channel is unbuffered
the channel's buffer is full
until the channel is being read from. send_raw
will return
false
when attempting to send on an already closed channel.
Note: The message referenced by msg_out
must match the size
and alignment used when the Raw_Chan
was created.
Inputs
c
: The channel
msg_out
: Pointer to the data to send
Returns
true
if the message was sent, false
when the channel was already closed
Example:
import "core:sync/chan"
send_raw_example :: proc() {
c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
value := 2
assert(chan.send_raw(c, &value))
// this would block since the channel has a buffersize of 1
// assert(chan.send_raw(c, &value))
// sending on a closed channel returns false
chan.close(c)
assert(! chan.send_raw(c, &value))
}
try_recv ¶
try_recv :: proc "contextless" (c: $C) -> (data: $T, ok: bool) {…}
Tries reading a message from the channel in a non-blocking fashion.
Inputs
c
: The channel
Returns
The message
true
if a message was received, false
when the channel was already closed or no message was available
Example:
import "core:sync/chan"
try_recv_example :: proc() {
c, err := chan.create(chan.Chan(int), context.allocator)
assert(err == .None)
defer chan.destroy(c)
_, ok := chan.try_recv(c)
assert(!ok, "there is not value to read")
}
try_recv_raw ¶
Reads a message from the channel if one is available.
Note: The location pointed to by msg_out
must match the size
and alignment used when the Raw_Chan
was created.
Inputs
c
: The channel
msg_out
: Pointer to where the message should be stored
Returns
true
if a message was received, false
when the channel was already closed or no message was available
Example:
import "core:sync/chan"
try_recv_raw_example :: proc() {
c, err := chan.create_raw(size_of(int), align_of(int), context.allocator)
assert(err == .None)
defer chan.destroy(c)
value: int
assert(!chan.try_recv_raw(c, &value))
}
try_send ¶
try_send :: proc "contextless" (c: $C, data: $T) -> (ok: bool) {…}
Tries sending the specified message which is: blocking: given the channel is unbuffered non-blocking: given the channel is buffered
Inputs
c
: The channel
data
: The message to send
Returns
true
if the message was sent, false
when the channel was
already closed or the channel's buffer was full
Example:
import "core:sync/chan"
try_send_example :: proc() {
c, err := chan.create(chan.Chan(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
assert(chan.try_send(c, 2), "there is enough space")
assert(!chan.try_send(c, 2), "the buffer is already full")
}
try_send_raw ¶
Tries sending the specified message which is: blocking: given the channel is unbuffered non-blocking: given the channel is buffered
Note: The message referenced by msg_out
must match the size
and alignment used when the Raw_Chan
was created.
Inputs
c
: the channel
msg_out
: pointer to the data to send
Returns
true
if the message was sent, false
when the channel was
already closed or the channel's buffer was full
Example:
import "core:sync/chan"
try_send_raw_example :: proc() {
c, err := chan.create_raw(size_of(int), align_of(int), 1, context.allocator)
assert(err == .None)
defer chan.destroy(c)
value := 2
assert(chan.try_send_raw(c, &value), "there is enough space")
assert(!chan.try_send_raw(c, &value), "the buffer is already full")
}
Procedure Groups
create ¶
create :: proc{ create_unbuffered, create_buffered, }
Creates a buffered or unbuffered Chan
instance.
Allocates Using Provided Allocator
Inputs
$C
: Type of Chan
to create
[cap
: The capacity of the channel] omit for creating unbuffered channels
allocator
: The allocator to use
Returns:
The initialized Chan
An Allocator_Error
Example:
import "core:sync/chan"
create_example :: proc() {
unbuffered: chan.Chan(int)
buffered: chan.Chan(int)
err: runtime.Allocator_Error
unbuffered, err = chan.create(chan.Chan(int), context.allocator)
assert(err == .None)
defer chan.destroy(unbuffered)
buffered, err = chan.create(chan.Chan(int), 10, context.allocator)
assert(err == .None)
defer chan.destroy(buffered)
}
create_raw ¶
create_raw :: proc{ create_raw_unbuffered, create_raw_buffered, }
Creates a buffered or unbuffered Raw_Chan
for messages of the specified
size and alignment.
Allocates Using Provided Allocator
Inputs
msg_size
: The size of the messages the messages being sent
msg_alignment
: The alignment of the messages being sent
[cap
: The capacity of the channel] omit for creating unbuffered channels
allocator
: The allocator to use
Returns:
The initialized Raw_Chan
An Allocator_Error
Example:
import "core:sync/chan"
create_raw_example :: proc() {
unbuffered: ^chan.Raw_Chan
buffered: ^chan.Raw_Chan
err: runtime.Allocator_Error
unbuffered, err = chan.create_raw(size_of(int), align_of(int), context.allocator)
assert(err == .None)
defer chan.destroy(unbuffered)
buffered, err = chan.create_raw(size_of(int), align_of(int), 10, context.allocator)
assert(err == .None)
defer chan.destroy(buffered)
}
Source Files
Generation Information
Generated with odin version dev-2025-04 (vendor "odin") Windows_amd64 @ 2025-04-13 21:11:30.261897500 +0000 UTC