Skip to content

Commit

Permalink
Replace 'keepends' with 'drop'
Browse files Browse the repository at this point in the history
Add 'drop' argument in place of 'keepends' for tubes. 'keepends' still
works but raises depreciation warning.
  • Loading branch information
MrQubo committed Oct 3, 2024
1 parent 9f92ed0 commit 83fd8c0
Showing 1 changed file with 97 additions and 47 deletions.
144 changes: 97 additions & 47 deletions pwnlib/tubes/tube.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,44 @@ def __init__(self, timeout = default, level = None, *a, **kw):
self._newline = None
atexit.register(self.close)

def _normalize_drop_keepends(self, drop, keepends, drop_default):
'''
>>> t = tube()
>>> t._normalize_drop_keepends(None, None, True)
True
>>> t._normalize_drop_keepends(None, None, False)
False
>>> t._normalize_drop_keepends(True, None, True)
True
>>> t._normalize_drop_keepends(True, None, False)
True
>>> t._normalize_drop_keepends(None, True, True)
False
>>> t._normalize_drop_keepends(None, True, False)
False
>>> t._normalize_drop_keepends(False, None, True)
False
>>> t._normalize_drop_keepends(False, None, False)
False
>>> t._normalize_drop_keepends(None, False, True)
True
>>> t._normalize_drop_keepends(None, False, False)
True
>>> t._normalize_drop_keepends(True, False, False)
Traceback (most recent call last):
...
pwnlib.exception.PwnlibException: 'drop' and 'keepends' arguments cannot be used together.
'''
if keepends is not None:
self.warn_once("'keepends' argument is deprecated. Use 'drop' instead.")
if drop is None and keepends is None:
return drop_default
elif drop is not None:
if keepends is not None:
self.error("'drop' and 'keepends' arguments cannot be used together.")
return drop
return not keepends

