â€ē Async â€ē Source â€ē Async â€ē Queue

class Queue

A thread-safe queue which allows items to be processed in order.

This implementation uses Thread::Queue internally for thread safety while maintaining compatibility with the fiber scheduler.

It has a compatible interface with class Async::Notification and class Async::Condition, except that it's multi-value.

Signature

asynchronous

This class is thread-safe.

public

Since Async v1.

Nested

Definitions

def initialize(parent: nil, delegate: Thread::Queue.new)

Create a new thread-safe queue.

Signature

parameter parent Interface(:async) | Nil

The parent task to use for async operations.

Implementation

def initialize(parent: nil, delegate: Thread::Queue.new)
	@delegate = delegate
	@parent = parent
end

def closed?

Signature

returns Boolean

Whether the queue is closed.

Implementation

def closed?
	@delegate.closed?
end

def close

Close the queue, causing all waiting tasks to return nil. Any subsequent calls to Async::Queue#enqueue will raise an exception.

Implementation

def close
	@delegate.close
end

def size

Signature

returns Integer

The number of items in the queue.

Implementation

def size
	@delegate.size
end

def empty?

Signature

returns Boolean

Whether the queue is empty.

Implementation

def empty?
	@delegate.empty?
end

def waiting_count

Signature

returns Integer

The number of tasks waiting for an item.

Implementation

def waiting_count
	@delegate.num_waiting
end

def push(item)

Add an item to the queue.

Implementation

def push(item)
	@delegate.push(item)
rescue ClosedQueueError
	raise ClosedError, "Cannot enqueue items to a closed queue!"
end

def <<(item)

Compatibility with ::Queue#push.

Implementation

def <<(item)
	self.push(item)
end

def enqueue(*items)

Add multiple items to the queue.

Implementation

def enqueue(*items)
	items.each {|item| @delegate.push(item)}
rescue ClosedQueueError
	raise ClosedError, "Cannot enqueue items to a closed queue!"
end

def dequeue(timeout: nil)

Remove and return the next item from the queue.

Signature

parameter timeout Numeric, nil

Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.

returns Object, nil

The dequeued item, or nil if timeout expires.

Implementation

def dequeue(timeout: nil)
	@delegate.pop(timeout: timeout)
end

def pop(timeout: nil)

Compatibility with ::Queue#pop.

Signature

parameter timeout Numeric, nil

Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.

returns Object, nil

The dequeued item, or nil if timeout expires.

Implementation

def pop(timeout: nil)
	@delegate.pop(timeout: timeout)
end

def async(parent: (@parent or Task.current), **options, &block)

  • asynchronous

Process each item in the queue.

Signature

asynchronous

Executes the given block concurrently for each item.

parameter arguments Array

The arguments to pass to the block.

parameter parent Interface(:async) | Nil

The parent task to use for async operations.

parameter options Hash

The options to pass to the task.

yields {|task| ...}

When the system is idle, the block will be executed in a new task.

Implementation

def async(parent: (@parent or Task.current), **options, &block)
	while item = self.dequeue
		parent.async(item, **options, &block)
	end
end

def each

Enumerate each item in the queue.

Implementation

def each
	while item = self.dequeue
		yield item
	end
end

def signal(value = nil)

Signal the queue with a value, the same as #enqueue.

Implementation

def signal(value = nil)
	self.enqueue(value)
end

def wait

Wait for an item to be available, the same as #dequeue.

Implementation

def wait
	self.dequeue
end

Discussion