Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CI: Reliability, eliminate a race condition and some resource leaks #1119

Merged
merged 7 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Unreleased
* :gh:issue:`905` Initial support for templated ``ansible_ssh_args``,
``ansible_ssh_common_args``, and ``ansible_ssh_extra_args`` variables.
NB: play or task scoped variables will probably still fail.
* :gh:issue:`694` CI: Fixed a race condition and some resource leaks causing
some of intermittent failures when running the test suite.


v0.3.9 (2024-08-13)
Expand Down
16 changes: 15 additions & 1 deletion mitogen/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2542,7 +2542,7 @@ def _signal_child(self, signum):
# because it is setuid, so this is best-effort only.
LOG.debug('%r: sending %s', self.proc, SIGNAL_BY_NUM[signum])
try:
os.kill(self.proc.pid, signum)
self.proc.send_signal(signum)
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EPERM:
Expand Down Expand Up @@ -2662,6 +2662,17 @@ def poll(self):
"""
raise NotImplementedError()

def send_signal(self, sig):
os.kill(self.pid, sig)

def terminate(self):
"Ask the process to gracefully shutdown."
self.send_signal(signal.SIGTERM)

def kill(self):
"Ask the operating system to forcefully destroy the process."
self.send_signal(signal.SIGKILL)


class PopenProcess(Process):
"""
Expand All @@ -2678,6 +2689,9 @@ def __init__(self, proc, stdin, stdout, stderr=None):
def poll(self):
return self.proc.poll()

def send_signal(self, sig):
self.proc.send_signal(sig)


class ModuleForwarder(object):
"""
Expand Down
10 changes: 7 additions & 3 deletions mitogen/unix.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,23 @@ def on_shutdown(self, broker):
def on_accept_client(self, sock):
sock.setblocking(True)
try:
pid, = struct.unpack('>L', sock.recv(4))
data = sock.recv(4)
pid, = struct.unpack('>L', data)
except (struct.error, socket.error):
LOG.error('listener: failed to read remote identity: %s',
sys.exc_info()[1])
LOG.error('listener: failed to read remote identity, got %d bytes: %s',
len(data), sys.exc_info()[1])
sock.close()
return

context_id = self._router.id_allocator.allocate()
try:
# FIXME #1109 send() returns number of bytes sent, check it
sock.send(struct.pack('>LLL', context_id, mitogen.context_id,
os.getpid()))
except socket.error:
LOG.error('listener: failed to assign identity to PID %d: %s',
pid, sys.exc_info()[1])
sock.close()
return

context = mitogen.parent.Context(self._router, context_id)
Expand Down
9 changes: 6 additions & 3 deletions tests/connection_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import signal
import sys
Expand Down Expand Up @@ -54,7 +55,9 @@ def do_detach(econtext):
class DetachReapTest(testlib.RouterMixin, testlib.TestCase):
def test_subprocess_preserved_on_shutdown(self):
c1 = self.router.local()
c1_stream = self.router.stream_by_id(c1.context_id)
pid = c1.call(os.getpid)
self.assertEqual(pid, c1_stream.conn.proc.pid)

l = mitogen.core.Latch()
mitogen.core.listen(c1, 'disconnect', l.put)
Expand All @@ -64,8 +67,8 @@ def test_subprocess_preserved_on_shutdown(self):
self.broker.shutdown()
self.broker.join()

os.kill(pid, 0) # succeeds if process still alive
self.assertIsNone(os.kill(pid, 0)) # succeeds if process still alive

# now clean up
os.kill(pid, signal.SIGTERM)
os.waitpid(pid, 0)
c1_stream.conn.proc.terminate()
c1_stream.conn.proc.proc.wait()
1 change: 1 addition & 0 deletions tests/create_child_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def close_proc(proc):
proc.stdout.close()
if proc.stderr:
proc.stderr.close()
proc.proc.wait()


def wait_read(fp, n):
Expand Down
2 changes: 1 addition & 1 deletion tests/data/importer/six_brokenpkg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@
else:
from . import _six as six
six_py_file = '{0}.py'.format(os.path.splitext(six.__file__)[0])
exec(open(six_py_file, 'rb').read())
with open(six_py_file, 'rb') as f: exec(f.read())
3 changes: 3 additions & 0 deletions tests/id_allocation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ def test_slave_allocates_id(self):
# Subsequent master allocation does not collide
c2 = self.router.local()
self.assertEqual(1002, c2.context_id)

context.shutdown()
c2.shutdown()
20 changes: 9 additions & 11 deletions tests/reaper_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@


class ReaperTest(testlib.TestCase):
@mock.patch('os.kill')
def test_calc_delay(self, kill):
def test_calc_delay(self):
broker = mock.Mock()
proc = mock.Mock()
proc.poll.return_value = None
Expand All @@ -24,29 +23,28 @@ def test_calc_delay(self, kill):
self.assertEqual(752, int(1000 * reaper._calc_delay(5)))
self.assertEqual(1294, int(1000 * reaper._calc_delay(6)))

@mock.patch('os.kill')
def test_reap_calls(self, kill):
def test_reap_calls(self):
broker = mock.Mock()
proc = mock.Mock()
proc.poll.return_value = None

reaper = mitogen.parent.Reaper(broker, proc, True, True)

reaper.reap()
self.assertEqual(0, kill.call_count)
self.assertEqual(0, proc.send_signal.call_count)

reaper.reap()
self.assertEqual(1, kill.call_count)
self.assertEqual(1, proc.send_signal.call_count)

reaper.reap()
reaper.reap()
reaper.reap()
self.assertEqual(1, kill.call_count)
self.assertEqual(1, proc.send_signal.call_count)

reaper.reap()
self.assertEqual(2, kill.call_count)
self.assertEqual(2, proc.send_signal.call_count)

self.assertEqual(kill.mock_calls, [
mock.call(proc.pid, signal.SIGTERM),
mock.call(proc.pid, signal.SIGKILL),
self.assertEqual(proc.send_signal.mock_calls, [
mock.call(signal.SIGTERM),
mock.call(signal.SIGKILL),
])
1 change: 1 addition & 0 deletions tests/ssh_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def test_verbose_enabled(self):
self.dockerized_ssh.port,
)
self.assertEqual(name, context.name)
context.shutdown(wait=True)


class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase):
Expand Down
34 changes: 28 additions & 6 deletions tests/testlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ def data_path(suffix):
return path


def retry(fn, on, max_attempts, delay):
for i in range(max_attempts):
try:
return fn()
except on:
if i >= max_attempts - 1:
raise
else:
time.sleep(delay)


def threading__thread_is_alive(thread):
"""Return whether the thread is alive (Python version compatibility shim).

