API Reference

PipeChannel Type

PipeChannels.PipeChannelType
PipeChannel{T}

A lock-free single-producer single-consumer channel using a ring buffer.

This implementation uses atomic operations for the head and tail indices, allowing one thread to write and another to read without any locks. This eliminates allocations in the hot path that would otherwise occur with Julia's Channel type.

The API matches Julia's Channel:

  • put! blocks when full, throws InvalidStateException when closed
  • take! blocks when empty, throws InvalidStateException when closed and empty
  • Iteration works with for x in ch syntax
  • bind connects a task to the channel for error propagation

Type Parameters

  • T: Element type stored in the channel

Fields

  • buffer::Vector{T}: Pre-allocated storage
  • capacity::Int: Maximum number of elements (one slot reserved for full/empty detection)
  • head::Threads.Atomic{Int}: Write position (modified only by producer)
  • tail::Threads.Atomic{Int}: Read position (modified only by consumer)

Thread Safety

  • Exactly ONE producer thread may call put!
  • Exactly ONE consumer thread may call take!
  • Multiple producers or consumers will cause data races

Examples

ch = PipeChannel{Int}(16)

# Producer thread
put!(ch, 42)
close(ch)

# Consumer thread
value = take!(ch)  # Returns 42
take!(ch)  # Throws InvalidStateException (closed and empty)
source

Channel Operations

Base.put!Method
put!(ch::PipeChannel{T}, value::T)

Add an element to the buffer. Blocks if the buffer is full.

Throws

  • InvalidStateException: If the channel is closed
  • The bound task's exception if the task failed

Thread Safety

Must only be called from a single producer thread.

source
Base.take!Method
take!(ch::PipeChannel{T}) -> T

Remove and return an element from the buffer. Blocks if the buffer is empty.

Throws

  • InvalidStateException: If the channel is closed and empty
  • The bound task's exception if the task failed

Thread Safety

Must only be called from a single consumer thread.

source
Base.closeMethod
close(ch::PipeChannel, excp::Exception=closed_exception())

