-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Visualization (#7) squash commit * issue not yet resolved, but moving to better test env * broken visualizer * time series output present * Histogram resolved * sub-class bokeh_visualizer created, configured through property, ipy notebook test runner created ' * Datagatherer (#22) * #15 inserted real code test * sqllite3 added * #15 datagathererinput * #15 datagathererinput * #18 test updated * #21, will revert post commit * reverted * reverted * #18 unit tested * #18 fixed failing test_read_sql_from_empty_table * #18 fixed test_read_sql_from_populated_table * #18 fixed test_read_sql_from_populated_table
- Loading branch information
Showing
2 changed files
with
201 additions
and
51 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,50 +1,102 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
import unittest | ||
import sqlite3 | ||
|
||
import pandas as pd | ||
import numpy as np | ||
from octopy_predictor.src.datagatherer import * | ||
|
||
def _create_test_connection(): | ||
conn = sqlite3.connect(':memory:') | ||
|
||
#TODO create test dataframes and corresponding tables in sqliter3 | ||
|
||
conn = 'file::memory:?cache=shared' | ||
|
||
|
||
|
||
class DataGathererTest(unittest.TestCase): | ||
"""Test cases for DataGatherer""" | ||
|
||
def setUp(self): | ||
self.gatherer = DataGatherer() | ||
|
||
def test_determine_resource(self): | ||
self.gatherer = DataGatherer() | ||
|
||
def test_read_sql_from_empty_table(self): | ||
|
||
""" | ||
TBD | ||
Test read_sql() | ||
given a gatherer_input with SQL gatherer values | ||
and empty in-memory database | ||
when read_sql is called | ||
then dataframe should be returned | ||
""" | ||
self.assertTrue(1==1) | ||
c = sqlite3.connect(conn, uri=True) | ||
c.execute('drop table if exists test') | ||
gatherer_input = DataGathererInput(SQL) | ||
gatherer_input.add(QUERY, "SELECT * FROM sqlite_master") | ||
gatherer_input.add(CONNECTION, conn) | ||
gatherer = DataGatherer() | ||
|
||
df = gatherer.read_sql(gatherer_input) | ||
|
||
self.assertIsNotNone(df) | ||
self.assertTrue(df.empty) | ||
|
||
def test_read_sql_from_populated_table(self): | ||
""" | ||
Test read_sql | ||
given a gatherer_input with SQL gatherer values | ||
and empty in-memory database | ||
when read_sql is called | ||
then dataframe should be returned | ||
""" | ||
#%% | ||
expected_df = pd.DataFrame(np.reshape(np.arange(10), (2,5))) | ||
c = sqlite3.connect(conn, uri=True) | ||
expected_df.to_sql('test', con=c, if_exists='replace', index=False) | ||
#%% | ||
|
||
#%% | ||
gatherer_input = DataGathererInput(SQL) | ||
gatherer_input.add(QUERY, "SELECT * FROM test") | ||
gatherer_input.add(CONNECTION, conn) | ||
gatherer = DataGatherer() | ||
|
||
df = gatherer.read_sql(gatherer_input) | ||
#%% | ||
self.assertIsNotNone(df) | ||
self.assertFalse(df.empty, "df is empty") | ||
self.assertEqual(expected_df.shape, df.shape) | ||
|
||
|
||
class DataGathererInputTest(unittest.TestCase): | ||
"""Test cases for DataGathererInput""" | ||
"""Test cases for DataGathererInput""" | ||
|
||
def test_SQL_inputs(self): | ||
""" | ||
given: input is SQL | ||
given: input is SQL | ||
when: DataGathererInput is created | ||
then: all parameters required for SQL datagatherer should be available | ||
""" | ||
conn = _create_test_connection() | ||
|
||
|
||
expected = { | ||
'type': SQL, | ||
CONNECTION: conn | ||
} | ||
|
||
|
||
|
||
input = DataGathererInput(SQL) | ||
input.add(CONNECTION, conn) | ||
|
||
self.assertIsNotNone(input.values) | ||
self.assertEqual(input.values[CONNECTION], expected[CONNECTION], "expected does not match actual") | ||
|
||
if __name__ == '__main__': | ||
|
||
|
||
if __name__ == '__main__': | ||
# unittest.main() | ||
suite = unittest.defaultTestLoader.loadTestsFromTestCase(DataGathererTest) | ||
unittest.TextTestRunner().run(suite) |