Skip to content

Commit

Permalink
add Tukey HSD functions and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
phobson committed Mar 25, 2024
1 parent 4831ddb commit 9d45fdf
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 10 deletions.
7 changes: 7 additions & 0 deletions wqio/datacollections.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,13 @@ def f_test(self, **opts):
"""
return self.comparison_stat_allway(stats.f_oneway, statname="f-test", control=None, **opts)

def tukey_hsd(self) -> tuple[pandas.DataFrame, pandas.DataFrame]:
hsd = utils.tukey_hsd(
self.tidy, self.rescol, self.stationcol, self.paramcol, *self.othergroups
)
scores = utils.process_tukey_hsd_scores(hsd, self.stationcol, self.paramcol)
return hsd, scores

def theilslopes(self, logs=False):
raise NotImplementedError

Expand Down
Binary file added wqio/tests/_data/wq.pkl
Binary file not shown.
20 changes: 20 additions & 0 deletions wqio/tests/test_datacollections.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,26 @@ def test_f_test(dc):
pandas.testing.assert_frame_equal(result, expected)


def test_tukey_hsd_smoke_test(dc):
hsd, scores = dc.tukey_hsd()
assert isinstance(hsd, pandas.DataFrame)
assert isinstance(scores, pandas.DataFrame)

assert hsd.index.names == ["loc 1", "loc 2", "param"]
assert hsd.columns.tolist() == [
"HSD Stat",
"p-value",
"CI-Low",
"CI-High",
"is_diff",
"sign_of_diff",
"score",
]

assert scores.index.names == ["param"]
assert scores.columns.tolist() == ["Inflow", "Outflow", "Reference"]


@helpers.seed
def test_theilslopes(dc):
with helpers.raises(NotImplementedError):
Expand Down
113 changes: 103 additions & 10 deletions wqio/tests/utils_tests/test_numutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
from collections import namedtuple
from io import StringIO
from textwrap import dedent
from typing import Any, Literal

import numpy
import numpy.testing as nptest
import pandas
import pandas.testing as pdtest
import pytest
import statsmodels.api as sm
from numpy._typing._array_like import NDArray
from pandas import DataFrame
from scipy import stats

from wqio.tests import helpers
Expand Down Expand Up @@ -92,7 +95,7 @@ def test_process_p_vals(fxn, pval, expected, error_to_raise):
(1.01, (None, None), ValueError),
],
)
def test_translate_p_vals(pval, expected, as_emoji, error_to_raise):
def test_translate_p_vals(pval, expected, as_emoji: bool, error_to_raise):
with helpers.raises(error_to_raise):
result = numutils.translate_p_vals(pval, as_emoji=as_emoji)
assert result == expected[as_emoji]
Expand Down Expand Up @@ -125,7 +128,7 @@ def test_anderson_darling():


@pytest.mark.parametrize("which", ["good", "bad"])
def test_processAndersonDarlingResults(which):
def test_processAndersonDarlingResults(which: Literal["good"] | Literal["bad"]):
fieldnames = ["statistic", "critical_values", "significance_level"]
AndersonResult = namedtuple("AndersonResult", fieldnames)
ARs = {
Expand Down Expand Up @@ -215,7 +218,7 @@ def units_norm_data():
return raw, expected


def test_normalize_units(units_norm_data):
def test_normalize_units(units_norm_data: tuple[DataFrame, DataFrame]):
unitsmap = {"ug/L": 1e-6, "mg/L": 1e-3, "g/L": 1e0}

targetunits = {"Lead, Total": "ug/L", "Cadmium, Total": "mg/L"}
Expand All @@ -226,7 +229,7 @@ def test_normalize_units(units_norm_data):
pdtest.assert_frame_equal(result, expected)


def test_normalize_units_bad_targetunits(units_norm_data):
def test_normalize_units_bad_targetunits(units_norm_data: tuple[DataFrame, DataFrame]):
unitsmap = {"ug/L": 1e-6, "mg/L": 1e-3, "g/L": 1e0}

targetunits = {"Lead, Total": "ug/L"}
Expand All @@ -243,7 +246,7 @@ def test_normalize_units_bad_targetunits(units_norm_data):
)


def test_normalize_units_bad_normalization(units_norm_data):
def test_normalize_units_bad_normalization(units_norm_data: tuple[DataFrame, DataFrame]):
unitsmap = {"mg/L": 1e-3, "g/L": 1e0}

targetunits = {"Lead, Total": "ug/L", "Cadmium, Total": "mg/L"}
Expand All @@ -260,7 +263,7 @@ def test_normalize_units_bad_normalization(units_norm_data):
)


def test_normalize_units_bad_conversion(units_norm_data):
def test_normalize_units_bad_conversion(units_norm_data: tuple[DataFrame, DataFrame]):
unitsmap = {"ug/L": 1e-6, "mg/L": 1e-3, "g/L": 1e0}

targetunits = {"Lead, Total": "ng/L", "Cadmium, Total": "mg/L"}
Expand Down Expand Up @@ -292,7 +295,7 @@ def test_test_pH2concentration(pH, expected, error):

@helpers.seed
@pytest.mark.parametrize("error", [None, ValueError])
def test_compute_theilslope_default(error):
def test_compute_theilslope_default(error: types.NoneType | type[ValueError]):
with helpers.raises(error):
y = helpers.getTestROSData()["res"].values
x = numpy.arange(len(y) - 1) if error else None
Expand Down Expand Up @@ -443,7 +446,7 @@ def fit_data():
(None, "junk", ValueError),
],
)
def test_fit_line(fit_data, fitlogs, fitprobs, error):
def test_fit_line(fit_data: dict[str, NDArray[Any]], fitlogs, fitprobs, error):
xy = {
(None, None): (fit_data["zscores"], fit_data["data"]),
("y", None): (fit_data["zscores"], fit_data["data"]),
Expand Down Expand Up @@ -483,13 +486,13 @@ def test_fit_line(fit_data, fitlogs, fitprobs, error):
assert isinstance(res, sm.regression.linear_model.RegressionResultsWrapper)


def test_fit_line_through_origin(fit_data):
def test_fit_line_through_origin(fit_data: dict[str, NDArray[Any]]):
x, y = fit_data["zscores"], fit_data["data"]
x_, y_, res = numutils.fit_line(x, y, through_origin=True)
assert res.params[0] == 0


def test_fit_line_with_xhat(fit_data):
def test_fit_line_with_xhat(fit_data: dict[str, NDArray[Any]]):
x, y = fit_data["zscores"], fit_data["data"]
x_, y_, res = numutils.fit_line(x, y, xhat=[-2, -1, 0, 1, 2])
expected = [-0.566018, 4.774419, 10.114857, 15.455295, 20.795733]
Expand Down Expand Up @@ -799,3 +802,93 @@ def test_remove_outliers():
x = numpy.random.normal(0, 4, size=37)

assert numutils.remove_outliers(x).shape == expected_shape


def test_tukey_hsd_functions():
expected_records = [
{
"chemical_name": "Copper",
"Loc_0": -2.0,
"Loc_1": 6.0,
"Loc_2": -2.0,
"Loc_3": -4.0,
"Loc_4": 3.0,
"Loc_5": -1.0,
"Loc_6": 0.0,
},
{
"chemical_name": "Di(2-ethylhexyl)phthalate",
"Loc_0": 3.0,
"Loc_1": 5.0,
"Loc_2": -2.0,
"Loc_3": -2.0,
"Loc_4": -2.0,
"Loc_5": -1.0,
"Loc_6": -1.0,
},
{
"chemical_name": "Indeno(1,2,3-cd)pyrene",
"Loc_0": 2.0,
"Loc_1": 0.0,
"Loc_2": 6.0,
"Loc_3": -2.0,
"Loc_4": -2.0,
"Loc_5": -4.0,
"Loc_6": 0.0,
},
{
"chemical_name": "Lead",
"Loc_0": 0.0,
"Loc_1": 6.0,
"Loc_2": -2.0,
"Loc_3": -3.0,
"Loc_4": 4.0,
"Loc_5": -3.0,
"Loc_6": -2.0,
},
{
"chemical_name": "Phenanthrene",
"Loc_0": 1.0,
"Loc_1": 0.0,
"Loc_2": 1.0,
"Loc_3": -3.0,
"Loc_4": 0.0,
"Loc_5": 0.0,
"Loc_6": 1.0,
},
{
"chemical_name": "Pyrene",
"Loc_0": 0.0,
"Loc_1": -1.0,
"Loc_2": 3.0,
"Loc_3": -2.0,
"Loc_4": -2.0,
"Loc_5": -2.0,
"Loc_6": 4.0,
},
{
"chemical_name": "Total Suspended Solids",
"Loc_0": -1.0,
"Loc_1": -1.0,
"Loc_2": -1.0,
"Loc_3": -1.0,
"Loc_4": -1.0,
"Loc_5": -1.0,
"Loc_6": 6.0,
},
{
"chemical_name": "Zinc",
"Loc_0": 0.0,
"Loc_1": 1.0,
"Loc_2": -1.0,
"Loc_3": -6.0,
"Loc_4": -1.0,
"Loc_5": 4.0,
"Loc_6": 3.0,
},
]
expected = pandas.DataFrame(expected_records).set_index("chemical_name")
wq = pandas.read_pickle(helpers.test_data_path("wq.pkl"))
hsd = numutils.tukey_hsd(wq, "res", "location", "chemical_name")
result = numutils.process_tukey_hsd_scores(hsd, "location", "chemical_name")
pandas.testing.assert_frame_equal(result, expected, check_names=False)
128 changes: 128 additions & 0 deletions wqio/utils/numutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import statsmodels.api as sm
from probscale.algo import _estimate_from_fit
from scipy import stats
from scipy.stats._hypotests import TukeyHSDResult

from wqio import validate
from wqio.utils import misc
Expand Down Expand Up @@ -786,3 +787,130 @@ def _paired_stat_generator(
stat = statfxn(x, y, **statopts)
row.update({statname: stat[0], "pvalue": stat.pvalue})
yield row


def _tukey_res_to_df(
names: list[str], hsd_res: list[TukeyHSDResult], group_prefix: str
) -> pandas.DataFrame:
"""Converts Scipy's TukeyHSDResult to a dataframe
Parameters
----------
names : list of str
Name of the groups present in the Tukey HSD Results
hsd_res : list of TukeyHSDResult
List of Tukey results to be converted to a dateframe
group_prefix : str (default = "Loc")
Prefix that describes the nature of the groups
Returns
-------
hsd_df : pandas.DataFrame
"""
rows = []
for i, n1 in enumerate(names):
for j, n2 in enumerate(names):
if i != j:
ci_bands = hsd_res.confidence_interval()
row = {
f"{group_prefix} 1": names[n1],
f"{group_prefix} 2": names[n2],
"HSD Stat": hsd_res.statistic[i, j],
"p-value": hsd_res.pvalue[i, j],
"CI-Low": ci_bands.low[i, j],
"CI-High": ci_bands.high[i, j],
}

rows.append(row)

df = pandas.DataFrame(rows).set_index([f"{group_prefix} 1", f"{group_prefix} 2"])
return df


def tukey_hsd(
df: pandas.DataFrame,
rescol: str,
compcol: str,
paramcol: str,
*othergroups: str,
):
"""
Run the Tukey HSD Test on a dataframe based on groupings
Parameters
----------
df : pandas.DataFrame
rescol : str
Name of the column that contains the values of interest
compcol: str
Name of the column that defines the groups to be compared
(i.e., treatment vs control)
paramcol: str
Name of the column that contains the measured parameter
*othergroups : str
Names of any other columsn that need to considered when
defining the groups to be considered.
Returns
-------
hsd_df : pandas.DataFrame
"""
groupcols = [paramcol, *othergroups]
scores = []
for name, g in df.groupby(by=groupcols):
locs = {loc: subg[rescol].values for loc, subg in g.groupby(compcol) if subg.shape[0] > 1}
subset_names = {n: loc for n, loc in enumerate(locs)}
res = stats.tukey_hsd(*[v for v in locs.values()])
df_res = _tukey_res_to_df(subset_names, res, group_prefix=compcol)

keys = {g: n for g, n in zip(groupcols, name)}
scores.append(
df_res.assign(
is_diff=lambda df: df["p-value"].lt(0.05).astype(int),
sign_of_diff=lambda df: numpy.sign(df["HSD Stat"]).astype(int),
score=lambda df: df["is_diff"] * df["sign_of_diff"],
**keys,
).set_index(groupcols, append=True)
)

return pandas.concat(scores, ignore_index=False, axis="index")


def process_tukey_hsd_scores(
hsd_df: pandas.DataFrame, compcol: str, paramcol: str
) -> pandas.DataFrame:
"""
Converts a Tukey HSD Results dataframe into scores that describe
the value of groups' magnitude relative to each other.
Generally speaking:
* -7 to -5 -> significantly lower
* -5 to -3 -> moderately lower
* -3 to -1 -> slightly lower
* -1 to +1 -> neutral
* +1 to +3 -> slightly higher
* +3 to +5 -> moderately higher
* +5 to +7 -> significantly higher
Parameters
----------
hsd_df : pandas.DataFrame
Dataframe dumped by `tukey_hsd`
group_prefix : str (default = "Loc")
Prefix that describes the nature of the groups
Returns
-------
scores : pandas.DataFrame
"""
return (
hsd_df["score"]
.unstack(level=f"{compcol} 2")
.fillna(0)
.groupby(level=paramcol, as_index=False, group_keys=False)
.apply(lambda g: g.sum(axis="columns"))
.unstack(level=f"{compcol} 1")
)

0 comments on commit 9d45fdf

Please sign in to comment.