Close the channel. After closing:

  • put! will throw InvalidStateException (or the bound task's exception)
  • take! will return remaining elements, then throw InvalidStateException (or the bound task's exception)

If excp is provided, it will be stored and thrown on subsequent operations.

source
Base.bindMethod
bind(ch::PipeChannel, task::Task)

Bind a task to the channel. When the task terminates:

  • The channel is automatically closed
  • If the task failed with an exception, that exception will be thrown on subsequent put! or take! operations

This is useful for propagating errors from producer/consumer tasks.

Examples

ch = PipeChannel{Int}(16)
task = @async begin
    for i in 1:10
        put!(ch, i)
    end
    close(ch)
end
bind(ch, task)

# If task fails, the exception propagates to consumers
for x in ch
    println(x)
end
source

Batch Operations

Batch operations allow transferring multiple items at once, significantly improving throughput by amortizing the cost of atomic operations.

Base.put!Method
put!(ch::PipeChannel{T}, values::AbstractVector{T}) -> AbstractVector{T}

Add multiple elements to the buffer in a single batch operation. Blocks until all items are written. Returns the input vector.

This is more efficient than calling put! repeatedly because:

  • Fewer atomic writes (one per batch of items that fit)
  • Reduced per-item overhead

Behavior

  • Writes as many items as possible, then blocks until space is available
  • Continues until all items are written
  • Returns the input vector (for consistency with single-item put!)

Throws

  • InvalidStateException: If the channel is closed

Thread Safety

Must only be called from a single producer thread.

Examples

ch = PipeChannel{Int}(100)
data = collect(1:50)
put!(ch, data)  # Blocks until all 50 items are written
source
Base.take!Method
take!(ch::PipeChannel{T}, n::Integer) -> Vector{T}

Remove and return exactly n elements from the buffer in a single batch operation. Blocks until all n items are available.

This is more efficient than calling take! repeatedly because:

  • Fewer atomic writes (one per batch of items available)
  • Single allocation for the result vector
  • Reduced per-item overhead

Behavior

  • Reads as many items as available, then blocks until more data arrives
  • Continues until exactly n items are read
  • Returns a vector of exactly n items

Throws

  • InvalidStateException: If the channel is closed before n items can be read

Thread Safety

Must only be called from a single consumer thread.

Examples

ch = PipeChannel{Int}(100)
# ... producer puts data ...
batch = take!(ch, 32)  # Blocks until exactly 32 items are available
source
Base.take!Method
take!(ch::PipeChannel{T}, output::AbstractVector{T}) -> Int

Remove elements from the buffer into a pre-allocated output vector. Blocks until the entire output buffer is filled. Returns length(output).

This variant avoids allocation by writing into a provided buffer.

Behavior

  • Reads as many items as available, then blocks until more data arrives
  • Continues until the entire output buffer is filled
  • Returns length(output)

Throws

  • InvalidStateException: If the channel is closed before the buffer can be filled

Thread Safety

Must only be called from a single consumer thread.

Examples

ch = PipeChannel{Int}(100)
buffer = Vector{Int}(undef, 32)
take!(ch, buffer)  # Blocks until all 32 slots are filled
source

Query Functions

Base.isopenMethod
isopen(ch::PipeChannel) -> Bool

Check if the channel is still open for operations.

source
Base.isreadyMethod
isready(ch::PipeChannel) -> Bool

Check if data is available to read (i.e., buffer is not empty).

This is the opposite of isempty and matches the Channel API where isready(ch) returns true when take! would not block.

Thread Safety Note

Same as isempty: only guaranteed accurate from the consumer thread. May return false even when data is available if called from the producer thread (false negative is safe - just causes unnecessary waiting).

source
Base.isemptyMethod
isempty(ch::PipeChannel) -> Bool

Check if the buffer is empty.

Thread Safety Note

This function is only guaranteed to be accurate when called from the consumer thread.

The result can be a false positive (reports empty when not empty) if called from the producer thread, because the producer may have advanced head after we read it but before we compare. This is safe - it just means the consumer might unnecessarily wait.

However, if the consumer calls this and it returns false, it is guaranteed that there is data to read, because only the consumer advances tail and only the producer advances head (which can only add more data, not remove it).

source
Base.isfullMethod
isfull(ch::PipeChannel) -> Bool

Check if the buffer is full.

Thread Safety Note

This function is only guaranteed to be accurate when called from the producer thread.

The result can be a false positive (reports full when not full) if called from the consumer thread, because the consumer may have advanced tail after we read it but before we compare. This is safe - it just means the producer might unnecessarily wait.

However, if the producer calls this and it returns false, it is guaranteed that there is space to write, because only the producer advances head and only the consumer advances tail (which can only create more space, not less).

source
Base.n_availMethod
n_avail(ch::PipeChannel) -> Int

Return the number of elements available to read.

Thread Safety Note

This is an approximation that may be slightly stale. The actual count may be higher (if the producer added items after we read head) but never lower (the consumer is the only one who can remove items by advancing tail).

Most accurate when called from the consumer thread.

source
Base.waitMethod
wait(ch::PipeChannel)

Block until data is available in the buffer or the channel is closed.

Unlike take!, this does not consume the data - it just waits until isready(ch) would return true, or throws if the channel is closed and empty.

Throws

  • InvalidStateException: If the channel is closed and empty
  • The bound task's exception if the task failed

Thread Safety

Should only be called from the consumer thread.

source

Iteration

PipeChannel supports Julia's iteration protocol. You can iterate over values until the channel is closed and empty:

ch = PipeChannel{Int}(16)
for i in 1:5
    put!(ch, i)
end
close(ch)

for value in ch
    println(value)
end
# Output: 1, 2, 3, 4, 5
Base.iterateMethod
iterate(ch::PipeChannel{T}, state=nothing)

Iterate over values in the channel until it's closed and empty. Catches InvalidStateException to cleanly end iteration. If a bound task failed, the TaskFailedException is propagated.

Note: The @inline annotation is critical for avoiding heap allocation of the returned (value, nothing) tuple when T is not an isbits type.

source
Base.eltypeMethod
eltype(::Type{PipeChannel{T}}) where T

Returns the element type T of the PipeChannel{T}.

source
Base.IteratorSizeMethod
IteratorSize(::Type{<:PipeChannel})

Returns Base.SizeUnknown() since the number of elements in a PipeChannel cannot be determined in advance (depends on when the channel is closed).

source