Expand Down Expand Up @@ -562,18 +573,24 @@ def wait_for_sshd(self):
wait_for_port(self.get_host(), self.port, pattern='OpenSSH')

def check_processes(self):
args = ['docker', 'exec', self.container_name, 'ps', '-o', 'comm=']
# Get Accounting name (ucomm) & command line (args) of each process
# in the container. No truncation (-ww). No column headers (foo=).
ps_output = subprocess.check_output([
'docker', 'exec', self.container_name,
'ps', '-w', '-w', '-o', 'ucomm=', '-o', 'args=',
])
ps_lines = ps_output.decode().splitlines()
processes = [tuple(line.split(None, 1)) for line in ps_lines]
counts = {}
for comm in subprocess.check_output(args).decode().splitlines():
comm = comm.strip()
counts[comm] = counts.get(comm, 0) + 1
for ucomm, _ in processes:
counts[ucomm] = counts.get(ucomm, 0) + 1

if counts != {'ps': 1, 'sshd': 1}:
assert 0, (
'Docker container %r contained extra running processes '
'after test completed: %r' % (
self.container_name,
counts
processes,
)
)

Expand Down Expand Up @@ -630,7 +647,12 @@ def setUpClass(cls):

@classmethod
def tearDownClass(cls):
cls.dockerized_ssh.check_processes()
retry(
cls.dockerized_ssh.check_processes,
on=AssertionError,
max_attempts=5,
delay=0.1,
)
cls.dockerized_ssh.close()
super(DockerMixin, cls).tearDownClass()

Expand Down
18 changes: 7 additions & 11 deletions tests/unix_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,13 @@ class ListenerTest(testlib.RouterMixin, testlib.TestCase):

def test_constructor_basic(self):
listener = self.klass.build_stream(router=self.router)
capture = testlib.LogCapturer()
capture.start()
try:
self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path))
os.unlink(listener.protocol.path)
# ensure we catch 0 byte read error log message
self.broker.shutdown()
self.broker.join()
self.broker_shutdown = True
finally:
capture.stop()
self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path))
os.unlink(listener.protocol.path)

# ensure we catch 0 byte read error log message
self.broker.shutdown()
self.broker.join()
self.broker_shutdown = True


class ClientTest(testlib.TestCase):
Expand Down
Loading