API Reference
PipeChannel Type
PipeChannels.PipeChannel — Type
PipeChannel{T}A lock-free single-producer single-consumer channel using a ring buffer.
This implementation uses atomic operations for the head and tail indices, allowing one thread to write and another to read without any locks. This eliminates allocations in the hot path that would otherwise occur with Julia's Channel type.
The API matches Julia's Channel:
put!blocks when full, throwsInvalidStateExceptionwhen closedtake!blocks when empty, throwsInvalidStateExceptionwhen closed and empty- Iteration works with
for x in chsyntax bindconnects a task to the channel for error propagation
Type Parameters
T: Element type stored in the channel
Fields
buffer::Vector{T}: Pre-allocated storagecapacity::Int: Maximum number of elements (one slot reserved for full/empty detection)head::Threads.Atomic{Int}: Write position (modified only by producer)tail::Threads.Atomic{Int}: Read position (modified only by consumer)
Thread Safety
- Exactly ONE producer thread may call
put! - Exactly ONE consumer thread may call
take! - Multiple producers or consumers will cause data races
Examples
ch = PipeChannel{Int}(16)
# Producer thread
put!(ch, 42)
close(ch)
# Consumer thread
value = take!(ch) # Returns 42
take!(ch) # Throws InvalidStateException (closed and empty)Channel Operations
Base.take! — Method
take!(ch::PipeChannel{T}) -> TRemove and return an element from the buffer. Blocks if the buffer is empty.
Throws
InvalidStateException: If the channel is closed and empty- The bound task's exception if the task failed
Thread Safety
Must only be called from a single consumer thread.
Base.close — Method
close(ch::PipeChannel, excp::Exception=closed_exception())Close the channel. After closing:
put!will throwInvalidStateException(or the bound task's exception)take!will return remaining elements, then throwInvalidStateException(or the bound task's exception)
If excp is provided, it will be stored and thrown on subsequent operations.
Base.bind — Method
bind(ch::PipeChannel, task::Task)Bind a task to the channel. When the task terminates:
- The channel is automatically closed
- If the task failed with an exception, that exception will be thrown on subsequent
put!ortake!operations
This is useful for propagating errors from producer/consumer tasks.
Examples
ch = PipeChannel{Int}(16)
task = @async begin
for i in 1:10
put!(ch, i)
end
close(ch)
end
bind(ch, task)
# If task fails, the exception propagates to consumers
for x in ch
println(x)
endBatch Operations
Batch operations allow transferring multiple items at once, significantly improving throughput by amortizing the cost of atomic operations.
Base.put! — Method
put!(ch::PipeChannel{T}, values::AbstractVector{T}) -> AbstractVector{T}Add multiple elements to the buffer in a single batch operation. Blocks until all items are written. Returns the input vector.
This is more efficient than calling put! repeatedly because:
- Fewer atomic writes (one per batch of items that fit)
- Reduced per-item overhead
Behavior
- Writes as many items as possible, then blocks until space is available
- Continues until all items are written
- Returns the input vector (for consistency with single-item
put!)
Throws
InvalidStateException: If the channel is closed
Thread Safety
Must only be called from a single producer thread.
Examples
ch = PipeChannel{Int}(100)
data = collect(1:50)
put!(ch, data) # Blocks until all 50 items are writtenBase.take! — Method
take!(ch::PipeChannel{T}, n::Integer) -> Vector{T}Remove and return exactly n elements from the buffer in a single batch operation. Blocks until all n items are available.
This is more efficient than calling take! repeatedly because:
- Fewer atomic writes (one per batch of items available)
- Single allocation for the result vector
- Reduced per-item overhead
Behavior
- Reads as many items as available, then blocks until more data arrives
- Continues until exactly
nitems are read - Returns a vector of exactly
nitems
Throws
InvalidStateException: If the channel is closed beforenitems can be read
Thread Safety
Must only be called from a single consumer thread.
Examples
ch = PipeChannel{Int}(100)
# ... producer puts data ...
batch = take!(ch, 32) # Blocks until exactly 32 items are availableBase.take! — Method
take!(ch::PipeChannel{T}, output::AbstractVector{T}) -> IntRemove elements from the buffer into a pre-allocated output vector. Blocks until the entire output buffer is filled. Returns length(output).
This variant avoids allocation by writing into a provided buffer.
Behavior
- Reads as many items as available, then blocks until more data arrives
- Continues until the entire output buffer is filled
- Returns
length(output)
Throws
InvalidStateException: If the channel is closed before the buffer can be filled
Thread Safety
Must only be called from a single consumer thread.
Examples
ch = PipeChannel{Int}(100)
buffer = Vector{Int}(undef, 32)
take!(ch, buffer) # Blocks until all 32 slots are filledQuery Functions
Base.isopen — Method
isopen(ch::PipeChannel) -> BoolCheck if the channel is still open for operations.
Base.isready — Method
isready(ch::PipeChannel) -> BoolCheck if data is available to read (i.e., buffer is not empty).
This is the opposite of isempty and matches the Channel API where isready(ch) returns true when take! would not block.
Thread Safety Note
Same as isempty: only guaranteed accurate from the consumer thread. May return false even when data is available if called from the producer thread (false negative is safe - just causes unnecessary waiting).
Base.isempty — Method
isempty(ch::PipeChannel) -> BoolCheck if the buffer is empty.
Thread Safety Note
This function is only guaranteed to be accurate when called from the consumer thread.
The result can be a false positive (reports empty when not empty) if called from the producer thread, because the producer may have advanced head after we read it but before we compare. This is safe - it just means the consumer might unnecessarily wait.
However, if the consumer calls this and it returns false, it is guaranteed that there is data to read, because only the consumer advances tail and only the producer advances head (which can only add more data, not remove it).
Base.isfull — Method
isfull(ch::PipeChannel) -> BoolCheck if the buffer is full.
Thread Safety Note
This function is only guaranteed to be accurate when called from the producer thread.
The result can be a false positive (reports full when not full) if called from the consumer thread, because the consumer may have advanced tail after we read it but before we compare. This is safe - it just means the producer might unnecessarily wait.
However, if the producer calls this and it returns false, it is guaranteed that there is space to write, because only the producer advances head and only the consumer advances tail (which can only create more space, not less).
Base.n_avail — Method
n_avail(ch::PipeChannel) -> IntReturn the number of elements available to read.
Thread Safety Note
This is an approximation that may be slightly stale. The actual count may be higher (if the producer added items after we read head) but never lower (the consumer is the only one who can remove items by advancing tail).
Most accurate when called from the consumer thread.
Base.wait — Method
wait(ch::PipeChannel)Block until data is available in the buffer or the channel is closed.
Unlike take!, this does not consume the data - it just waits until isready(ch) would return true, or throws if the channel is closed and empty.
Throws
InvalidStateException: If the channel is closed and empty- The bound task's exception if the task failed
Thread Safety
Should only be called from the consumer thread.
Iteration
PipeChannel supports Julia's iteration protocol. You can iterate over values until the channel is closed and empty:
ch = PipeChannel{Int}(16)
for i in 1:5
put!(ch, i)
end
close(ch)
for value in ch
println(value)
end
# Output: 1, 2, 3, 4, 5Base.iterate — Method
iterate(ch::PipeChannel{T}, state=nothing)Iterate over values in the channel until it's closed and empty. Catches InvalidStateException to cleanly end iteration. If a bound task failed, the TaskFailedException is propagated.
Note: The @inline annotation is critical for avoiding heap allocation of the returned (value, nothing) tuple when T is not an isbits type.
Base.eltype — Method
eltype(::Type{PipeChannel{T}}) where TReturns the element type T of the PipeChannel{T}.
Base.IteratorSize — Method
IteratorSize(::Type{<:PipeChannel})Returns Base.SizeUnknown() since the number of elements in a PipeChannel cannot be determined in advance (depends on when the channel is closed).