class Queue
A thread-safe queue which allows items to be processed in order.
This implementation uses Thread::Queue internally for thread safety while maintaining compatibility with the fiber scheduler.
It has a compatible interface with class Async::Notification
and class Async::Condition
, except that it's multi-value.
Signature
- asynchronous
This class is thread-safe.
- public
Since Async v1.
Nested
Definitions
def initialize(parent: nil, delegate: Thread::Queue.new)
Create a new thread-safe queue.
Signature
-
parameterã
parent
ãInterface(:async) | Nil
The parent task to use for async operations.
Implementation
def initialize(parent: nil, delegate: Thread::Queue.new)
@delegate = delegate
@parent = parent
end
def closed?
Signature
-
returnsã
Boolean
Whether the queue is closed.
Implementation
def closed?
@delegate.closed?
end
def close
Close the queue, causing all waiting tasks to return nil
. Any subsequent calls to Async::Queue#enqueue
will raise an exception.
Implementation
def close
@delegate.close
end
def size
Signature
-
returnsã
Integer
The number of items in the queue.
Implementation
def size
@delegate.size
end
def empty?
Signature
-
returnsã
Boolean
Whether the queue is empty.
Implementation
def empty?
@delegate.empty?
end
def waiting_count
Signature
-
returnsã
Integer
The number of tasks waiting for an item.
Implementation
def waiting_count
@delegate.num_waiting
end
def push(item)
Add an item to the queue.
Implementation
def push(item)
@delegate.push(item)
rescue ClosedQueueError
raise ClosedError, "Cannot enqueue items to a closed queue!"
end
def <<(item)
Compatibility with ::Queue#push
.
Implementation
def <<(item)
self.push(item)
end
def enqueue(*items)
Add multiple items to the queue.
Implementation
def enqueue(*items)
items.each {|item| @delegate.push(item)}
rescue ClosedQueueError
raise ClosedError, "Cannot enqueue items to a closed queue!"
end
def dequeue(timeout: nil)
Remove and return the next item from the queue.
Signature
-
parameterã
timeout
ãNumeric, nil
Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.
-
returnsã
Object, nil
The dequeued item, or nil if timeout expires.
Implementation
def dequeue(timeout: nil)
@delegate.pop(timeout: timeout)
end
def pop(timeout: nil)
Compatibility with ::Queue#pop
.
Signature
-
parameterã
timeout
ãNumeric, nil
Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.
-
returnsã
Object, nil
The dequeued item, or nil if timeout expires.
Implementation
def pop(timeout: nil)
@delegate.pop(timeout: timeout)
end
def async(parent: (@parent or Task.current), **options, &block)
- asynchronous
Process each item in the queue.
Signature
- asynchronous
Executes the given block concurrently for each item.
-
parameterã
arguments
ãArray
The arguments to pass to the block.
-
parameterã
parent
ãInterface(:async) | Nil
The parent task to use for async operations.
-
parameterã
options
ãHash
The options to pass to the task.
-
yieldsã
{|task| ...}
When the system is idle, the block will be executed in a new task.
Implementation
def async(parent: (@parent or Task.current), **options, &block)
while item = self.dequeue
parent.async(item, **options, &block)
end
end
def each
Enumerate each item in the queue.
Implementation
def each
while item = self.dequeue
yield item
end
end
def signal(value = nil)
Signal the queue with a value, the same as #enqueue
.
Implementation
def signal(value = nil)
self.enqueue(value)
end
def wait
Wait for an item to be available, the same as #dequeue
.
Implementation
def wait
self.dequeue
end