Skip to content

Commit

Permalink
add Tukey HSD functions and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
phobson committed Mar 25, 2024
1 parent 4831ddb commit fec02f8
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 10 deletions.
Binary file added wqio/tests/_data/wq.pkl
Binary file not shown.
113 changes: 103 additions & 10 deletions wqio/tests/utils_tests/test_numutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
from collections import namedtuple
from io import StringIO
from textwrap import dedent
from typing import Any, Literal

import numpy
import numpy.testing as nptest
import pandas
import pandas.testing as pdtest
import pytest
import statsmodels.api as sm
from numpy._typing._array_like import NDArray
from pandas import DataFrame
from scipy import stats

from wqio.tests import helpers
Expand Down Expand Up @@ -92,7 +95,7 @@ def test_process_p_vals(fxn, pval, expected, error_to_raise):
(1.01, (None, None), ValueError),
],
)
def test_translate_p_vals(pval, expected, as_emoji, error_to_raise):
def test_translate_p_vals(pval, expected, as_emoji: bool, error_to_raise):
with helpers.raises(error_to_raise):
result = numutils.translate_p_vals(pval, as_emoji=as_emoji)
assert result == expected[as_emoji]
Expand Down Expand Up @@ -125,7 +128,7 @@ def test_anderson_darling():


@pytest.mark.parametrize("which", ["good", "bad"])
def test_processAndersonDarlingResults(which):
def test_processAndersonDarlingResults(which: Literal["good"] | Literal["bad"]):
fieldnames = ["statistic", "critical_values", "significance_level"]
AndersonResult = namedtuple("AndersonResult", fieldnames)
ARs = {
Expand Down Expand Up @@ -215,7 +218,7 @@ def units_norm_data():
return raw, expected


def test_normalize_units(units_norm_data):
def test_normalize_units(units_norm_data: tuple[DataFrame, DataFrame]):
unitsmap = {"ug/L": 1e-6, "mg/L": 1e-3, "g/L": 1e0}

targetunits = {"Lead, Total": "ug/L", "Cadmium, Total": "mg/L"}
Expand All @@ -226,7 +229,7 @@ def test_normalize_units(units_norm_data):
pdtest.assert_frame_equal(result, expected)


def test_normalize_units_bad_targetunits(units_norm_data):
def test_normalize_units_bad_targetunits(units_norm_data: tuple[DataFrame, DataFrame]):
unitsmap = {"ug/L": 1e-6, "mg/L": 1e-3, "g/L": 1e0}

targetunits = {"Lead, Total": "ug/L"}
Expand All @@ -243,7 +246,7 @@ def test_normalize_units_bad_targetunits(units_norm_data):
)


def test_normalize_units_bad_normalization(units_norm_data):
def test_normalize_units_bad_normalization(units_norm_data: tuple[DataFrame, DataFrame]):
unitsmap = {"mg/L": 1e-3, "g/L": 1e0}

targetunits = {"Lead, Total": "ug/L", "Cadmium, Total": "mg/L"}
Expand All @@ -260,7 +263,7 @@ def test_normalize_units_bad_normalization(units_norm_data):
)


def test_normalize_units_bad_conversion(units_norm_data):
def test_normalize_units_bad_conversion(units_norm_data: tuple[DataFrame, DataFrame]):
unitsmap = {"ug/L": 1e-6, "mg/L": 1e-3, "g/L": 1e0}

targetunits = {"Lead, Total": "ng/L", "Cadmium, Total": "mg/L"}
Expand Down Expand Up @@ -292,7 +295,7 @@ def test_test_pH2concentration(pH, expected, error):

