Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Aug 5, 2024
1 parent f5b7ee8 commit eeb6c41
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 15 deletions.
43 changes: 30 additions & 13 deletions lib/async/pool/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,18 @@ def release(resource)
retire(resource) unless processed
end

def drain
# Enumerate all existing resources and retire them:
while resource = acquire_existing_resource
retire(resource)
end
end

# Close all resources in the pool.
def close
@available.clear

while pair = @resources.shift
resource, usage = pair

if usage > 0
Console.warn(self, resource: resource, usage: usage) {"Closing resource while still in use!"}
end

resource.close
end
self.drain

@available.clear
@gardener&.stop
end

Expand Down Expand Up @@ -224,6 +222,8 @@ def retire(resource)
def start_gardener
return if @gardener

@gardener = true

Async(transient: true, annotation: "#{self.class} Gardener") do |task|
@gardener = task

Expand Down Expand Up @@ -319,7 +319,7 @@ def available_resource
resource = nil

@guard.acquire do
resource = get_resource
resource = acquire_or_create_resource
end

return resource
Expand All @@ -330,7 +330,24 @@ def available_resource

private

def get_resource
# Acquire an existing resource with zero usage.
# If there are resources that are in use, wait until they are released.
def acquire_existing_resource
while @resources.any?
@resources.each do |resource, usage|
if usage == 0
return resource
end
end

@notification.wait
end

# Only when the pool has been completely drained, return nil:
return nil
end

def acquire_or_create_resource
while resource = @available.last
if usage = @resources[resource] and usage < resource.concurrency
if resource.viable?
Expand Down
20 changes: 18 additions & 2 deletions test/async/pool/controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,27 @@
end

it "warns if closing while a resource is acquired" do
pool.acquire
events = []

expect(Console).to receive(:warn).and_return(nil)
child = Async do |task|
events << :acquire
resource = pool.acquire

sleep 0.1

events << :release
pool.release(resource)
end

sleep 0.1

events << :close
pool.close
events << :closed

child.wait

expect(events).to be == [:acquire, :close, :release, :closed]
end
end

Expand Down

0 comments on commit eeb6c41

Please sign in to comment.