diff --git a/.github/workflows/flake8.yml b/.github/workflows/flake8.yml index c6625b37..8abacb1b 100644 --- a/.github/workflows/flake8.yml +++ b/.github/workflows/flake8.yml @@ -21,6 +21,6 @@ jobs: - name: flake8 Lint uses: py-actions/flake8@v2 with: - ignore: "E501,W503" + ignore: "E501,W503,E252" exclude: "__init__.py, input/__init__.py" path: "pyerrors" diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 36981809..e4b517bd 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -40,6 +40,7 @@ jobs: pip install pytest-cov pip install pytest-benchmark pip install hypothesis + pip install typing_extensions pip freeze - name: Run tests diff --git a/pyerrors/correlators.py b/pyerrors/correlators.py index 0375155f..09fe9845 100644 --- a/pyerrors/correlators.py +++ b/pyerrors/correlators.py @@ -1,3 +1,4 @@ +from __future__ import annotations import warnings from itertools import permutations import numpy as np @@ -6,9 +7,11 @@ import scipy.linalg from .obs import Obs, reweight, correlate, CObs from .misc import dump_object, _assert_equal_properties -from .fits import least_squares +from .fits import least_squares, Fit_result from .roots import find_root from . import linalg +from numpy import ndarray, ufunc +from typing import Any, Callable, Optional, Union class Corr: @@ -42,7 +45,7 @@ class Corr: __slots__ = ["content", "N", "T", "tag", "prange"] - def __init__(self, data_input, padding=[0, 0], prange=None): + def __init__(self, data_input: Any, padding: list[int]=[0, 0], prange: Optional[list[int]]=None): """ Initialize a Corr object. Parameters @@ -73,7 +76,7 @@ def __init__(self, data_input, padding=[0, 0], prange=None): T = data_input[0, 0].T N = data_input.shape[0] - input_as_list = [] + input_as_list: list[Union[None, ndarray]] = [] for t in range(T): if any([(item.content[t] is None) for item in data_input.flatten()]): if not all([(item.content[t] is None) for item in data_input.flatten()]): @@ -97,7 +100,7 @@ def __init__(self, data_input, padding=[0, 0], prange=None): if all([isinstance(item, (Obs, CObs)) or item is None for item in data_input]): _assert_equal_properties([o for o in data_input if o is not None]) - self.content = [np.asarray([item]) if item is not None else None for item in data_input] + self.content: list[Union[None, ndarray]] = [np.asarray([item]) if item is not None else None for item in data_input] self.N = 1 elif all([isinstance(item, np.ndarray) or item is None for item in data_input]) and any([isinstance(item, np.ndarray) for item in data_input]): self.content = data_input @@ -119,14 +122,15 @@ def __init__(self, data_input, padding=[0, 0], prange=None): self.T = len(self.content) self.prange = prange - def __getitem__(self, idx): + def __getitem__(self, idx: Union[slice, int]) -> Union[CObs, Obs, ndarray, list[ndarray]]: """Return the content of timeslice idx""" - if self.content[idx] is None: + idx_content = self.content[idx] + if idx_content is None: return None - elif len(self.content[idx]) == 1: - return self.content[idx][0] + elif len(idx_content) == 1: + return idx_content[0] else: - return self.content[idx] + return idx_content @property def reweighted(self): @@ -151,7 +155,7 @@ def gamma_method(self, **kwargs): gm = gamma_method - def projected(self, vector_l=None, vector_r=None, normalize=False): + def projected(self, vector_l: Optional[Union[ndarray, list[Optional[ndarray]]]]=None, vector_r: Optional[Union[ndarray, list[Optional[ndarray]]]]=None, normalize: bool=False) -> "Corr": """We need to project the Correlator with a Vector to get a single value at each timeslice. The method can use one or two vectors. @@ -163,7 +167,7 @@ def projected(self, vector_l=None, vector_r=None, normalize=False): if vector_l is None: vector_l, vector_r = np.asarray([1.] + (self.N - 1) * [0.]), np.asarray([1.] + (self.N - 1) * [0.]) - elif (vector_r is None): + elif vector_r is None: vector_r = vector_l if isinstance(vector_l, list) and not isinstance(vector_r, list): if len(vector_l) != self.T: @@ -174,7 +178,7 @@ def projected(self, vector_l=None, vector_r=None, normalize=False): raise ValueError("Length of vector list must be equal to T") vector_l = [vector_l] * self.T - if not isinstance(vector_l, list): + if isinstance(vector_l, ndarray) and isinstance(vector_r, ndarray): if not vector_l.shape == vector_r.shape == (self.N,): raise ValueError("Vectors are of wrong shape!") if normalize: @@ -190,7 +194,7 @@ def projected(self, vector_l=None, vector_r=None, normalize=False): newcontent = [None if (_check_for_none(self, self.content[t]) or vector_l[t] is None or vector_r[t] is None) else np.asarray([vector_l[t].T @ self.content[t] @ vector_r[t]]) for t in range(self.T)] return Corr(newcontent) - def item(self, i, j): + def item(self, i: int, j: int) -> "Corr": """Picks the element [i,j] from every matrix and returns a correlator containing one Obs per timeslice. Parameters @@ -205,7 +209,7 @@ def item(self, i, j): newcontent = [None if (item is None) else item[i, j] for item in self.content] return Corr(newcontent) - def plottable(self): + def plottable(self) -> tuple[list, list, list]: """Outputs the correlator in a plotable format. Outputs three lists containing the timeslice index, the value on each @@ -219,7 +223,7 @@ def plottable(self): return x_list, y_list, y_err_list - def symmetric(self): + def symmetric(self) -> "Corr": """ Symmetrize the correlator around x0=0.""" if self.N != 1: raise ValueError('symmetric cannot be safely applied to multi-dimensional correlators.') @@ -236,11 +240,11 @@ def symmetric(self): newcontent.append(None) else: newcontent.append(0.5 * (self.content[t] + self.content[self.T - t])) - if (all([x is None for x in newcontent])): + if all([x is None for x in newcontent]): raise ValueError("Corr could not be symmetrized: No redundant values") return Corr(newcontent, prange=self.prange) - def anti_symmetric(self): + def anti_symmetric(self) -> "Corr": """Anti-symmetrize the correlator around x0=0.""" if self.N != 1: raise TypeError('anti_symmetric cannot be safely applied to multi-dimensional correlators.') @@ -277,11 +281,11 @@ def is_matrix_symmetric(self): return False return True - def trace(self): + def trace(self) -> "Corr": """Calculates the per-timeslice trace of a correlator matrix.""" if self.N == 1: raise ValueError("Only works for correlator matrices.") - newcontent = [] + newcontent: list[Union[None, float]] = [] for t in range(self.T): if _check_for_none(self, self.content[t]): newcontent.append(None) @@ -289,7 +293,7 @@ def trace(self): newcontent.append(np.trace(self.content[t])) return Corr(newcontent) - def matrix_symmetric(self): + def matrix_symmetric(self) -> "Corr": """Symmetrizes the correlator matrices on every timeslice.""" if self.N == 1: raise ValueError("Trying to symmetrize a correlator matrix, that already has N=1.") @@ -299,7 +303,7 @@ def matrix_symmetric(self): transposed = [None if _check_for_none(self, G) else G.T for G in self.content] return 0.5 * (Corr(transposed) + self) - def GEVP(self, t0, ts=None, sort="Eigenvalue", vector_obs=False, **kwargs): + def GEVP(self, t0: int, ts: Optional[int]=None, sort: Optional[str]="Eigenvalue", vector_obs: bool=False, **kwargs) -> Union[list[list[Optional[ndarray]]], ndarray, list[Optional[ndarray]]]: r'''Solve the generalized eigenvalue problem on the correlator matrix and returns the corresponding eigenvectors. The eigenvectors are sorted according to the descending eigenvalues, the zeroth eigenvector(s) correspond to the @@ -405,7 +409,7 @@ def _get_mat_at_t(t, vector_obs=vector_obs): else: return reordered_vecs - def Eigenvalue(self, t0, ts=None, state=0, sort="Eigenvalue", **kwargs): + def Eigenvalue(self, t0: int, ts: None=None, state: int=0, sort: str="Eigenvalue", **kwargs) -> "Corr": """Determines the eigenvalue of the GEVP by solving and projecting the correlator Parameters @@ -418,7 +422,7 @@ def Eigenvalue(self, t0, ts=None, state=0, sort="Eigenvalue", **kwargs): vec = self.GEVP(t0, ts=ts, sort=sort, **kwargs)[state] return self.projected(vec) - def Hankel(self, N, periodic=False): + def Hankel(self, N: int, periodic: bool=False) -> "Corr": """Constructs an NxN Hankel matrix C(t) c(t+1) ... c(t+n-1) @@ -459,7 +463,7 @@ def wrap(i): return Corr(new_content) - def roll(self, dt): + def roll(self, dt: int) -> "Corr": """Periodically shift the correlator by dt timeslices Parameters @@ -469,11 +473,11 @@ def roll(self, dt): """ return Corr(list(np.roll(np.array(self.content, dtype=object), dt, axis=0))) - def reverse(self): + def reverse(self) -> "Corr": """Reverse the time ordering of the Corr""" return Corr(self.content[:: -1]) - def thin(self, spacing=2, offset=0): + def thin(self, spacing: int=2, offset: int=0) -> "Corr": """Thin out a correlator to suppress correlations Parameters @@ -483,7 +487,7 @@ def thin(self, spacing=2, offset=0): offset : int Offset the equal spacing """ - new_content = [] + new_content: list[Union[None, list, ndarray]] = [] for t in range(self.T): if (offset + t) % spacing != 0: new_content.append(None) @@ -491,7 +495,7 @@ def thin(self, spacing=2, offset=0): new_content.append(self.content[t]) return Corr(new_content) - def correlate(self, partner): + def correlate(self, partner: Union[Corr, float, Obs]) -> "Corr": """Correlate the correlator with another correlator or Obs Parameters @@ -503,7 +507,7 @@ def correlate(self, partner): """ if self.N != 1: raise ValueError("Only one-dimensional correlators can be safely correlated.") - new_content = [] + new_content: list[Union[None, ndarray]] = [] for x0, t_slice in enumerate(self.content): if _check_for_none(self, t_slice): new_content.append(None) @@ -520,7 +524,7 @@ def correlate(self, partner): return Corr(new_content) - def reweight(self, weight, **kwargs): + def reweight(self, weight: Obs, **kwargs) -> "Corr": """Reweight the correlator. Parameters @@ -535,7 +539,7 @@ def reweight(self, weight, **kwargs): """ if self.N != 1: raise Exception("Reweighting only implemented for one-dimensional correlators.") - new_content = [] + new_content: list[Union[None, ndarray]] = [] for t_slice in self.content: if _check_for_none(self, t_slice): new_content.append(None) @@ -543,7 +547,7 @@ def reweight(self, weight, **kwargs): new_content.append(np.array(reweight(weight, t_slice, **kwargs))) return Corr(new_content) - def T_symmetry(self, partner, parity=+1): + def T_symmetry(self, partner: "Corr", parity: int=+1) -> "Corr": """Return the time symmetry average of the correlator and its partner Parameters @@ -573,7 +577,7 @@ def T_symmetry(self, partner, parity=+1): return (self + T_partner) / 2 - def deriv(self, variant="symmetric"): + def deriv(self, variant: Optional[str]="symmetric") -> "Corr": """Return the first derivative of the correlator with respect to x0. Parameters @@ -584,8 +588,8 @@ def deriv(self, variant="symmetric"): """ if self.N != 1: raise ValueError("deriv only implemented for one-dimensional correlators.") + newcontent: list[Union[None, ndarray, Obs]] = [] if variant == "symmetric": - newcontent = [] for t in range(1, self.T - 1): if (self.content[t - 1] is None) or (self.content[t + 1] is None): newcontent.append(None) @@ -595,7 +599,6 @@ def deriv(self, variant="symmetric"): raise ValueError('Derivative is undefined at all timeslices') return Corr(newcontent, padding=[1, 1]) elif variant == "forward": - newcontent = [] for t in range(self.T - 1): if (self.content[t] is None) or (self.content[t + 1] is None): newcontent.append(None) @@ -605,7 +608,6 @@ def deriv(self, variant="symmetric"): raise ValueError("Derivative is undefined at all timeslices") return Corr(newcontent, padding=[0, 1]) elif variant == "backward": - newcontent = [] for t in range(1, self.T): if (self.content[t - 1] is None) or (self.content[t] is None): newcontent.append(None) @@ -615,7 +617,6 @@ def deriv(self, variant="symmetric"): raise ValueError("Derivative is undefined at all timeslices") return Corr(newcontent, padding=[1, 0]) elif variant == "improved": - newcontent = [] for t in range(2, self.T - 2): if (self.content[t - 2] is None) or (self.content[t - 1] is None) or (self.content[t + 1] is None) or (self.content[t + 2] is None): newcontent.append(None) @@ -625,7 +626,6 @@ def deriv(self, variant="symmetric"): raise ValueError('Derivative is undefined at all timeslices') return Corr(newcontent, padding=[2, 2]) elif variant == 'log': - newcontent = [] for t in range(self.T): if (self.content[t] is None) or (self.content[t] <= 0): newcontent.append(None) @@ -638,7 +638,7 @@ def deriv(self, variant="symmetric"): else: raise ValueError("Unknown variant.") - def second_deriv(self, variant="symmetric"): + def second_deriv(self, variant: Optional[str]="symmetric") -> "Corr": r"""Return the second derivative of the correlator with respect to x0. Parameters @@ -657,8 +657,8 @@ def second_deriv(self, variant="symmetric"): """ if self.N != 1: raise ValueError("second_deriv only implemented for one-dimensional correlators.") + newcontent: list[Union[None, ndarray, Obs]] = [] if variant == "symmetric": - newcontent = [] for t in range(1, self.T - 1): if (self.content[t - 1] is None) or (self.content[t + 1] is None): newcontent.append(None) @@ -668,7 +668,6 @@ def second_deriv(self, variant="symmetric"): raise ValueError("Derivative is undefined at all timeslices") return Corr(newcontent, padding=[1, 1]) elif variant == "big_symmetric": - newcontent = [] for t in range(2, self.T - 2): if (self.content[t - 2] is None) or (self.content[t + 2] is None): newcontent.append(None) @@ -678,7 +677,6 @@ def second_deriv(self, variant="symmetric"): raise ValueError("Derivative is undefined at all timeslices") return Corr(newcontent, padding=[2, 2]) elif variant == "improved": - newcontent = [] for t in range(2, self.T - 2): if (self.content[t - 2] is None) or (self.content[t - 1] is None) or (self.content[t] is None) or (self.content[t + 1] is None) or (self.content[t + 2] is None): newcontent.append(None) @@ -688,7 +686,6 @@ def second_deriv(self, variant="symmetric"): raise ValueError("Derivative is undefined at all timeslices") return Corr(newcontent, padding=[2, 2]) elif variant == 'log': - newcontent = [] for t in range(self.T): if (self.content[t] is None) or (self.content[t] <= 0): newcontent.append(None) @@ -701,7 +698,7 @@ def second_deriv(self, variant="symmetric"): else: raise ValueError("Unknown variant.") - def m_eff(self, variant='log', guess=1.0): + def m_eff(self, variant: str='log', guess: float=1.0) -> "Corr": """Returns the effective mass of the correlator as correlator object Parameters @@ -785,7 +782,7 @@ def root_function(x, d): else: raise ValueError('Unknown variant.') - def fit(self, function, fitrange=None, silent=False, **kwargs): + def fit(self, function: Callable, fitrange: Optional[Union[str, list[int]]]=None, silent: bool=False, **kwargs) -> Fit_result: r'''Fits function to the data Parameters @@ -819,7 +816,7 @@ def fit(self, function, fitrange=None, silent=False, **kwargs): result = least_squares(xs, ys, function, silent=silent, **kwargs) return result - def plateau(self, plateau_range=None, method="fit", auto_gamma=False): + def plateau(self, plateau_range: Optional[list[int]]=None, method: str="fit", auto_gamma: bool=False) -> Obs: """ Extract a plateau value from a Corr object Parameters @@ -856,7 +853,7 @@ def const_func(a, t): else: raise ValueError("Unsupported plateau method: " + method) - def set_prange(self, prange): + def set_prange(self, prange: list[int]): """Sets the attribute prange of the Corr object.""" if not len(prange) == 2: raise ValueError("prange must be a list or array with two values") @@ -868,7 +865,7 @@ def set_prange(self, prange): self.prange = prange return - def show(self, x_range=None, comp=None, y_range=None, logscale=False, plateau=None, fit_res=None, fit_key=None, ylabel=None, save=None, auto_gamma=False, hide_sigma=None, references=None, title=None): + def show(self, x_range: Optional[list[int]]=None, comp: Optional[Corr]=None, y_range: None=None, logscale: bool=False, plateau: None=None, fit_res: Optional[Fit_result]=None, fit_key: Optional[str]=None, ylabel: None=None, save: None=None, auto_gamma: bool=False, hide_sigma: None=None, references: None=None, title: None=None): """Plots the correlator using the tag of the correlator as label if available. Parameters @@ -993,7 +990,7 @@ def show(self, x_range=None, comp=None, y_range=None, logscale=False, plateau=No else: raise TypeError("'save' has to be a string.") - def spaghetti_plot(self, logscale=True): + def spaghetti_plot(self, logscale: bool=True): """Produces a spaghetti plot of the correlator suited to monitor exceptional configurations. Parameters @@ -1022,7 +1019,7 @@ def spaghetti_plot(self, logscale=True): plt.title(name) plt.draw() - def dump(self, filename, datatype="json.gz", **kwargs): + def dump(self, filename: str, datatype: str="json.gz", **kwargs): """Dumps the Corr into a file of chosen type Parameters ---------- @@ -1046,10 +1043,10 @@ def dump(self, filename, datatype="json.gz", **kwargs): else: raise ValueError("Unknown datatype " + str(datatype)) - def print(self, print_range=None): + def print(self, print_range: Optional[list[int]]=None): print(self.__repr__(print_range)) - def __repr__(self, print_range=None): + def __repr__(self, print_range: Optional[list[int]]=None) -> str: if print_range is None: print_range = [0, None] @@ -1074,7 +1071,7 @@ def __repr__(self, print_range=None): content_string += '\n' return content_string - def __str__(self): + def __str__(self) -> str: return self.__repr__() # We define the basic operations, that can be performed with correlators. @@ -1084,18 +1081,18 @@ def __str__(self): __array_priority__ = 10000 - def __eq__(self, y): + def __eq__(self, y: Union[Corr, Obs, int]) -> ndarray: if isinstance(y, Corr): comp = np.asarray(y.content, dtype=object) else: comp = np.asarray(y) return np.asarray(self.content, dtype=object) == comp - def __add__(self, y): + def __add__(self, y: Any) -> "Corr": if isinstance(y, Corr): if ((self.N != y.N) or (self.T != y.T)): raise ValueError("Addition of Corrs with different shape") - newcontent = [] + newcontent: list[Union[None, ndarray, Obs]] = [] for t in range(self.T): if _check_for_none(self, self.content[t]) or _check_for_none(y, y.content[t]): newcontent.append(None) @@ -1104,7 +1101,7 @@ def __add__(self, y): return Corr(newcontent) elif isinstance(y, (Obs, int, float, CObs, complex)): - newcontent = [] + newcontent: list[Union[None, ndarray, Obs]] = [] for t in range(self.T): if _check_for_none(self, self.content[t]): newcontent.append(None) @@ -1119,11 +1116,11 @@ def __add__(self, y): else: raise TypeError("Corr + wrong type") - def __mul__(self, y): + def __mul__(self, y: Any) -> "Corr": if isinstance(y, Corr): if not ((self.N == 1 or y.N == 1 or self.N == y.N) and self.T == y.T): raise ValueError("Multiplication of Corr object requires N=N or N=1 and T=T") - newcontent = [] + newcontent: list[Union[None, ndarray, Obs]] = [] for t in range(self.T): if _check_for_none(self, self.content[t]) or _check_for_none(y, y.content[t]): newcontent.append(None) @@ -1147,13 +1144,13 @@ def __mul__(self, y): else: raise TypeError("Corr * wrong type") - def __matmul__(self, y): + def __matmul__(self, y: Union[Corr, ndarray]) -> "Corr": if isinstance(y, np.ndarray): if y.ndim != 2 or y.shape[0] != y.shape[1]: raise ValueError("Can only multiply correlators by square matrices.") if not self.N == y.shape[0]: raise ValueError("matmul: mismatch of matrix dimensions") - newcontent = [] + newcontent: list[Union[None, ndarray, Obs]] = [] for t in range(self.T): if _check_for_none(self, self.content[t]): newcontent.append(None) @@ -1174,13 +1171,13 @@ def __matmul__(self, y): else: return NotImplemented - def __rmatmul__(self, y): + def __rmatmul__(self, y: ndarray) -> "Corr": if isinstance(y, np.ndarray): if y.ndim != 2 or y.shape[0] != y.shape[1]: raise ValueError("Can only multiply correlators by square matrices.") if not self.N == y.shape[0]: raise ValueError("matmul: mismatch of matrix dimensions") - newcontent = [] + newcontent: list[Union[None, ndarray, Obs]] = [] for t in range(self.T): if _check_for_none(self, self.content[t]): newcontent.append(None) @@ -1190,11 +1187,11 @@ def __rmatmul__(self, y): else: return NotImplemented - def __truediv__(self, y): + def __truediv__(self, y: Union[Corr, float, ndarray, int]) -> "Corr": if isinstance(y, Corr): if not ((self.N == 1 or y.N == 1 or self.N == y.N) and self.T == y.T): raise ValueError("Multiplication of Corr object requires N=N or N=1 and T=T") - newcontent = [] + newcontent: list[Union[None, ndarray, Obs]] = [] for t in range(self.T): if _check_for_none(self, self.content[t]) or _check_for_none(y, y.content[t]): newcontent.append(None) @@ -1229,7 +1226,7 @@ def __truediv__(self, y): elif isinstance(y, (int, float)): if y == 0: raise ValueError('Division by zero will return undefined correlator') - newcontent = [] + newcontent: list[Union[None, ndarray, Obs]] = [] for t in range(self.T): if _check_for_none(self, self.content[t]): newcontent.append(None) @@ -1244,37 +1241,37 @@ def __truediv__(self, y): else: raise TypeError('Corr / wrong type') - def __neg__(self): + def __neg__(self) -> "Corr": newcontent = [None if _check_for_none(self, item) else -1. * item for item in self.content] return Corr(newcontent, prange=self.prange) - def __sub__(self, y): + def __sub__(self, y: Union[Corr, float, ndarray, int]) -> "Corr": return self + (-y) - def __pow__(self, y): + def __pow__(self, y: Union[float, int]) -> "Corr": if isinstance(y, (Obs, int, float, CObs)): newcontent = [None if _check_for_none(self, item) else item**y for item in self.content] return Corr(newcontent, prange=self.prange) else: raise TypeError('Type of exponent not supported') - def __abs__(self): + def __abs__(self) -> "Corr": newcontent = [None if _check_for_none(self, item) else np.abs(item) for item in self.content] return Corr(newcontent, prange=self.prange) # The numpy functions: - def sqrt(self): + def sqrt(self) -> "Corr": return self ** 0.5 - def log(self): + def log(self) -> "Corr": newcontent = [None if _check_for_none(self, item) else np.log(item) for item in self.content] return Corr(newcontent, prange=self.prange) - def exp(self): + def exp(self) -> "Corr": newcontent = [None if _check_for_none(self, item) else np.exp(item) for item in self.content] return Corr(newcontent, prange=self.prange) - def _apply_func_to_corr(self, func): + def _apply_func_to_corr(self, func: Union[Callable, ufunc]) -> "Corr": newcontent = [None if _check_for_none(self, item) else func(item) for item in self.content] for t in range(self.T): if _check_for_none(self, newcontent[t]): @@ -1287,57 +1284,57 @@ def _apply_func_to_corr(self, func): raise ValueError('Operation returns undefined correlator') return Corr(newcontent) - def sin(self): + def sin(self) -> "Corr": return self._apply_func_to_corr(np.sin) - def cos(self): + def cos(self) -> "Corr": return self._apply_func_to_corr(np.cos) - def tan(self): + def tan(self) -> "Corr": return self._apply_func_to_corr(np.tan) - def sinh(self): + def sinh(self) -> "Corr": return self._apply_func_to_corr(np.sinh) - def cosh(self): + def cosh(self) -> "Corr": return self._apply_func_to_corr(np.cosh) - def tanh(self): + def tanh(self) -> "Corr": return self._apply_func_to_corr(np.tanh) - def arcsin(self): + def arcsin(self) -> "Corr": return self._apply_func_to_corr(np.arcsin) - def arccos(self): + def arccos(self) -> "Corr": return self._apply_func_to_corr(np.arccos) - def arctan(self): + def arctan(self) -> "Corr": return self._apply_func_to_corr(np.arctan) - def arcsinh(self): + def arcsinh(self) -> "Corr": return self._apply_func_to_corr(np.arcsinh) - def arccosh(self): + def arccosh(self) -> "Corr": return self._apply_func_to_corr(np.arccosh) - def arctanh(self): + def arctanh(self) -> "Corr": return self._apply_func_to_corr(np.arctanh) # Right hand side operations (require tweak in main module to work) def __radd__(self, y): return self + y - def __rsub__(self, y): + def __rsub__(self, y: int) -> "Corr": return -self + y - def __rmul__(self, y): + def __rmul__(self, y: Union[float, int]) -> "Corr": return self * y - def __rtruediv__(self, y): + def __rtruediv__(self, y: int) -> "Corr": return (self / y) ** (-1) @property - def real(self): + def real(self) -> "Corr": def return_real(obs_OR_cobs): if isinstance(obs_OR_cobs.flatten()[0], CObs): return np.vectorize(lambda x: x.real)(obs_OR_cobs) @@ -1347,7 +1344,7 @@ def return_real(obs_OR_cobs): return self._apply_func_to_corr(return_real) @property - def imag(self): + def imag(self) -> "Corr": def return_imag(obs_OR_cobs): if isinstance(obs_OR_cobs.flatten()[0], CObs): return np.vectorize(lambda x: x.imag)(obs_OR_cobs) @@ -1356,7 +1353,7 @@ def return_imag(obs_OR_cobs): return self._apply_func_to_corr(return_imag) - def prune(self, Ntrunc, tproj=3, t0proj=2, basematrix=None): + def prune(self, Ntrunc: int, tproj: int=3, t0proj: int=2, basematrix: None=None) -> "Corr": r''' Project large correlation matrix to lowest states This method can be used to reduce the size of an (N x N) correlation matrix @@ -1414,7 +1411,7 @@ def prune(self, Ntrunc, tproj=3, t0proj=2, basematrix=None): return Corr(newcontent) -def _sort_vectors(vec_set_in, ts): +def _sort_vectors(vec_set_in: list[Optional[ndarray]], ts: int) -> list[Union[None, ndarray, list[ndarray]]]: """Helper function used to find a set of Eigenvectors consistent over all timeslices""" if isinstance(vec_set_in[ts][0][0], Obs): @@ -1446,12 +1443,12 @@ def _sort_vectors(vec_set_in, ts): return sorted_vec_set -def _check_for_none(corr, entry): +def _check_for_none(corr: Corr, entry: Optional[ndarray]) -> bool: """Checks if entry for correlator corr is None""" return len(list(filter(None, np.asarray(entry).flatten()))) < corr.N ** 2 -def _GEVP_solver(Gt, G0, method='eigh', chol_inv=None): +def _GEVP_solver(Gt: Optional[ndarray], G0: ndarray, method: str='eigh', chol_inv: Optional[ndarray]=None) -> ndarray: r"""Helper function for solving the GEVP and sorting the eigenvectors. Solves $G(t)v_i=\lambda_i G(t_0)v_i$ and returns the eigenvectors v_i diff --git a/pyerrors/covobs.py b/pyerrors/covobs.py index 64d9d6ec..2c654ee0 100644 --- a/pyerrors/covobs.py +++ b/pyerrors/covobs.py @@ -1,9 +1,12 @@ +from __future__ import annotations import numpy as np +from numpy import ndarray +from typing import Any, Optional, Union class Covobs: - def __init__(self, mean, cov, name, pos=None, grad=None): + def __init__(self, mean: Optional[Union[float, int]], cov: Any, name: str, pos: Optional[int]=None, grad: Optional[Union[ndarray, list[float]]]=None): """ Initialize Covobs object. Parameters @@ -39,12 +42,12 @@ def __init__(self, mean, cov, name, pos=None, grad=None): self._set_grad(grad) self.value = mean - def errsq(self): + def errsq(self) -> float: """ Return the variance (= square of the error) of the Covobs """ return np.dot(np.transpose(self.grad), np.dot(self.cov, self.grad)).item() - def _set_cov(self, cov): + def _set_cov(self, cov: Any): """ Set the covariance matrix of the covobs Parameters @@ -79,7 +82,7 @@ def _set_cov(self, cov): if ev < 0: raise Exception('Covariance matrix is not positive-semidefinite!') - def _set_grad(self, grad): + def _set_grad(self, grad: Union[list[float], ndarray]): """ Set the gradient of the covobs Parameters @@ -96,9 +99,9 @@ def _set_grad(self, grad): raise Exception('Invalid dimension of grad!') @property - def cov(self): + def cov(self) -> ndarray: return self._cov @property - def grad(self): + def grad(self) -> ndarray: return self._grad diff --git a/pyerrors/dirac.py b/pyerrors/dirac.py index 016e4722..9d4244aa 100644 --- a/pyerrors/dirac.py +++ b/pyerrors/dirac.py @@ -1,4 +1,6 @@ +from __future__ import annotations import numpy as np +from numpy import ndarray gammaX = np.array( @@ -22,7 +24,7 @@ dtype=complex) -def epsilon_tensor(i, j, k): +def epsilon_tensor(i: int, j: int, k: int) -> float: """Rank-3 epsilon tensor Based on https://codegolf.stackexchange.com/a/160375 @@ -39,7 +41,7 @@ def epsilon_tensor(i, j, k): return (i - j) * (j - k) * (k - i) / 2 -def epsilon_tensor_rank4(i, j, k, o): +def epsilon_tensor_rank4(i: int, j: int, k: int, o: int) -> float: """Rank-4 epsilon tensor Extension of https://codegolf.stackexchange.com/a/160375 @@ -57,7 +59,7 @@ def epsilon_tensor_rank4(i, j, k, o): return (i - j) * (j - k) * (k - i) * (i - o) * (j - o) * (o - k) / 12 -def Grid_gamma(gamma_tag): +def Grid_gamma(gamma_tag: str) -> ndarray: """Returns gamma matrix in Grid labeling.""" if gamma_tag == 'Identity': g = identity diff --git a/pyerrors/fits.py b/pyerrors/fits.py index 8ed540c5..f1b22e76 100644 --- a/pyerrors/fits.py +++ b/pyerrors/fits.py @@ -1,3 +1,4 @@ +from __future__ import annotations import gc from collections.abc import Sequence import warnings @@ -15,6 +16,8 @@ from numdifftools import Jacobian as num_jacobian from numdifftools import Hessian as num_hessian from .obs import Obs, derived_observable, covariance, cov_Obs, invert_corr_cov_cholesky +from numpy import ndarray +from typing import Any, Callable, Optional, Union class Fit_result(Sequence): @@ -33,13 +36,31 @@ class Fit_result(Sequence): Hotelling t-squared p-value for correlated fits. """ - def __init__(self): - self.fit_parameters = None - - def __getitem__(self, idx): + def __init__(self) -> None: + self.fit_parameters: Optional[list] = None + self.fit_function: Optional[Union[Callable, dict[str, Callable]]] = None + self.priors: Optional[Union[list[Obs], dict[int, Obs]]] = None + self.method: Optional[str] = None + self.iterations: Optional[int] = None + self.chisquare: Optional[float] = None + self.odr_chisquare: Optional[float] = None + self.dof: Optional[int] = None + self.p_value: Optional[float] = None + self.message: Optional[str] = None + self.t2_p_value: Optional[float] = None + self.chisquare_by_dof: Optional[float] = None + self.chisquare_by_expected_chisquare: Optional[float] = None + self.residual_variance: Optional[float] = None + self.xplus: Optional[float] = None + + def __getitem__(self, idx: int) -> Obs: + if self.fit_parameters is None: + raise TypeError('No fit parameters available.') return self.fit_parameters[idx] - def __len__(self): + def __len__(self) -> int: + if self.fit_parameters is None: + raise TypeError('No fit parameters available.') return len(self.fit_parameters) def gamma_method(self, **kwargs): @@ -48,29 +69,31 @@ def gamma_method(self, **kwargs): gm = gamma_method - def __str__(self): + def __str__(self) -> str: my_str = 'Goodness of fit:\n' - if hasattr(self, 'chisquare_by_dof'): + if self.chisquare_by_dof is not None: my_str += '\u03C7\u00b2/d.o.f. = ' + f'{self.chisquare_by_dof:2.6f}' + '\n' - elif hasattr(self, 'residual_variance'): + elif self.residual_variance is not None: my_str += 'residual variance = ' + f'{self.residual_variance:2.6f}' + '\n' - if hasattr(self, 'chisquare_by_expected_chisquare'): + if self.chisquare_by_expected_chisquare is not None: my_str += '\u03C7\u00b2/\u03C7\u00b2exp = ' + f'{self.chisquare_by_expected_chisquare:2.6f}' + '\n' - if hasattr(self, 'p_value'): + if self.p_value is not None: my_str += 'p-value = ' + f'{self.p_value:2.4f}' + '\n' - if hasattr(self, 't2_p_value'): + if self.t2_p_value is not None: my_str += 't\u00B2p-value = ' + f'{self.t2_p_value:2.4f}' + '\n' my_str += 'Fit parameters:\n' + if self.fit_parameters is None: + raise TypeError('No fit parameters available.') for i_par, par in enumerate(self.fit_parameters): my_str += str(i_par) + '\t' + ' ' * int(par >= 0) + str(par).rjust(int(par < 0.0)) + '\n' return my_str - def __repr__(self): + def __repr__(self) -> str: m = max(map(len, list(self.__dict__.keys()))) + 1 return '\n'.join([key.rjust(m) + ': ' + repr(value) for key, value in sorted(self.__dict__.items())]) -def least_squares(x, y, func, priors=None, silent=False, **kwargs): +def least_squares(x: Any, y: Union[dict[str, ndarray], list[Obs], ndarray, dict[str, list[Obs]]], func: Union[Callable, dict[str, Callable]], priors: Optional[Union[dict[int, str], list[str], list[Obs], dict[int, Obs]]]=None, silent: bool=False, **kwargs) -> Fit_result: r'''Performs a non-linear fit to y = func(x). ``` @@ -335,9 +358,8 @@ def func_b(a, x): p_f = dp_f = np.array([]) prior_mask = [] loc_priors = [] - - if 'initial_guess' in kwargs: - x0 = kwargs.get('initial_guess') + x0 = kwargs.get('initial_guess') + if x0 is not None: if len(x0) != n_parms: raise ValueError('Initial guess does not have the correct length: %d vs. %d' % (len(x0), n_parms)) else: @@ -356,8 +378,8 @@ def chisqfunc_uncorr(p): return anp.sum(general_chisqfunc_uncorr(p, y_f, p_f) ** 2) if kwargs.get('correlated_fit') is True: - if 'inv_chol_cov_matrix' in kwargs: - chol_inv = kwargs.get('inv_chol_cov_matrix') + chol_inv = kwargs.get('inv_chol_cov_matrix') + if chol_inv is not None: if (chol_inv[0].shape[0] != len(dy_f)): raise TypeError('The number of columns of the inverse covariance matrix handed over needs to be equal to the number of y errors.') if (chol_inv[0].shape[0] != chol_inv[0].shape[1]): @@ -386,17 +408,17 @@ def chisqfunc(p): if output.method != 'Levenberg-Marquardt': if output.method == 'migrad': - tolerance = 1e-4 # default value of 1e-1 set by iminuit can be problematic - if 'tol' in kwargs: - tolerance = kwargs.get('tol') + tolerance = kwargs.get('tol') + if tolerance is None: + tolerance = 1e-4 # default value of 1e-1 set by iminuit can be problematic fit_result = iminuit.minimize(chisqfunc_uncorr, x0, tol=tolerance) # Stopping criterion 0.002 * tol * errordef if kwargs.get('correlated_fit') is True: fit_result = iminuit.minimize(chisqfunc, fit_result.x, tol=tolerance) output.iterations = fit_result.nfev else: - tolerance = 1e-12 - if 'tol' in kwargs: - tolerance = kwargs.get('tol') + tolerance = kwargs.get('tol') + if tolerance is None: + tolerance = 1e-12 fit_result = scipy.optimize.minimize(chisqfunc_uncorr, x0, method=kwargs.get('method'), tol=tolerance) if kwargs.get('correlated_fit') is True: fit_result = scipy.optimize.minimize(chisqfunc, fit_result.x, method=kwargs.get('method'), tol=tolerance) @@ -426,8 +448,8 @@ def chisqfunc_residuals(p): if not fit_result.success: raise Exception('The minimization procedure did not converge.') - output.chisquare = chisquare - output.dof = y_all.shape[-1] - n_parms + len(loc_priors) + output.chisquare = float(chisquare) + output.dof = int(y_all.shape[-1] - n_parms + len(loc_priors)) output.p_value = 1 - scipy.stats.chi2.cdf(output.chisquare, output.dof) if output.dof > 0: output.chisquare_by_dof = output.chisquare / output.dof @@ -503,7 +525,7 @@ def chisqfunc_compact(d): return output -def total_least_squares(x, y, func, silent=False, **kwargs): +def total_least_squares(x: list[Obs], y: list[Obs], func: Callable, silent: bool=False, **kwargs) -> Fit_result: r'''Performs a non-linear fit to y = func(x) and returns a list of Obs corresponding to the fit parameters. Parameters @@ -600,8 +622,8 @@ def func(a, x): if np.any(np.asarray(dy_f) <= 0.0): raise Exception('No y errors available, run the gamma method first.') - if 'initial_guess' in kwargs: - x0 = kwargs.get('initial_guess') + x0 = kwargs.get('initial_guess') + if x0 is not None: if len(x0) != n_parms: raise Exception('Initial guess does not have the correct length: %d vs. %d' % (len(x0), n_parms)) else: @@ -707,7 +729,7 @@ def odr_chisquare_compact_y(d): return output -def fit_lin(x, y, **kwargs): +def fit_lin(x: Sequence[Union[Obs, int, float]], y: Sequence[Obs], **kwargs) -> list[Obs]: """Performs a linear fit to y = n + m * x and returns two Obs n, m. Parameters @@ -738,7 +760,7 @@ def f(a, x): raise TypeError('Unsupported types for x') -def qqplot(x, o_y, func, p, title=""): +def qqplot(x: ndarray, o_y: list[Obs], func: Callable, p: list[Obs], title: str=""): """Generates a quantile-quantile plot of the fit result which can be used to check if the residuals of the fit are gaussian distributed. @@ -768,7 +790,7 @@ def qqplot(x, o_y, func, p, title=""): plt.draw() -def residual_plot(x, y, func, fit_res, title=""): +def residual_plot(x: ndarray, y: list[Obs], func: Callable, fit_res: list[Obs], title: str=""): """Generates a plot which compares the fit to the data and displays the corresponding residuals For uncorrelated data the residuals are expected to be distributed ~N(0,1). @@ -805,7 +827,7 @@ def residual_plot(x, y, func, fit_res, title=""): plt.draw() -def error_band(x, func, beta): +def error_band(x: list[int], func: Callable, beta: list[Obs]) -> ndarray: """Calculate the error band for an array of sample values x, for given fit function func with optimized parameters beta. Returns @@ -829,7 +851,7 @@ def error_band(x, func, beta): return err -def ks_test(objects=None): +def ks_test(objects: Optional[list[Fit_result]]=None): """Performs a Kolmogorov–Smirnov test for the p-values of all fit object. Parameters @@ -873,7 +895,7 @@ def ks_test(objects=None): print(scipy.stats.kstest(p_values, 'uniform')) -def _extract_val_and_dval(string): +def _extract_val_and_dval(string: str) -> tuple[float, float]: split_string = string.split('(') if '.' in split_string[0] and '.' not in split_string[1][:-1]: factor = 10 ** -len(split_string[0].partition('.')[2]) @@ -882,11 +904,13 @@ def _extract_val_and_dval(string): return float(split_string[0]), float(split_string[1][:-1]) * factor -def _construct_prior_obs(i_prior, i_n): +def _construct_prior_obs(i_prior: Union[Obs, str], i_n: int) -> Obs: if isinstance(i_prior, Obs): return i_prior elif isinstance(i_prior, str): loc_val, loc_dval = _extract_val_and_dval(i_prior) - return cov_Obs(loc_val, loc_dval ** 2, '#prior' + str(i_n) + f"_{np.random.randint(2147483647):010d}") + prior_obs = cov_Obs(loc_val, loc_dval ** 2, '#prior' + str(i_n) + f"_{np.random.randint(2147483647):010d}") + assert isinstance(prior_obs, Obs) + return prior_obs else: raise TypeError("Prior entries need to be 'Obs' or 'str'.") diff --git a/pyerrors/input/dobs.py b/pyerrors/input/dobs.py index aea9b7a9..719773b6 100644 --- a/pyerrors/input/dobs.py +++ b/pyerrors/input/dobs.py @@ -1,3 +1,4 @@ +from __future__ import annotations from collections import defaultdict import gzip import lxml.etree as et @@ -11,12 +12,15 @@ from ..obs import _merge_idx from ..covobs import Covobs from .. import version as pyerrorsversion +from lxml.etree import _Element +from numpy import ndarray +from typing import Any, Optional, Union # Based on https://stackoverflow.com/a/10076823 -def _etree_to_dict(t): +def _etree_to_dict(t: _Element) -> dict: """ Convert the content of an XML file to a python dict""" - d = {t.tag: {} if t.attrib else None} + d: dict = {t.tag: {} if t.attrib else None} children = list(t) if children: dd = defaultdict(list) @@ -38,7 +42,7 @@ def _etree_to_dict(t): return d -def _dict_to_xmlstring(d): +def _dict_to_xmlstring(d: dict[str, Any]) -> str: if isinstance(d, dict): iters = '' for k in d: @@ -66,7 +70,7 @@ def _dict_to_xmlstring(d): return iters -def _dict_to_xmlstring_spaces(d, space=' '): +def _dict_to_xmlstring_spaces(d: dict, space: str=' ') -> str: s = _dict_to_xmlstring(d) o = '' c = 0 @@ -85,7 +89,7 @@ def _dict_to_xmlstring_spaces(d, space=' '): return o -def create_pobs_string(obsl, name, spec='', origin='', symbol=[], enstag=None): +def create_pobs_string(obsl: list[Obs], name: str, spec: str='', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, enstag: Optional[str]=None) -> str: """Export a list of Obs or structures containing Obs to an xml string according to the Zeuthen pobs format. @@ -113,7 +117,9 @@ def create_pobs_string(obsl, name, spec='', origin='', symbol=[], enstag=None): XML formatted string of the input data """ - od = {} + if symbol is None: + symbol = [] + od: dict[str, Any] = {} ename = obsl[0].e_names[0] names = list(obsl[0].deltas.keys()) nr = len(names) @@ -176,7 +182,7 @@ def create_pobs_string(obsl, name, spec='', origin='', symbol=[], enstag=None): return rs -def write_pobs(obsl, fname, name, spec='', origin='', symbol=[], enstag=None, gz=True): +def write_pobs(obsl: list[Obs], fname: str, name: str, spec: str='', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, enstag: Optional[str]=None, gz: bool=True): """Export a list of Obs or structures containing Obs to a .xml.gz file according to the Zeuthen pobs format. @@ -206,6 +212,8 @@ def write_pobs(obsl, fname, name, spec='', origin='', symbol=[], enstag=None, gz ------- None """ + if symbol is None: + symbol = [] pobsstring = create_pobs_string(obsl, name, spec, origin, symbol, enstag) if not fname.endswith('.xml') and not fname.endswith('.gz'): @@ -215,38 +223,39 @@ def write_pobs(obsl, fname, name, spec='', origin='', symbol=[], enstag=None, gz if not fname.endswith('.gz'): fname += '.gz' - fp = gzip.open(fname, 'wb') - fp.write(pobsstring.encode('utf-8')) + gp = gzip.open(fname, 'wb') + gp.write(pobsstring.encode('utf-8')) + gp.close() else: fp = open(fname, 'w', encoding='utf-8') fp.write(pobsstring) - fp.close() + fp.close() -def _import_data(string): +def _import_data(string: str) -> list[Union[int, float]]: return json.loads("[" + ",".join(string.replace(' +', ' ').split()) + "]") -def _check(condition): +def _check(condition: bool): if not condition: raise Exception("XML file format not supported") class _NoTagInDataError(Exception): """Raised when tag is not in data""" - def __init__(self, tag): + def __init__(self, tag: str): self.tag = tag super().__init__('Tag %s not in data!' % (self.tag)) -def _find_tag(dat, tag): +def _find_tag(dat: _Element, tag: str) -> int: for i in range(len(dat)): if dat[i].tag == tag: return i raise _NoTagInDataError(tag) -def _import_array(arr): +def _import_array(arr: _Element) -> Union[list[Union[str, list[int], list[ndarray]]], ndarray]: name = arr[_find_tag(arr, 'id')].text.strip() index = _find_tag(arr, 'layout') try: @@ -284,12 +293,12 @@ def _import_array(arr): _check(False) -def _import_rdata(rd): +def _import_rdata(rd: _Element) -> tuple[list[ndarray], str, list[int]]: name, idx, mask, deltas = _import_array(rd) return deltas, name, idx -def _import_cdata(cd): +def _import_cdata(cd: _Element) -> tuple[str, ndarray, ndarray]: _check(cd[0].tag == "id") _check(cd[1][0].text.strip() == "cov") cov = _import_array(cd[1]) @@ -297,7 +306,7 @@ def _import_cdata(cd): return cd[0].text.strip(), cov, grad -def read_pobs(fname, full_output=False, gz=True, separator_insertion=None): +def read_pobs(fname: str, full_output: bool=False, gz: bool=True, separator_insertion: None=None) -> Union[dict, list[Obs]]: """Import a list of Obs from an xml.gz file in the Zeuthen pobs format. Tags are not written or recovered automatically. @@ -309,7 +318,7 @@ def read_pobs(fname, full_output=False, gz=True, separator_insertion=None): full_output : bool If True, a dict containing auxiliary information and the data is returned. If False, only the data is returned as list. - separatior_insertion: str or int + separator_insertion: str or int str: replace all occurences of "separator_insertion" within the replica names by "|%s" % (separator_insertion) when constructing the names of the replica. int: Insert the separator "|" at the position given by separator_insertion. @@ -329,8 +338,8 @@ def read_pobs(fname, full_output=False, gz=True, separator_insertion=None): if gz: if not fname.endswith('.gz'): fname += '.gz' - with gzip.open(fname, 'r') as fin: - content = fin.read() + with gzip.open(fname, 'r') as gin: + content = gin.read() else: if fname.endswith('.gz'): warnings.warn("Trying to read from %s without unzipping!" % fname, UserWarning) @@ -350,7 +359,7 @@ def read_pobs(fname, full_output=False, gz=True, separator_insertion=None): deltas = [] names = [] - idl = [] + idl: list[list[int]] = [] for i in range(5, len(pobs)): delta, name, idx = _import_rdata(pobs[i]) deltas.append(delta) @@ -397,7 +406,7 @@ def read_pobs(fname, full_output=False, gz=True, separator_insertion=None): # this is based on Mattia Bruno's implementation at https://github.com/mbruno46/pyobs/blob/master/pyobs/IO/xml.py -def import_dobs_string(content, full_output=False, separator_insertion=True): +def import_dobs_string(content: bytes, full_output: bool=False, separator_insertion: bool=True) -> Union[dict, list[Obs]]: """Import a list of Obs from a string in the Zeuthen dobs format. Tags are not written or recovered automatically. @@ -409,7 +418,7 @@ def import_dobs_string(content, full_output=False, separator_insertion=True): full_output : bool If True, a dict containing auxiliary information and the data is returned. If False, only the data is returned as list. - separatior_insertion: str, int or bool + separator_insertion: str, int or bool str: replace all occurences of "separator_insertion" within the replica names by "|%s" % (separator_insertion) when constructing the names of the replica. int: Insert the separator "|" at the position given by separator_insertion. @@ -571,7 +580,7 @@ def import_dobs_string(content, full_output=False, separator_insertion=True): return res -def read_dobs(fname, full_output=False, gz=True, separator_insertion=True): +def read_dobs(fname: str, full_output: bool=False, gz: bool=True, separator_insertion: bool=True) -> Union[dict, list[Obs]]: """Import a list of Obs from an xml.gz file in the Zeuthen dobs format. Tags are not written or recovered automatically. @@ -618,7 +627,7 @@ def read_dobs(fname, full_output=False, gz=True, separator_insertion=True): return import_dobs_string(content, full_output, separator_insertion=separator_insertion) -def _dobsdict_to_xmlstring(d): +def _dobsdict_to_xmlstring(d: dict[str, Any]) -> str: if isinstance(d, dict): iters = '' for k in d: @@ -658,7 +667,7 @@ def _dobsdict_to_xmlstring(d): return iters -def _dobsdict_to_xmlstring_spaces(d, space=' '): +def _dobsdict_to_xmlstring_spaces(d: dict, space: str=' ') -> str: s = _dobsdict_to_xmlstring(d) o = '' c = 0 @@ -677,7 +686,7 @@ def _dobsdict_to_xmlstring_spaces(d, space=' '): return o -def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=None, enstags=None): +def create_dobs_string(obsl: list[Obs], name: str, spec: str='dobs v1.0', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, who: Optional[str]=None, enstags: Optional[dict]=None) -> str: """Generate the string for the export of a list of Obs or structures containing Obs to a .xml.gz file according to the Zeuthen dobs format. @@ -708,9 +717,11 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N xml_str : str XML string generated from the data """ + if symbol is None: + symbol = [] if enstags is None: enstags = {} - od = {} + od: dict[str, Any] = {} r_names = [] for o in obsl: r_names += [name for name in o.names if name.split('|')[0] in o.mc_names] @@ -810,7 +821,7 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N ed[''].append(ad) pd['edata'].append(ed) - allcov = {} + allcov: dict[str, ndarray] = {} for o in obsl: for cname in o.cov_names: if cname in allcov: @@ -866,7 +877,7 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N return rs -def write_dobs(obsl, fname, name, spec='dobs v1.0', origin='', symbol=[], who=None, enstags=None, gz=True): +def write_dobs(obsl: list[Obs], fname: str, name: str, spec: str='dobs v1.0', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, who: Optional[str]=None, enstags: Optional[dict]=None, gz: bool=True): """Export a list of Obs or structures containing Obs to a .xml.gz file according to the Zeuthen dobs format. @@ -900,6 +911,8 @@ def write_dobs(obsl, fname, name, spec='dobs v1.0', origin='', symbol=[], who=No ------- None """ + if symbol is None: + symbol = [] if enstags is None: enstags = {} @@ -912,9 +925,10 @@ def write_dobs(obsl, fname, name, spec='dobs v1.0', origin='', symbol=[], who=No if not fname.endswith('.gz'): fname += '.gz' - fp = gzip.open(fname, 'wb') - fp.write(dobsstring.encode('utf-8')) + gp = gzip.open(fname, 'wb') + gp.write(dobsstring.encode('utf-8')) + gp.close() else: fp = open(fname, 'w', encoding='utf-8') fp.write(dobsstring) - fp.close() + fp.close() diff --git a/pyerrors/input/json.py b/pyerrors/input/json.py index ca3fb0d2..e5b56a96 100644 --- a/pyerrors/input/json.py +++ b/pyerrors/input/json.py @@ -1,3 +1,4 @@ +from __future__ import annotations import rapidjson as json import gzip import getpass @@ -12,9 +13,10 @@ from ..correlators import Corr from ..misc import _assert_equal_properties from .. import version as pyerrorsversion +from typing import Any, Union -def create_json_string(ol, description='', indent=1): +def create_json_string(ol: list, description: Union[str, dict]='', indent: int=1) -> str: """Generate the string for the export of a list of Obs or structures containing Obs to a .json(.gz) file @@ -165,7 +167,7 @@ def write_Corr_to_dict(my_corr): if not isinstance(ol, list): ol = [ol] - d = {} + d: dict[str, Any] = {} d['program'] = 'pyerrors %s' % (pyerrorsversion.__version__) d['version'] = '1.1' d['who'] = getpass.getuser() @@ -216,7 +218,7 @@ def _jsonifier(obj): return json.dumps(d, indent=indent, ensure_ascii=False, default=_jsonifier, write_mode=json.WM_COMPACT) -def dump_to_json(ol, fname, description='', indent=1, gz=True): +def dump_to_json(ol: list, fname: str, description: Union[str, dict]='', indent: int=1, gz: bool=True): """Export a list of Obs or structures containing Obs to a .json(.gz) file. Dict keys that are not JSON-serializable such as floats are converted to strings. @@ -250,15 +252,16 @@ def dump_to_json(ol, fname, description='', indent=1, gz=True): if not fname.endswith('.gz'): fname += '.gz' - fp = gzip.open(fname, 'wb') - fp.write(jsonstring.encode('utf-8')) + gp = gzip.open(fname, 'wb') + gp.write(jsonstring.encode('utf-8')) + gp.close() else: fp = open(fname, 'w', encoding='utf-8') fp.write(jsonstring) - fp.close() + fp.close() -def _parse_json_dict(json_dict, verbose=True, full_output=False): +def _parse_json_dict(json_dict: dict[str, Any], verbose: bool=True, full_output: bool=False) -> Any: """Reconstruct a list of Obs or structures containing Obs from a dict that was built out of a json string. @@ -470,7 +473,7 @@ def get_Corr_from_dict(o): return ol -def import_json_string(json_string, verbose=True, full_output=False): +def import_json_string(json_string: str, verbose: bool=True, full_output: bool=False) -> Union[Obs, list[Obs], Corr]: """Reconstruct a list of Obs or structures containing Obs from a json string. The following structures are supported: Obs, list, numpy.ndarray, Corr @@ -500,7 +503,7 @@ def import_json_string(json_string, verbose=True, full_output=False): return _parse_json_dict(json.loads(json_string), verbose, full_output) -def load_json(fname, verbose=True, gz=True, full_output=False): +def load_json(fname: str, verbose: bool=True, gz: bool=True, full_output: bool=False) -> Any: """Import a list of Obs or structures containing Obs from a .json(.gz) file. The following structures are supported: Obs, list, numpy.ndarray, Corr @@ -545,7 +548,7 @@ def load_json(fname, verbose=True, gz=True, full_output=False): return _parse_json_dict(d, verbose, full_output) -def _ol_from_dict(ind, reps='DICTOBS'): +def _ol_from_dict(ind: dict, reps: str='DICTOBS') -> tuple[list, dict]: """Convert a dictionary of Obs objects to a list and a dictionary that contains placeholders instead of the Obs objects. @@ -625,7 +628,7 @@ def obslist_replace_obs(li): return ol, nd -def dump_dict_to_json(od, fname, description='', indent=1, reps='DICTOBS', gz=True): +def dump_dict_to_json(od: dict, fname: str, description: Union[str, dict]='', indent: int=1, reps: str='DICTOBS', gz: bool=True): """Export a dict of Obs or structures containing Obs to a .json(.gz) file Parameters @@ -665,7 +668,7 @@ def dump_dict_to_json(od, fname, description='', indent=1, reps='DICTOBS', gz=Tr dump_to_json(ol, fname, description=desc_dict, indent=indent, gz=gz) -def _od_from_list_and_dict(ol, ind, reps='DICTOBS'): +def _od_from_list_and_dict(ol: list, ind: dict, reps: str='DICTOBS') -> dict[str, dict[str, Any]]: """Parse a list of Obs or structures containing Obs and an accompanying dict, where the structures have been replaced by placeholders to a dict that contains the structures. @@ -728,7 +731,7 @@ def list_replace_string(li): return nd -def load_json_dict(fname, verbose=True, gz=True, full_output=False, reps='DICTOBS'): +def load_json_dict(fname: str, verbose: bool=True, gz: bool=True, full_output: bool=False, reps: str='DICTOBS') -> dict: """Import a dict of Obs or structures containing Obs from a .json(.gz) file. The following structures are supported: Obs, list, numpy.ndarray, Corr diff --git a/pyerrors/input/misc.py b/pyerrors/input/misc.py index c62f502c..9aced78e 100644 --- a/pyerrors/input/misc.py +++ b/pyerrors/input/misc.py @@ -1,3 +1,4 @@ +from __future__ import annotations import os import fnmatch import re @@ -8,9 +9,10 @@ from matplotlib import gridspec from ..obs import Obs from ..fits import fit_lin +from typing import Optional -def fit_t0(t2E_dict, fit_range, plot_fit=False, observable='t0'): +def fit_t0(t2E_dict: dict[float, Obs], fit_range: int, plot_fit: Optional[bool]=False, observable: str='t0') -> Obs: """Compute the root of (flow-based) data based on a dictionary that contains the necessary information in key-value pairs a la (flow time: observable at flow time). @@ -97,11 +99,15 @@ def fit_t0(t2E_dict, fit_range, plot_fit=False, observable='t0'): return -fit_result[0] / fit_result[1] -def read_pbp(path, prefix, **kwargs): +def read_pbp(path: str, prefix: str, **kwargs): """Read pbp format from given folder structure. Parameters ---------- + path : str + Directory to read pbp from + prefix : str + Prefix of the files to be read r_start : list list which contains the first config to be read for each replicum r_stop : list diff --git a/pyerrors/input/openQCD.py b/pyerrors/input/openQCD.py index 278977d2..339c8bfd 100644 --- a/pyerrors/input/openQCD.py +++ b/pyerrors/input/openQCD.py @@ -1,3 +1,4 @@ +from __future__ import annotations import os import fnmatch import struct @@ -8,9 +9,11 @@ from ..correlators import Corr from .misc import fit_t0 from .utils import sort_names +from io import BufferedReader +from typing import Optional, Union -def read_rwms(path, prefix, version='2.0', names=None, **kwargs): +def read_rwms(path: str, prefix: str, version: str='2.0', names: Optional[list[str]]=None, **kwargs) -> list[Obs]: """Read rwms format from given folder structure. Returns a list of length nrw Parameters @@ -229,7 +232,7 @@ def read_rwms(path, prefix, version='2.0', names=None, **kwargs): return result -def _extract_flowed_energy_density(path, prefix, dtr_read, xmin, spatial_extent, postfix='ms', **kwargs): +def _extract_flowed_energy_density(path: str, prefix: str, dtr_read: int, xmin: int, spatial_extent: int, postfix: str='ms', **kwargs) -> dict[float, Obs]: """Extract a dictionary with the flowed Yang-Mills action density from given .ms.dat files. Returns a dictionary with Obs as values and flow times as keys. @@ -422,7 +425,7 @@ def _extract_flowed_energy_density(path, prefix, dtr_read, xmin, spatial_extent, return E_dict -def extract_t0(path, prefix, dtr_read, xmin, spatial_extent, fit_range=5, postfix='ms', c=0.3, **kwargs): +def extract_t0(path: str, prefix: str, dtr_read: int, xmin: int, spatial_extent: int, fit_range: int=5, postfix: str='ms', c: Union[float, int]=0.3, **kwargs) -> Obs: """Extract t0/a^2 from given .ms.dat files. Returns t0 as Obs. It is assumed that all boundary effects have @@ -495,7 +498,7 @@ def extract_t0(path, prefix, dtr_read, xmin, spatial_extent, fit_range=5, postfi return fit_t0(t2E_dict, fit_range, plot_fit=kwargs.get('plot_fit')) -def extract_w0(path, prefix, dtr_read, xmin, spatial_extent, fit_range=5, postfix='ms', c=0.3, **kwargs): +def extract_w0(path: str, prefix: str, dtr_read: int, xmin: int, spatial_extent: int, fit_range: int=5, postfix: str='ms', c: Union[float, int]=0.3, **kwargs) -> Obs: """Extract w0/a from given .ms.dat files. Returns w0 as Obs. It is assumed that all boundary effects have @@ -577,7 +580,7 @@ def extract_w0(path, prefix, dtr_read, xmin, spatial_extent, fit_range=5, postfi return np.sqrt(fit_t0(tdtt2E_dict, fit_range, plot_fit=kwargs.get('plot_fit'), observable='w0')) -def _parse_array_openQCD2(d, n, size, wa, quadrupel=False): +def _parse_array_openQCD2(d: int, n: tuple[int, int], size: int, wa: Union[tuple[float, float, float, float, float, float, float, float], tuple[float, float]], quadrupel: bool=False) -> list[list[float]]: arr = [] if d == 2: for i in range(n[0]): @@ -596,7 +599,7 @@ def _parse_array_openQCD2(d, n, size, wa, quadrupel=False): return arr -def _find_files(path, prefix, postfix, ext, known_files=[]): +def _find_files(path: str, prefix: str, postfix: str, ext: str, known_files: Union[str, list[str]]=[]) -> list[str]: found = [] files = [] @@ -636,7 +639,7 @@ def _find_files(path, prefix, postfix, ext, known_files=[]): return files -def _read_array_openQCD2(fp): +def _read_array_openQCD2(fp: BufferedReader) -> dict[str, Union[int, tuple[int, int], list[list[float]]]]: t = fp.read(4) d = struct.unpack('i', t)[0] t = fp.read(4 * d) @@ -662,7 +665,7 @@ def _read_array_openQCD2(fp): return {'d': d, 'n': n, 'size': size, 'arr': arr} -def read_qtop(path, prefix, c, dtr_cnfg=1, version="openQCD", **kwargs): +def read_qtop(path: str, prefix: str, c: float, dtr_cnfg: int=1, version: str="openQCD", **kwargs) -> Obs: """Read the topologial charge based on openQCD gradient flow measurements. Parameters @@ -715,7 +718,7 @@ def read_qtop(path, prefix, c, dtr_cnfg=1, version="openQCD", **kwargs): return _read_flow_obs(path, prefix, c, dtr_cnfg=dtr_cnfg, version=version, obspos=0, **kwargs) -def read_gf_coupling(path, prefix, c, dtr_cnfg=1, Zeuthen_flow=True, **kwargs): +def read_gf_coupling(path: str, prefix: str, c: float, dtr_cnfg: int=1, Zeuthen_flow: bool=True, **kwargs) -> Obs: """Read the gradient flow coupling based on sfqcd gradient flow measurements. See 1607.06423 for details. Note: The current implementation only works for c=0.3 and T=L. The definition of the coupling in 1607.06423 requires projection to topological charge zero which is not done within this function but has to be performed in a separate step. @@ -787,7 +790,7 @@ def read_gf_coupling(path, prefix, c, dtr_cnfg=1, Zeuthen_flow=True, **kwargs): return t * t * (5 / 3 * plaq - 1 / 12 * C2x1) / normdict[L] -def _read_flow_obs(path, prefix, c, dtr_cnfg=1, version="openQCD", obspos=0, sum_t=True, **kwargs): +def _read_flow_obs(path: str, prefix: str, c: float, dtr_cnfg: int=1, version: str="openQCD", obspos: int=0, sum_t: bool=True, **kwargs) -> Obs: """Read a flow observable based on openQCD gradient flow measurements. Parameters @@ -1059,7 +1062,7 @@ def _read_flow_obs(path, prefix, c, dtr_cnfg=1, version="openQCD", obspos=0, sum return result -def qtop_projection(qtop, target=0): +def qtop_projection(qtop: Obs, target: int=0) -> Obs: """Returns the projection to the topological charge sector defined by target. Parameters @@ -1085,7 +1088,7 @@ def qtop_projection(qtop, target=0): return reto -def read_qtop_sector(path, prefix, c, target=0, **kwargs): +def read_qtop_sector(path: str, prefix: str, c: float, target: int=0, **kwargs) -> Obs: """Constructs reweighting factors to a specified topological sector. Parameters @@ -1143,7 +1146,7 @@ def read_qtop_sector(path, prefix, c, target=0, **kwargs): return qtop_projection(qtop, target=target) -def read_ms5_xsf(path, prefix, qc, corr, sep="r", **kwargs): +def read_ms5_xsf(path: str, prefix: str, qc: str, corr: str, sep: str="r", **kwargs) -> Corr: """ Read data from files in the specified directory with the specified prefix and quark combination extension, and return a `Corr` object containing the data. diff --git a/pyerrors/input/pandas.py b/pyerrors/input/pandas.py index 13482983..f6cdb7bf 100644 --- a/pyerrors/input/pandas.py +++ b/pyerrors/input/pandas.py @@ -1,3 +1,4 @@ +from __future__ import annotations import warnings import gzip import sqlite3 @@ -6,9 +7,11 @@ from ..correlators import Corr from .json import create_json_string, import_json_string import numpy as np +from pandas.core.frame import DataFrame +from pandas.core.series import Series -def to_sql(df, table_name, db, if_exists='fail', gz=True, **kwargs): +def to_sql(df: DataFrame, table_name: str, db: str, if_exists: str='fail', gz: bool=True, **kwargs): """Write DataFrame including Obs or Corr valued columns to sqlite database. Parameters @@ -34,7 +37,7 @@ def to_sql(df, table_name, db, if_exists='fail', gz=True, **kwargs): con.close() -def read_sql(sql, db, auto_gamma=False, **kwargs): +def read_sql(sql: str, db: str, auto_gamma: bool=False, **kwargs) -> DataFrame: """Execute SQL query on sqlite database and obtain DataFrame including Obs or Corr valued columns. Parameters @@ -58,7 +61,7 @@ def read_sql(sql, db, auto_gamma=False, **kwargs): return _deserialize_df(extract_df, auto_gamma=auto_gamma) -def dump_df(df, fname, gz=True): +def dump_df(df: DataFrame, fname: str, gz: bool=True): """Exports a pandas DataFrame containing Obs valued columns to a (gzipped) csv file. Before making use of pandas to_csv functionality Obs objects are serialized via the standardized @@ -97,7 +100,7 @@ def dump_df(df, fname, gz=True): out.to_csv(fname, index=False) -def load_df(fname, auto_gamma=False, gz=True): +def load_df(fname: str, auto_gamma: bool=False, gz: bool=True) -> DataFrame: """Imports a pandas DataFrame from a csv.(gz) file in which Obs objects are serialized as json strings. Parameters @@ -131,7 +134,7 @@ def load_df(fname, auto_gamma=False, gz=True): return _deserialize_df(re_import, auto_gamma=auto_gamma) -def _serialize_df(df, gz=False): +def _serialize_df(df: DataFrame, gz: bool=False) -> DataFrame: """Serializes all Obs or Corr valued columns into json strings according to the pyerrors json specification. Parameters @@ -152,7 +155,7 @@ def _serialize_df(df, gz=False): return out -def _deserialize_df(df, auto_gamma=False): +def _deserialize_df(df: DataFrame, auto_gamma: bool=False) -> DataFrame: """Deserializes all pyerrors json strings into Obs or Corr objects according to the pyerrors json specification. Parameters @@ -188,7 +191,7 @@ def _deserialize_df(df, auto_gamma=False): return df -def _need_to_serialize(col): +def _need_to_serialize(col: Series) -> bool: serialize = False i = 0 while i < len(col) and col[i] is None: diff --git a/pyerrors/input/sfcf.py b/pyerrors/input/sfcf.py index 0431788a..89cda7e1 100644 --- a/pyerrors/input/sfcf.py +++ b/pyerrors/input/sfcf.py @@ -1,3 +1,4 @@ +from __future__ import annotations import os import fnmatch import re @@ -5,12 +6,14 @@ from ..obs import Obs from .utils import sort_names, check_idl import itertools +from numpy import ndarray +from typing import Any, Union, Optional sep = "/" -def read_sfcf(path, prefix, name, quarks='.*', corr_type="bi", noffset=0, wf=0, wf2=0, version="1.0c", cfg_separator="n", silent=False, **kwargs): +def read_sfcf(path: str, prefix: str, name: str, quarks: str='.*', corr_type: str="bi", noffset: int=0, wf: int=0, wf2: int=0, version: str="1.0c", cfg_separator: str="n", silent: bool=False, **kwargs) -> list[Obs]: """Read sfcf files from given folder structure. Parameters @@ -75,7 +78,7 @@ def read_sfcf(path, prefix, name, quarks='.*', corr_type="bi", noffset=0, wf=0, return ret[name][quarks][str(noffset)][str(wf)][str(wf2)] -def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=['bi'], noffset_list=[0], wf_list=[0], wf2_list=[0], version="1.0c", cfg_separator="n", silent=False, keyed_out=False, **kwargs): +def read_sfcf_multi(path: str, prefix: str, name_list: list[str], quarks_list: list[str]=['.*'], corr_type_list: list[str]=['bi'], noffset_list: list[int]=[0], wf_list: list[int]=[0], wf2_list: list[int]=[0], version: str="1.0c", cfg_separator: str="n", silent: bool=False, keyed_out: bool=False, **kwargs) -> dict: """Read sfcf files from given folder structure. Parameters @@ -162,10 +165,9 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= else: compact = False appended = False - ls = [] - if "replica" in kwargs: - ls = kwargs.get("replica") - else: + ls = kwargs.get("replica") + if ls is None: + ls = [] for (dirpath, dirnames, filenames) in os.walk(path): if not appended: ls.extend(dirnames) @@ -190,13 +192,12 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= if not silent: print('Read', part, 'part of', name_list, 'from', prefix[:-1], ',', replica, 'replica') - if 'names' in kwargs: - new_names = kwargs.get('names') + new_names = kwargs.get('names') + if new_names is not None: if len(new_names) != len(set(new_names)): raise Exception("names are not unique!") if len(new_names) != replica: raise Exception('names should have the length', replica) - else: ens_name = kwargs.get("ens_name") if not appended: @@ -205,14 +206,14 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= new_names = _get_appended_rep_names(ls, prefix, name_list[0], ens_name, rep_sep=(kwargs.get('rep_string', 'r'))) new_names = sort_names(new_names) - idl = [] + idl: list[list[int]] = [] - noffset_list = [str(x) for x in noffset_list] - wf_list = [str(x) for x in wf_list] - wf2_list = [str(x) for x in wf2_list] + noffset_strings: list[str] = [str(x) for x in noffset_list] + wf_strings: list[str] = [str(x) for x in wf_list] + wf2_strings: list[str] = [str(x) for x in wf2_list] # setup dict structures - intern = {} + intern: dict[str, Any] = {} for name, corr_type in zip(name_list, corr_type_list): intern[name] = {} b2b, single = _extract_corr_type(corr_type) @@ -221,26 +222,26 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= intern[name]["spec"] = {} for quarks in quarks_list: intern[name]["spec"][quarks] = {} - for off in noffset_list: + for off in noffset_strings: intern[name]["spec"][quarks][off] = {} - for w in wf_list: + for w in wf_strings: intern[name]["spec"][quarks][off][w] = {} if b2b: - for w2 in wf2_list: + for w2 in wf2_strings: intern[name]["spec"][quarks][off][w][w2] = {} intern[name]["spec"][quarks][off][w][w2]["pattern"] = _make_pattern(version, name, off, w, w2, intern[name]['b2b'], quarks) else: intern[name]["spec"][quarks][off][w]["0"] = {} intern[name]["spec"][quarks][off][w]["0"]["pattern"] = _make_pattern(version, name, off, w, 0, intern[name]['b2b'], quarks) - internal_ret_dict = {} + internal_ret_dict: dict[str, list] = {} needed_keys = [] for name, corr_type in zip(name_list, corr_type_list): b2b, single = _extract_corr_type(corr_type) if b2b: - needed_keys.extend(_lists2key([name], quarks_list, noffset_list, wf_list, wf2_list)) + needed_keys.extend(_lists2key([name], quarks_list, noffset_strings, wf_strings, wf2_strings)) else: - needed_keys.extend(_lists2key([name], quarks_list, noffset_list, wf_list, ["0"])) + needed_keys.extend(_lists2key([name], quarks_list, noffset_strings, wf_strings, ["0"])) for key in needed_keys: internal_ret_dict[key] = [] @@ -286,9 +287,9 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= if version == "0.0" or not compact: file = path + '/' + item + '/' + sub_ls[0] + '/' + name if corr_type_list[name_index] == 'bi': - name_keys = _lists2key(quarks_list, noffset_list, wf_list, ["0"]) + name_keys = _lists2key(quarks_list, noffset_strings, wf_strings, ["0"]) else: - name_keys = _lists2key(quarks_list, noffset_list, wf_list, wf2_list) + name_keys = _lists2key(quarks_list, noffset_strings, wf_strings, wf2_strings) for key in name_keys: specs = _key2specs(key) quarks = specs[0] @@ -304,7 +305,7 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= intern[name]["T"] = T # preparing the datastructure # the correlators get parsed into... - deltas = [] + deltas: list[list] = [] for j in range(intern[name]["T"]): deltas.append([]) internal_ret_dict[sep.join([name, key])] = deltas @@ -325,8 +326,8 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= rep_data.append(file_data) for t in range(intern[name]["T"]): internal_ret_dict[key][t].append([]) - for cfg in range(no_cfg): - internal_ret_dict[key][t][i].append(rep_data[cfg][key][t]) + for cfg_number in range(no_cfg): + internal_ret_dict[key][t][i].append(rep_data[cfg_number][key][t]) else: for key in needed_keys: specs = _key2specs(key) @@ -335,10 +336,9 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= off = specs[2] w = specs[3] w2 = specs[4] - if "files" in kwargs: - if isinstance(kwargs.get("files"), list) and all(isinstance(f, str) for f in kwargs.get("files")): - name_ls = kwargs.get("files") - else: + name_ls = kwargs.get("files") + if name_ls is not None: + if not (isinstance(name_ls, list) and all(isinstance(f, str) for f in name_ls)): raise TypeError("In append mode, files has to be of type list[str]!") else: name_ls = ls @@ -375,7 +375,7 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= if not silent: print("Done") - result_dict = {} + result_dict: dict[str, Any] = {} if keyed_out: for key in needed_keys: name = _key2specs(key)[0] @@ -388,12 +388,12 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= result_dict[name] = {} for quarks in quarks_list: result_dict[name][quarks] = {} - for off in noffset_list: + for off in noffset_strings: result_dict[name][quarks][off] = {} - for w in wf_list: + for w in wf_strings: result_dict[name][quarks][off][w] = {} if corr_type != 'bi': - for w2 in wf2_list: + for w2 in wf2_strings: key = _specs2key(name, quarks, off, w, w2) result = [] for t in range(intern[name]["T"]): @@ -408,22 +408,22 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list= return result_dict -def _lists2key(*lists): +def _lists2key(*lists) -> list[str]: keys = [] for tup in itertools.product(*lists): keys.append(sep.join(tup)) return keys -def _key2specs(key): +def _key2specs(key: str) -> list[str]: return key.split(sep) -def _specs2key(*specs): +def _specs2key(*specs) -> str: return sep.join(specs) -def _read_o_file(cfg_path, name, needed_keys, intern, version, im): +def _read_o_file(cfg_path: str, name: str, needed_keys: list[str], intern: dict[str, dict], version: str, im: int) -> dict[str, list[float]]: return_vals = {} for key in needed_keys: file = cfg_path + '/' + name @@ -448,7 +448,7 @@ def _read_o_file(cfg_path, name, needed_keys, intern, version, im): return return_vals -def _extract_corr_type(corr_type): +def _extract_corr_type(corr_type: str) -> tuple[bool, bool]: if corr_type == 'bb': b2b = True single = True @@ -461,7 +461,9 @@ def _extract_corr_type(corr_type): return b2b, single -def _find_files(rep_path, prefix, compact, files=[]): +def _find_files(rep_path: str, prefix: str, compact: bool, files: Optional[list]=None) -> list[str]: + if files is None: + files = [] sub_ls = [] if not files == []: files.sort(key=lambda x: int(re.findall(r'\d+', x)[-1])) @@ -488,7 +490,7 @@ def _find_files(rep_path, prefix, compact, files=[]): return files -def _make_pattern(version, name, noffset, wf, wf2, b2b, quarks): +def _make_pattern(version: str, name: str, noffset: str, wf: str, wf2: Union[str, int], b2b: bool, quarks: str) -> str: if version == "0.0": pattern = "# " + name + " : offset " + str(noffset) + ", wf " + str(wf) if b2b: @@ -502,7 +504,7 @@ def _make_pattern(version, name, noffset, wf, wf2, b2b, quarks): return pattern -def _find_correlator(file_name, version, pattern, b2b, silent=False): +def _find_correlator(file_name: str, version: str, pattern: str, b2b: bool, silent: bool=False) -> tuple[int, int]: T = 0 with open(file_name, "r") as my_file: @@ -516,6 +518,7 @@ def _find_correlator(file_name, version, pattern, b2b, silent=False): else: start_read = content.count('\n', 0, match.start()) + 5 + b2b end_match = re.search(r'\n\s*\n', content[match.start():]) + assert end_match is not None T = content[match.start():].count('\n', 0, end_match.start()) - 4 - b2b if not T > 0: raise ValueError("Correlator with pattern\n" + pattern + "\nis empty!") @@ -528,7 +531,7 @@ def _find_correlator(file_name, version, pattern, b2b, silent=False): return start_read, T -def _read_compact_file(rep_path, cfg_file, intern, needed_keys, im): +def _read_compact_file(rep_path: str, cfg_file: str, intern: dict[str, dict], needed_keys: list[str], im: int) -> dict[str, list[float]]: return_vals = {} with open(rep_path + cfg_file) as fp: lines = fp.readlines() @@ -559,7 +562,7 @@ def _read_compact_file(rep_path, cfg_file, intern, needed_keys, im): return return_vals -def _read_compact_rep(path, rep, sub_ls, intern, needed_keys, im): +def _read_compact_rep(path: str, rep: str, sub_ls: list[str], intern: dict[str, dict], needed_keys: list[str], im: int) -> dict[str, list[ndarray]]: rep_path = path + '/' + rep + '/' no_cfg = len(sub_ls) @@ -581,7 +584,7 @@ def _read_compact_rep(path, rep, sub_ls, intern, needed_keys, im): return return_vals -def _read_chunk(chunk, gauge_line, cfg_sep, start_read, T, corr_line, b2b, pattern, im, single): +def _read_chunk(chunk: list[str], gauge_line: int, cfg_sep: str, start_read: int, T: int, corr_line: int, b2b: bool, pattern: str, im: int, single: bool) -> tuple[int, list[float]]: try: idl = int(chunk[gauge_line].split(cfg_sep)[-1]) except Exception: @@ -598,7 +601,7 @@ def _read_chunk(chunk, gauge_line, cfg_sep, start_read, T, corr_line, b2b, patte return idl, data -def _read_append_rep(filename, pattern, b2b, cfg_separator, im, single): +def _read_append_rep(filename: str, pattern: str, b2b: bool, cfg_separator: str, im: int, single: bool) -> tuple[int, list[int], list[list[float]]]: with open(filename, 'r') as fp: content = fp.readlines() data_starts = [] @@ -638,16 +641,16 @@ def _read_append_rep(filename, pattern, b2b, cfg_separator, im, single): rep_idl.append(idl) rep_data.append(data) - data = [] + final_data: list[list[float]] = [] for t in range(T): - data.append([]) + final_data.append([]) for c in range(len(rep_data)): - data[t].append(rep_data[c][t]) - return T, rep_idl, data + final_data[t].append(rep_data[c][t]) + return T, rep_idl, final_data -def _get_rep_names(ls, ens_name=None, rep_sep='r'): +def _get_rep_names(ls: list[str], ens_name: None=None, rep_sep: str ='r') -> list[str]: new_names = [] for entry in ls: try: @@ -662,7 +665,7 @@ def _get_rep_names(ls, ens_name=None, rep_sep='r'): return new_names -def _get_appended_rep_names(ls, prefix, name, ens_name=None, rep_sep='r'): +def _get_appended_rep_names(ls: list[str], prefix: str, name: str, ens_name: None=None, rep_sep: str ='r') -> list[str]: new_names = [] for exc in ls: if not fnmatch.fnmatch(exc, prefix + '*.' + name): diff --git a/pyerrors/input/utils.py b/pyerrors/input/utils.py index eaf41f06..0ab6b324 100644 --- a/pyerrors/input/utils.py +++ b/pyerrors/input/utils.py @@ -1,11 +1,12 @@ """Utilities for the input""" +from __future__ import annotations import re import fnmatch import os -def sort_names(ll): +def sort_names(ll: list[str]) -> list[str]: """Sorts a list of names of replika with searches for `r` and `id` in the replikum string. If this search fails, a fallback method is used, where the strings are simply compared and the first diffeing numeral is used for differentiation. @@ -52,7 +53,7 @@ def sort_names(ll): return ll -def check_idl(idl, che): +def check_idl(idl: list, che: list) -> str: """Checks if list of configurations is contained in an idl Parameters @@ -82,7 +83,7 @@ def check_idl(idl, che): return miss_str -def check_params(path, param_hash, prefix, param_prefix="parameters_"): +def check_params(path: str, param_hash: str, prefix: str, param_prefix: str ="parameters_") -> dict[str, list]: """ Check if, for sfcf, the parameter hashes at the end of the parameter files are in fact the expected one. diff --git a/pyerrors/integrate.py b/pyerrors/integrate.py index 3b48f3fb..fcc59ec6 100644 --- a/pyerrors/integrate.py +++ b/pyerrors/integrate.py @@ -1,10 +1,13 @@ +from __future__ import annotations import numpy as np from .obs import derived_observable, Obs from autograd import jacobian from scipy.integrate import quad as squad +from numpy import ndarray +from typing import Callable, Union -def quad(func, p, a, b, **kwargs): +def quad(func: Callable, p: Union[list[Union[float, Obs]], list[float], ndarray], a: Union[Obs, float, int], b: Union[Obs, float, int], **kwargs) -> Union[tuple[Obs, float], tuple[float, float], tuple[Obs, float, dict[str, Union[int, ndarray]]]]: '''Performs a (one-dimensional) numeric integration of f(p, x) from a to b. The integration is performed using scipy.integrate.quad(). diff --git a/pyerrors/linalg.py b/pyerrors/linalg.py index 5a489e26..cb0b3119 100644 --- a/pyerrors/linalg.py +++ b/pyerrors/linalg.py @@ -1,9 +1,12 @@ +from __future__ import annotations import numpy as np import autograd.numpy as anp # Thinly-wrapped numpy from .obs import derived_observable, CObs, Obs, import_jackknife +from numpy import ndarray +from typing import Callable, Union -def matmul(*operands): +def matmul(*operands) -> ndarray: """Matrix multiply all operands. Parameters @@ -45,6 +48,7 @@ def multi_dot_i(operands): Nr = derived_observable(multi_dot_r, extended_operands, array_mode=True) Ni = derived_observable(multi_dot_i, extended_operands, array_mode=True) + assert isinstance(Nr, ndarray) and isinstance(Ni, ndarray) res = np.empty_like(Nr) for (n, m), entry in np.ndenumerate(Nr): res[n, m] = CObs(Nr[n, m], Ni[n, m]) @@ -59,7 +63,7 @@ def multi_dot(operands): return derived_observable(multi_dot, operands, array_mode=True) -def jack_matmul(*operands): +def jack_matmul(*operands) -> ndarray: """Matrix multiply both operands making use of the jackknife approximation. Parameters @@ -120,7 +124,7 @@ def _imp_from_jack_c(matrix, name, idl): return _imp_from_jack(r, name, idl) -def einsum(subscripts, *operands): +def einsum(subscripts: str, *operands) -> Union[CObs, Obs, ndarray]: """Wrapper for numpy.einsum Parameters @@ -194,24 +198,24 @@ def _imp_from_jack_c(matrix, name, idl): return result -def inv(x): +def inv(x: ndarray) -> ndarray: """Inverse of Obs or CObs valued matrices.""" return _mat_mat_op(anp.linalg.inv, x) -def cholesky(x): +def cholesky(x: ndarray) -> ndarray: """Cholesky decomposition of Obs valued matrices.""" if any(isinstance(o, CObs) for o in x.ravel()): raise Exception("Cholesky decomposition is not implemented for CObs.") return _mat_mat_op(anp.linalg.cholesky, x) -def det(x): +def det(x: Union[ndarray, int]) -> Obs: """Determinant of Obs valued matrices.""" return _scalar_mat_op(anp.linalg.det, x) -def _scalar_mat_op(op, obs, **kwargs): +def _scalar_mat_op(op: Callable, obs: Union[ndarray, int], **kwargs) -> Obs: """Computes the matrix to scalar operation op to a given matrix of Obs.""" def _mat(x, **kwargs): dim = int(np.sqrt(len(x))) @@ -232,7 +236,7 @@ def _mat(x, **kwargs): return derived_observable(_mat, raveled_obs, **kwargs) -def _mat_mat_op(op, obs, **kwargs): +def _mat_mat_op(op: Callable, obs: ndarray, **kwargs) -> ndarray: """Computes the matrix to matrix operation op to a given matrix of Obs.""" # Use real representation to calculate matrix operations for complex matrices if any(isinstance(o, CObs) for o in obs.ravel()): @@ -258,31 +262,31 @@ def _mat_mat_op(op, obs, **kwargs): return derived_observable(lambda x, **kwargs: op(x), [obs], array_mode=True)[0] -def eigh(obs, **kwargs): +def eigh(obs: ndarray, **kwargs) -> tuple[ndarray, ndarray]: """Computes the eigenvalues and eigenvectors of a given hermitian matrix of Obs according to np.linalg.eigh.""" w = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[0], obs) v = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[1], obs) return w, v -def eig(obs, **kwargs): +def eig(obs: ndarray, **kwargs) -> ndarray: """Computes the eigenvalues of a given matrix of Obs according to np.linalg.eig.""" w = derived_observable(lambda x, **kwargs: anp.real(anp.linalg.eig(x)[0]), obs) return w -def eigv(obs, **kwargs): +def eigv(obs: ndarray, **kwargs) -> ndarray: """Computes the eigenvectors of a given hermitian matrix of Obs according to np.linalg.eigh.""" v = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[1], obs) return v -def pinv(obs, **kwargs): +def pinv(obs: ndarray, **kwargs) -> ndarray: """Computes the Moore-Penrose pseudoinverse of a matrix of Obs.""" return derived_observable(lambda x, **kwargs: anp.linalg.pinv(x), obs) -def svd(obs, **kwargs): +def svd(obs: ndarray, **kwargs) -> tuple[ndarray, ndarray, ndarray]: """Computes the singular value decomposition of a matrix of Obs.""" u = derived_observable(lambda x, **kwargs: anp.linalg.svd(x, full_matrices=False)[0], obs) s = derived_observable(lambda x, **kwargs: anp.linalg.svd(x, full_matrices=False)[1], obs) diff --git a/pyerrors/misc.py b/pyerrors/misc.py index 94a7d4c2..c4baff53 100644 --- a/pyerrors/misc.py +++ b/pyerrors/misc.py @@ -1,3 +1,4 @@ +from __future__ import annotations import platform import numpy as np import scipy @@ -5,8 +6,13 @@ import matplotlib.pyplot as plt import pandas as pd import pickle -from .obs import Obs +from .obs import Obs, CObs from .version import __version__ +from numpy import ndarray +from typing import Union, TYPE_CHECKING + +if TYPE_CHECKING: + from .correlators import Corr def print_config(): @@ -54,7 +60,7 @@ def errorbar(x, y, axes=plt, **kwargs): axes.errorbar(val["x"], val["y"], xerr=err["x"], yerr=err["y"], **kwargs) -def dump_object(obj, name, **kwargs): +def dump_object(obj: Corr, name: str, **kwargs): """Dump object into pickle file. Parameters @@ -70,15 +76,18 @@ def dump_object(obj, name, **kwargs): ------- None """ - if 'path' in kwargs: - file_name = kwargs.get('path') + '/' + name + '.p' + path = kwargs.get('path') + if path is not None: + if not isinstance(path, str): + raise Exception("Path has to be a string.") + file_name = path + '/' + name + '.p' else: file_name = name + '.p' with open(file_name, 'wb') as fb: pickle.dump(obj, fb) -def load_object(path): +def load_object(path: str) -> Union[Obs, Corr]: """Load object from pickle file. Parameters @@ -95,7 +104,7 @@ def load_object(path): return pickle.load(file) -def pseudo_Obs(value, dvalue, name, samples=1000): +def pseudo_Obs(value: Union[float, int], dvalue: Union[float, int], name: str, samples: int=1000) -> Obs: """Generate an Obs object with given value, dvalue and name for test purposes Parameters @@ -118,11 +127,11 @@ def pseudo_Obs(value, dvalue, name, samples=1000): return Obs([np.zeros(samples) + value], [name]) else: for _ in range(100): - deltas = [np.random.normal(0.0, dvalue * np.sqrt(samples), samples)] + deltas = np.array([np.random.normal(0.0, dvalue * np.sqrt(samples), samples)]) deltas -= np.mean(deltas) deltas *= dvalue / np.sqrt((np.var(deltas) / samples)) / np.sqrt(1 + 3 / samples) deltas += value - res = Obs(deltas, [name]) + res = Obs(list(deltas), [name]) res.gamma_method(S=2, tau_exp=0) if abs(res.dvalue - dvalue) < 1e-10 * dvalue: break @@ -132,7 +141,7 @@ def pseudo_Obs(value, dvalue, name, samples=1000): return res -def gen_correlated_data(means, cov, name, tau=0.5, samples=1000): +def gen_correlated_data(means: Union[ndarray, list[float]], cov: ndarray, name: str, tau: Union[float, ndarray]=0.5, samples: int=1000) -> list[Obs]: """ Generate observables with given covariance and autocorrelation times. Parameters @@ -174,7 +183,7 @@ def gen_correlated_data(means, cov, name, tau=0.5, samples=1000): return [Obs([dat], [name]) for dat in corr_data.T] -def _assert_equal_properties(ol, otype=Obs): +def _assert_equal_properties(ol: Union[list[Obs], list[CObs], ndarray]): otype = type(ol[0]) for o in ol[1:]: if not isinstance(o, otype): diff --git a/pyerrors/mpm.py b/pyerrors/mpm.py index bf782af6..7a85ace3 100644 --- a/pyerrors/mpm.py +++ b/pyerrors/mpm.py @@ -1,10 +1,12 @@ +from __future__ import annotations import numpy as np import scipy.linalg from .obs import Obs from .linalg import svd, eig +from typing import Optional -def matrix_pencil_method(corrs, k=1, p=None, **kwargs): +def matrix_pencil_method(corrs: list[Obs], k: int=1, p: Optional[int]=None, **kwargs) -> list[Obs]: """Matrix pencil method to extract k energy levels from data Implementation of the matrix pencil method based on diff --git a/pyerrors/obs.py b/pyerrors/obs.py index 623c37fd..4ee81d10 100644 --- a/pyerrors/obs.py +++ b/pyerrors/obs.py @@ -1,3 +1,5 @@ +from __future__ import annotations +import sys import warnings import hashlib import pickle @@ -10,6 +12,16 @@ import numdifftools as nd from itertools import groupby from .covobs import Covobs +from numpy import bool, float64, int64, ndarray +from typing import Any, Callable, Optional, Union, Sequence, TYPE_CHECKING + +if sys.version_info >= (3, 10): + from types import NotImplementedType +else: + NotImplementedType = type(NotImplemented) + +if TYPE_CHECKING: + from .correlators import Corr # Improve print output of numpy.ndarrays containing Obs objects. np.set_printoptions(formatter={'object': lambda x: str(x)}) @@ -51,13 +63,13 @@ class Obs: 'idl', 'tag', '_covobs', '__dict__'] S_global = 2.0 - S_dict = {} + S_dict: dict[str, float] = {} tau_exp_global = 0.0 - tau_exp_dict = {} + tau_exp_dict: dict[str, float] = {} N_sigma_global = 1.0 - N_sigma_dict = {} + N_sigma_dict: dict[str, int] = {} - def __init__(self, samples, names, idl=None, **kwargs): + def __init__(self, samples: list[Union[ndarray, list[Any]]], names: list[str], idl: Optional[Union[list[list[int]], list[Union[list[int], range]], list[range]]]=None, **kwargs): """ Initialize Obs object. Parameters @@ -70,7 +82,8 @@ def __init__(self, samples, names, idl=None, **kwargs): list of ranges or lists on which the samples are defined """ - if kwargs.get("means") is None and len(samples): + means: Optional[list[float]] = kwargs.get("means") + if means is None and len(samples): if len(samples) != len(names): raise ValueError('Length of samples and names incompatible.') if idl is not None: @@ -85,18 +98,21 @@ def __init__(self, samples, names, idl=None, **kwargs): else: if not isinstance(names[0], str): raise TypeError('All names have to be strings.') + # This check does not work because of nan hacks in the json.gz export + # if not all((isinstance(o, np.ndarray) and o.ndim == 1) for o in samples): + # raise TypeError('All samples have to be 1d numpy arrays.') if min(len(x) for x in samples) <= 4: raise ValueError('Samples have to have at least 5 entries.') - self.names = sorted(names) + self.names: list[str] = sorted(names) self.shape = {} - self.r_values = {} - self.deltas = {} - self._covobs = {} + self.r_values: dict[str, float] = {} + self.deltas: dict[str, ndarray] = {} + self._covobs: dict[str, Covobs] = {} - self._value = 0 - self.N = 0 - self.idl = {} + self._value: float = 0.0 + self.N: int = 0 + self.idl: dict[str, Union[list[int], range]] = {} if idl is not None: for name, idx in sorted(zip(names, idl)): if isinstance(idx, range): @@ -112,13 +128,13 @@ def __init__(self, samples, names, idl=None, **kwargs): else: self.idl[name] = list(idx) else: - raise TypeError('incompatible type for idl[%s].' % (name)) + raise TypeError('incompatible type for idl[%s].' % name) else: for name, sample in sorted(zip(names, samples)): self.idl[name] = range(1, len(sample) + 1) - if kwargs.get("means") is not None: - for name, sample, mean in sorted(zip(names, samples, kwargs.get("means"))): + if means is not None: + for name, sample, mean in sorted(zip(names, samples, means)): self.shape[name] = len(self.idl[name]) self.N += self.shape[name] self.r_values[name] = mean @@ -130,38 +146,38 @@ def __init__(self, samples, names, idl=None, **kwargs): if len(sample) != self.shape[name]: raise ValueError('Incompatible samples and idx for %s: %d vs. %d' % (name, len(sample), self.shape[name])) self.r_values[name] = np.mean(sample) - self.deltas[name] = sample - self.r_values[name] + self.deltas[name] = np.asarray(sample) - self.r_values[name] self._value += self.shape[name] * self.r_values[name] self._value /= self.N - self._dvalue = 0.0 - self.ddvalue = 0.0 + self._dvalue: float = 0.0 + self.ddvalue: float = 0.0 self.reweighted = False self.tag = None @property - def value(self): + def value(self) -> Union[float, int64, float64, int]: return self._value @property - def dvalue(self): + def dvalue(self) -> Union[float, float64]: return self._dvalue @property - def e_names(self): + def e_names(self) -> list[str]: return sorted(set([o.split('|')[0] for o in self.names])) @property - def cov_names(self): + def cov_names(self) -> list[Union[Any, str]]: return sorted(set([o for o in self.covobs.keys()])) @property - def mc_names(self): + def mc_names(self) -> list[Union[Any, str]]: return sorted(set([o.split('|')[0] for o in self.names if o not in self.cov_names])) @property - def e_content(self): + def e_content(self) -> dict[str, list[str]]: res = {} for e, e_name in enumerate(self.e_names): res[e_name] = sorted(filter(lambda x: x.startswith(e_name + '|'), self.names)) @@ -170,7 +186,7 @@ def e_content(self): return res @property - def covobs(self): + def covobs(self) -> dict[str, Covobs]: return self._covobs def gamma_method(self, **kwargs): @@ -341,7 +357,7 @@ def _compute_drho(i): gm = gamma_method - def _calc_gamma(self, deltas, idx, shape, w_max, fft, gapsize): + def _calc_gamma(self, deltas: ndarray, idx: Union[range, list[int], list[int64]], shape: int, w_max: Union[int64, int], fft: bool, gapsize: Union[int64, int]) -> ndarray: """Calculate Gamma_{AA} from the deltas, which are defined on idx. idx is assumed to be a contiguous range (possibly with a stepsize != 1) @@ -377,7 +393,7 @@ def _calc_gamma(self, deltas, idx, shape, w_max, fft, gapsize): return gamma - def details(self, ens_content=True): + def details(self, ens_content: bool=True): """Output detailed properties of the Obs. Parameters @@ -425,19 +441,21 @@ def details(self, ens_content=True): my_string = ' ' + "\u00B7 Ensemble '" + key + "' " if len(value) == 1: my_string += f': {self.shape[value[0]]} configurations' - if isinstance(self.idl[value[0]], range): - my_string += f' (from {self.idl[value[0]].start} to {self.idl[value[0]][-1]}' + int(self.idl[value[0]].step != 1) * f' in steps of {self.idl[value[0]].step}' + ')' + my_idl = self.idl[value[0]] + if isinstance(my_idl, range): + my_string += f' (from {my_idl.start} to {my_idl[-1]}' + int(my_idl.step != 1) * f' in steps of {my_idl.step}' + ')' else: - my_string += f' (irregular range from {self.idl[value[0]][0]} to {self.idl[value[0]][-1]})' + my_string += f' (irregular range from {my_idl[0]} to {my_idl[-1]})' else: sublist = [] for v in value: my_substring = ' ' + "\u00B7 Replicum '" + v[len(key) + 1:] + "' " my_substring += f': {self.shape[v]} configurations' - if isinstance(self.idl[v], range): - my_substring += f' (from {self.idl[v].start} to {self.idl[v][-1]}' + int(self.idl[v].step != 1) * f' in steps of {self.idl[v].step}' + ')' + my_idl = self.idl[v] + if isinstance(my_idl, range): + my_substring += f' (from {my_idl.start} to {my_idl[-1]}' + int(my_idl.step != 1) * f' in steps of {my_idl.step}' + ')' else: - my_substring += f' (irregular range from {self.idl[v][0]} to {self.idl[v][-1]})' + my_substring += f' (irregular range from {my_idl[0]} to {my_idl[-1]})' sublist.append(my_substring) my_string += '\n' + '\n'.join(sublist) @@ -446,7 +464,7 @@ def details(self, ens_content=True): my_string_list.append(my_string) print('\n'.join(my_string_list)) - def reweight(self, weight): + def reweight(self, weight: "Obs") -> "Obs": """Reweight the obs with given rewighting factors. Parameters @@ -461,7 +479,7 @@ def reweight(self, weight): """ return reweight(weight, [self])[0] - def is_zero_within_error(self, sigma=1): + def is_zero_within_error(self, sigma: Union[float, int]=1) -> Union[bool, bool]: """Checks whether the observable is zero within 'sigma' standard errors. Parameters @@ -473,7 +491,7 @@ def is_zero_within_error(self, sigma=1): """ return self.is_zero() or np.abs(self.value) <= sigma * self._dvalue - def is_zero(self, atol=1e-10): + def is_zero(self, atol: float=1e-10) -> Union[bool, bool]: """Checks whether the observable is zero within a given tolerance. Parameters @@ -483,7 +501,7 @@ def is_zero(self, atol=1e-10): """ return np.isclose(0.0, self.value, 1e-14, atol) and all(np.allclose(0.0, delta, 1e-14, atol) for delta in self.deltas.values()) and all(np.allclose(0.0, delta.errsq(), 1e-14, atol) for delta in self.covobs.values()) - def plot_tauint(self, save=None): + def plot_tauint(self, save: None=None): """Plot integrated autocorrelation time for each ensemble. Parameters @@ -523,7 +541,7 @@ def plot_tauint(self, save=None): if save: fig.savefig(save + "_" + str(e)) - def plot_rho(self, save=None): + def plot_rho(self, save: None=None): """Plot normalized autocorrelation function time for each ensemble. Parameters @@ -576,7 +594,7 @@ def plot_rep_dist(self): plt.title('Replica distribution' + e_name + ' (mean=0, var=1)') plt.draw() - def plot_history(self, expand=True): + def plot_history(self, expand: bool=True): """Plot derived Monte Carlo history for each ensemble Parameters @@ -608,7 +626,7 @@ def plot_history(self, expand=True): plt.title(e_name + f'\nskew: {skew(y_test):.3f} (p={skewtest(y_test).pvalue:.3f}), kurtosis: {kurtosis(y_test):.3f} (p={kurtosistest(y_test).pvalue:.3f})') plt.draw() - def plot_piechart(self, save=None): + def plot_piechart(self, save: None=None) -> dict[str, float64]: """Plot piechart which shows the fractional contribution of each ensemble to the error and returns a dictionary containing the fractions. @@ -622,7 +640,7 @@ def plot_piechart(self, save=None): if np.isclose(0.0, self._dvalue, atol=1e-15): raise ValueError('Error is 0.0') labels = self.e_names - sizes = [self.e_dvalue[name] ** 2 for name in labels] / self._dvalue ** 2 + sizes = np.array([self.e_dvalue[name] ** 2 for name in labels]) / self._dvalue ** 2 fig1, ax1 = plt.subplots() ax1.pie(sizes, labels=labels, startangle=90, normalize=True) ax1.axis('equal') @@ -632,7 +650,7 @@ def plot_piechart(self, save=None): return dict(zip(labels, sizes)) - def dump(self, filename, datatype="json.gz", description="", **kwargs): + def dump(self, filename: str, datatype: str="json.gz", description: str="", **kwargs): """Dump the Obs to a file 'name' of chosen format. Parameters @@ -647,8 +665,11 @@ def dump(self, filename, datatype="json.gz", description="", **kwargs): path : str specifies a custom path for the file (default '.') """ - if 'path' in kwargs: - file_name = kwargs.get('path') + '/' + filename + path = kwargs.get('path') + if path is not None: + if not isinstance(path, str): + raise TypeError('path has to be a string.') + file_name = path + '/' + filename else: file_name = filename @@ -661,7 +682,7 @@ def dump(self, filename, datatype="json.gz", description="", **kwargs): else: raise TypeError("Unknown datatype " + str(datatype)) - def export_jackknife(self): + def export_jackknife(self) -> ndarray: """Export jackknife samples from the Obs Returns @@ -687,7 +708,7 @@ def export_jackknife(self): tmp_jacks[1:] = (n * mean - full_data) / (n - 1) return tmp_jacks - def export_bootstrap(self, samples=500, random_numbers=None, save_rng=None): + def export_bootstrap(self, samples: int=500, random_numbers: Optional[ndarray]=None, save_rng: None=None) -> ndarray: """Export bootstrap samples from the Obs Parameters @@ -730,16 +751,16 @@ def export_bootstrap(self, samples=500, random_numbers=None, save_rng=None): ret[1:] = proj @ (self.deltas[name] + self.r_values[name]) return ret - def __float__(self): + def __float__(self) -> float: return float(self.value) - def __repr__(self): + def __repr__(self) -> str: return 'Obs[' + str(self) + ']' - def __str__(self): + def __str__(self) -> str: return _format_uncertainty(self.value, self._dvalue) - def __format__(self, format_type): + def __format__(self, format_type: str) -> str: if format_type == "": significance = 2 else: @@ -752,35 +773,36 @@ def __format__(self, format_type): my_str = char + my_str return my_str - def __hash__(self): + def __hash__(self) -> int: hash_tuple = (np.array([self.value]).astype(np.float32).data.tobytes(),) hash_tuple += tuple([o.astype(np.float32).data.tobytes() for o in self.deltas.values()]) hash_tuple += tuple([np.array([o.errsq()]).astype(np.float32).data.tobytes() for o in self.covobs.values()]) hash_tuple += tuple([o.encode() for o in self.names]) m = hashlib.md5() - [m.update(o) for o in hash_tuple] + for o in hash_tuple: + m.update(o) return int(m.hexdigest(), 16) & 0xFFFFFFFF # Overload comparisons - def __lt__(self, other): + def __lt__(self, other: Union[Obs, float, float64]) -> Union[bool, bool]: return self.value < other - def __le__(self, other): + def __le__(self, other: Union[Obs, float, int]) -> bool: return self.value <= other - def __gt__(self, other): + def __gt__(self, other: Union[Obs, float]) -> Union[bool, bool]: return self.value > other - def __ge__(self, other): + def __ge__(self, other: Union[Obs, float, int]) -> Union[bool, bool]: return self.value >= other - def __eq__(self, other): + def __eq__(self, other: Optional[Union[Obs, float64, int, float]]) -> Union[bool, bool]: if other is None: return False return (self - other).is_zero() # Overload math operations - def __add__(self, y): + def __add__(self, y: Any) -> Union[Obs, NotImplementedType, CObs, ndarray]: if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] + x[1], [self, y], man_grad=[1, 1]) else: @@ -793,10 +815,10 @@ def __add__(self, y): else: return derived_observable(lambda x, **kwargs: x[0] + y, [self], man_grad=[1]) - def __radd__(self, y): + def __radd__(self, y: Union[float, int]) -> Union[Obs, NotImplementedType, CObs, ndarray]: return self + y - def __mul__(self, y): + def __mul__(self, y: Any) -> Union[Obs, ndarray, CObs, NotImplementedType]: if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] * x[1], [self, y], man_grad=[y.value, self.value]) else: @@ -809,10 +831,10 @@ def __mul__(self, y): else: return derived_observable(lambda x, **kwargs: x[0] * y, [self], man_grad=[y]) - def __rmul__(self, y): + def __rmul__(self, y: Union[float, int]) -> Union[Obs, NotImplementedType, CObs, ndarray]: return self * y - def __sub__(self, y): + def __sub__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]: if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] - x[1], [self, y], man_grad=[1, -1]) else: @@ -823,16 +845,16 @@ def __sub__(self, y): else: return derived_observable(lambda x, **kwargs: x[0] - y, [self], man_grad=[1]) - def __rsub__(self, y): + def __rsub__(self, y: Union[float, int]) -> Union[Obs, NotImplementedType, CObs, ndarray]: return -1 * (self - y) - def __pos__(self): + def __pos__(self) -> Obs: return self - def __neg__(self): + def __neg__(self) -> Union[Obs, NotImplementedType, CObs, ndarray]: return -1 * self - def __truediv__(self, y): + def __truediv__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]: if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] / x[1], [self, y], man_grad=[1 / y.value, - self.value / y.value ** 2]) else: @@ -843,7 +865,7 @@ def __truediv__(self, y): else: return derived_observable(lambda x, **kwargs: x[0] / y, [self], man_grad=[1 / y]) - def __rtruediv__(self, y): + def __rtruediv__(self, y: Union[float, int]) -> Obs: if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] / x[1], [y, self], man_grad=[1 / self.value, - y.value / self.value ** 2]) else: @@ -854,62 +876,62 @@ def __rtruediv__(self, y): else: return derived_observable(lambda x, **kwargs: y / x[0], [self], man_grad=[-y / self.value ** 2]) - def __pow__(self, y): + def __pow__(self, y: Union[Obs, float, int]) -> Obs: if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] ** x[1], [self, y], man_grad=[y.value * self.value ** (y.value - 1), self.value ** y.value * np.log(self.value)]) else: return derived_observable(lambda x, **kwargs: x[0] ** y, [self], man_grad=[y * self.value ** (y - 1)]) - def __rpow__(self, y): + def __rpow__(self, y: Union[float, int]) -> Obs: return derived_observable(lambda x, **kwargs: y ** x[0], [self], man_grad=[y ** self.value * np.log(y)]) - def __abs__(self): + def __abs__(self) -> Obs: return derived_observable(lambda x: anp.abs(x[0]), [self]) # Overload numpy functions - def sqrt(self): + def sqrt(self) -> Obs: return derived_observable(lambda x, **kwargs: np.sqrt(x[0]), [self], man_grad=[1 / 2 / np.sqrt(self.value)]) - def log(self): + def log(self) -> Obs: return derived_observable(lambda x, **kwargs: np.log(x[0]), [self], man_grad=[1 / self.value]) - def exp(self): + def exp(self) -> Obs: return derived_observable(lambda x, **kwargs: np.exp(x[0]), [self], man_grad=[np.exp(self.value)]) - def sin(self): + def sin(self) -> Obs: return derived_observable(lambda x, **kwargs: np.sin(x[0]), [self], man_grad=[np.cos(self.value)]) - def cos(self): + def cos(self) -> Obs: return derived_observable(lambda x, **kwargs: np.cos(x[0]), [self], man_grad=[-np.sin(self.value)]) - def tan(self): + def tan(self) -> Obs: return derived_observable(lambda x, **kwargs: np.tan(x[0]), [self], man_grad=[1 / np.cos(self.value) ** 2]) - def arcsin(self): + def arcsin(self) -> Obs: return derived_observable(lambda x: anp.arcsin(x[0]), [self]) - def arccos(self): + def arccos(self) -> Obs: return derived_observable(lambda x: anp.arccos(x[0]), [self]) - def arctan(self): + def arctan(self) -> Obs: return derived_observable(lambda x: anp.arctan(x[0]), [self]) - def sinh(self): + def sinh(self) -> Obs: return derived_observable(lambda x, **kwargs: np.sinh(x[0]), [self], man_grad=[np.cosh(self.value)]) - def cosh(self): + def cosh(self) -> Obs: return derived_observable(lambda x, **kwargs: np.cosh(x[0]), [self], man_grad=[np.sinh(self.value)]) - def tanh(self): + def tanh(self) -> Obs: return derived_observable(lambda x, **kwargs: np.tanh(x[0]), [self], man_grad=[1 / np.cosh(self.value) ** 2]) - def arcsinh(self): + def arcsinh(self) -> Obs: return derived_observable(lambda x: anp.arcsinh(x[0]), [self]) - def arccosh(self): + def arccosh(self) -> Obs: return derived_observable(lambda x: anp.arccosh(x[0]), [self]) - def arctanh(self): + def arctanh(self) -> Obs: return derived_observable(lambda x: anp.arctanh(x[0]), [self]) @@ -917,17 +939,17 @@ class CObs: """Class for a complex valued observable.""" __slots__ = ['_real', '_imag', 'tag'] - def __init__(self, real, imag=0.0): + def __init__(self, real: Obs, imag: Union[Obs, float, int]=0.0): self._real = real self._imag = imag self.tag = None @property - def real(self): + def real(self) -> Obs: return self._real @property - def imag(self): + def imag(self) -> Union[Obs, float, int]: return self._imag def gamma_method(self, **kwargs): @@ -937,14 +959,14 @@ def gamma_method(self, **kwargs): if isinstance(self.imag, Obs): self.imag.gamma_method(**kwargs) - def is_zero(self): + def is_zero(self) -> bool: """Checks whether both real and imaginary part are zero within machine precision.""" return self.real == 0.0 and self.imag == 0.0 - def conjugate(self): + def conjugate(self) -> CObs: return CObs(self.real, -self.imag) - def __add__(self, other): + def __add__(self, other: Any) -> Union[CObs, ndarray]: if isinstance(other, np.ndarray): return other + self elif hasattr(other, 'real') and hasattr(other, 'imag'): @@ -953,10 +975,10 @@ def __add__(self, other): else: return CObs(self.real + other, self.imag) - def __radd__(self, y): + def __radd__(self, y: Union[complex, float, Obs, int]) -> "CObs": return self + y - def __sub__(self, other): + def __sub__(self, other: Any) -> Union[CObs, ndarray]: if isinstance(other, np.ndarray): return -1 * (other - self) elif hasattr(other, 'real') and hasattr(other, 'imag'): @@ -964,14 +986,14 @@ def __sub__(self, other): else: return CObs(self.real - other, self.imag) - def __rsub__(self, other): + def __rsub__(self, other: Union[complex, float, Obs, int]) -> "CObs": return -1 * (self - other) - def __mul__(self, other): + def __mul__(self, other: Any) -> Union[CObs, ndarray]: if isinstance(other, np.ndarray): return other * self elif hasattr(other, 'real') and hasattr(other, 'imag'): - if all(isinstance(i, Obs) for i in [self.real, self.imag, other.real, other.imag]): + if isinstance(self.real, Obs) and isinstance(self.imag, Obs) and isinstance(other.real, Obs) and isinstance(other.imag, Obs): return CObs(derived_observable(lambda x, **kwargs: x[0] * x[1] - x[2] * x[3], [self.real, other.real, self.imag, other.imag], man_grad=[other.real.value, self.real.value, -other.imag.value, -self.imag.value]), @@ -986,10 +1008,10 @@ def __mul__(self, other): else: return CObs(self.real * other, self.imag * other) - def __rmul__(self, other): + def __rmul__(self, other: Union[complex, Obs, CObs, float, int]) -> "CObs": return self * other - def __truediv__(self, other): + def __truediv__(self, other: Any) -> Union[CObs, ndarray]: if isinstance(other, np.ndarray): return 1 / (other / self) elif hasattr(other, 'real') and hasattr(other, 'imag'): @@ -998,32 +1020,35 @@ def __truediv__(self, other): else: return CObs(self.real / other, self.imag / other) - def __rtruediv__(self, other): + def __rtruediv__(self, other: Union[complex, float, Obs, CObs, int]) -> CObs: r = self.real ** 2 + self.imag ** 2 if hasattr(other, 'real') and hasattr(other, 'imag'): return CObs((self.real * other.real + self.imag * other.imag) / r, (self.real * other.imag - self.imag * other.real) / r) else: return CObs(self.real * other / r, -self.imag * other / r) - def __abs__(self): + def __abs__(self) -> Obs: return np.sqrt(self.real**2 + self.imag**2) - def __pos__(self): + def __pos__(self) -> "CObs": return self - def __neg__(self): + def __neg__(self) -> "CObs": return -1 * self - def __eq__(self, other): - return self.real == other.real and self.imag == other.imag + def __eq__(self, other: object) -> bool: + if hasattr(other, 'real') and hasattr(other, 'imag'): + return self.real == other.real and self.imag == other.imag + else: + return False - def __str__(self): + def __str__(self) -> str: return '(' + str(self.real) + int(self.imag >= 0.0) * '+' + str(self.imag) + 'j)' - def __repr__(self): + def __repr__(self) -> str: return 'CObs[' + str(self) + ']' - def __format__(self, format_type): + def __format__(self, format_type: str) -> str: if format_type == "": significance = 2 format_type = "2" @@ -1032,7 +1057,7 @@ def __format__(self, format_type): return f"({self.real:{format_type}}{self.imag:+{significance}}j)" -def gamma_method(x, **kwargs): +def gamma_method(x: Union[Corr, Obs, ndarray, list[Obs]], **kwargs) -> ndarray: """Vectorized version of the gamma_method applicable to lists or arrays of Obs. See docstring of pe.Obs.gamma_method for details. @@ -1043,7 +1068,7 @@ def gamma_method(x, **kwargs): gm = gamma_method -def _format_uncertainty(value, dvalue, significance=2): +def _format_uncertainty(value: Union[float, float64, int], dvalue: Union[float, float64, int], significance: int=2) -> str: """Creates a string of a value and its error in paranthesis notation, e.g., 13.02(45)""" if dvalue == 0.0 or (not np.isfinite(dvalue)): return str(value) @@ -1060,7 +1085,7 @@ def _format_uncertainty(value, dvalue, significance=2): return f"{value:.{max(0, int(significance - fexp - 1))}f}({dvalue:2.{max(0, int(significance - fexp - 1))}f})" -def _expand_deltas(deltas, idx, shape, gapsize): +def _expand_deltas(deltas: ndarray, idx: Union[range, list[int], list[int64]], shape: int, gapsize: Union[int64, int]) -> ndarray: """Expand deltas defined on idx to a regular range with spacing gapsize between two configurations and where holes are filled by 0. If idx is of type range, the deltas are not changed if the idx.step == gapsize. @@ -1086,7 +1111,7 @@ def _expand_deltas(deltas, idx, shape, gapsize): return ret -def _merge_idx(idl): +def _merge_idx(idl: list[Union[list[Union[int, int]], range, list[int]]]) -> Union[list[Union[int, int]], range, list[int]]: """Returns the union of all lists in idl as range or sorted list Parameters @@ -1109,7 +1134,7 @@ def _merge_idx(idl): return idunion -def _intersection_idx(idl): +def _intersection_idx(idl: list[Union[range, list[int]]]) -> Union[range, list[int]]: """Returns the intersection of all lists in idl as range or sorted list Parameters @@ -1135,7 +1160,7 @@ def _intersection_idx(idl): return idinter -def _expand_deltas_for_merge(deltas, idx, shape, new_idx, scalefactor): +def _expand_deltas_for_merge(deltas: ndarray, idx: Union[range, list[int]], shape: int, new_idx: Union[range, list[int]], scalefactor: Union[float, int]) -> ndarray: """Expand deltas defined on idx to the list of configs that is defined by new_idx. New, empty entries are filled by 0. If idx and new_idx are of type range, the smallest common divisor of the step sizes is used as new step size. @@ -1167,7 +1192,7 @@ def _expand_deltas_for_merge(deltas, idx, shape, new_idx, scalefactor): return np.array([ret[new_idx[i] - new_idx[0]] for i in range(len(new_idx))]) * len(new_idx) / len(idx) * scalefactor -def derived_observable(func, data, array_mode=False, **kwargs): +def derived_observable(func: Callable, data: Any, array_mode: bool=False, **kwargs) -> Union[Obs, ndarray]: """Construct a derived Obs according to func(data, **kwargs) using automatic differentiation. Parameters @@ -1205,7 +1230,7 @@ def derived_observable(func, data, array_mode=False, **kwargs): if isinstance(raveled_data[i], (int, float)): raveled_data[i] = cov_Obs(raveled_data[i], 0.0, "###dummy_covobs###") - allcov = {} + allcov: dict[str, ndarray] = {} for o in raveled_data: for name in o.cov_names: if name in allcov: @@ -1295,8 +1320,8 @@ def __init__(self, N): self.grad = np.zeros((N, 1)) new_covobs_lengths = dict(set([y for x in [[(n, o.covobs[n].N) for n in o.cov_names] for o in raveled_data] for y in x])) - d_extracted = {} - g_extracted = {} + d_extracted: dict[str, list] = {} + g_extracted: dict[str, list] = {} for name in new_sample_names: d_extracted[name] = [] ens_length = len(new_idl_d[name]) @@ -1357,7 +1382,7 @@ def __init__(self, N): return final_result -def _reduce_deltas(deltas, idx_old, idx_new): +def _reduce_deltas(deltas: Union[list[float], ndarray], idx_old: Union[range, list[int]], idx_new: Union[range, list[int], ndarray]) -> Union[list[float], ndarray]: """Extract deltas defined on idx_old on all configs of idx_new. Assumes, that idx_old and idx_new are correctly defined idl, i.e., they @@ -1386,7 +1411,7 @@ def _reduce_deltas(deltas, idx_old, idx_new): return np.array(deltas)[indices] -def reweight(weight, obs, **kwargs): +def reweight(weight: Obs, obs: Union[ndarray, list[Obs]], **kwargs) -> list[Obs]: """Reweight a list of observables. Parameters @@ -1410,8 +1435,8 @@ def reweight(weight, obs, **kwargs): for name in obs[i].names: if not set(obs[i].idl[name]).issubset(weight.idl[name]): raise ValueError('obs[%d] has to be defined on a subset of the configs in weight.idl[%s]!' % (i, name)) - new_samples = [] - w_deltas = {} + new_samples: list = [] + w_deltas: dict[str, ndarray] = {} for name in sorted(obs[i].names): w_deltas[name] = _reduce_deltas(weight.deltas[name], weight.idl[name], obs[i].idl[name]) new_samples.append((w_deltas[name] + weight.r_values[name]) * (obs[i].deltas[name] + obs[i].r_values[name])) @@ -1428,7 +1453,7 @@ def reweight(weight, obs, **kwargs): return result -def correlate(obs_a, obs_b): +def correlate(obs_a: Obs, obs_b: Obs) -> Obs: """Correlate two observables. Parameters @@ -1471,7 +1496,7 @@ def correlate(obs_a, obs_b): return o -def covariance(obs, visualize=False, correlation=False, smooth=None, **kwargs): +def covariance(obs: Union[ndarray, list[Obs]], visualize: bool=False, correlation: bool=False, smooth: Optional[int]=None, **kwargs) -> ndarray: r'''Calculates the error covariance matrix of a set of observables. WARNING: This function should be used with care, especially for observables with support on multiple @@ -1541,7 +1566,7 @@ def covariance(obs, visualize=False, correlation=False, smooth=None, **kwargs): return cov -def invert_corr_cov_cholesky(corr, inverrdiag): +def invert_corr_cov_cholesky(corr: ndarray, inverrdiag: ndarray) -> ndarray: """Constructs a lower triangular matrix `chol` via the Cholesky decomposition of the correlation matrix `corr` and then returns the inverse covariance matrix `chol_inv` as a lower triangular matrix by solving `chol * x = inverrdiag`. @@ -1564,7 +1589,7 @@ def invert_corr_cov_cholesky(corr, inverrdiag): return chol_inv -def sort_corr(corr, kl, yd): +def sort_corr(corr: ndarray, kl: list[str], yd: dict[str, list[Obs]]) -> ndarray: """ Reorders a correlation matrix to match the alphabetical order of its underlying y data. The ordering of the input correlation matrix `corr` is given by the list of keys `kl`. @@ -1627,7 +1652,7 @@ def sort_corr(corr, kl, yd): return corr_sorted -def _smooth_eigenvalues(corr, E): +def _smooth_eigenvalues(corr: ndarray, E: int) -> ndarray: """Eigenvalue smoothing as described in hep-lat/9412087 corr : np.ndarray @@ -1644,7 +1669,7 @@ def _smooth_eigenvalues(corr, E): return vec @ np.diag(vals) @ vec.T -def _covariance_element(obs1, obs2): +def _covariance_element(obs1: Obs, obs2: Obs) -> Union[float, float64]: """Estimates the covariance of two Obs objects, neglecting autocorrelations.""" def calc_gamma(deltas1, deltas2, idx1, idx2, new_idx): @@ -1704,7 +1729,7 @@ def calc_gamma(deltas1, deltas2, idx1, idx2, new_idx): return dvalue -def import_jackknife(jacks, name, idl=None): +def import_jackknife(jacks: ndarray, name: str, idl: Optional[list[Union[list[int], range]]]=None) -> Obs: """Imports jackknife samples and returns an Obs Parameters @@ -1724,7 +1749,7 @@ def import_jackknife(jacks, name, idl=None): return new_obs -def import_bootstrap(boots, name, random_numbers): +def import_bootstrap(boots: ndarray, name: str, random_numbers: ndarray) -> Obs: """Imports bootstrap samples and returns an Obs Parameters @@ -1754,7 +1779,7 @@ def import_bootstrap(boots, name, random_numbers): return ret -def merge_obs(list_of_obs): +def merge_obs(list_of_obs: list[Obs]) -> Obs: """Combine all observables in list_of_obs into one new observable Parameters @@ -1772,11 +1797,11 @@ def merge_obs(list_of_obs): if any([len(o.cov_names) for o in list_of_obs]): raise ValueError('Not possible to merge data that contains covobs!') new_dict = {} - idl_dict = {} + idl_dict: dict[str, Union[range, list[int]]] = {} for o in list_of_obs: new_dict.update({key: o.deltas.get(key, 0) + o.r_values.get(key, 0) for key in set(o.deltas) | set(o.r_values)}) - idl_dict.update({key: o.idl.get(key, 0) for key in set(o.deltas)}) + idl_dict.update({key: o.idl.get(key) for key in set(o.deltas)}) names = sorted(new_dict.keys()) o = Obs([new_dict[name] for name in names], names, idl=[idl_dict[name] for name in names]) @@ -1784,7 +1809,7 @@ def merge_obs(list_of_obs): return o -def cov_Obs(means, cov, name, grad=None): +def cov_Obs(means: Union[int, list[float], float, list[int]], cov: Any, name: str, grad: None=None) -> Union[Obs, list[Obs]]: """Create an Obs based on mean(s) and a covariance matrix Parameters @@ -1827,13 +1852,14 @@ def covobs_to_obs(co): return ol -def _determine_gap(o, e_content, e_name): +def _determine_gap(o: Obs, e_content: dict[str, list[str]], e_name: str) -> Union[int64, int]: gaps = [] for r_name in e_content[e_name]: - if isinstance(o.idl[r_name], range): - gaps.append(o.idl[r_name].step) + my_idl = o.idl[r_name] + if isinstance(my_idl, range): + gaps.append(my_idl.step) else: - gaps.append(np.min(np.diff(o.idl[r_name]))) + gaps.append(np.min(np.diff(my_idl))) gap = min(gaps) if not np.all([gi % gap == 0 for gi in gaps]): @@ -1842,7 +1868,7 @@ def _determine_gap(o, e_content, e_name): return gap -def _check_lists_equal(idl): +def _check_lists_equal(idl: Sequence[Union[list[int], list[Union[int64, int]], range, ndarray]]): ''' Use groupby to efficiently check whether all elements of idl are identical. Returns True if all elements are equal, otherwise False. diff --git a/pyerrors/roots.py b/pyerrors/roots.py index acc5614f..79ac94d1 100644 --- a/pyerrors/roots.py +++ b/pyerrors/roots.py @@ -1,10 +1,12 @@ +from __future__ import annotations import numpy as np import scipy.optimize from autograd import jacobian -from .obs import derived_observable +from .obs import Obs, derived_observable +from typing import Callable, Union -def find_root(d, func, guess=1.0, **kwargs): +def find_root(d: Union[Obs, list[Obs]], func: Callable, guess: float=1.0, **kwargs) -> Obs: r'''Finds the root of the function func(x, d) where d is an `Obs`. Parameters diff --git a/pyerrors/special.py b/pyerrors/special.py index 8b36e056..bb9e82b9 100644 --- a/pyerrors/special.py +++ b/pyerrors/special.py @@ -1,3 +1,4 @@ +from __future__ import annotations import scipy import numpy as np from autograd.extend import primitive, defvjp diff --git a/pyproject.toml b/pyproject.toml index 657ec5bb..de95dd75 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,3 +4,7 @@ build-backend = "setuptools.build_meta" [tool.ruff.lint] ignore = ["F403"] + +[tool.mypy] +warn_unused_configs = true +ignore_missing_imports = true