@helpers.seed
@pytest.mark.parametrize("error", [None, ValueError])
def test_compute_theilslope_default(error):
def test_compute_theilslope_default(error: types.NoneType | type[ValueError]):
with helpers.raises(error):
y = helpers.getTestROSData()["res"].values
x = numpy.arange(len(y) - 1) if error else None
Expand Down Expand Up @@ -443,7 +446,7 @@ def fit_data():
(None, "junk", ValueError),
],
)
def test_fit_line(fit_data, fitlogs, fitprobs, error):
def test_fit_line(fit_data: dict[str, NDArray[Any]], fitlogs, fitprobs, error):
xy = {
(None, None): (fit_data["zscores"], fit_data["data"]),
("y", None): (fit_data["zscores"], fit_data["data"]),
Expand Down Expand Up @@ -483,13 +486,13 @@ def test_fit_line(fit_data, fitlogs, fitprobs, error):
assert isinstance(res, sm.regression.linear_model.RegressionResultsWrapper)


def test_fit_line_through_origin(fit_data):
def test_fit_line_through_origin(fit_data: dict[str, NDArray[Any]]):
x, y = fit_data["zscores"], fit_data["data"]
x_, y_, res = numutils.fit_line(x, y, through_origin=True)
assert res.params[0] == 0


def test_fit_line_with_xhat(fit_data):
def test_fit_line_with_xhat(fit_data: dict[str, NDArray[Any]]):
x, y = fit_data["zscores"], fit_data["data"]
x_, y_, res = numutils.fit_line(x, y, xhat=[-2, -1, 0, 1, 2])
expected = [-0.566018, 4.774419, 10.114857, 15.455295, 20.795733]
Expand Down Expand Up @@ -799,3 +802,93 @@ def test_remove_outliers():
x = numpy.random.normal(0, 4, size=37)

assert numutils.remove_outliers(x).shape == expected_shape


def test_tukey_hsd_functions():
expected_records = [
{
"chemical_name": "Copper",
"Loc_0": -2.0,
"Loc_1": 6.0,
"Loc_2": -2.0,
"Loc_3": -4.0,
"Loc_4": 3.0,
"Loc_5": -1.0,
"Loc_6": 0.0,
},
{
"chemical_name": "Di(2-ethylhexyl)phthalate",
"Loc_0": 3.0,
"Loc_1": 5.0,
"Loc_2": -2.0,
"Loc_3": -2.0,
"Loc_4": -2.0,
"Loc_5": -1.0,
"Loc_6": -1.0,
},
{
"chemical_name": "Indeno(1,2,3-cd)pyrene",
"Loc_0": 2.0,
"Loc_1": 0.0,
"Loc_2": 6.0,
"Loc_3": -2.0,
"Loc_4": -2.0,
"Loc_5": -4.0,
"Loc_6": 0.0,
},
{
"chemical_name": "Lead",
"Loc_0": 0.0,
"Loc_1": 6.0,
"Loc_2": -2.0,
"Loc_3": -3.0,
"Loc_4": 4.0,
"Loc_5": -3.0,
"Loc_6": -2.0,
},
{
"chemical_name": "Phenanthrene",
"Loc_0": 1.0,
"Loc_1": 0.0,
"Loc_2": 1.0,
"Loc_3": -3.0,
"Loc_4": 0.0,
"Loc_5": 0.0,
"Loc_6": 1.0,
},
{
"chemical_name": "Pyrene",
"Loc_0": 0.0,
"Loc_1": -1.0,
"Loc_2": 3.0,
"Loc_3": -2.0,
"Loc_4": -2.0,
"Loc_5": -2.0,
"Loc_6": 4.0,
},
{
"chemical_name": "Total Suspended Solids",
"Loc_0": -1.0,
"Loc_1": -1.0,
"Loc_2": -1.0,
"Loc_3": -1.0,
"Loc_4": -1.0,
"Loc_5": -1.0,
"Loc_6": 6.0,
},
{
"chemical_name": "Zinc",
"Loc_0": 0.0,
"Loc_1": 1.0,
"Loc_2": -1.0,
"Loc_3": -6.0,
"Loc_4": -1.0,
"Loc_5": 4.0,
"Loc_6": 3.0,
},
]
expected = pandas.DataFrame(expected_records).set_index("chemical_name")
wq = pandas.read_pickle(helpers.test_data_path("wq.pkl"))
hsd = numutils.tukey_hsd(wq, "res", "location", "chemical_name")
result = numutils.process_tukey_hsd_scores(hsd)
pandas.testing.assert_frame_equal(result, expected, check_names=False)
54 changes: 54 additions & 0 deletions wqio/utils/numutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,3 +786,57 @@ def _paired_stat_generator(
stat = statfxn(x, y, **statopts)
row.update({statname: stat[0], "pvalue": stat.pvalue})
yield row


def _tukey_res_to_df(names, hsd_res):
rows = []
for i, n1 in enumerate(names):
for j, n2 in enumerate(names):
if i != j:
ci_bands = hsd_res.confidence_interval()
row = {
"Loc 1": names[n1],
"Loc 2": names[n2],
"HSD Stat": hsd_res.statistic[i, j],
"p-value": hsd_res.pvalue[i, j],
"CI-Low": ci_bands.low[i, j],
"CI-High": ci_bands.high[i, j],
}

rows.append(row)

df = pandas.DataFrame(rows).set_index(["Loc 1", "Loc 2"])
return df


def tukey_hsd(df, rescol, compcol, paramcol, *othergroups):
groupcols = [paramcol, *othergroups]
scores = []
for name, g in df.groupby(by=groupcols):
locs = {loc: subg[rescol].values for loc, subg in g.groupby(compcol) if subg.shape[0] > 1}
subset_names = {n: loc for n, loc in enumerate(locs)}
res = stats.tukey_hsd(*[v for v in locs.values()])
df_res = _tukey_res_to_df(subset_names, res)

keys = {g: n for g, n in zip(groupcols, name)}
scores.append(
df_res.assign(
is_diff=lambda df: df["p-value"].lt(0.05).astype(int),
sign_of_diff=lambda df: numpy.sign(df["HSD Stat"]).astype(int),
score=lambda df: df["is_diff"] * df["sign_of_diff"],
**keys,
).set_index(groupcols, append=True)
) # ["score"].unstack(level="Loc 2").fillna(0).sum(axis="columns")

return pandas.concat(scores, ignore_index=False, axis="index")


def process_tukey_hsd_scores(hsd_df):
return (
hsd_df["score"]
.unstack(level="Loc 2")
.fillna(0)
.groupby(level="chemical_name", as_index=False, group_keys=False)
.apply(lambda g: g.sum(axis="columns"))
.unstack(level="Loc 1")
)

0 comments on commit fec02f8

Please sign in to comment.