Skip to content

Commit

Permalink
v1.8.2 bug fix for reading unmatched events as dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
ShawnStrasser committed Aug 29, 2024
1 parent a1f8eb8 commit 80e4282
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install build twine
pip install build twine toml
- name: Build package
run: python -m build
Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pip install atspm
```
Or pinned to a specific version:
```bash
pip install atspm==1.7.0
pip install atspm==1.8.2
```
`atspm` works on Python 3.10-3.12 and is tested on Ubuntu, Windows, and MacOS.

Expand Down Expand Up @@ -130,6 +130,11 @@ Detailed documentation for each measure is coming soon.

## Release Notes

### Version 1.8.2 (August 28, 2024)

#### Bug Fixes / Improvements:
- Fixed issue when passing unmatched events as a dataframe instead of a file path.

### Version 1.8.0 (August 28, 2024)

#### Bug Fixes / Improvements:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "atspm"
version = "1.8.0"
version = "1.8.2"
authors = [
{ name="Shawn Strasser", email="shawn.strasser@odot.oregon.gov" },
]
Expand Down
2 changes: 1 addition & 1 deletion src/atspm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "1.8.0"
__version__ = "1.8.2"

from .sample_data import sample_data
from .signal_data_processor import SignalDataProcessor
2 changes: 1 addition & 1 deletion src/atspm/data_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@ def aggregate_data(conn, aggregation_name, to_sql, **kwargs):
return None
except Exception as e:
print('Error when executing query for: ', aggregation_name)
print(query)
#print(query)
raise e
16 changes: 13 additions & 3 deletions src/atspm/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,19 @@ def load_data(conn,
where_clause = f" WHERE TimeStamp > TIMESTAMP '{min_timestamp}' - INTERVAL '{max_days_old} days'"
# Iterate over the strings/dataframes in unmatched_events dictionary
for key, value in unmatched_events.items():
if isinstance(value, str):
reference = value
else:
# Create a pointer for DuckDB
reference = 'unmatched_df'
unmatched_df = value

# Create view that unions the previous unmatched events with the new ones
if key == 'df_or_path':
load_sql = f"""
CREATE TABLE unmatched_previous AS
SELECT * FROM {value} {where_clause};
SELECT TimeStamp::DATETIME as TimeStamp, DeviceId as DeviceId, EventId::INT16 as EventId, Parameter::INT16 as Parameter
FROM {reference} {where_clause};
CREATE VIEW raw_data_all AS
SELECT * FROM raw_data
UNION ALL
Expand All @@ -63,11 +71,13 @@ def load_data(conn,
elif key == 'split_fail_df_or_path':
load_sql = f"""
CREATE TABLE sf_unmatched_previous AS
SELECT * FROM {value} {where_clause}
SELECT TimeStamp::DATETIME as TimeStamp, DeviceId as DeviceId, EventId::INT16 as EventId, Detector::INT16 as Detector, Phase::INT16 as Phase
FROM {reference} {where_clause}
"""
else:
raise ValueError(f"Unmatched events key '{key}' not recognized.")
v_print(f"Loading unmatched events {value}", verbose, 2)
v_print(f"Loading unmatched events: \n{reference}\n", verbose, 2)
v_print(f'Executing SQL to load unmatched events: \n{load_sql}', verbose, 2)
conn.query(load_sql)

except Exception as e:
Expand Down
12 changes: 10 additions & 2 deletions src/atspm/signal_data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,14 @@ def __init__(self, **kwargs):
if key == 'max_days_old':
continue
if isinstance(value, str):
if os.path.exists(value):
if os.path.exists(value) and value != '':
self.unmatched_event_settings[key] = f"'{value}'"
else:
v_print(f"Warning! unmatched_events file {value} does not exist at given path yet. This is expected if no unmatched events have been recorded (first run), and a file will be output to that path at end of this run.", self.verbose)
v_print(f"Warning, {key} file '{value}' does not exist or is blank. This is expected only for the first run.", self.verbose)
self.unmatched_found = False
elif value is None:
v_print(f"Warning, {key} file is None. This is expected only for the first run.", self.verbose)
self.unmatched_found = False

# Check if detector_health is in aggregations
if any(d['name'] == 'detector_health' for d in self.aggregations):
Expand Down Expand Up @@ -199,6 +202,7 @@ def aggregate(self):

for aggregation in self.aggregations:
start_time = time.time()
v_print(f"\nRunning {aggregation['name']} aggregation...", self.verbose, 2)

#######################
### Detector Health ###
Expand Down Expand Up @@ -267,10 +271,14 @@ def aggregate(self):
if self.incremental_run and aggregation['name'] in ['timeline', 'arrival_on_green', 'yellow_red', 'split_failures']:
aggregation['params']['incremental_run'] = True #lets the aggregation know to save unmatched events for next run
if self.unmatched_found:
v_print(f"Incremental run using previous events for {aggregation['name']}", self.verbose, 2)
aggregation['params']['unmatched'] = True #lets the aggregation know to use the unmatched events from previous run
# split_failures uses its own view
if aggregation['name'] != 'split_failures':
aggregation['params']['from_table'] = 'raw_data_all'
else:
v_print(f"First Run For {aggregation['name']}", self.verbose, 2)
aggregation['params']['unmatched'] = False

#######################
### Run Aggregation ###
Expand Down
98 changes: 91 additions & 7 deletions tests/test_atspm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@
from src.atspm import SignalDataProcessor
import duckdb
import numpy
import toml
from src.atspm import __version__

def test_version_consistency():
"""Test that the version in __init__.py matches the one in pyproject.toml"""
# Read version from pyproject.toml
with open('pyproject.toml', 'r') as f:
pyproject = toml.load(f)
pyproject_version = pyproject['project']['version']

# Compare versions
assert __version__ == pyproject_version, f"Version mismatch: __init__.py has {__version__}, pyproject.toml has {pyproject_version}"

# Define the parameters for testing
TEST_PARAMS = {
Expand All @@ -16,10 +28,6 @@
'output_format': 'parquet',
'output_file_prefix': 'test_',
'remove_incomplete': False,
#'unmatched_event_settings': {
# 'df_or_path': 'test_folder/unmatched.parquet', # for timeline, arrival_on_green & yellow_red
# 'split_fail_df_or_path': 'test_folder/sf_unmatched.parquet', # just for split_failures
# 'max_days_old': 14}, # remove unmatched events older than 14 days
'to_sql': False,
'verbose': 0,
'aggregations': [
Expand Down Expand Up @@ -166,7 +174,7 @@ def test_incremental_aggregation(incremental_processor_output, aggregation):
precalc_file = f"tests/precalculated/{agg_name}.parquet"

for file in output_files:
assert os.path.exists(file), f"Incremental output file {file} not found"
assert os.path.exists(file), f"Incremental output file {file} not found"
assert os.path.exists(precalc_file), f"Precalculated file for {agg_name} not found"

incremental_dfs = [pd.read_parquet(file) for file in output_files]
Expand All @@ -176,9 +184,85 @@ def test_incremental_aggregation(incremental_processor_output, aggregation):

# due to how split_failures imputes missing actuations there are some differences in incremental runs
if agg_name == 'split_failures':
compare_dataframes_with_tolerance(combined_df, precalc_df, tolerance=0.04)
compare_dataframes_with_tolerance(combined_df, precalc_df, tolerance=0.04)
else:
compare_dataframes(combined_df, precalc_df)
compare_dataframes(combined_df, precalc_df)

# REPLICATING ODOT'S PRODUCTION ENVIRONMENT
@pytest.fixture(scope="module")
def incremental_processor_output_with_dataframes():
"""Fixture to run the SignalDataProcessor incrementally using dataframes"""
data = duckdb.query("select * from 'tests/hires_test_data.parquet'").df()
configs = duckdb.query("select * from 'tests/configs_test_data.parquet'").df()
unmatched_df = None
sf_unmatched_df = None

chunks = {
'1_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 15:00:00' and timestamp < '2024-05-13 15:15:00'").df(),
'2_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 15:15:00' and timestamp < '2024-05-13 15:30:00'").df(),
'3_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 15:30:00' and timestamp < '2024-05-13 15:45:00'").df(),
'4_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 15:45:00' and timestamp < '2024-05-13 16:00:00'").df(),
'5_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 16:00:00' and timestamp < '2024-05-13 16:15:00'").df(),
'6_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 16:15:00' and timestamp < '2024-05-13 16:30:00'").df(),
'7_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 16:30:00' and timestamp < '2024-05-13 16:45:00'").df(),
'8_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 16:45:00' and timestamp < '2024-05-13 17:00:00'").df(),
'9_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 17:00:00' and timestamp < '2024-05-13 17:15:00'").df(),
'10_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 17:15:00' and timestamp < '2024-05-13 17:30:00'").df(),
'11_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 17:30:00' and timestamp < '2024-05-13 17:45:00'").df(),
'12_chunk': duckdb.sql("select * from data where timestamp >= '2024-05-13 17:45:00' and timestamp < '2024-05-13 18:00:00'").df(),
}

results = {}

for i, chunk in chunks.items():
params = TEST_PARAMS.copy()
params.update({
'raw_data': chunk,
'detector_config': configs,
'unmatched_event_settings': {
'df_or_path': unmatched_df,
'split_fail_df_or_path': sf_unmatched_df,
'max_days_old': 14
},
'verbose': 2,
'aggregations': INCREMENTAL_AGGREGATIONS
})
processor = SignalDataProcessor(**params)
processor.load()
processor.aggregate()

# Store results for each aggregation
for agg in INCREMENTAL_AGGREGATIONS:
agg_name = agg['name']
if agg_name not in results:
results[agg_name] = []
results[agg_name].append(processor.conn.sql(f"select * from {agg_name}").df())

# Update unmatched dataframes for next iteration
unmatched_df = processor.conn.sql("select * from unmatched_events").df()
sf_unmatched_df = processor.conn.sql("select * from sf_unmatched").df()

return results

@pytest.mark.parametrize("aggregation", INCREMENTAL_AGGREGATIONS, ids=lambda x: x['name'])
def test_incremental_aggregation_with_dataframes(incremental_processor_output_with_dataframes, aggregation):
"""Test each aggregation for incremental runs using dataframes"""
agg_name = aggregation['name']
incremental_results = incremental_processor_output_with_dataframes[agg_name]
precalc_file = f"tests/precalculated/{agg_name}.parquet"

assert len(incremental_results) == 12, f"Expected 12 chunks of results for {agg_name}"
assert os.path.exists(precalc_file), f"Precalculated file for {agg_name} not found"

combined_df = pd.concat(incremental_results).drop_duplicates().reset_index(drop=True)
precalc_df = pd.read_parquet(precalc_file)

# due to how split_failures imputes missing actuations there are some differences in incremental runs
if agg_name == 'split_failures':
compare_dataframes_with_tolerance(combined_df, precalc_df, tolerance=0.04)
else:
compare_dataframes(combined_df, precalc_df)


if __name__ == "__main__":
pytest.main([__file__])

0 comments on commit 80e4282

Please sign in to comment.