@property
def newline(self):
r'''Character sent with methods like sendline() or used for recvline().
Expand Down Expand Up @@ -98,8 +136,8 @@ def recv(self, numb = None, timeout = default):
>>> t.recv() == b'Woohoo'
True
>>> with context.local(log_level='debug'):
... _ = t.recv() # doctest: +ELLIPSIS
[...] Received 0xc bytes:
... _ = t.recv()
[DEBUG] Received 0xc bytes:
b'Hello, world'
"""
numb = self.buffer.get_fill_size(numb)
Expand Down Expand Up @@ -263,7 +301,7 @@ def recvn(self, numb, timeout = default):
>>> t.recv_raw = lambda *a: time.sleep(0.01) or b'a'
>>> t.recvn(10, timeout=0.05)
b''
>>> t.recvn(10, timeout=0.06) # doctest: +ELLIPSIS
>>> t.recvn(10, timeout=0.06)
b'aaaaaa...'
"""
# Keep track of how much data has been received
Expand Down Expand Up @@ -368,8 +406,8 @@ def recvuntil(self, delims, drop=False, timeout=default):

return b''

def recvlines(self, numlines=2**20, keepends=False, timeout=default):
r"""recvlines(numlines, keepends=False, timeout=default) -> list of bytes objects
def recvlines(self, numlines=2**20, drop=None, keepends=None, timeout=default):
r"""recvlines(numlines, drop=True, timeout=default) -> list of bytes objects
Receive up to ``numlines`` lines.
Expand All @@ -381,7 +419,7 @@ def recvlines(self, numlines=2**20, keepends=False, timeout=default):
Arguments:
numlines(int): Maximum number of lines to receive
keepends(bool): Keep newlines at the end of each line (:const:`False`).
drop(bool): Drop newlines at the end of each line (:const:`True`).
timeout(int): Maximum timeout
Raises:
Expand All @@ -400,17 +438,20 @@ def recvlines(self, numlines=2**20, keepends=False, timeout=default):
>>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\n'
>>> t.recvlines(3)
[b'Foo', b'Bar', b'Baz']
>>> t.recvlines(3, True)
>>> t.recvlines(3, drop=False)
[b'Foo\n', b'Bar\n', b'Baz\n']
"""
drop = self._normalize_drop_keepends(drop, keepends, True)
del keepends

lines = []
with self.countdown(timeout):
for _ in range(numlines):
try:
# We must set 'keepends' to True here so that we can
# We must set 'drop' to False here so that we can
# restore the original, unmodified data to the buffer
# in the event of a timeout.
res = self.recvline(keepends=True, timeout=timeout)
res = self.recvline(drop=False, timeout=timeout)
except Exception:
self.unrecv(b''.join(lines))
raise
Expand All @@ -420,13 +461,13 @@ def recvlines(self, numlines=2**20, keepends=False, timeout=default):
else:
break

if not keepends:
lines = [line.rstrip(self.newline) for line in lines]
if drop:
lines = [line[:-len(self.newline)] for line in lines]

return lines

def recvlinesS(self, numlines=2**20, keepends=False, timeout=default):
r"""recvlinesS(numlines, keepends=False, timeout=default) -> str list
def recvlinesS(self, numlines=2**20, drop=None, keepends=None, timeout=default):
r"""recvlinesS(numlines, drop=True, timeout=default) -> str list
This function is identical to :meth:`recvlines`, but decodes
the received bytes into string using :func:`context.encoding`.
Expand All @@ -442,10 +483,10 @@ def recvlinesS(self, numlines=2**20, keepends=False, timeout=default):
>>> t.recvlinesS(3)
['Foo', 'Bar', 'Baz']
"""
return [packing._decode(x) for x in self.recvlines(numlines, keepends, timeout)]
return [packing._decode(x) for x in self.recvlines(numlines, drop=drop, keepends=keepends, timeout=timeout)]

def recvlinesb(self, numlines=2**20, keepends=False, timeout=default):
r"""recvlinesb(numlines, keepends=False, timeout=default) -> bytearray list
def recvlinesb(self, numlines=2**20, drop=None, keepends=None, timeout=default):
r"""recvlinesb(numlines, drop=True, timeout=default) -> bytearray list
This function is identical to :meth:`recvlines`, but returns a bytearray.
Expand All @@ -459,10 +500,10 @@ def recvlinesb(self, numlines=2**20, keepends=False, timeout=default):
>>> t.recvlinesb(3)
[bytearray(b'Foo'), bytearray(b'Bar'), bytearray(b'Baz')]
"""
return [bytearray(x) for x in self.recvlines(numlines, keepends, timeout)]
return [bytearray(x) for x in self.recvlines(numlines, drop=drop, keepends=keepends, timeout=timeout)]

def recvline(self, keepends=True, timeout=default):
r"""recvline(keepends=True, timeout=default) -> bytes
def recvline(self, drop=None, keepends=None, timeout=default):
r"""recvline(drop=False, timeout=default) -> bytes
Receive a single line from the tube.
Expand All @@ -478,7 +519,7 @@ def recvline(self, keepends=True, timeout=default):
all data is buffered and an empty byte string (``b''``) is returned.
Arguments:
keepends(bool): Keep the line ending (:const:`True`).
drop(bool): Drop the line ending (:const:`False`).
timeout(int): Timeout
Raises:
Expand All @@ -501,10 +542,10 @@ def recvline(self, keepends=True, timeout=default):
b'Foo\n'
>>> t.recvline()
b'Bar\r\n'
>>> t.recvline(keepends = False)
>>> t.recvline(drop=True)
b'Baz'
>>> t.newline = b'\r\n'
>>> t.recvline(keepends = False)
>>> t.recvline(drop=True)
b'Foo\nBar'
>>> t = tube()
>>> def _recv_eof(n):
Expand All @@ -518,22 +559,25 @@ def recvline(self, keepends=True, timeout=default):
b'real line\n'
>>> t.recvline()
b'trailing data'
>>> t.recvline() # doctest: +ELLIPSIS
>>> t.recvline()
Traceback (most recent call last):
...
...
EOFError
"""
drop = self._normalize_drop_keepends(drop, keepends, False)
del keepends

try:
return self.recvuntil(self.newline, drop = not keepends, timeout = timeout)
return self.recvuntil(self.newline, drop=drop, timeout=timeout)
except EOFError:
if not context.throw_eof_on_incomplete_line and self.buffer.size > 0:
if context.throw_eof_on_incomplete_line is None:
self.warn_once('EOFError during recvline. Returning buffered data without trailing newline.')
return self.buffer.get()
raise

def recvline_pred(self, pred, keepends=False, timeout=default):
r"""recvline_pred(pred, keepends=False) -> bytes
def recvline_pred(self, pred, drop=None, keepends=None, timeout=default):
r"""recvline_pred(pred, drop=True, timeout=default) -> bytes
Receive data until ``pred(line)`` returns a truthy value.
Drop all other data.
Expand All @@ -544,25 +588,28 @@ def recvline_pred(self, pred, keepends=False, timeout=default):
Arguments:
pred(callable): Function to call. Returns the line for which
this function returns :const:`True`.
drop(bool): Drop the line ending (:const:`True`).
Examples:
>>> t = tube()
>>> t.recv_raw = lambda n: b"Foo\nBar\nBaz\n"
>>> t.recvline_pred(lambda line: line == b"Bar\n")
b'Bar'
>>> t.recvline_pred(lambda line: line == b"Bar\n", keepends=True)
>>> t.recvline_pred(lambda line: line == b"Bar\n", drop=False)
b'Bar\n'
>>> t.recvline_pred(lambda line: line == b'Nope!', timeout=0.1)
b''
"""
drop = self._normalize_drop_keepends(drop, keepends, True)
del keepends

tmpbuf = Buffer()
line = b''
with self.countdown(timeout):
while self.countdown_active():
try:
line = self.recvline(keepends=True)
line = self.recvline(drop=False)
except Exception:
self.buffer.unget(tmpbuf)
raise
Expand All @@ -572,22 +619,23 @@ def recvline_pred(self, pred, keepends=False, timeout=default):
return b''

if pred(line):
if not keepends:
if drop:
line = line[:-len(self.newline)]
return line
else:
tmpbuf.add(line)

return b''

def recvline_contains(self, items, keepends = False, timeout = default):
r"""
def recvline_contains(self, items, drop=None, keepends=None, timeout=default):
r"""recvline_contains(items, drop=True, timeout=default) -> bytes
Receive lines until one line is found which contains at least
one of `items`.
Arguments:
items(str,tuple): List of strings to search for, or a single string.
keepends(bool): Return lines with newlines if :const:`True`
drop(bool): Drop the line ending (:const:`True`).
timeout(int): Timeout, in seconds
Examples:
Expand All @@ -613,10 +661,10 @@ def recvline_contains(self, items, keepends = False, timeout = default):
def pred(line):
return any(d in line for d in items)

return self.recvline_pred(pred, keepends, timeout)
return self.recvline_pred(pred, drop=drop, keepends=keepends, timeout=timeout)

def recvline_startswith(self, delims, keepends=False, timeout=default):
r"""recvline_startswith(delims, keepends=False, timeout=default) -> bytes
def recvline_startswith(self, delims, drop=None, keepends=None, timeout=default):
r"""recvline_startswith(delims, drop=True, timeout=default) -> bytes
Keep receiving lines until one is found that starts with one of
`delims`. Returns the last line received.
Expand All @@ -626,7 +674,7 @@ def recvline_startswith(self, delims, keepends=False, timeout=default):
Arguments:
delims(str,tuple): List of strings to search for, or string of single characters
keepends(bool): Return lines with newlines if :const:`True`
drop(bool): Drop the line ending (:const:`True`).
timeout(int): Timeout, in seconds
Returns:
Expand All @@ -638,7 +686,7 @@ def recvline_startswith(self, delims, keepends=False, timeout=default):
>>> t.recv_raw = lambda n: b"Hello\nWorld\nXylophone\n"
>>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'))
b'World'
>>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'), True)
>>> t.recvline_startswith((b'W',b'X',b'Y',b'Z'), drop=False)
b'Xylophone\n'
>>> t.recvline_startswith(b'Wo')
b'World'
Expand All @@ -649,11 +697,12 @@ def recvline_startswith(self, delims, keepends=False, timeout=default):
delims = tuple(map(packing._need_bytes, delims))

return self.recvline_pred(lambda line: any(map(line.startswith, delims)),
drop=drop,
keepends=keepends,
timeout=timeout)

def recvline_endswith(self, delims, keepends=False, timeout=default):
r"""recvline_endswith(delims, keepends=False, timeout=default) -> bytes
def recvline_endswith(self, delims, drop=None, keepends=None, timeout=default):
r"""recvline_endswith(delims, drop=True, timeout=default) -> bytes
Keep receiving lines until one is found that ends with one of
`delims`. Returns the last line received.
Expand All @@ -669,7 +718,7 @@ def recvline_endswith(self, delims, keepends=False, timeout=default):
>>> t.recv_raw = lambda n: b'Foo\nBar\nBaz\nKaboodle\n'
>>> t.recvline_endswith(b'r')
b'Bar'
>>> t.recvline_endswith((b'a',b'b',b'c',b'd',b'e'), True)
>>> t.recvline_endswith((b'a',b'b',b'c',b'd',b'e'), drop=False)
b'Kaboodle\n'
>>> t.recvline_endswith(b'oodle')
b'Kaboodle'
Expand All @@ -681,6 +730,7 @@ def recvline_endswith(self, delims, keepends=False, timeout=default):
delims = tuple(packing._need_bytes(delim) + self.newline for delim in delims)

return self.recvline_pred(lambda line: any(map(line.endswith, delims)),
drop=drop,
keepends=keepends,
timeout=timeout)

Expand Down Expand Up @@ -724,8 +774,8 @@ def recvregex(self, regex, exact=False, timeout=default, capture=False):
else:
return self.recvpred(pred, timeout = timeout)

def recvline_regex(self, regex, exact=False, keepends=False, timeout=default):
"""recvline_regex(regex, exact=False, keepends=False, timeout=default) -> bytes
def recvline_regex(self, regex, exact=False, drop=None, keepends=None, timeout=default):
"""recvline_regex(regex, exact=False, drop=True, timeout=default) -> bytes
Wrapper around :func:`recvline_pred`, which will return when a regex
matches a line.
Expand All @@ -746,7 +796,7 @@ def recvline_regex(self, regex, exact=False, keepends=False, timeout=default):
else:
pred = regex.search

return self.recvline_pred(pred, keepends = keepends, timeout = timeout)
return self.recvline_pred(pred, drop=drop, keepends=keepends, timeout=timeout)

def recvrepeat(self, timeout=default):
"""recvrepeat(timeout=default) -> bytes
Expand Down Expand Up @@ -1062,7 +1112,7 @@ def clean_and_log(self, timeout = 0.05):
>>> t.connected_raw = lambda d: True
>>> t.fileno = lambda: 1234
>>> with context.local(log_level='info'):
... data = t.clean_and_log() #doctest: +ELLIPSIS
... data = t.clean_and_log()
[DEBUG] Received 0xb bytes:
b'hooray_data'
>>> data
Expand Down Expand Up @@ -1313,7 +1363,7 @@ def shutdown(self, direction = "send"):
send
send
send
>>> t.shutdown('bad_value') #doctest: +ELLIPSIS
>>> t.shutdown('bad_value')
Traceback (most recent call last):
...
KeyError: "direction must be in ['in', 'out', 'read', 'recv', 'send', 'write']"
Expand Down Expand Up @@ -1347,7 +1397,7 @@ def connected(self, direction = 'any'):
send
send
send
>>> t.connected('bad_value') #doctest: +ELLIPSIS
>>> t.connected('bad_value')
Traceback (most recent call last):
...
KeyError: "direction must be in ['any', 'in', 'out', 'read', 'recv', 'send', 'write']"
Expand Down

0 comments on commit 83fd8c0

Please sign in to comment.