From fffaba38cb4457285d8c3fc49e5762242bb771fe Mon Sep 17 00:00:00 2001 From: Giampaolo Rodola Date: Fri, 20 Dec 2024 16:11:21 +0100 Subject: [PATCH] Use f-strings (#2483) --- .github/workflows/issues.py | 18 +-- docs/conf.py | 4 +- psutil/__init__.py | 43 ++--- psutil/_common.py | 26 +-- psutil/_psaix.py | 13 +- psutil/_psbsd.py | 27 ++-- psutil/_pslinux.py | 201 ++++++++++++------------ psutil/_psposix.py | 3 +- psutil/_pssunos.py | 47 +++--- psutil/_pswindows.py | 31 ++-- psutil/tests/__init__.py | 77 ++++----- psutil/tests/test_aix.py | 4 +- psutil/tests/test_bsd.py | 34 ++-- psutil/tests/test_connections.py | 6 +- psutil/tests/test_linux.py | 54 +++---- psutil/tests/test_misc.py | 20 +-- psutil/tests/test_osx.py | 6 +- psutil/tests/test_posix.py | 10 +- psutil/tests/test_process.py | 26 ++- psutil/tests/test_process_all.py | 4 +- psutil/tests/test_sunos.py | 2 +- psutil/tests/test_system.py | 5 +- psutil/tests/test_windows.py | 25 ++- pyproject.toml | 9 +- scripts/battery.py | 12 +- scripts/ifconfig.py | 10 +- scripts/internal/bench_oneshot.py | 13 +- scripts/internal/bench_oneshot_2.py | 4 +- scripts/internal/check_broken_links.py | 8 +- scripts/internal/clinter.py | 4 +- scripts/internal/download_wheels.py | 8 +- scripts/internal/git_pre_commit.py | 23 ++- scripts/internal/install_pip.py | 4 +- scripts/internal/print_access_denied.py | 2 +- scripts/internal/print_api_speed.py | 2 +- scripts/internal/print_dist.py | 8 +- scripts/internal/print_downloads.py | 24 +-- scripts/internal/print_hashes.py | 7 +- scripts/internal/winmake.py | 16 +- scripts/killall.py | 4 +- scripts/pidof.py | 2 +- scripts/pmap.py | 2 +- scripts/procinfo.py | 16 +- scripts/procsmem.py | 6 +- scripts/sensors.py | 8 +- scripts/top.py | 4 +- scripts/winservices.py | 4 +- 47 files changed, 438 insertions(+), 448 deletions(-) diff --git a/.github/workflows/issues.py b/.github/workflows/issues.py index 5efe6fc32..7fb369eb9 100755 --- a/.github/workflows/issues.py +++ b/.github/workflows/issues.py @@ -194,29 +194,29 @@ def get_issue(): def log(msg): if '\n' in msg or "\r\n" in msg: - print(">>>\n%s\n<<<" % msg, flush=True) + print(f">>>\n{msg}\n<<<", flush=True) else: - print(">>> %s <<<" % msg, flush=True) + print(f">>> {msg} <<<", flush=True) def add_label(issue, label): def should_add(issue, label): if has_label(issue, label): - log("already has label %r" % (label)) + log(f"already has label {label!r}") return False for left, right in ILLOGICAL_PAIRS: if label == left and has_label(issue, right): - log("already has label" % (label)) + log(f"already has label f{label}") return False return not has_label(issue, label) if not should_add(issue, label): - log("should not add label %r" % label) + log(f"should not add label {label!r}") return - log("add label %r" % label) + log(f"add label {label!r}") issue.add_to_labels(label) @@ -329,16 +329,16 @@ def on_new_pr(issue): def main(): issue = get_issue() stype = "PR" if is_pr(issue) else "issue" - log("running issue bot for %s %r" % (stype, issue)) + log(f"running issue bot for {stype} {issue!r}") if is_event_new_issue(): - log("created new issue %s" % issue) + log(f"created new issue {issue}") add_labels_from_text(issue, issue.title) if issue.body: add_labels_from_new_body(issue, issue.body) on_new_issue(issue) elif is_event_new_pr(): - log("created new PR %s" % issue) + log(f"created new PR {issue}") add_labels_from_text(issue, issue.title) if issue.body: add_labels_from_new_body(issue, issue.body) diff --git a/docs/conf.py b/docs/conf.py index 213aadfe0..604eeccb4 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -84,7 +84,7 @@ def get_version(): # General information about the project. project = PROJECT_NAME -copyright = '2009-%s, %s' % (THIS_YEAR, AUTHOR) +copyright = f"2009-{THIS_YEAR}, {AUTHOR}" author = AUTHOR # The version info for the project you're documenting, acts as replacement for @@ -267,7 +267,7 @@ def get_version(): # html_search_scorer = 'scorer.js' # Output file base name for HTML help builder. -htmlhelp_basename = '%s-doc' % PROJECT_NAME +htmlhelp_basename = f"{PROJECT_NAME}-doc" # -- Options for LaTeX output --------------------------------------------- diff --git a/psutil/__init__.py b/psutil/__init__.py index c7d377f60..160c813d8 100644 --- a/psutil/__init__.py +++ b/psutil/__init__.py @@ -135,7 +135,8 @@ PROCFS_PATH = "/proc" else: # pragma: no cover - raise NotImplementedError('platform %s is not supported' % sys.platform) + msg = f"platform {sys.platform} is not supported" + raise NotImplementedError(msg) # fmt: off @@ -223,7 +224,7 @@ if int(__version__.replace('.', '')) != getattr( _psplatform.cext, 'version', None ): - msg = "version conflict: %r C extension " % _psplatform.cext.__file__ + msg = f"version conflict: {_psplatform.cext.__file__!r} C extension " msg += "module was built for another version of psutil" if hasattr(_psplatform.cext, 'version'): msg += " (%s instead of %s)" % ( @@ -231,7 +232,7 @@ __version__, ) else: - msg += " (different than %s)" % __version__ + msg += f" (different than {__version__})" msg += "; you may try to 'pip uninstall psutil', manually remove %s" % ( getattr( _psplatform.cext, @@ -323,12 +324,12 @@ def _init(self, pid, _ignore_nsp=False): pid = os.getpid() else: if pid < 0: - msg = "pid must be a positive integer (got %s)" % pid + msg = f"pid must be a positive integer (got {pid})" raise ValueError(msg) try: _psplatform.cext.check_pid_range(pid) except OverflowError: - msg = "process PID out of range (got %s)" % pid + msg = f"process PID out of range (got {pid})" raise NoSuchProcess(pid, msg=msg) self._pid = pid @@ -419,7 +420,7 @@ def __str__(self): return "%s.%s(%s)" % ( self.__class__.__module__, self.__class__.__name__, - ", ".join(["%s=%r" % (k, v) for k, v in info.items()]), + ", ".join([f"{k}={v!r}" for k, v in info.items()]), ) __repr__ = __str__ @@ -553,7 +554,7 @@ def as_dict(self, attrs=None, ad_value=None): valid_names = _as_dict_attrnames if attrs is not None: if not isinstance(attrs, (list, tuple, set, frozenset)): - msg = "invalid attrs type %s" % type(attrs) + msg = f"invalid attrs type {type(attrs)}" raise TypeError(msg) attrs = set(attrs) invalid_names = attrs - valid_names @@ -1046,7 +1047,7 @@ def cpu_percent(self, interval=None): """ blocking = interval is not None and interval > 0.0 if interval is not None and interval < 0: - msg = "interval is not positive (got %r)" % interval + msg = f"interval is not positive (got {interval!r})" raise ValueError(msg) num_cpus = cpu_count() or 1 @@ -1156,9 +1157,9 @@ def memory_percent(self, memtype="rss"): """ valid_types = list(_psplatform.pfullmem._fields) if memtype not in valid_types: - msg = "invalid memtype %r; valid types are %r" % ( - memtype, - tuple(valid_types), + msg = ( + f"invalid memtype {memtype!r}; valid types are" + f" {tuple(valid_types)!r}" ) raise ValueError(msg) fun = ( @@ -1175,7 +1176,7 @@ def memory_percent(self, memtype="rss"): # we should never get here msg = ( "can't calculate process memory percent because total physical" - " system memory is not positive (%r)" % (total_phymem) + f" system memory is not positive ({total_phymem!r})" ) raise ValueError(msg) return (value / float(total_phymem)) * 100 @@ -1438,9 +1439,9 @@ def __getattribute__(self, name): try: return object.__getattribute__(self.__subproc, name) except AttributeError: - msg = "%s instance has no attribute '%s'" % ( - self.__class__.__name__, - name, + msg = ( + f"{self.__class__.__name__} instance has no attribute" + f" '{name}'" ) raise AttributeError(msg) @@ -1524,7 +1525,7 @@ def remove(pid): remove(pid) while _pids_reused: pid = _pids_reused.pop() - debug("refreshing Process instance for reused PID %s" % pid) + debug(f"refreshing Process instance for reused PID {pid}") remove(pid) try: ls = sorted(list(pmap.items()) + list(dict.fromkeys(new_pids).items())) @@ -1596,12 +1597,12 @@ def check_gone(proc, timeout): callback(proc) if timeout is not None and not timeout >= 0: - msg = "timeout must be a positive integer, got %s" % timeout + msg = f"timeout must be a positive integer, got {timeout}" raise ValueError(msg) gone = set() alive = set(procs) if callback is not None and not callable(callback): - msg = "callback %r is not a callable" % callback + msg = f"callback {callback!r} is not a callable" raise TypeError(msg) if timeout is not None: deadline = _timer() + timeout @@ -1801,7 +1802,7 @@ def cpu_percent(interval=None, percpu=False): tid = threading.current_thread().ident blocking = interval is not None and interval > 0.0 if interval is not None and interval < 0: - msg = "interval is not positive (got %r)" % interval + msg = f"interval is not positive (got {interval})" raise ValueError(msg) def calculate(t1, t2): @@ -1861,7 +1862,7 @@ def cpu_times_percent(interval=None, percpu=False): tid = threading.current_thread().ident blocking = interval is not None and interval > 0.0 if interval is not None and interval < 0: - msg = "interval is not positive (got %r)" % interval + msg = f"interval is not positive (got {interval!r})" raise ValueError(msg) def calculate(t1, t2): @@ -2243,7 +2244,7 @@ def net_if_addrs(): # https://github.com/giampaolo/psutil/issues/786 separator = ":" if POSIX else "-" while addr.count(separator) < 5: - addr += "%s00" % separator + addr += f"{separator}00" ret[name].append(_common.snicaddr(fam, addr, mask, broadcast, ptp)) return dict(ret) diff --git a/psutil/_common.py b/psutil/_common.py index a0986ca7b..1e8bb7959 100644 --- a/psutil/_common.py +++ b/psutil/_common.py @@ -289,8 +289,8 @@ def __str__(self): def __repr__(self): # invoked on `repr(Error)` info = self._infodict(("pid", "ppid", "name", "seconds", "msg")) - details = ", ".join(["%s=%r" % (k, v) for k, v in info.items()]) - return "psutil.%s(%s)" % (self.__class__.__name__, details) + details = ", ".join([f"{k}={v!r}" for k, v in info.items()]) + return f"psutil.{self.__class__.__name__}({details})" class NoSuchProcess(Error): @@ -356,7 +356,7 @@ def __init__(self, seconds, pid=None, name=None): self.seconds = seconds self.pid = pid self.name = name - self.msg = "timeout after %s seconds" % seconds + self.msg = f"timeout after {seconds} seconds" def __reduce__(self): return (self.__class__, (self.seconds, self.pid, self.name)) @@ -863,13 +863,12 @@ def hilite(s, color=None, bold=False): # pragma: no cover try: color = colors[color] except KeyError: - raise ValueError( - "invalid color %r; choose between %s" % (list(colors.keys())) - ) + msg = f"invalid color {color!r}; choose amongst {list(colors.keys())}" + raise ValueError(msg) attr.append(color) if bold: attr.append('1') - return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), s) + return f"\x1b[{';'.join(attr)}m{s}\x1b[0m" def print_color( @@ -894,10 +893,11 @@ def print_color( try: color = colors[color] except KeyError: - raise ValueError( - "invalid color %r; choose between %r" - % (color, list(colors.keys())) + msg = ( + f"invalid color {color!r}; choose between" + f" {list(colors.keys())!r}" ) + raise ValueError(msg) if bold and color <= 7: color += 8 @@ -922,9 +922,9 @@ def debug(msg): if isinstance(msg, Exception): if isinstance(msg, OSError): # ...because str(exc) may contain info about the file name - msg = "ignoring %s" % msg + msg = f"ignoring {msg}" else: - msg = "ignoring %r" % msg + msg = f"ignoring {msg!r}" print( # noqa - "psutil-debug [%s:%s]> %s" % (fname, lineno, msg), file=sys.stderr + f"psutil-debug [{fname}:{lineno}]> {msg}", file=sys.stderr ) diff --git a/psutil/_psaix.py b/psutil/_psaix.py index b774d8d42..2547e194a 100644 --- a/psutil/_psaix.py +++ b/psutil/_psaix.py @@ -146,7 +146,8 @@ def cpu_count_cores(): stdout, stderr = p.communicate() stdout, stderr = (x.decode(sys.stdout.encoding) for x in (stdout, stderr)) if p.returncode != 0: - raise RuntimeError("%r command error\n%s" % (cmd, stderr)) + msg = f"{cmd!r} command error\n{stderr}" + raise RuntimeError(msg) processors = stdout.strip().splitlines() return len(processors) or None @@ -430,7 +431,7 @@ def threads(self): # is no longer there. if not retlist: # will raise NSP if process is gone - os.stat('%s/%s' % (self._procfs_path, self.pid)) + os.stat(f"{self._procfs_path}/{self.pid}") return retlist @wrap_exceptions @@ -443,7 +444,7 @@ def net_connections(self, kind='inet'): # is no longer there. if not ret: # will raise NSP if process is gone - os.stat('%s/%s' % (self._procfs_path, self.pid)) + os.stat(f"{self._procfs_path}/{self.pid}") return ret @wrap_exceptions @@ -489,10 +490,10 @@ def terminal(self): def cwd(self): procfs_path = self._procfs_path try: - result = os.readlink("%s/%s/cwd" % (procfs_path, self.pid)) + result = os.readlink(f"{procfs_path}/{self.pid}/cwd") return result.rstrip('/') except FileNotFoundError: - os.stat("%s/%s" % (procfs_path, self.pid)) # raise NSP or AD + os.stat(f"{procfs_path}/{self.pid}") # raise NSP or AD return "" @wrap_exceptions @@ -539,7 +540,7 @@ def open_files(self): def num_fds(self): if self.pid == 0: # no /proc/0/fd return 0 - return len(os.listdir("%s/%s/fd" % (self._procfs_path, self.pid))) + return len(os.listdir(f"{self._procfs_path}/{self.pid}/fd")) @wrap_exceptions def num_ctx_switches(self): diff --git a/psutil/_psbsd.py b/psutil/_psbsd.py index 0628facbe..6f6eeb35f 100644 --- a/psutil/_psbsd.py +++ b/psutil/_psbsd.py @@ -485,7 +485,7 @@ def sensors_temperatures(): current, high = cext.sensors_cpu_temperature(cpu) if high <= 0: high = None - name = "Core %s" % cpu + name = f"Core {cpu}" ret["coretemp"].append( _common.shwtemp(name, current, high, high) ) @@ -673,7 +673,7 @@ def exe(self): # /proc/0 dir exists but /proc/0/exe doesn't return "" with wrap_exceptions_procfs(self): - return os.readlink("/proc/%s/exe" % self.pid) + return os.readlink(f"/proc/{self.pid}/exe") else: # OpenBSD: exe cannot be determined; references: # https://chromium.googlesource.com/chromium/src/base/+/ @@ -708,7 +708,7 @@ def cmdline(self): else: # XXX: this happens with unicode tests. It means the C # routine is unable to decode invalid unicode chars. - debug("ignoring %r and returning an empty list" % err) + debug(f"ignoring {err!r} and returning an empty list") return [] else: raise @@ -932,12 +932,11 @@ def cpu_affinity_set(self, cpus): # Pre-emptively check if CPUs are valid because the C # function has a weird behavior in case of invalid CPUs, # see: https://github.com/giampaolo/psutil/issues/586 - allcpus = tuple(range(len(per_cpu_times()))) + allcpus = set(range(len(per_cpu_times()))) for cpu in cpus: if cpu not in allcpus: - raise ValueError( - "invalid CPU #%i (choose between %s)" % (cpu, allcpus) - ) + msg = f"invalid CPU {cpu!r} (choose between {allcpus})" + raise ValueError(msg) try: cext.proc_cpu_affinity_set(self.pid, cpus) except OSError as err: @@ -948,10 +947,11 @@ def cpu_affinity_set(self, cpus): if err.errno in {errno.EINVAL, errno.EDEADLK}: for cpu in cpus: if cpu not in allcpus: - raise ValueError( - "invalid CPU #%i (choose between %s)" - % (cpu, allcpus) + msg = ( + f"invalid CPU {cpu!r} (choose between" + f" {allcpus})" ) + raise ValueError(msg) raise @wrap_exceptions @@ -964,9 +964,10 @@ def rlimit(self, resource, limits=None): return cext.proc_getrlimit(self.pid, resource) else: if len(limits) != 2: - raise ValueError( - "second argument must be a (soft, hard) tuple, got %s" - % repr(limits) + msg = ( + "second argument must be a (soft, hard) tuple, got" + f" {limits!r}" ) + raise ValueError(msg) soft, hard = limits return cext.proc_setrlimit(self.pid, resource, soft, hard) diff --git a/psutil/_pslinux.py b/psutil/_pslinux.py index b67964cf6..531503d55 100644 --- a/psutil/_pslinux.py +++ b/psutil/_pslinux.py @@ -73,8 +73,8 @@ POWER_SUPPLY_PATH = "/sys/class/power_supply" -HAS_PROC_SMAPS = os.path.exists('/proc/%s/smaps' % os.getpid()) -HAS_PROC_SMAPS_ROLLUP = os.path.exists('/proc/%s/smaps_rollup' % os.getpid()) +HAS_PROC_SMAPS = os.path.exists(f"/proc/{os.getpid()}/smaps") +HAS_PROC_SMAPS_ROLLUP = os.path.exists(f"/proc/{os.getpid()}/smaps_rollup") HAS_PROC_IO_PRIORITY = hasattr(cext, "proc_ioprio_get") HAS_CPU_AFFINITY = hasattr(cext, "proc_cpu_affinity_get") @@ -242,9 +242,9 @@ def is_storage_device(name): name = name.replace('/', '!') including_virtual = True if including_virtual: - path = "/sys/block/%s" % name + path = f"/sys/block/{name}" else: - path = "/sys/block/%s/device" % name + path = f"/sys/block/{name}/device" return os.access(path, os.F_OK) @@ -257,7 +257,7 @@ def set_scputimes_ntuple(procfs_path): Used by cpu_times() function. """ global scputimes - with open_binary('%s/stat' % procfs_path) as f: + with open_binary(f"{procfs_path}/stat") as f: values = f.readline().split()[1:] fields = ['user', 'nice', 'system', 'idle', 'iowait', 'irq', 'softirq'] vlen = len(values) @@ -277,7 +277,7 @@ def set_scputimes_ntuple(procfs_path): set_scputimes_ntuple("/proc") except Exception as err: # noqa: BLE001 # Don't want to crash at import time. - debug("ignoring exception on import: %r" % err) + debug(f"ignoring exception on import: {err!r}") scputimes = namedtuple('scputimes', 'user system idle')(0.0, 0.0, 0.0) @@ -324,13 +324,12 @@ def calculate_avail_vmem(mems): slab_reclaimable = mems[b'SReclaimable:'] except KeyError as err: debug( - "%s is missing from /proc/meminfo; using an approximation for " - "calculating available memory" - % err.args[0] + f"{err.args[0]} is missing from /proc/meminfo; using an" + " approximation for calculating available memory" ) return fallback try: - f = open_binary('%s/zoneinfo' % get_procfs_path()) + f = open_binary(f"{get_procfs_path()}/zoneinfo") except OSError: return fallback # kernel 2.6.13 @@ -360,7 +359,7 @@ def virtual_memory(): """ missing_fields = [] mems = {} - with open_binary('%s/meminfo' % get_procfs_path()) as f: + with open_binary(f"{get_procfs_path()}/meminfo") as f: for line in f: fields = line.split() mems[fields[0]] = int(fields[1]) * 1024 @@ -486,7 +485,7 @@ def virtual_memory(): def swap_memory(): """Return swap memory metrics.""" mems = {} - with open_binary('%s/meminfo' % get_procfs_path()) as f: + with open_binary(f"{get_procfs_path()}/meminfo") as f: for line in f: fields = line.split() mems[fields[0]] = int(fields[1]) * 1024 @@ -506,12 +505,12 @@ def swap_memory(): percent = usage_percent(used, total, round_=1) # get pgin/pgouts try: - f = open_binary("%s/vmstat" % get_procfs_path()) + f = open_binary(f"{get_procfs_path()}/vmstat") except OSError as err: # see https://github.com/giampaolo/psutil/issues/722 msg = ( "'sin' and 'sout' swap memory stats couldn't " - + "be determined and were set to 0 (%s)" % str(err) + f"be determined and were set to 0 ({err})" ) warnings.warn(msg, RuntimeWarning, stacklevel=2) sin = sout = 0 @@ -552,7 +551,7 @@ def cpu_times(): """ procfs_path = get_procfs_path() set_scputimes_ntuple(procfs_path) - with open_binary('%s/stat' % procfs_path) as f: + with open_binary(f"{procfs_path}/stat") as f: values = f.readline().split() fields = values[1 : len(scputimes._fields) + 1] fields = [float(x) / CLOCK_TICKS for x in fields] @@ -566,7 +565,7 @@ def per_cpu_times(): procfs_path = get_procfs_path() set_scputimes_ntuple(procfs_path) cpus = [] - with open_binary('%s/stat' % procfs_path) as f: + with open_binary(f"{procfs_path}/stat") as f: # get rid of the first line which refers to system wide CPU stats f.readline() for line in f: @@ -586,7 +585,7 @@ def cpu_count_logical(): except ValueError: # as a second fallback we try to parse /proc/cpuinfo num = 0 - with open_binary('%s/cpuinfo' % get_procfs_path()) as f: + with open_binary(f"{get_procfs_path()}/cpuinfo") as f: for line in f: if line.lower().startswith(b'processor'): num += 1 @@ -596,7 +595,7 @@ def cpu_count_logical(): # try to parse /proc/stat as a last resort if num == 0: search = re.compile(r'cpu\d') - with open_text('%s/stat' % get_procfs_path()) as f: + with open_text(f"{get_procfs_path()}/stat") as f: for line in f: line = line.split(' ')[0] if search.match(line): @@ -629,7 +628,7 @@ def cpu_count_cores(): # Method #2 mapping = {} current_info = {} - with open_binary('%s/cpuinfo' % get_procfs_path()) as f: + with open_binary(f"{get_procfs_path()}/cpuinfo") as f: for line in f: line = line.strip().lower() if not line: @@ -652,7 +651,7 @@ def cpu_count_cores(): def cpu_stats(): """Return various CPU stats as a named tuple.""" - with open_binary('%s/stat' % get_procfs_path()) as f: + with open_binary(f"{get_procfs_path()}/stat") as f: ctx_switches = None interrupts = None soft_interrupts = None @@ -678,7 +677,7 @@ def cpu_stats(): def _cpu_get_cpuinfo_freq(): """Return current CPU frequency from cpuinfo if available.""" ret = [] - with open_binary('%s/cpuinfo' % get_procfs_path()) as f: + with open_binary(f"{get_procfs_path()}/cpuinfo") as f: for line in f: if line.lower().startswith(b'cpu mhz'): ret.append(float(line.split(b':', 1)[1])) @@ -784,9 +783,9 @@ def __init__(self): def get_proc_inodes(self, pid): inodes = defaultdict(list) - for fd in os.listdir("%s/%s/fd" % (self._procfs_path, pid)): + for fd in os.listdir(f"{self._procfs_path}/{pid}/fd"): try: - inode = readlink("%s/%s/fd/%s" % (self._procfs_path, pid, fd)) + inode = readlink(f"{self._procfs_path}/{pid}/fd/{fd}") except (FileNotFoundError, ProcessLookupError): # ENOENT == file which is gone in the meantime; # os.stat('/proc/%s' % self.pid) will be done later @@ -890,10 +889,11 @@ def process_inet(file, family, type_, inodes, filter_pid=None): line.split()[:10] ) except ValueError: - raise RuntimeError( - "error while parsing %s; malformed line %s %r" - % (file, lineno, line) + msg = ( + f"error while parsing {file}; malformed line" + f" {lineno} {line!r}" ) + raise RuntimeError(msg) if inode in inodes: # # We assume inet sockets are unique, so we error # # out if there are multiple references to the @@ -931,10 +931,10 @@ def process_unix(file, family, inodes, filter_pid=None): if ' ' not in line: # see: https://github.com/giampaolo/psutil/issues/766 continue - raise RuntimeError( - "error while parsing %s; malformed line %r" - % (file, line) + msg = ( + f"error while parsing {file}; malformed line {line!r}" ) + raise RuntimeError(msg) if inode in inodes: # noqa # With UNIX sockets we can have a single inode # referencing many file descriptors. @@ -965,7 +965,7 @@ def retrieve(self, kind, pid=None): inodes = self.get_all_inodes() ret = set() for proto_name, family, type_ in self.tmap[kind]: - path = "%s/net/%s" % (self._procfs_path, proto_name) + path = f"{self._procfs_path}/net/{proto_name}" if family in {socket.AF_INET, socket.AF_INET6}: ls = self.process_inet( path, family, type_, inodes, filter_pid=pid @@ -997,7 +997,7 @@ def net_io_counters(): """Return network I/O statistics for every network interface installed on the system as a dict of raw tuples. """ - with open_text("%s/net/dev" % get_procfs_path()) as f: + with open_text(f"{get_procfs_path()}/net/dev") as f: lines = f.readlines() retdict = {} for line in lines[2:]: @@ -1099,7 +1099,7 @@ def read_procfs(): # See: # https://www.kernel.org/doc/Documentation/iostats.txt # https://www.kernel.org/doc/Documentation/ABI/testing/procfs-diskstats - with open_text("%s/diskstats" % get_procfs_path()) as f: + with open_text(f"{get_procfs_path()}/diskstats") as f: lines = f.readlines() for line in lines: fields = line.split() @@ -1122,7 +1122,8 @@ def read_procfs(): reads, rbytes, writes, wbytes = map(int, fields[3:]) rtime = wtime = reads_merged = writes_merged = busy_time = 0 else: - raise ValueError("not sure how to interpret line %r" % line) + msg = f"not sure how to interpret line {line!r}" + raise ValueError(msg) yield (name, reads, writes, rbytes, wbytes, rtime, wtime, reads_merged, writes_merged, busy_time) # fmt: on @@ -1142,16 +1143,16 @@ def read_sysfs(): wtime, reads_merged, writes_merged, busy_time) # fmt: on - if os.path.exists('%s/diskstats' % get_procfs_path()): + if os.path.exists(f"{get_procfs_path()}/diskstats"): gen = read_procfs() elif os.path.exists('/sys/block'): gen = read_sysfs() else: - raise NotImplementedError( - "%s/diskstats nor /sys/block filesystem are available on this " - "system" - % get_procfs_path() + msg = ( + f"{get_procfs_path()}/diskstats nor /sys/block are available on" + " this system" ) + raise NotImplementedError(msg) retdict = {} for entry in gen: @@ -1197,7 +1198,7 @@ def __init__(self): self.minor = os.minor(dev) def ask_proc_partitions(self): - with open_text("%s/partitions" % get_procfs_path()) as f: + with open_text(f"{get_procfs_path()}/partitions") as f: for line in f.readlines()[2:]: fields = line.split() if len(fields) < 4: # just for extra safety @@ -1207,19 +1208,19 @@ def ask_proc_partitions(self): name = fields[3] if major == self.major and minor == self.minor: if name: # just for extra safety - return "/dev/%s" % name + return f"/dev/{name}" def ask_sys_dev_block(self): - path = "/sys/dev/block/%s:%s/uevent" % (self.major, self.minor) + path = f"/sys/dev/block/{self.major}:{self.minor}/uevent" with open_text(path) as f: for line in f: if line.startswith("DEVNAME="): name = line.strip().rpartition("DEVNAME=")[2] if name: # just for extra safety - return "/dev/%s" % name + return f"/dev/{name}" def ask_sys_class_block(self): - needle = "%s:%s" % (self.major, self.minor) + needle = f"{self.major}:{self.minor}" files = glob.iglob("/sys/class/block/*/dev") for file in files: try: @@ -1231,7 +1232,7 @@ def ask_sys_class_block(self): data = f.read().strip() if data == needle: name = os.path.basename(os.path.dirname(file)) - return "/dev/%s" % name + return f"/dev/{name}" def find(self): path = None @@ -1261,7 +1262,7 @@ def disk_partitions(all=False): fstypes = set() procfs_path = get_procfs_path() if not all: - with open_text("%s/filesystems" % procfs_path) as f: + with open_text(f"{procfs_path}/filesystems") as f: for line in f: line = line.strip() if not line.startswith("nodev"): @@ -1276,7 +1277,7 @@ def disk_partitions(all=False): if procfs_path == "/proc" and os.path.isfile('/etc/mtab'): mounts_path = os.path.realpath("/etc/mtab") else: - mounts_path = os.path.realpath("%s/self/mounts" % procfs_path) + mounts_path = os.path.realpath(f"{procfs_path}/self/mounts") retlist = [] partitions = cext.disk_partitions(mounts_path) @@ -1558,14 +1559,15 @@ def users(): def boot_time(): """Return the system boot time expressed in seconds since the epoch.""" global BOOT_TIME - path = '%s/stat' % get_procfs_path() + path = f"{get_procfs_path()}/stat" with open_binary(path) as f: for line in f: if line.startswith(b'btime'): ret = float(line.strip().split()[1]) BOOT_TIME = ret return ret - raise RuntimeError("line 'btime' not found in %s" % path) + msg = f"line 'btime' not found in {path}" + raise RuntimeError(msg) # ===================================================================== @@ -1598,7 +1600,7 @@ def pid_exists(pid): # Note: already checked that this is faster than using a # regular expr. Also (a lot) faster than doing # 'return pid in pids()' - path = "%s/%s/status" % (get_procfs_path(), pid) + path = f"{get_procfs_path()}/{pid}/status" with open_binary(path) as f: for line in f: if line.startswith(b"Tgid:"): @@ -1606,7 +1608,8 @@ def pid_exists(pid): # If tgid and pid are the same then we're # dealing with a process PID. return tgid == pid - raise ValueError("'Tgid' line not found in %s" % path) + msg = f"'Tgid' line not found in {path}" + raise ValueError(msg) except (OSError, ValueError): return pid in pids() @@ -1619,7 +1622,7 @@ def ppid_map(): procfs_path = get_procfs_path() for pid in pids(): try: - with open_binary("%s/%s/stat" % (procfs_path, pid)) as f: + with open_binary(f"{procfs_path}/{pid}/stat") as f: data = f.read() except (FileNotFoundError, ProcessLookupError): # Note: we should be able to access /stat for all processes @@ -1652,9 +1655,7 @@ def wrapper(self, *args, **kwargs): # /proc/PID directory may still exist, but the files within # it may not, indicating the process is gone, see: # https://github.com/giampaolo/psutil/issues/2418 - if not os.path.exists( - "%s/%s/stat" % (self._procfs_path, self.pid) - ): + if not os.path.exists(f"{self._procfs_path}/{self.pid}/stat"): raise NoSuchProcess(self.pid, self._name) raise @@ -1680,7 +1681,7 @@ def _is_zombie(self): # it's empty. Instead of returning a "null" value we'll raise an # exception. try: - data = bcat("%s/%s/stat" % (self._procfs_path, self.pid)) + data = bcat(f"{self._procfs_path}/{self.pid}/stat") except OSError: return False else: @@ -1696,7 +1697,7 @@ def _raise_if_not_alive(self): """Raise NSP if the process disappeared on us.""" # For those C function who do not raise NSP, possibly returning # incorrect or incomplete result. - os.stat('%s/%s' % (self._procfs_path, self.pid)) + os.stat(f"{self._procfs_path}/{self.pid}") @wrap_exceptions @memoize_when_activated @@ -1709,7 +1710,7 @@ def _parse_stat_file(self): The return value is cached in case oneshot() ctx manager is in use. """ - data = bcat("%s/%s/stat" % (self._procfs_path, self.pid)) + data = bcat(f"{self._procfs_path}/{self.pid}/stat") # Process name is between parentheses. It can contain spaces and # other parentheses. This is taken into account by looking for # the first occurrence of "(" and the last occurrence of ")". @@ -1744,13 +1745,13 @@ def _read_status_file(self): The return value is cached in case oneshot() ctx manager is in use. """ - with open_binary("%s/%s/status" % (self._procfs_path, self.pid)) as f: + with open_binary(f"{self._procfs_path}/{self.pid}/status") as f: return f.read() @wrap_exceptions @memoize_when_activated def _read_smaps_file(self): - with open_binary("%s/%s/smaps" % (self._procfs_path, self.pid)) as f: + with open_binary(f"{self._procfs_path}/{self.pid}/smaps") as f: return f.read().strip() def oneshot_enter(self): @@ -1771,19 +1772,19 @@ def name(self): @wrap_exceptions def exe(self): try: - return readlink("%s/%s/exe" % (self._procfs_path, self.pid)) + return readlink(f"{self._procfs_path}/{self.pid}/exe") except (FileNotFoundError, ProcessLookupError): self._raise_if_zombie() # no such file error; might be raised also if the # path actually exists for system processes with # low pids (about 0-20) - if os.path.lexists("%s/%s" % (self._procfs_path, self.pid)): + if os.path.lexists(f"{self._procfs_path}/{self.pid}"): return "" raise @wrap_exceptions def cmdline(self): - with open_text("%s/%s/cmdline" % (self._procfs_path, self.pid)) as f: + with open_text(f"{self._procfs_path}/{self.pid}/cmdline") as f: data = f.read() if not data: # may happen in case of zombie process @@ -1809,7 +1810,7 @@ def cmdline(self): @wrap_exceptions def environ(self): - with open_text("%s/%s/environ" % (self._procfs_path, self.pid)) as f: + with open_text(f"{self._procfs_path}/{self.pid}/environ") as f: data = f.read() return parse_environ_block(data) @@ -1823,11 +1824,11 @@ def terminal(self): return None # May not be available on old kernels. - if os.path.exists('/proc/%s/io' % os.getpid()): + if os.path.exists(f"/proc/{os.getpid()}/io"): @wrap_exceptions def io_counters(self): - fname = "%s/%s/io" % (self._procfs_path, self.pid) + fname = f"{self._procfs_path}/{self.pid}/io" fields = {} with open_binary(fname) as f: for line in f: @@ -1842,7 +1843,8 @@ def io_counters(self): else: fields[name] = int(value) if not fields: - raise RuntimeError("%s file was empty" % fname) + msg = f"{fname} file was empty" + raise RuntimeError(msg) try: return pio( fields[b'syscr'], # read syscalls @@ -1853,10 +1855,11 @@ def io_counters(self): fields[b'wchar'], # write chars ) except KeyError as err: - raise ValueError( - "%r field was not found in %s; found fields are %r" - % (err.args[0], fname, fields) + msg = ( + f"{err.args[0]!r} field was not found in {fname}; found" + f" fields are {fields!r}" ) + raise ValueError(msg) @wrap_exceptions def cpu_times(self): @@ -1901,7 +1904,7 @@ def memory_info(self): # | data | data + stack | drs | DATA | # | dirty | dirty pages (unused in Linux 2.6) | dt | | # ============================================================ - with open_binary("%s/%s/statm" % (self._procfs_path, self.pid)) as f: + with open_binary(f"{self._procfs_path}/{self.pid}/statm") as f: vms, rss, shared, text, lib, data, dirty = ( int(x) * PAGESIZE for x in f.readline().split()[:7] ) @@ -2004,10 +2007,11 @@ def get_blocks(lines, current_block): # see issue #369 continue else: - raise ValueError( - "don't know how to interpret line %r" - % line + msg = ( + "don't know how to interpret line" + f" {line!r}" ) + raise ValueError(msg) yield (current_block.pop(), data) data = self._read_smaps_file() @@ -2055,7 +2059,7 @@ def get_blocks(lines, current_block): @wrap_exceptions def cwd(self): - return readlink("%s/%s/cwd" % (self._procfs_path, self.pid)) + return readlink(f"{self._procfs_path}/{self.pid}/cwd") @wrap_exceptions def num_ctx_switches( @@ -2064,11 +2068,13 @@ def num_ctx_switches( data = self._read_status_file() ctxsw = _ctxsw_re.findall(data) if not ctxsw: - raise NotImplementedError( - "'voluntary_ctxt_switches' and 'nonvoluntary_ctxt_switches'" - "lines were not found in %s/%s/status; the kernel is " - "probably older than 2.6.23" % (self._procfs_path, self.pid) + msg = ( + "'voluntary_ctxt_switches' and" + " 'nonvoluntary_ctxt_switches'lines were not found in" + f" {self._procfs_path}/{self.pid}/status; the kernel is" + " probably older than 2.6.23" ) + raise NotImplementedError(msg) else: return _common.pctxsw(int(ctxsw[0]), int(ctxsw[1])) @@ -2080,16 +2086,12 @@ def num_threads(self, _num_threads_re=re.compile(br'Threads:\t(\d+)')): @wrap_exceptions def threads(self): - thread_ids = os.listdir("%s/%s/task" % (self._procfs_path, self.pid)) + thread_ids = os.listdir(f"{self._procfs_path}/{self.pid}/task") thread_ids.sort() retlist = [] hit_enoent = False for thread_id in thread_ids: - fname = "%s/%s/task/%s/stat" % ( - self._procfs_path, - self.pid, - thread_id, - ) + fname = f"{self._procfs_path}/{self.pid}/task/{thread_id}/stat" try: with open_binary(fname) as f: st = f.read().strip() @@ -2150,15 +2152,17 @@ def cpu_affinity_set(self, cpus): all_cpus = tuple(range(len(per_cpu_times()))) for cpu in cpus: if cpu not in all_cpus: - raise ValueError( - "invalid CPU number %r; choose between %s" - % (cpu, eligible_cpus) + msg = ( + f"invalid CPU {cpu!r}; choose between" + f" {eligible_cpus!r}" ) + raise ValueError(msg) if cpu not in eligible_cpus: - raise ValueError( - "CPU number %r is not eligible; choose " - "between %s" % (cpu, eligible_cpus) + msg = ( + f"CPU number {cpu} is not eligible; choose" + f" between {eligible_cpus}" ) + raise ValueError(msg) raise # only starting from kernel 2.6.13 @@ -2178,7 +2182,8 @@ def ionice_set(self, ioclass, value): IOPriority.IOPRIO_CLASS_IDLE, IOPriority.IOPRIO_CLASS_NONE, }: - raise ValueError("%r ioclass accepts no value" % ioclass) + msg = f"{ioclass!r} ioclass accepts no value" + raise ValueError(msg) if value < 0 or value > 7: msg = "value not in 0-7 range" raise ValueError(msg) @@ -2203,7 +2208,7 @@ def rlimit(self, resource_, limits=None): if len(limits) != 2: msg = ( "second argument must be a (soft, hard) " - + "tuple, got %s" % repr(limits) + f"tuple, got {limits!r}" ) raise ValueError(msg) resource.prlimit(self.pid, resource_, limits) @@ -2224,10 +2229,10 @@ def status(self): @wrap_exceptions def open_files(self): retlist = [] - files = os.listdir("%s/%s/fd" % (self._procfs_path, self.pid)) + files = os.listdir(f"{self._procfs_path}/{self.pid}/fd") hit_enoent = False for fd in files: - file = "%s/%s/fd/%s" % (self._procfs_path, self.pid, fd) + file = f"{self._procfs_path}/{self.pid}/fd/{fd}" try: path = readlink(file) except (FileNotFoundError, ProcessLookupError): @@ -2250,11 +2255,7 @@ def open_files(self): # absolute path though. if path.startswith('/') and isfile_strict(path): # Get file position and flags. - file = "%s/%s/fdinfo/%s" % ( - self._procfs_path, - self.pid, - fd, - ) + file = f"{self._procfs_path}/{self.pid}/fdinfo/{fd}" try: with open_binary(file) as f: pos = int(f.readline().split()[1]) @@ -2281,7 +2282,7 @@ def net_connections(self, kind='inet'): @wrap_exceptions def num_fds(self): - return len(os.listdir("%s/%s/fd" % (self._procfs_path, self.pid))) + return len(os.listdir(f"{self._procfs_path}/{self.pid}/fd")) @wrap_exceptions def ppid(self): diff --git a/psutil/_psposix.py b/psutil/_psposix.py index e07481986..fdfce147f 100644 --- a/psutil/_psposix.py +++ b/psutil/_psposix.py @@ -151,7 +151,8 @@ def sleep(interval): # continue else: # Should never happen. - raise ValueError("unknown process exit status %r" % status) + msg = f"unknown process exit status {status!r}" + raise ValueError(msg) def disk_usage(path): diff --git a/psutil/_pssunos.py b/psutil/_pssunos.py index b741792f3..ac3345b7a 100644 --- a/psutil/_pssunos.py +++ b/psutil/_pssunos.py @@ -143,7 +143,7 @@ def swap_memory(): p = subprocess.Popen( [ '/usr/bin/env', - 'PATH=/usr/sbin:/sbin:%s' % os.environ['PATH'], + f"PATH=/usr/sbin:/sbin:{os.environ['PATH']}", 'swap', '-l', ], @@ -152,7 +152,8 @@ def swap_memory(): stdout, _ = p.communicate() stdout = stdout.decode(sys.stdout.encoding) if p.returncode != 0: - raise RuntimeError("'swap -l' failed (retcode=%s)" % p.returncode) + msg = f"'swap -l' failed (retcode={p.returncode})" + raise RuntimeError(msg) lines = stdout.strip().split('\n')[1:] if not lines: @@ -239,7 +240,7 @@ def disk_partitions(all=False): continue except OSError as err: # https://github.com/giampaolo/psutil/issues/1674 - debug("skipping %r: %s" % (mountpoint, err)) + debug(f"skipping {mountpoint!r}: {err}") continue ntuple = _common.sdiskpart(device, mountpoint, fstype, opts) retlist.append(ntuple) @@ -387,7 +388,7 @@ def _assert_alive(self): """Raise NSP if the process disappeared on us.""" # For those C function who do not raise NSP, possibly returning # incorrect or incomplete result. - os.stat('%s/%s' % (self._procfs_path, self.pid)) + os.stat(f"{self._procfs_path}/{self.pid}") def oneshot_enter(self): self._proc_name_and_args.cache_activate(self) @@ -408,7 +409,7 @@ def _proc_name_and_args(self): @memoize_when_activated def _proc_basic_info(self): if self.pid == 0 and not os.path.exists( - '%s/%s/psinfo' % (self._procfs_path, self.pid) + f"{self._procfs_path}/{self.pid}/psinfo" ): raise AccessDenied(self.pid) ret = cext.proc_basic_info(self.pid, self._procfs_path) @@ -428,9 +429,7 @@ def name(self): @wrap_exceptions def exe(self): try: - return os.readlink( - "%s/%s/path/a.out" % (self._procfs_path, self.pid) - ) + return os.readlink(f"{self._procfs_path}/{self.pid}/path/a.out") except OSError: pass # continue and guess the exe name from the cmdline # Will be guessed later from cmdline but we want to explicitly @@ -527,9 +526,7 @@ def terminal(self): if tty != cext.PRNODEV: for x in (0, 1, 2, 255): try: - return os.readlink( - '%s/%d/path/%d' % (procfs_path, self.pid, x) - ) + return os.readlink(f"{procfs_path}/{self.pid}/path/{x}") except FileNotFoundError: hit_enoent = True continue @@ -544,9 +541,9 @@ def cwd(self): # Reference: http://goo.gl/55XgO procfs_path = self._procfs_path try: - return os.readlink("%s/%s/path/cwd" % (procfs_path, self.pid)) + return os.readlink(f"{procfs_path}/{self.pid}/path/cwd") except FileNotFoundError: - os.stat("%s/%s" % (procfs_path, self.pid)) # raise NSP or AD + os.stat(f"{procfs_path}/{self.pid}") # raise NSP or AD return "" @wrap_exceptions @@ -568,7 +565,7 @@ def status(self): def threads(self): procfs_path = self._procfs_path ret = [] - tids = os.listdir('%s/%d/lwp' % (procfs_path, self.pid)) + tids = os.listdir(f"{procfs_path}/{self.pid}/lwp") hit_enoent = False for tid in tids: tid = int(tid) @@ -603,8 +600,8 @@ def open_files(self): retlist = [] hit_enoent = False procfs_path = self._procfs_path - pathdir = '%s/%d/path' % (procfs_path, self.pid) - for fd in os.listdir('%s/%d/fd' % (procfs_path, self.pid)): + pathdir = f"{procfs_path}/{self.pid}/path" + for fd in os.listdir(f"{procfs_path}/{self.pid}/fd"): path = os.path.join(pathdir, fd) if os.path.islink(path): try: @@ -636,7 +633,8 @@ def _get_unix_sockets(self, pid): raise AccessDenied(self.pid, self._name) if 'no such process' in stderr.lower(): raise NoSuchProcess(self.pid, self._name) - raise RuntimeError("%r command error\n%s" % (cmd, stderr)) + msg = f"{cmd!r} command error\n{stderr}" + raise RuntimeError(msg) lines = stdout.split('\n')[2:] for i, line in enumerate(lines): @@ -662,7 +660,7 @@ def net_connections(self, kind='inet'): # is no longer there. if not ret: # will raise NSP if process is gone - os.stat('%s/%s' % (self._procfs_path, self.pid)) + os.stat(f"{self._procfs_path}/{self.pid}") # UNIX sockets if kind in {'all', 'unix'}: @@ -678,9 +676,8 @@ def net_connections(self, kind='inet'): @wrap_exceptions def memory_maps(self): def toaddr(start, end): - return '%s-%s' % ( - hex(start)[2:].strip('L'), - hex(end)[2:].strip('L'), + return "{}-{}".format( + hex(start)[2:].strip('L'), hex(end)[2:].strip('L') ) procfs_path = self._procfs_path @@ -705,9 +702,7 @@ def toaddr(start, end): addr = toaddr(addr, addrsize) if not name.startswith('['): try: - name = os.readlink( - '%s/%s/path/%s' % (procfs_path, self.pid, name) - ) + name = os.readlink(f"{procfs_path}/{self.pid}/path/{name}") except OSError as err: if err.errno == errno.ENOENT: # sometimes the link may not be resolved by @@ -716,7 +711,7 @@ def toaddr(start, end): # unresolved link path. # This seems an inconsistency with /proc similar # to: http://goo.gl/55XgO - name = '%s/%s/path/%s' % (procfs_path, self.pid, name) + name = f"{procfs_path}/{self.pid}/path/{name}" hit_enoent = True else: raise @@ -727,7 +722,7 @@ def toaddr(start, end): @wrap_exceptions def num_fds(self): - return len(os.listdir("%s/%s/fd" % (self._procfs_path, self.pid))) + return len(os.listdir(f"{self._procfs_path}/{self.pid}/fd")) @wrap_exceptions def num_ctx_switches(self): diff --git a/psutil/_pswindows.py b/psutil/_pswindows.py index e3c7db3ca..7a6558fe1 100644 --- a/psutil/_pswindows.py +++ b/psutil/_pswindows.py @@ -476,14 +476,11 @@ def __init__(self, name, display_name): self._display_name = display_name def __str__(self): - details = "(name=%r, display_name=%r)" % ( - self._name, - self._display_name, - ) - return "%s%s" % (self.__class__.__name__, details) + details = f"(name={self._name!r}, display_name={self._display_name!r})" + return f"{self.__class__.__name__}{details}" def __repr__(self): - return "<%s at %s>" % (self.__str__(), id(self)) + return f"<{self.__str__()} at {id(self)}>" def __eq__(self, other): # Test for equality with another WindosService object based @@ -525,15 +522,15 @@ def _wrap_exceptions(self): except OSError as err: if is_permission_err(err): msg = ( - "service %r is not querable (not enough privileges)" - % self._name + f"service {self._name!r} is not querable (not enough" + " privileges)" ) raise AccessDenied(pid=None, name=self._name, msg=msg) elif err.winerror in { cext.ERROR_INVALID_NAME, cext.ERROR_SERVICE_DOES_NOT_EXIST, }: - msg = "service %r does not exist" % self._name + msg = f"service {self._name!r} does not exist" raise NoSuchProcess(pid=None, name=self._name, msg=msg) else: raise @@ -759,7 +756,7 @@ def exe(self): # 24 = ERROR_TOO_MANY_OPEN_FILES. Not sure why this happens # (perhaps PyPy's JIT delaying garbage collection of files?). if err.errno == 24: - debug("%r translated into AccessDenied" % err) + debug(f"{err!r} translated into AccessDenied") raise AccessDenied(self.pid, self._name) raise else: @@ -1024,7 +1021,8 @@ def ionice_set(self, ioclass, value): IOPriority.IOPRIO_NORMAL, IOPriority.IOPRIO_HIGH, }: - raise ValueError("%s is not a valid priority" % ioclass) + msg = f"{ioclass} is not a valid priority" + raise ValueError(msg) cext.proc_io_priority_set(self.pid, ioclass) @wrap_exceptions @@ -1066,7 +1064,8 @@ def from_bitmask(x): def cpu_affinity_set(self, value): def to_bitmask(ls): if not ls: - raise ValueError("invalid argument %r" % ls) + msg = f"invalid argument {ls!r}" + raise ValueError(msg) out = 0 for b in ls: out |= 2**b @@ -1079,11 +1078,11 @@ def to_bitmask(ls): for cpu in value: if cpu not in allcpus: if not isinstance(cpu, int): - raise TypeError( - "invalid CPU %r; an integer is required" % cpu - ) + msg = f"invalid CPU {cpu!r}; an integer is required" + raise TypeError(msg) else: - raise ValueError("invalid CPU %r" % cpu) + msg = f"invalid CPU {cpu!r}" + raise ValueError(msg) bitmask = to_bitmask(value) cext.proc_cpu_affinity_set(self.pid, bitmask) diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py index 13fed1778..c852ead0a 100644 --- a/psutil/tests/__init__.py +++ b/psutil/tests/__init__.py @@ -174,9 +174,9 @@ def macos_version(): # Disambiguate TESTFN for parallel testing. if os.name == 'java': # Jython disallows @ in module names - TESTFN_PREFIX = '$psutil-%s-' % os.getpid() + TESTFN_PREFIX = f"$psutil-{os.getpid()}-" else: - TESTFN_PREFIX = '@psutil-%s-' % os.getpid() + TESTFN_PREFIX = f"@psutil-{os.getpid()}-" UNICODE_SUFFIX = "-ƒőő" # An invalid unicode string. INVALID_UNICODE_SUFFIX = b"f\xc0\x80".decode('utf8', 'surrogateescape') @@ -289,7 +289,7 @@ def __init__(self): def __repr__(self): name = self.__class__.__name__ - return '<%s running=%s at %#x>' % (name, self._running, id(self)) + return f"<{name} running={self._running} at {id(self):#x}>" def __enter__(self): self.start() @@ -364,8 +364,8 @@ def spawn_testproc(cmd=None, **kwds): safe_rmpath(testfn) pyline = ( "import time;" - + "open(r'%s', 'w').close();" % testfn - + "[time.sleep(0.1) for x in range(100)];" # 10 secs + f"open(r'{testfn}', 'w').close();" + "[time.sleep(0.1) for x in range(100)];" # 10 secs ) cmd = [PYTHON_EXE, "-c", pyline] sproc = subprocess.Popen(cmd, **kwds) @@ -391,16 +391,16 @@ def spawn_children_pair(): tfile = None testfn = get_testfn(dir=os.getcwd()) try: - s = textwrap.dedent("""\ + s = textwrap.dedent(f"""\ import subprocess, os, sys, time s = "import os, time;" - s += "f = open('%s', 'w');" + s += "f = open('{os.path.basename(testfn)}', 'w');" s += "f.write(str(os.getpid()));" s += "f.close();" s += "[time.sleep(0.1) for x in range(100 * 6)];" - p = subprocess.Popen([r'%s', '-c', s]) + p = subprocess.Popen([r'{PYTHON_EXE}', '-c', s]) p.wait() - """ % (os.path.basename(testfn), PYTHON_EXE)) + """) # On Windows if we create a subprocess with CREATE_NO_WINDOW flag # set (which is the default) a "conhost.exe" extra process will be # spawned as a child. We don't want that. @@ -426,7 +426,7 @@ def spawn_zombie(): """ assert psutil.POSIX unix_file = get_testfn() - src = textwrap.dedent("""\ + src = textwrap.dedent(f"""\ import os, sys, time, socket, contextlib child_pid = os.fork() if child_pid > 0: @@ -434,10 +434,10 @@ def spawn_zombie(): else: # this is the zombie process with socket.socket(socket.AF_UNIX) as s: - s.connect('%s') + s.connect('{unix_file}') pid = bytes(str(os.getpid()), 'ascii') s.sendall(pid) - """ % unix_file) + """) tfile = None sock = bind_unix_socket(unix_file) try: @@ -581,7 +581,7 @@ def flush_popen(proc): elif isinstance(p, subprocess.Popen): return term_subprocess_proc(p, wait_timeout) else: - raise TypeError("wrong type %r" % p) + raise TypeError(f"wrong type {p!r}") finally: if isinstance(p, (subprocess.Popen, psutil.Popen)): flush_popen(p) @@ -617,7 +617,7 @@ def reap_children(recursive=False): terminate(p, wait_timeout=None) _, alive = psutil.wait_procs(children, timeout=GLOBAL_TIMEOUT) for p in alive: - warn("couldn't terminate process %r; attempting kill()" % p) + warn(f"couldn't terminate process {p!r}; attempting kill()") terminate(p, sig=signal.SIGKILL) @@ -638,7 +638,7 @@ def kernel_version(): else: break if not s: - raise ValueError("can't parse %r" % uname) + raise ValueError(f"can't parse {uname!r}") minor = 0 micro = 0 nums = s.split('.') @@ -786,7 +786,7 @@ def retry_fun(fun): pass except OSError as _: err = _ - warn("ignoring %s" % (str(err))) + warn(f"ignoring {err}") time.sleep(0.01) raise err @@ -919,11 +919,11 @@ def context(exc, match=None): yield einfo except exc as err: if match and not re.search(match, str(err)): - msg = f'"{match}" does not match "{str(err)}"' + msg = f'"{match}" does not match "{err}"' raise AssertionError(msg) einfo._exc = err else: - raise AssertionError("%r not raised" % exc) + raise AssertionError(f"{exc!r} not raised") return context(exc, match=match) @@ -1031,9 +1031,9 @@ def assertProcessGone(self, proc): except psutil.NoSuchProcess as exc: self._check_proc_exc(proc, exc) else: - msg = "Process.%s() didn't raise NSP and returned %r" % ( - name, - ret, + msg = ( + f"Process.{name}() didn't raise NSP and returned" + f" {ret!r}" ) raise AssertionError(msg) proc.wait(timeout=0) # assert not raise TimeoutExpired @@ -1179,15 +1179,16 @@ def _check_fds(self, fun): after = self._get_num_fds() diff = after - before if diff < 0: - raise self.fail( - "negative diff %r (gc probably collected a " - "resource from a previous test)" % diff + msg = ( + f"negative diff {diff!r} (gc probably collected a" + " resource from a previous test)" ) + raise self.fail(msg) if diff > 0: type_ = "fd" if POSIX else "handle" if diff > 1: type_ += "s" - msg = "%s unclosed %s after calling %r" % (diff, type_, fun) + msg = f"{diff} unclosed {type_} after calling {fun!r}" raise self.fail(msg) def _call_ntimes(self, fun, times): @@ -1291,13 +1292,13 @@ def print_sysinfo(): if psutil.LINUX and shutil.which("lsb_release"): info['OS'] = sh('lsb_release -d -s') elif psutil.OSX: - info['OS'] = 'Darwin %s' % platform.mac_ver()[0] + info['OS'] = f"Darwin {platform.mac_ver()[0]}" elif psutil.WINDOWS: info['OS'] = "Windows " + ' '.join(map(str, platform.win32_ver())) if hasattr(platform, 'win32_edition'): info['OS'] += ", " + platform.win32_edition() else: - info['OS'] = "%s %s" % (platform.system(), platform.version()) + info['OS'] = f"{platform.system()} {platform.version()}" info['arch'] = ', '.join( list(platform.architecture()) + [platform.machine()] ) @@ -1312,7 +1313,7 @@ def print_sysinfo(): ]) info['pip'] = getattr(pip, '__version__', 'not installed') if wheel is not None: - info['pip'] += " (wheel=%s)" % wheel.__version__ + info['pip'] += f" (wheel={wheel.__version__})" # UNIX if psutil.POSIX: @@ -1328,7 +1329,7 @@ def print_sysinfo(): # system info['fs-encoding'] = sys.getfilesystemencoding() lang = locale.getlocale() - info['lang'] = '%s, %s' % (lang[0], lang[1]) + info['lang'] = f"{lang[0]}, {lang[1]}" info['boot-time'] = datetime.datetime.fromtimestamp( psutil.boot_time() ).strftime("%Y-%m-%d %H:%M:%S") @@ -1526,9 +1527,9 @@ def test_class_coverage(cls, test_class, ls): for fun_name, _, _ in ls: meth_name = 'test_' + fun_name if not hasattr(test_class, meth_name): - msg = "%r class should define a '%s' method" % ( - test_class.__class__.__name__, - meth_name, + msg = ( + f"{test_class.__class__.__name__!r} class should define a" + f" {meth_name!r} method" ) raise AttributeError(msg) @@ -1539,7 +1540,7 @@ def test(cls): klass = {x for x in dir(psutil.Process) if x[0] != '_'} leftout = (this | ignored) ^ klass if leftout: - raise ValueError("uncovered Process class names: %r" % leftout) + raise ValueError(f"uncovered Process class names: {leftout!r}") class system_namespace: @@ -1618,7 +1619,7 @@ def retry_on_failure(retries=NO_RETRIES): """ def logfun(exc): - print("%r, retrying" % exc, file=sys.stderr) # NOQA + print(f"{exc!r}, retrying", file=sys.stderr) # NOQA return retry( exception=AssertionError, timeout=None, retries=retries, logfun=logfun @@ -1657,8 +1658,8 @@ def wrapper(*args, **kwargs): if not only_if: raise msg = ( - "%r was skipped because it raised NotImplementedError" - % fun.__name__ + f"{fun.__name__!r} was skipped because it raised" + " NotImplementedError" ) raise pytest.skip(msg) @@ -1802,7 +1803,7 @@ def check_net_address(addr, family): elif family == psutil.AF_LINK: assert re.match(r'([a-fA-F0-9]{2}[:|\-]?){6}', addr) is not None, addr else: - raise ValueError("unknown family %r" % family) + raise ValueError(f"unknown family {family!r}") def check_connection_ntuple(conn): @@ -1888,7 +1889,7 @@ def filter_proc_net_connections(cons): for conn in cons: if POSIX and conn.family == socket.AF_UNIX: if MACOS and "/syslog" in conn.raddr: - debug("skipping %s" % str(conn)) + debug(f"skipping {conn}") continue new.append(conn) return new diff --git a/psutil/tests/test_aix.py b/psutil/tests/test_aix.py index 2b0f849be..10934c12d 100755 --- a/psutil/tests/test_aix.py +++ b/psutil/tests/test_aix.py @@ -31,7 +31,7 @@ def test_virtual_memory(self): "available", "mmode", ]: - re_pattern += r"(?P<%s>\S+)\s+" % (field,) + re_pattern += rf"(?P<{field}>\S+)\s+" matchobj = re.search(re_pattern, out) assert matchobj is not None @@ -104,7 +104,7 @@ def test_cpu_stats(self): "S5rd", "sysc", ]: - re_pattern += r"(?P<%s>\S+)\s+" % (field,) + re_pattern += rf"(?P<{field}>\S+)\s+" matchobj = re.search(re_pattern, out) assert matchobj is not None diff --git a/psutil/tests/test_bsd.py b/psutil/tests/test_bsd.py index 8e53f4f3e..2786c3485 100755 --- a/psutil/tests/test_bsd.py +++ b/psutil/tests/test_bsd.py @@ -86,7 +86,7 @@ def tearDownClass(cls): @pytest.mark.skipif(NETBSD, reason="-o lstart doesn't work on NETBSD") def test_process_create_time(self): - output = sh("ps -o lstart -p %s" % self.pid) + output = sh(f"ps -o lstart -p {self.pid}") start_ps = output.replace('STARTED', '').strip() start_psutil = psutil.Process(self.pid).create_time() start_psutil = time.strftime( @@ -98,7 +98,7 @@ def test_disks(self): # test psutil.disk_usage() and psutil.disk_partitions() # against "df -a" def df(path): - out = sh('df -k "%s"' % path).strip() + out = sh(f'df -k "{path}"').strip() lines = out.split('\n') lines.pop(0) line = lines.pop(0) @@ -117,9 +117,9 @@ def df(path): assert usage.total == total # 10 MB tolerance if abs(usage.free - free) > 10 * 1024 * 1024: - raise self.fail("psutil=%s, df=%s" % (usage.free, free)) + raise self.fail(f"psutil={usage.free}, df={free}") if abs(usage.used - used) > 10 * 1024 * 1024: - raise self.fail("psutil=%s, df=%s" % (usage.used, used)) + raise self.fail(f"psutil={usage.used}, df={used}") @pytest.mark.skipif( not shutil.which("sysctl"), reason="sysctl cmd not available" @@ -144,7 +144,7 @@ def test_virtual_memory_total(self): def test_net_if_stats(self): for name, stats in psutil.net_if_stats().items(): try: - out = sh("ifconfig %s" % name) + out = sh(f"ifconfig {name}") except RuntimeError: pass else: @@ -170,7 +170,7 @@ def tearDownClass(cls): @retry_on_failure() def test_memory_maps(self): - out = sh('procstat -v %s' % self.pid) + out = sh(f"procstat -v {self.pid}") maps = psutil.Process(self.pid).memory_maps(grouped=False) lines = out.split('\n')[1:] while lines: @@ -178,23 +178,23 @@ def test_memory_maps(self): fields = line.split() _, start, stop, _perms, res = fields[:5] map = maps.pop() - assert "%s-%s" % (start, stop) == map.addr + assert f"{start}-{stop}" == map.addr assert int(res) == map.rss if not map.path.startswith('['): assert fields[10] == map.path def test_exe(self): - out = sh('procstat -b %s' % self.pid) + out = sh(f"procstat -b {self.pid}") assert psutil.Process(self.pid).exe() == out.split('\n')[1].split()[-1] def test_cmdline(self): - out = sh('procstat -c %s' % self.pid) + out = sh(f"procstat -c {self.pid}") assert ' '.join(psutil.Process(self.pid).cmdline()) == ' '.join( out.split('\n')[1].split()[2:] ) def test_uids_gids(self): - out = sh('procstat -s %s' % self.pid) + out = sh(f"procstat -s {self.pid}") euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8] p = psutil.Process(self.pid) uids = p.uids() @@ -209,7 +209,7 @@ def test_uids_gids(self): @retry_on_failure() def test_ctx_switches(self): tested = [] - out = sh('procstat -r %s' % self.pid) + out = sh(f"procstat -r {self.pid}") p = psutil.Process(self.pid) for line in out.split('\n'): line = line.lower().strip() @@ -229,7 +229,7 @@ def test_ctx_switches(self): @retry_on_failure() def test_cpu_times(self): tested = [] - out = sh('procstat -r %s' % self.pid) + out = sh(f"procstat -r {self.pid}") p = psutil.Process(self.pid) for line in out.split('\n'): line = line.lower().strip() @@ -256,7 +256,7 @@ def parse_swapinfo(): parts = re.split(r'\s+', output) if not parts: - raise ValueError("Can't parse swapinfo: %s" % output) + raise ValueError(f"Can't parse swapinfo: {output}") # the size is in 1k units, so multiply by 1024 total, used, free = (int(p) * 1024 for p in parts[1:4]) @@ -423,7 +423,7 @@ def test_sensors_battery(self): def secs2hours(secs): m, _s = divmod(secs, 60) h, m = divmod(m, 60) - return "%d:%02d" % (h, m) + return f"{int(h)}:{int(m):02}" out = sh("acpiconf -i 0") fields = {x.split('\t')[0]: x.split('\t')[-1] for x in out.split("\n")} @@ -466,7 +466,7 @@ def test_sensors_battery_no_battery(self): def test_sensors_temperatures_against_sysctl(self): num_cpus = psutil.cpu_count(True) for cpu in range(num_cpus): - sensor = "dev.cpu.%s.temperature" % cpu + sensor = f"dev.cpu.{cpu}.temperature" # sysctl returns a string in the format 46.0C try: sysctl_result = int(float(sysctl(sensor)[:-1])) @@ -480,7 +480,7 @@ def test_sensors_temperatures_against_sysctl(self): < 10 ) - sensor = "dev.cpu.%s.coretemp.tjmax" % cpu + sensor = f"dev.cpu.{cpu}.coretemp.tjmax" sysctl_result = int(float(sysctl(sensor)[:-1])) assert ( psutil.sensors_temperatures()["coretemp"][cpu].high @@ -515,7 +515,7 @@ def parse_meminfo(look_for): for line in f: if line.startswith(look_for): return int(line.split()[1]) * 1024 - raise ValueError("can't find %s" % look_for) + raise ValueError(f"can't find {look_for}") # --- virtual mem diff --git a/psutil/tests/test_connections.py b/psutil/tests/test_connections.py index 47f69fed5..a082f9015 100755 --- a/psutil/tests/test_connections.py +++ b/psutil/tests/test_connections.py @@ -522,14 +522,14 @@ def test_multi_sockets_procs(self): for _ in range(times): fname = self.get_testfn() fnames.append(fname) - src = textwrap.dedent("""\ + src = textwrap.dedent(f"""\ import time, os from psutil.tests import create_sockets with create_sockets(): - with open(r'%s', 'w') as f: + with open(r'{fname}', 'w') as f: f.write("hello") [time.sleep(0.1) for x in range(100)] - """ % fname) + """) sproc = self.pyrun(src) pids.append(sproc.pid) diff --git a/psutil/tests/test_linux.py b/psutil/tests/test_linux.py index ea96a0a73..945e0f535 100755 --- a/psutil/tests/test_linux.py +++ b/psutil/tests/test_linux.py @@ -110,7 +110,7 @@ def get_ipv6_addresses(ifname): all_fields.append(fields) if len(all_fields) == 0: - raise ValueError("could not find interface %r" % ifname) + raise ValueError(f"could not find interface {ifname!r}") for i in range(len(all_fields)): unformatted = all_fields[i][0] @@ -131,7 +131,7 @@ def get_mac_address(ifname): info = fcntl.ioctl( s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname) ) - return ''.join(['%02x:' % char for char in info[18:24]])[:-1] + return "".join([f"{char:02x}:" for char in info[18:24]])[:-1] def free_swap(): @@ -145,9 +145,7 @@ def free_swap(): _, total, used, free = line.split() nt = collections.namedtuple('free', 'total used free') return nt(int(total), int(used), int(free)) - raise ValueError( - "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines) - ) + raise ValueError(f"can't find 'Swap' in 'free' output:\n{out}") def free_physmem(): @@ -167,9 +165,7 @@ def free_physmem(): 'free', 'total used free shared output' ) return nt(total, used, free, shared, out) - raise ValueError( - "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines) - ) + raise ValueError(f"can't find 'Mem' in 'free' output:\n{out}") def vmstat(stat): @@ -178,7 +174,7 @@ def vmstat(stat): line = line.strip() if stat in line: return int(line.split(' ')[0]) - raise ValueError("can't find %r in 'vmstat' output" % stat) + raise ValueError(f"can't find {stat!r} in 'vmstat' output") def get_free_version_info(): @@ -271,7 +267,7 @@ def test_shared(self): psutil_value = psutil.virtual_memory().shared assert ( abs(free_value - psutil_value) < TOLERANCE_SYS_MEM - ), '%s %s \n%s' % (free_value, psutil_value, free.output) + ), f"{free_value} {psutil_value} \n{free.output}" @retry_on_failure() def test_available(self): @@ -286,7 +282,7 @@ def test_available(self): psutil_value = psutil.virtual_memory().available assert ( abs(free_value - psutil_value) < TOLERANCE_SYS_MEM - ), '%s %s \n%s' % (free_value, psutil_value, out) + ), f"{free_value} {psutil_value} \n{out}" @pytest.mark.skipif(not LINUX, reason="LINUX only") @@ -993,7 +989,7 @@ class TestSystemNetIfStats(PsutilTestCase): def test_against_ifconfig(self): for name, stats in psutil.net_if_stats().items(): try: - out = sh("ifconfig %s" % name) + out = sh(f"ifconfig {name}") except RuntimeError: pass else: @@ -1004,7 +1000,7 @@ def test_against_ifconfig(self): def test_mtu(self): for name, stats in psutil.net_if_stats().items(): - with open("/sys/class/net/%s/mtu" % name) as f: + with open(f"/sys/class/net/{name}/mtu") as f: assert stats.mtu == int(f.read().strip()) @pytest.mark.skipif( @@ -1016,7 +1012,7 @@ def test_flags(self): matches_found = 0 for name, stats in psutil.net_if_stats().items(): try: - out = sh("ifconfig %s" % name) + out = sh(f"ifconfig {name}") except RuntimeError: pass else: @@ -1049,7 +1045,7 @@ class TestSystemNetIOCounters(PsutilTestCase): def test_against_ifconfig(self): def ifconfig(nic): ret = {} - out = sh("ifconfig %s" % nic) + out = sh(f"ifconfig {nic}") ret['packets_recv'] = int( re.findall(r'RX packets[: ](\d+)', out)[0] ) @@ -1133,7 +1129,7 @@ def test_against_df(self): # test psutil.disk_usage() and psutil.disk_partitions() # against "df -a" def df(path): - out = sh('df -P -B 1 "%s"' % path).strip() + out = sh(f'df -P -B 1 "{path}"').strip() lines = out.split('\n') lines.pop(0) line = lines.pop(0) @@ -1337,9 +1333,7 @@ def test_call_methods(self): else: with pytest.raises(FileNotFoundError): finder.ask_proc_partitions() - if os.path.exists( - "/sys/dev/block/%s:%s/uevent" % (self.major, self.minor) - ): + if os.path.exists(f"/sys/dev/block/{self.major}:{self.minor}/uevent"): finder.ask_sys_dev_block() else: with pytest.raises(FileNotFoundError): @@ -1354,9 +1348,7 @@ def test_comparisons(self): a = b = c = None if os.path.exists("/proc/partitions"): a = finder.ask_proc_partitions() - if os.path.exists( - "/sys/dev/block/%s:%s/uevent" % (self.major, self.minor) - ): + if os.path.exists(f"/sys/dev/block/{self.major}:{self.minor}/uevent"): b = finder.ask_sys_class_block() c = finder.ask_sys_dev_block() @@ -1865,7 +1857,7 @@ def test_parse_smaps_mocked(self): Locked: 19 kB VmFlags: rd ex """).encode() - with mock_open_content({"/proc/%s/smaps" % os.getpid(): content}) as m: + with mock_open_content({f"/proc/{os.getpid()}/smaps": content}) as m: p = psutil._pslinux.Process(os.getpid()) uss, pss, swap = p._parse_smaps() assert m.called @@ -2036,7 +2028,7 @@ def test_threads_mocked(self): # condition). threads() is supposed to ignore that instead # of raising NSP. def open_mock_1(name, *args, **kwargs): - if name.startswith('/proc/%s/task' % os.getpid()): + if name.startswith(f"/proc/{os.getpid()}/task"): raise FileNotFoundError else: return orig_open(name, *args, **kwargs) @@ -2050,7 +2042,7 @@ def open_mock_1(name, *args, **kwargs): # ...but if it bumps into something != ENOENT we want an # exception. def open_mock_2(name, *args, **kwargs): - if name.startswith('/proc/%s/task' % os.getpid()): + if name.startswith(f"/proc/{os.getpid()}/task"): raise PermissionError else: return orig_open(name, *args, **kwargs) @@ -2075,7 +2067,7 @@ def test_issue_1014(self): # Emulates a case where smaps file does not exist. In this case # wrap_exception decorator should not raise NoSuchProcess. with mock_open_exception( - '/proc/%s/smaps' % os.getpid(), FileNotFoundError + f"/proc/{os.getpid()}/smaps", FileNotFoundError ) as m: p = psutil.Process() with pytest.raises(FileNotFoundError): @@ -2085,7 +2077,7 @@ def test_issue_1014(self): def test_issue_2418(self): p = psutil.Process() with mock_open_exception( - '/proc/%s/statm' % os.getpid(), FileNotFoundError + f"/proc/{os.getpid()}/statm", FileNotFoundError ): with mock.patch("os.path.exists", return_value=False): with pytest.raises(psutil.NoSuchProcess): @@ -2157,7 +2149,7 @@ def test_stat_file_parsing(self): "7", # delayacct_blkio_ticks ] content = " ".join(args).encode() - with mock_open_content({"/proc/%s/stat" % os.getpid(): content}): + with mock_open_content({f"/proc/{os.getpid()}/stat": content}): p = psutil.Process() assert p.name() == 'cat' assert p.status() == psutil.STATUS_ZOMBIE @@ -2180,7 +2172,7 @@ def test_status_file_parsing(self): Cpus_allowed_list:\t0-7 voluntary_ctxt_switches:\t12 nonvoluntary_ctxt_switches:\t13""").encode() - with mock_open_content({"/proc/%s/status" % os.getpid(): content}): + with mock_open_content({f"/proc/{os.getpid()}/status": content}): p = psutil.Process() assert p.num_ctx_switches().voluntary == 12 assert p.num_ctx_switches().involuntary == 13 @@ -2224,7 +2216,7 @@ def setUpClass(cls): def read_status_file(self, linestart): with psutil._psplatform.open_text( - '/proc/%s/status' % self.proc.pid + f"/proc/{self.proc.pid}/status" ) as f: for line in f: line = line.strip() @@ -2234,7 +2226,7 @@ def read_status_file(self, linestart): return int(value) except ValueError: return value - raise ValueError("can't find %r" % linestart) + raise ValueError(f"can't find {linestart!r}") def test_name(self): value = self.read_status_file("Name:") diff --git a/psutil/tests/test_misc.py b/psutil/tests/test_misc.py index 6771dbf98..9d24bb32c 100755 --- a/psutil/tests/test_misc.py +++ b/psutil/tests/test_misc.py @@ -66,8 +66,8 @@ def test_process__repr__(self, func=repr): p = psutil.Process(self.spawn_testproc().pid) r = func(p) assert "psutil.Process" in r - assert "pid=%s" % p.pid in r - assert "name='%s'" % str(p.name()) in r.replace("name=u'", "name='") + assert f"pid={p.pid}" in r + assert f"name='{p.name()}'" in r.replace("name=u'", "name='") assert "status=" in r assert "exitcode=" not in r p.terminate() @@ -83,7 +83,7 @@ def test_process__repr__(self, func=repr): ): p = psutil.Process() r = func(p) - assert "pid=%s" % p.pid in r + assert f"pid={p.pid}" in r assert "status='zombie'" in r assert "name=" not in r with mock.patch.object( @@ -93,7 +93,7 @@ def test_process__repr__(self, func=repr): ): p = psutil.Process() r = func(p) - assert "pid=%s" % p.pid in r + assert f"pid={p.pid}" in r assert "terminated" in r assert "name=" not in r with mock.patch.object( @@ -103,7 +103,7 @@ def test_process__repr__(self, func=repr): ): p = psutil.Process() r = func(p) - assert "pid=%s" % p.pid in r + assert f"pid={p.pid}" in r assert "name=" not in r def test_process__str__(self): @@ -232,7 +232,7 @@ def test__all__(self): fun.__doc__ is not None and 'deprecated' not in fun.__doc__.lower() ): - raise self.fail('%r not in psutil.__all__' % name) + raise self.fail(f"{name!r} not in psutil.__all__") # Import 'star' will break if __all__ is inconsistent, see: # https://github.com/giampaolo/psutil/issues/656 @@ -912,7 +912,7 @@ class TestScripts(PsutilTestCase): @staticmethod def assert_stdout(exe, *args, **kwargs): kwargs.setdefault("env", PYTHON_EXE_ENV) - exe = '%s' % os.path.join(SCRIPTS_DIR, exe) + exe = os.path.join(SCRIPTS_DIR, exe) cmd = [PYTHON_EXE, exe] for arg in args: cmd.append(arg) @@ -941,8 +941,8 @@ def test_coverage(self): if 'test_' + os.path.splitext(name)[0] not in meths: # self.assert_stdout(name) raise self.fail( - 'no test defined for %r script' - % os.path.join(SCRIPTS_DIR, name) + "no test defined for" + f" {os.path.join(SCRIPTS_DIR, name)!r} script" ) @pytest.mark.skipif(not POSIX, reason="POSIX only") @@ -952,7 +952,7 @@ def test_executable(self): if file.endswith('.py'): path = os.path.join(root, file) if not stat.S_IXUSR & os.stat(path)[stat.ST_MODE]: - raise self.fail('%r is not executable' % path) + raise self.fail(f"{path!r} is not executable") def test_disk_usage(self): self.assert_stdout('disk_usage.py') diff --git a/psutil/tests/test_osx.py b/psutil/tests/test_osx.py index a70cdf641..682012ecf 100755 --- a/psutil/tests/test_osx.py +++ b/psutil/tests/test_osx.py @@ -62,7 +62,7 @@ def tearDownClass(cls): terminate(cls.pid) def test_process_create_time(self): - output = sh("ps -o lstart -p %s" % self.pid) + output = sh(f"ps -o lstart -p {self.pid}") start_ps = output.replace('STARTED', '').strip() hhmmss = start_ps.split(' ')[-2] year = start_ps.split(' ')[-1] @@ -83,7 +83,7 @@ def test_disks(self): # test psutil.disk_usage() and psutil.disk_partitions() # against "df -a" def df(path): - out = sh('df -k "%s"' % path).strip() + out = sh(f'df -k "{path}"').strip() lines = out.split('\n') lines.pop(0) line = lines.pop(0) @@ -172,7 +172,7 @@ def test_swapmem_sout(self): def test_net_if_stats(self): for name, stats in psutil.net_if_stats().items(): try: - out = sh("ifconfig %s" % name) + out = sh(f"ifconfig {name}") except RuntimeError: pass else: diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py index 6c8ac7f49..93e6df6e3 100755 --- a/psutil/tests/test_posix.py +++ b/psutil/tests/test_posix.py @@ -135,7 +135,7 @@ def ps_vsz(pid): def df(device): try: - out = sh("df -k %s" % device).strip() + out = sh(f"df -k {device}").strip() except RuntimeError as err: if "device busy" in str(err).lower(): raise pytest.skip("df returned EBUSY") @@ -357,8 +357,8 @@ def test_nic_names(self): break else: raise self.fail( - "couldn't find %s nic in 'ifconfig -a' output\n%s" - % (nic, output) + f"couldn't find {nic} nic in 'ifconfig -a'" + f" output\n{output}" ) # @pytest.mark.skipif(CI_TESTING and not psutil.users(), @@ -407,9 +407,7 @@ def test_users_started(self): started = [x.capitalize() for x in started] if not tstamp: - raise pytest.skip( - "cannot interpret tstamp in who output\n%s" % (out) - ) + raise pytest.skip(f"cannot interpret tstamp in who output\n{out}") with self.subTest(psutil=psutil.users(), who=out): for idx, u in enumerate(psutil.users()): diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py index 35432b1db..1b6269a3d 100755 --- a/psutil/tests/test_process.py +++ b/psutil/tests/test_process.py @@ -280,10 +280,10 @@ def test_cpu_times_2(self): # using a tolerance of +/- 0.1 seconds. # It will fail if the difference between the values is > 0.1s. if (max([user_time, utime]) - min([user_time, utime])) > 0.1: - raise self.fail("expected: %s, found: %s" % (utime, user_time)) + raise self.fail(f"expected: {utime}, found: {user_time}") if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: - raise self.fail("expected: %s, found: %s" % (ktime, kernel_time)) + raise self.fail(f"expected: {ktime}, found: {kernel_time}") @pytest.mark.skipif(not HAS_PROC_CPU_NUM, reason="not supported") def test_cpu_num(self): @@ -305,8 +305,8 @@ def test_create_time(self): difference = abs(create_time - now) if difference > 2: raise self.fail( - "expected: %s, found: %s, difference: %s" - % (now, create_time, difference) + f"expected: {now}, found: {create_time}, difference:" + f" {difference}" ) # make sure returned value can be pretty printed with strftime @@ -658,7 +658,7 @@ def test_memory_maps(self): # https://github.com/giampaolo/psutil/issues/759 with open_text('/proc/self/smaps') as f: data = f.read() - if "%s (deleted)" % nt.path not in data: + if f"{nt.path} (deleted)" not in data: raise elif '64' not in os.path.basename(nt.path): # XXX - On Windows we have this strange behavior with @@ -728,7 +728,7 @@ def test_exe(self): # "/usr/local/bin/python" # We do not want to consider this difference in accuracy # an error. - ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) + ver = f"{sys.version_info[0]}.{sys.version_info[1]}" try: assert exe.replace(ver, '') == PYTHON_EXE.replace(ver, '') except AssertionError: @@ -1023,7 +1023,7 @@ def test_cpu_affinity_errs(self): p.cpu_affinity(invalid_cpu) with pytest.raises(ValueError): p.cpu_affinity(range(10000, 11000)) - with pytest.raises(TypeError): + with pytest.raises((TypeError, ValueError)): p.cpu_affinity([0, "1"]) with pytest.raises(ValueError): p.cpu_affinity([0, -1]) @@ -1072,8 +1072,8 @@ def test_open_files(self): # another process cmdline = ( - "import time; f = open(r'%s', 'r'); [time.sleep(0.1) for x in" - " range(100)];" % testfn + f"import time; f = open(r'{testfn}', 'r'); [time.sleep(0.1) for x" + " in range(100)];" ) p = self.spawn_psproc([PYTHON_EXE, "-c", cmdline]) @@ -1102,9 +1102,7 @@ def test_open_files_2(self): ): break else: - raise self.fail( - "no file found; files=%s" % (repr(p.open_files())) - ) + raise self.fail(f"no file found; files={p.open_files()!r}") assert normcase(file.path) == normcase(fileobj.name) if WINDOWS: assert file.fd == -1 @@ -1377,7 +1375,7 @@ def assert_raises_nsp(fun, fun_name): if WINDOWS and fun_name in {'exe', 'name'}: return raise self.fail( - "%r didn't raise NSP and returned %r instead" % (fun, ret) + f"{fun!r} didn't raise NSP and returned {ret!r} instead" ) p = self.spawn_psproc() @@ -1435,7 +1433,7 @@ def test_reused_pid(self): with contextlib.redirect_stderr(io.StringIO()) as f: list(psutil.process_iter()) assert ( - "refreshing Process instance for reused PID %s" % p.pid + f"refreshing Process instance for reused PID {p.pid}" in f.getvalue() ) assert p.pid not in psutil._pmap diff --git a/psutil/tests/test_process_all.py b/psutil/tests/test_process_all.py index 29f3f894e..24229979c 100755 --- a/psutil/tests/test_process_all.py +++ b/psutil/tests/test_process_all.py @@ -142,7 +142,7 @@ def test_all(self): info, ) s += '-' * 70 - s += "\n%s" % traceback.format_exc() + s += f"\n{traceback.format_exc()}" s = "\n".join((" " * 4) + i for i in s.splitlines()) + "\n" failures.append(s) else: @@ -484,7 +484,7 @@ def tearDown(self): def test_it(self): def is_linux_tid(pid): try: - f = open("/proc/%s/status" % pid, "rb") + f = open(f"/proc/{pid}/status", "rb") except FileNotFoundError: return False else: diff --git a/psutil/tests/test_sunos.py b/psutil/tests/test_sunos.py index b9638ec44..b5d9d353b 100755 --- a/psutil/tests/test_sunos.py +++ b/psutil/tests/test_sunos.py @@ -18,7 +18,7 @@ @pytest.mark.skipif(not SUNOS, reason="SUNOS only") class SunOSSpecificTestCase(PsutilTestCase): def test_swap_memory(self): - out = sh('env PATH=/usr/sbin:/sbin:%s swap -l' % os.environ['PATH']) + out = sh(f"env PATH=/usr/sbin:/sbin:{os.environ['PATH']} swap -l") lines = out.strip().split('\n')[1:] if not lines: raise ValueError('no swap device(s) configured') diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py index 1e814b1e8..62d49bf61 100755 --- a/psutil/tests/test_system.py +++ b/psutil/tests/test_system.py @@ -334,11 +334,10 @@ def test_virtual_memory(self): assert isinstance(value, int) if name != 'total': if not value >= 0: - raise self.fail("%r < 0 (%s)" % (name, value)) + raise self.fail(f"{name!r} < 0 ({value})") if value > mem.total: raise self.fail( - "%r > total (total=%s, %s=%s)" - % (name, mem.total, name, value) + f"{name!r} > total (total={mem.total}, {name}={value})" ) def test_swap_memory(self): diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py index 4e5d2484d..667f1d686 100755 --- a/psutil/tests/test_windows.py +++ b/psutil/tests/test_windows.py @@ -65,8 +65,8 @@ def powershell(cmd): if not shutil.which("powershell.exe"): raise pytest.skip("powershell.exe not available") cmdline = ( - 'powershell.exe -ExecutionPolicy Bypass -NoLogo -NonInteractive ' - + '-NoProfile -WindowStyle Hidden -Command "%s"' % cmd + "powershell.exe -ExecutionPolicy Bypass -NoLogo -NonInteractive " + f"-NoProfile -WindowStyle Hidden -Command \"{cmd}\"" # noqa: Q003 ) return sh(cmdline) @@ -77,7 +77,7 @@ def wmic(path, what, converter=int): >>> wmic("Win32_OperatingSystem", "FreePhysicalMemory") 2134124534 """ - out = sh("wmic path %s get %s" % (path, what)).strip() + out = sh(f"wmic path {path} get {what}").strip() data = "".join(out.splitlines()[1:]).strip() # get rid of the header if converter is not None: if "," in what: @@ -142,7 +142,7 @@ def test_nic_names(self): continue if nic not in out: raise self.fail( - "%r nic wasn't found in 'ipconfig /all' output" % nic + f"{nic!r} nic wasn't found in 'ipconfig /all' output" ) def test_total_phymem(self): @@ -222,12 +222,10 @@ def test_disks(self): assert usage.free == wmi_free # 10 MB tolerance if abs(usage.free - wmi_free) > 10 * 1024 * 1024: - raise self.fail( - "psutil=%s, wmi=%s" % (usage.free, wmi_free) - ) + raise self.fail(f"psutil={usage.free}, wmi={wmi_free}") break else: - raise self.fail("can't find partition %s" % repr(ps_part)) + raise self.fail(f"can't find partition {ps_part!r}") @retry_on_failure() def test_disk_usage(self): @@ -262,10 +260,9 @@ def test_net_if_stats(self): for wmi_adapter in wmi_adapters: wmi_names.add(wmi_adapter.Name) wmi_names.add(wmi_adapter.NetConnectionID) - assert ps_names & wmi_names, "no common entries in %s, %s" % ( - ps_names, - wmi_names, - ) + assert ( + ps_names & wmi_names + ), f"no common entries in {ps_names}, {wmi_names}" def test_boot_time(self): wmi_os = wmi.WMI().Win32_OperatingSystem() @@ -606,7 +603,7 @@ def test_username(self): w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] p = psutil.Process(self.pid) domain, _, username = w.GetOwner() - username = "%s\\%s" % (domain, username) + username = f"{domain}\\{username}" assert p.username() == username @retry_on_failure() @@ -627,7 +624,7 @@ def test_memory_vms(self): # returned instead. wmi_usage = int(w.PageFileUsage) if vms not in {wmi_usage, wmi_usage * 1024}: - raise self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) + raise self.fail(f"wmi={wmi_usage}, psutil={vms}") def test_create_time(self): w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] diff --git a/pyproject.toml b/pyproject.toml index b64adb6ff..a0d6df88c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,10 +96,11 @@ ignore = [ # T201 == print(), T203 == pprint() # EM101 == raw-string-in-exception # TRY003 == raise-vanilla-args -".github/workflows/*" = ["T201", "T203"] -"psutil/tests/*" = ["EM101", "TRY003"] -"scripts/*" = ["T201", "T203"] -"scripts/internal/*" = ["EM101", "T201", "T203", "TRY003"] +# EM102 == Exception must not use an f-string literal, assign to variable first +".github/workflows/*" = ["EM102", "T201", "T203"] +"psutil/tests/*" = ["EM101", "EM102", "TRY003"] +"scripts/*" = ["EM102", "T201", "T203"] +"scripts/internal/*" = ["EM101", "EM102", "T201", "T203", "TRY003"] "setup.py" = ["T201", "T203"] [tool.ruff.lint.isort] diff --git a/scripts/battery.py b/scripts/battery.py index d9a783daa..b6d4679ea 100755 --- a/scripts/battery.py +++ b/scripts/battery.py @@ -22,7 +22,7 @@ def secs2hours(secs): mm, ss = divmod(secs, 60) hh, mm = divmod(mm, 60) - return "%d:%02d:%02d" % (hh, mm, ss) + return f"{int(hh)}:{int(mm):02}:{int(ss):02}" def main(): @@ -32,16 +32,16 @@ def main(): if batt is None: return sys.exit("no battery is installed") - print("charge: %s%%" % round(batt.percent, 2)) + print(f"charge: {round(batt.percent, 2)}%") if batt.power_plugged: print( - "status: %s" - % ("charging" if batt.percent < 100 else "fully charged") + "status: " + f" {'charging' if batt.percent < 100 else 'fully charged'}" ) print("plugged in: yes") else: - print("left: %s" % secs2hours(batt.secsleft)) - print("status: %s" % "discharging") + print(f"left: {secs2hours(batt.secsleft)}") + print("status: discharging") print("plugged in: no") diff --git a/scripts/ifconfig.py b/scripts/ifconfig.py index dd7684e87..dfc5f5ae4 100755 --- a/scripts/ifconfig.py +++ b/scripts/ifconfig.py @@ -66,7 +66,7 @@ def main(): stats = psutil.net_if_stats() io_counters = psutil.net_io_counters(pernic=True) for nic, addrs in psutil.net_if_addrs().items(): - print("%s:" % (nic)) + print(f"{nic}:") if nic in stats: st = stats[nic] print(" stats : ", end='') @@ -103,13 +103,13 @@ def main(): ) for addr in addrs: print(" %-4s" % af_map.get(addr.family, addr.family), end="") - print(" address : %s" % addr.address) + print(f" address : {addr.address}") if addr.broadcast: - print(" broadcast : %s" % addr.broadcast) + print(f" broadcast : {addr.broadcast}") if addr.netmask: - print(" netmask : %s" % addr.netmask) + print(f" netmask : {addr.netmask}") if addr.ptp: - print(" p2p : %s" % addr.ptp) + print(f" p2p : {addr.ptp}") print() diff --git a/scripts/internal/bench_oneshot.py b/scripts/internal/bench_oneshot.py index 43c279a30..299f9cea6 100755 --- a/scripts/internal/bench_oneshot.py +++ b/scripts/internal/bench_oneshot.py @@ -126,8 +126,9 @@ def call_oneshot(funs): def main(): print( - "%s methods involved on platform %r (%s iterations, psutil %s):" - % (len(names), sys.platform, ITERATIONS, psutil.__version__) + f"{len(names)} methods involved on platform" + f" {sys.platform!r} ({ITERATIONS} iterations, psutil" + f" {psutil.__version__}):" ) for name in sorted(names): print(" " + name) @@ -136,19 +137,19 @@ def main(): elapsed1 = timeit.timeit( "call_normal(funs)", setup=setup, number=ITERATIONS ) - print("normal: %.3f secs" % elapsed1) + print(f"normal: {elapsed1:.3f} secs") # "one shot" run elapsed2 = timeit.timeit( "call_oneshot(funs)", setup=setup, number=ITERATIONS ) - print("onshot: %.3f secs" % elapsed2) + print(f"onshot: {elapsed2:.3f} secs") # done if elapsed2 < elapsed1: - print("speedup: +%.2fx" % (elapsed1 / elapsed2)) + print(f"speedup: +{elapsed1 / elapsed2:.2f}x") elif elapsed2 > elapsed1: - print("slowdown: -%.2fx" % (elapsed2 / elapsed1)) + print(f"slowdown: -{elapsed2 / elapsed1:.2f}x") else: print("same speed") diff --git a/scripts/internal/bench_oneshot_2.py b/scripts/internal/bench_oneshot_2.py index 41c9cbb89..1076dffc8 100755 --- a/scripts/internal/bench_oneshot_2.py +++ b/scripts/internal/bench_oneshot_2.py @@ -37,8 +37,8 @@ def main(): args = runner.parse_args() if not args.worker: print( - "%s methods involved on platform %r (psutil %s):" - % (len(names), sys.platform, psutil.__version__) + f"{len(names)} methods involved on platform" + f" {sys.platform!r} (psutil {psutil.__version__}):" ) for name in sorted(names): print(" " + name) diff --git a/scripts/internal/check_broken_links.py b/scripts/internal/check_broken_links.py index ed84385ef..408886817 100755 --- a/scripts/internal/check_broken_links.py +++ b/scripts/internal/check_broken_links.py @@ -212,7 +212,7 @@ def parallel_validator(urls): } for fut in concurrent.futures.as_completed(fut_to_url): current += 1 - sys.stdout.write("\r%s / %s" % (current, total)) + sys.stdout.write(f"\r{current} / {total}") sys.stdout.flush() fname, url = fut_to_url[fut] try: @@ -220,7 +220,7 @@ def parallel_validator(urls): except Exception: # noqa: BLE001 fails.append((fname, url)) print() - print("warn: error while validating %s" % url, file=sys.stderr) + print(f"warn: error while validating {url}", file=sys.stderr) traceback.print_exc() else: if not ok: @@ -242,7 +242,7 @@ def main(): for fname in args.files: urls = get_urls(fname) if urls: - print("%4s %s" % (len(urls), fname)) + print(f"{len(urls):4} {fname}") for url in urls: all_urls.append((fname, url)) @@ -254,7 +254,7 @@ def main(): fname, url = fail print("%-30s: %s " % (fname, url)) print('-' * 20) - print("total: %s fails!" % len(fails)) + print(f"total: {len(fails)} fails!") sys.exit(1) diff --git a/scripts/internal/clinter.py b/scripts/internal/clinter.py index 225140018..4f0d99c31 100755 --- a/scripts/internal/clinter.py +++ b/scripts/internal/clinter.py @@ -17,7 +17,7 @@ def warn(path, line, lineno, msg): global warned warned = True - print("%s:%s: %s" % (path, lineno, msg), file=sys.stderr) + print(f"{path}:{lineno}: {msg}", file=sys.stderr) def check_line(path, line, idx, lines): @@ -49,7 +49,7 @@ def check_line(path, line, idx, lines): keywords = ("if", "else", "while", "do", "enum", "for") for kw in keywords: if sls.startswith(kw + '('): - warn(path, line, lineno, "missing space between %r and '('" % kw) + warn(path, line, lineno, f"missing space between {kw!r} and '('") # eof if eof and not line.endswith('\n'): warn(path, line, lineno, "no blank line at EOF") diff --git a/scripts/internal/download_wheels.py b/scripts/internal/download_wheels.py index bd9e74390..73cda80f1 100755 --- a/scripts/internal/download_wheels.py +++ b/scripts/internal/download_wheels.py @@ -34,10 +34,10 @@ def get_artifacts(): - base_url = "https://api.github.com/repos/%s/%s" % (USER, PROJECT) + base_url = f"https://api.github.com/repos/{USER}/{PROJECT}" url = base_url + "/actions/artifacts" res = requests.get( - url=url, headers={"Authorization": "token %s" % TOKEN}, timeout=TIMEOUT + url=url, headers={"Authorization": f"token {TOKEN}"}, timeout=TIMEOUT ) res.raise_for_status() data = json.loads(res.content) @@ -47,7 +47,7 @@ def get_artifacts(): def download_zip(url): print("downloading: " + url) res = requests.get( - url=url, headers={"Authorization": "token %s" % TOKEN}, timeout=TIMEOUT + url=url, headers={"Authorization": f"token {TOKEN}"}, timeout=TIMEOUT ) res.raise_for_status() totbytes = 0 @@ -55,7 +55,7 @@ def download_zip(url): for chunk in res.iter_content(chunk_size=16384): f.write(chunk) totbytes += len(chunk) - print("got %s, size %s)" % (OUTFILE, bytes2human(totbytes))) + print(f"got {OUTFILE}, size {bytes2human(totbytes)})") def run(): diff --git a/scripts/internal/git_pre_commit.py b/scripts/internal/git_pre_commit.py index de4b461e6..10f6368da 100755 --- a/scripts/internal/git_pre_commit.py +++ b/scripts/internal/git_pre_commit.py @@ -45,7 +45,7 @@ def hilite(s, ok=True, bold=False): attr.append('31') if bold: attr.append('1') - return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), s) + return f"\x1b[{';'.join(attr)}m{s}\x1b[0m" def exit(msg): @@ -97,7 +97,7 @@ def git_commit_files(): def black(files): - print("running black (%s)" % len(files)) + print(f"running black ({len(files)})") cmd = [PYTHON, "-m", "black", "--check", "--safe"] + files if subprocess.call(cmd) != 0: return exit( @@ -107,8 +107,15 @@ def black(files): def ruff(files): - print("running ruff (%s)" % len(files)) - cmd = [PYTHON, "-m", "ruff", "check", "--no-cache"] + files + print(f"running ruff ({len(files)})") + cmd = [ + PYTHON, + "-m", + "ruff", + "check", + "--no-cache", + "--output-format=concise", + ] + files if subprocess.call(cmd) != 0: return exit( "Python code didn't pass 'ruff' style check." @@ -117,7 +124,7 @@ def ruff(files): def c_linter(files): - print("running clinter (%s)" % len(files)) + print(f"running clinter ({len(files)})") # XXX: we should escape spaces and possibly other amenities here cmd = [PYTHON, "scripts/internal/clinter.py"] + files if subprocess.call(cmd) != 0: @@ -125,14 +132,14 @@ def c_linter(files): def toml_sort(files): - print("running toml linter (%s)" % len(files)) + print(f"running toml linter ({len(files)})") cmd = ["toml-sort", "--check"] + files if subprocess.call(cmd) != 0: - return sys.exit("%s didn't pass style check" % ' '.join(files)) + return sys.exit(f"{' '.join(files)} didn't pass style check") def rstcheck(files): - print("running rst linter (%s)" % len(files)) + print(f"running rst linter ({len(files)})") cmd = ["rstcheck", "--config=pyproject.toml"] + files if subprocess.call(cmd) != 0: return sys.exit("RST code didn't pass style check") diff --git a/scripts/internal/install_pip.py b/scripts/internal/install_pip.py index 9b1ee7a21..bca5d5fe7 100755 --- a/scripts/internal/install_pip.py +++ b/scripts/internal/install_pip.py @@ -31,7 +31,7 @@ def main(): else None ) with tempfile.NamedTemporaryFile(suffix=".py") as f: - print("downloading %s into %s" % (URL, f.name)) + print(f"downloading {URL} into {f.name}") kwargs = dict(context=ssl_context) if ssl_context else {} req = urlopen(URL, **kwargs) data = req.read() @@ -41,7 +41,7 @@ def main(): f.flush() print("download finished, installing pip") - code = os.system("%s %s --user --upgrade" % (sys.executable, f.name)) + code = os.system(f"{sys.executable} {f.name} --user --upgrade") sys.exit(code) diff --git a/scripts/internal/print_access_denied.py b/scripts/internal/print_access_denied.py index 6bb0cdd08..dbf96f2d8 100755 --- a/scripts/internal/print_access_denied.py +++ b/scripts/internal/print_access_denied.py @@ -78,7 +78,7 @@ def main(): for methname, ads in sorted(d.items(), key=lambda x: (x[1], x[0])): perc = (ads / tot_procs) * 100 outcome = "SUCCESS" if not ads else "ACCESS DENIED" - s = templ % (methname, ads, "%6.1f%%" % perc, outcome) + s = templ % (methname, ads, f"{perc:6.1f}%", outcome) print_color(s, "red" if ads else None) tot_perc = round((tot_ads / tot_calls) * 100, 1) print("-" * 50) diff --git a/scripts/internal/print_api_speed.py b/scripts/internal/print_api_speed.py index 3fecbfbcd..673f51c89 100755 --- a/scripts/internal/print_api_speed.py +++ b/scripts/internal/print_api_speed.py @@ -95,7 +95,7 @@ def print_timings(): i = 0 while timings[:]: title, times, elapsed = timings.pop(0) - s = templ % (title, str(times), "%.5f" % elapsed) + s = templ % (title, str(times), f"{elapsed:.5f}") if i > len(timings) - 5: print_color(s, color="red") else: diff --git a/scripts/internal/print_dist.py b/scripts/internal/print_dist.py index 1fb4b3f3a..5e2a6e598 100755 --- a/scripts/internal/print_dist.py +++ b/scripts/internal/print_dist.py @@ -54,7 +54,7 @@ def platform(self): else: return 'macos' else: - raise ValueError("unknown platform %r" % self.name) + raise ValueError(f"unknown platform {self.name!r}") def arch(self): if self.name.endswith(('x86_64.whl', 'amd64.whl')): @@ -106,14 +106,14 @@ def main(): elif path.endswith(".tar.gz"): pkg = Tarball(path) else: - raise ValueError("invalid package %r" % path) + raise ValueError(f"invalid package {path!r}") groups[pkg.platform()].append(pkg) tot_files = 0 tot_size = 0 templ = "%-120s %7s %8s %7s" for platf, pkgs in groups.items(): - ppn = "%s (%s)" % (platf, len(pkgs)) + ppn = f"{platf} ({len(pkgs)})" s = templ % (ppn, "size", "arch", "pyver") print_color('\n' + s, color=None, bold=True) for pkg in sorted(pkgs, key=lambda x: x.name): @@ -131,7 +131,7 @@ def main(): print_color(s, color='brown') print_color( - "\n\ntotals: files=%s, size=%s" % (tot_files, bytes2human(tot_size)), + f"\n\ntotals: files={tot_files}, size={bytes2human(tot_size)}", bold=True, ) diff --git a/scripts/internal/print_downloads.py b/scripts/internal/print_downloads.py index ae6216907..70afd4b83 100755 --- a/scripts/internal/print_downloads.py +++ b/scripts/internal/print_downloads.py @@ -68,7 +68,7 @@ def query(cmd): def top_packages(): global LAST_UPDATE ret = query( - "pypinfo --all --json --days %s --limit %s '' project" % (DAYS, LIMIT) + f"pypinfo --all --json --days {DAYS} --limit {LIMIT} '' project" ) LAST_UPDATE = ret['last_update'] return [(x['project'], x['download_count']) for x in ret['rows']] @@ -81,7 +81,7 @@ def ranking(): if name == PKGNAME: return i i += 1 - raise ValueError("can't find %s" % PKGNAME) + raise ValueError(f"can't find {PKGNAME}") def downloads(): @@ -89,23 +89,23 @@ def downloads(): for name, downloads in data: if name == PKGNAME: return downloads - raise ValueError("can't find %s" % PKGNAME) + raise ValueError(f"can't find {PKGNAME}") def downloads_pyver(): - return query("pypinfo --json --days %s %s pyversion" % (DAYS, PKGNAME)) + return query(f"pypinfo --json --days {DAYS} {PKGNAME} pyversion") def downloads_by_country(): - return query("pypinfo --json --days %s %s country" % (DAYS, PKGNAME)) + return query(f"pypinfo --json --days {DAYS} {PKGNAME} country") def downloads_by_system(): - return query("pypinfo --json --days %s %s system" % (DAYS, PKGNAME)) + return query(f"pypinfo --json --days {DAYS} {PKGNAME} system") def downloads_by_distro(): - return query("pypinfo --json --days %s %s distro" % (DAYS, PKGNAME)) + return query(f"pypinfo --json --days {DAYS} {PKGNAME} distro") # --- print @@ -116,7 +116,7 @@ def downloads_by_distro(): def print_row(left, right): if isinstance(right, int): - right = f'{right:,}' + right = f"{right:,}" print(templ % (left, right)) @@ -142,9 +142,9 @@ def main(): print("# Download stats") print() - s = "psutil download statistics of the last %s days (last update " % DAYS - s += "*%s*).\n" % LAST_UPDATE - s += "Generated via [pypistats.py](%s) script.\n" % GITHUB_SCRIPT_URL + s = f"psutil download statistics of the last {DAYS} days (last update " + s += f"*{LAST_UPDATE}*).\n" + s += f"Generated via [pypistats.py]({GITHUB_SCRIPT_URL}) script.\n" print(s) data = [ @@ -171,4 +171,4 @@ def main(): try: main() finally: - print("bytes billed: %s" % bytes_billed, file=sys.stderr) + print(f"bytes billed: {bytes_billed}", file=sys.stderr) diff --git a/scripts/internal/print_hashes.py b/scripts/internal/print_hashes.py index 5b9cfb209..b8d8b365d 100755 --- a/scripts/internal/print_hashes.py +++ b/scripts/internal/print_hashes.py @@ -31,12 +31,9 @@ def main(): if os.path.isfile(file): md5 = csum(file, "md5") sha256 = csum(file, "sha256") - print( - "%s\nmd5: %s\nsha256: %s\n" - % (os.path.basename(file), md5, sha256) - ) + print(f"{os.path.basename(file)}\nmd5: {md5}\nsha256: {sha256}\n") else: - print("skipping %r (not a file)" % file) + print(f"skipping {file!r} (not a file)") if __name__ == "__main__": diff --git a/scripts/internal/winmake.py b/scripts/internal/winmake.py index 2e076f0df..5413b827b 100755 --- a/scripts/internal/winmake.py +++ b/scripts/internal/winmake.py @@ -96,7 +96,7 @@ def win_colorprint(s, color=LIGHTBLUE): def sh(cmd, nolog=False): assert isinstance(cmd, list), repr(cmd) if not nolog: - safe_print("cmd: %s" % cmd) + safe_print(f"cmd: {cmd}") p = subprocess.Popen(cmd, env=os.environ, universal_newlines=True) p.communicate() # print stdout/stderr in real time if p.returncode != 0: @@ -120,10 +120,10 @@ def rm(pattern, directory=False): for name in found: path = os.path.join(root, name) if directory: - safe_print("rmdir -f %s" % path) + safe_print(f"rmdir -f {path}") safe_rmtree(path) else: - safe_print("rm %s" % path) + safe_print(f"rm {path}") safe_remove(path) @@ -134,14 +134,14 @@ def safe_remove(path): if err.errno != errno.ENOENT: raise else: - safe_print("rm %s" % path) + safe_print(f"rm {path}") def safe_rmtree(path): existed = os.path.isdir(path) shutil.rmtree(path, ignore_errors=True) if existed and not os.path.isdir(path): - safe_print("rmdir -f %s" % path) + safe_print(f"rmdir -f {path}") def recursive_rm(*patterns): @@ -268,7 +268,7 @@ def uninstall(): if 'psutil' not in line: f.write(line) else: - print("removed line %r from %r" % (line, path)) + print(f"removed line {line!r} from {path!r}") def clean(): @@ -467,7 +467,7 @@ def get_python(path): '312-64', ) for v in vers: - pypath = r'C:\\python%s\python.exe' % v + pypath = rf"C:\\python{v}\python.exe" if path in pypath and os.path.isfile(pypath): return pypath @@ -531,7 +531,7 @@ def main(): PYTHON = get_python(args.python) if not PYTHON: return sys.exit( - "can't find any python installation matching %r" % args.python + f"can't find any python installation matching {args.python!r}" ) os.putenv('PYTHON', PYTHON) win_colorprint("using " + PYTHON) diff --git a/scripts/killall.py b/scripts/killall.py index 592b8d6e3..532e8b15c 100755 --- a/scripts/killall.py +++ b/scripts/killall.py @@ -14,7 +14,7 @@ def main(): if len(sys.argv) != 2: - sys.exit('usage: %s name' % __file__) + sys.exit(f"usage: {__file__} name") else: name = sys.argv[1] @@ -24,7 +24,7 @@ def main(): proc.kill() killed.append(proc.pid) if not killed: - sys.exit('%s: no process found' % name) + sys.exit(f"{name}: no process found") else: sys.exit(0) diff --git a/scripts/pidof.py b/scripts/pidof.py index 7ac8e0323..7c3b93d8a 100755 --- a/scripts/pidof.py +++ b/scripts/pidof.py @@ -32,7 +32,7 @@ def pidof(pgname): def main(): if len(sys.argv) != 2: - sys.exit('usage: %s pgname' % __file__) + sys.exit(f"usage: {__file__} pgname") else: pgname = sys.argv[1] pids = pidof(pgname) diff --git a/scripts/pmap.py b/scripts/pmap.py index 719ce0134..ae633a41b 100755 --- a/scripts/pmap.py +++ b/scripts/pmap.py @@ -61,7 +61,7 @@ def main(): safe_print(line) print("-" * 31) print(templ % ("Total", bytes2human(total_rss), '', '')) - safe_print("PID = %s, name = %s" % (p.pid, p.name())) + safe_print(f"PID = {p.pid}, name = {p.name()}") if __name__ == '__main__': diff --git a/scripts/procinfo.py b/scripts/procinfo.py index 24004a960..963dd77da 100755 --- a/scripts/procinfo.py +++ b/scripts/procinfo.py @@ -130,10 +130,10 @@ def str_ntuple(nt, convert_bytes=False): if nt == ACCESS_DENIED: return "" if not convert_bytes: - return ", ".join(["%s=%s" % (x, getattr(nt, x)) for x in nt._fields]) + return ", ".join([f"{x}={getattr(nt, x)}" for x in nt._fields]) else: return ", ".join( - ["%s=%s" % (x, bytes2human(getattr(nt, x))) for x in nt._fields] + [f"{x}={bytes2human(getattr(nt, x))}" for x in nt._fields] ) @@ -148,7 +148,7 @@ def run(pid, verbose=False): with proc.oneshot(): try: parent = proc.parent() - parent = '(%s)' % parent.name() if parent else '' + parent = f"({parent.name()})" if parent else "" except psutil.Error: parent = '' try: @@ -165,7 +165,7 @@ def run(pid, verbose=False): # here we go print_('pid', pinfo['pid']) print_('name', pinfo['name']) - print_('parent', '%s %s' % (pinfo['ppid'], parent)) + print_('parent', f"{pinfo['ppid']} {parent}") print_('exe', pinfo['exe']) print_('cwd', pinfo['cwd']) print_('cmdline', ' '.join(pinfo['cmdline'])) @@ -207,7 +207,7 @@ def run(pid, verbose=False): else: print_( "ionice", - "class=%s, value=%s" % (str(ionice.ioclass), ionice.value), + f"class={ionice.ioclass}, value={ionice.value}", ) print_('num-threads', pinfo['num_threads']) @@ -261,8 +261,8 @@ def run(pid, verbose=False): rip, rport = conn.raddr line = template % ( type, - "%s:%s" % (lip, lport), - "%s:%s" % (rip, rport), + f"{lip}:{lport}", + f"{rip}:{rport}", conn.status, ) print_('', line) @@ -277,7 +277,7 @@ def run(pid, verbose=False): print_("", "[...]") break print_('', template % thread) - print_('', "total=%s" % len(pinfo['threads'])) + print_('', f"total={len(pinfo['threads'])}") else: print_('threads', '') diff --git a/scripts/procsmem.py b/scripts/procsmem.py index c8eaf3407..17eee8fb0 100755 --- a/scripts/procsmem.py +++ b/scripts/procsmem.py @@ -53,8 +53,8 @@ def convert_bytes(n): for s in reversed(symbols): if n >= prefix[s]: value = float(n) / prefix[s] - return '%.1f%s' % (value, s) - return "%sB" % n + return f"{value:.1f}{s}" + return f"{n}B" def main(): @@ -97,7 +97,7 @@ def main(): print(line) if ad_pids: print( - "warning: access denied for %s pids" % (len(ad_pids)), + f"warning: access denied for {len(ad_pids)} pids", file=sys.stderr, ) diff --git a/scripts/sensors.py b/scripts/sensors.py index 861a47d79..fe1286ba5 100755 --- a/scripts/sensors.py +++ b/scripts/sensors.py @@ -34,7 +34,7 @@ def secs2hours(secs): mm, ss = divmod(secs, 60) hh, mm = divmod(mm, 60) - return "%d:%02d:%02d" % (hh, mm, ss) + return f"{int(hh)}:{int(mm):02}:{int(ss):02}" def main(): @@ -78,7 +78,7 @@ def main(): # Battery. if battery: print("Battery:") - print(" charge: %s%%" % round(battery.percent, 2)) + print(f" charge: {round(battery.percent, 2)}%") if battery.power_plugged: print( " status: %s" @@ -86,8 +86,8 @@ def main(): ) print(" plugged in: yes") else: - print(" left: %s" % secs2hours(battery.secsleft)) - print(" status: %s" % "discharging") + print(f" left: {secs2hours(battery.secsleft)}") + print(" status: discharging") print(" plugged in: no") diff --git a/scripts/top.py b/scripts/top.py index db206e896..03f235ec3 100755 --- a/scripts/top.py +++ b/scripts/top.py @@ -160,9 +160,9 @@ def get_dashes(perc): st = [] for x, y in procs_status.items(): if y: - st.append("%s=%s" % (x, y)) + st.append(f"{x}={y}") st.sort(key=lambda x: x[:3] in {'run', 'sle'}, reverse=1) - printl(" Processes: %s (%s)" % (num_procs, ', '.join(st))) + printl(f" Processes: {num_procs} ({', '.join(st)})") # load average, uptime uptime = datetime.datetime.now() - datetime.datetime.fromtimestamp( psutil.boot_time() diff --git a/scripts/winservices.py b/scripts/winservices.py index 216d0a652..6ff240c1d 100755 --- a/scripts/winservices.py +++ b/scripts/winservices.py @@ -43,7 +43,7 @@ def main(): for service in psutil.win_service_iter(): info = service.as_dict() - print("%r (%r)" % (info['name'], info['display_name'])) + print(f"{info['name']!r} ({info['display_name']!r})") s = "status: %s, start: %s, username: %s, pid: %s" % ( info['status'], info['start_type'], @@ -51,7 +51,7 @@ def main(): info['pid'], ) print(s) - print("binpath: %s" % info['binpath']) + print(f"binpath: {info['binpath']}") print()