â€ē Async â€ē Source â€ē Async â€ē PriorityQueue

class PriorityQueue

A queue which allows items to be processed in priority order of consumers.

Unlike a traditional priority queue where items have priorities, this queue assigns priorities to consumers (fibers waiting to dequeue). Higher priority consumers are served first when items become available.

Signature

public

Since Async v2.

Definitions

Waiter

A waiter represents a fiber waiting to dequeue with a given priority.

Implementation

Waiter = Struct.new(:fiber, :priority, :sequence, :condition, :value) do
	include Comparable
	
	def <=>(other)
		# Higher priority comes first, then FIFO for equal priorities:
		if priority == other.priority
			# Use sequence for FIFO behavior (lower sequence = earlier):
			sequence <=> other.sequence
		else
			other.priority <=> priority  # Reverse for max-heap behavior
		end
	end
	
	def signal(value)
		self.value = value
		condition.signal
	end
	
	def wait_for_value(mutex, timeout = nil)
		condition.wait(mutex, timeout)
		return self.value
	end
	
	# Invalidate this waiter, making it unusable and detectable as abandoned.
	def invalidate!
		self.fiber = nil
	end
	
	# Check if this waiter has been invalidated.
	def valid?
		self.fiber&.alive?
	end
end

def initialize(parent: nil)

Create a new priority queue.

Signature

parameter parent Interface(:async) | Nil

The parent task to use for async operations.

Implementation

def initialize(parent: nil)
	@items = []
	@closed = false
	@parent = parent
	@waiting = IO::Event::PriorityHeap.new
	@sequence = 0
	
	@mutex = Mutex.new
end

def close

Close the queue, causing all waiting tasks to return nil. Any subsequent calls to Async::PriorityQueue#enqueue will raise an exception.

Implementation

def close
	@mutex.synchronize do
		@closed = true
		
		# Signal all waiting fibers with nil, skipping dead/invalid ones:
		while waiter = @waiting.pop
			waiter.signal(nil)
		end
	end
end

def closed?

Signature

returns Boolean

Whether the queue is closed.

Implementation

def closed?
	@closed
end

attr :items

Signature

attribute Array

The items in the queue.

def size

Signature

returns Integer

The number of items in the queue.

Implementation

def size
	@items.size
end

def empty?

Signature

returns Boolean

Whether the queue is empty.

Implementation

def empty?
	@items.empty?
end

def waiting_count

Signature

returns Integer

The number of fibers waiting to dequeue.

Implementation

def waiting_count
	@mutex.synchronize do
		@waiting.size
	end
end

alias waiting waiting_count

  • deprecated

Signature

deprecated

Use #waiting_count instead.

def push(item)

Add an item to the queue.

Signature

parameter item Object

The item to add to the queue.

Implementation

def push(item)
	@mutex.synchronize do
		if @closed
			raise ClosedError, "Cannot push items to a closed queue."
		end
		
		@items << item
		
		# Wake up the highest priority waiter if any, skipping dead/invalid waiters:
		while waiter = @waiting.pop
			if waiter.valid?
				value = @items.shift
				waiter.signal(value)
				break
			end
			# Dead/invalid waiter discarded, try next one.
		end
	end
end

def <<(item)

Compatibility with ::Queue#push.

Implementation

def <<(item)
	self.push(item)
end

def enqueue(*items)

Add multiple items to the queue.

Signature

parameter items Array

The items to add to the queue.

Implementation

def enqueue(*items)
	@mutex.synchronize do
		if @closed
			raise ClosedError, "Cannot enqueue items to a closed queue."
		end
		
		@items.concat(items)
		
		# Wake up waiting fibers in priority order, skipping dead/invalid waiters:
		while !@items.empty? && (waiter = @waiting.pop)
			if waiter.valid?
				value = @items.shift
				waiter.signal(value)
			end
			# Dead/invalid waiter discarded, continue to next one.
		end
	end
end

def dequeue(priority: 0, timeout: nil)

Remove and return the next item from the queue.

If the queue is empty, this method will block until an item is available or timeout expires. Fibers are served in priority order, with higher priority fibers receiving items first.

Signature

parameter priority Numeric

The priority of this consumer (higher = served first).

parameter timeout Numeric, nil

Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.

returns Object, nil

The next item in the queue, or nil if timeout expires.

Implementation

def dequeue(priority: 0, timeout: nil)
	@mutex.synchronize do
		# If queue is closed and empty, return nil immediately:
		if @closed && @items.empty?
			return nil
		end
		
		# Fast path: if items available and either no waiters or we have higher priority:
		unless @items.empty?
			head = @waiting.peek
			if head.nil? or priority > head.priority
				return @items.shift
			end
		end
		
		# Handle immediate timeout (non-blocking)
		return nil if timeout == 0
		
		# Need to wait - create our own condition variable and add to waiting queue:
		sequence = @sequence
		@sequence += 1
		
		condition = ConditionVariable.new
		
		begin
			waiter = Waiter.new(Fiber.current, priority, sequence, condition, nil)
			@waiting.push(waiter)
			
			# Wait for our specific condition variable to be signaled:
			return waiter.wait_for_value(@mutex, timeout)
		ensure
			waiter&.invalidate!
		end
	end
end

def pop(priority: 0, timeout: nil)

Compatibility with ::Queue#pop.

Signature

parameter priority Numeric

The priority of this consumer.

parameter timeout Numeric, nil

Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.

returns Object, nil

The dequeued item, or nil if timeout expires.

Implementation

def pop(priority: 0, timeout: nil)
	self.dequeue(priority: priority, timeout: timeout)
end

def async(priority: 0, parent: (@parent or Task.current), **options, &block)

  • asynchronous

Process each item in the queue.

Signature

asynchronous

Executes the given block concurrently for each item.

parameter priority Numeric

The priority for processing items.

parameter parent Interface(:async) | Nil

The parent task to use for async operations.

parameter options Hash

The options to pass to the task.

yields {|task| ...}

When the system is idle, the block will be executed in a new task.

Implementation

def async(priority: 0, parent: (@parent or Task.current), **options, &block)
	while item = self.dequeue(priority: priority)
		parent.async(item, **options, &block)
	end
end

def each(priority: 0)

Enumerate each item in the queue.

Signature

parameter priority Numeric

The priority for dequeuing items.

Implementation

def each(priority: 0)
	while item = self.dequeue(priority: priority)
		yield item
	end
end

def signal(value = nil)

Signal the queue with a value, the same as #enqueue.

Implementation

def signal(value = nil)
	self.enqueue(value)
end

def wait(priority: 0)

Wait for an item to be available, the same as #dequeue.

Signature

parameter priority Numeric

The priority of this consumer.

Implementation

def wait(priority: 0)
	self.dequeue(priority: priority)
end

Discussion