Skip to content

Commit

Permalink
Add more data
Browse files Browse the repository at this point in the history
  • Loading branch information
rajsinghtech committed Oct 25, 2024
1 parent ea43f54 commit eaf439a
Showing 1 changed file with 216 additions and 146 deletions.
362 changes: 216 additions & 146 deletions frontend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import os # Import os to access environment variables
from concurrent.futures import ThreadPoolExecutor, as_completed

# Configure Streamlit page
st.set_page_config(page_title="S3 JSON Data Visualization", layout="wide")

# Load AWS credentials and configuration from environment variables
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
Expand All @@ -20,159 +23,226 @@
# Construct the endpoint URL
endpoint_url = f"{AWS_BUCKET_PROTOCOL}://{AWS_BUCKET_HOST}:{AWS_BUCKET_PORT}"

# S3 client configuration with adjustments
s3_client = boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=endpoint_url,
region_name=AWS_BUCKET_REGION,
config=Config(signature_version='s3v4', s3={'addressing_style': 'path'})
)

def list_prefixes(client, bucket_name, prefix='', delimiter='/'):
paginator = client.get_paginator('list_objects_v2')
prefixes = set()
for result in paginator.paginate(Bucket=bucket_name, Prefix=prefix, Delimiter=delimiter):
for prefix_item in result.get('CommonPrefixes', []):
prefixes.add(prefix_item.get('Prefix').rstrip('/'))
return sorted(list(prefixes))
@st.cache_resource
def get_s3_client():
"""Initialize and cache the S3 client."""
return boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
endpoint_url=endpoint_url,
region_name=AWS_BUCKET_REGION,
config=Config(signature_version='s3v4', s3={'addressing_style': 'path'})
)

s3_client = get_s3_client()

@st.cache_data
def list_prefixes(_client, bucket_name, prefix='', delimiter='/'):
"""
List unique prefixes (e.g., VINs, years, months, days) in the specified S3 bucket.
def fetch_json_object(client, bucket, key):
Args:
_client: Boto3 S3 client.
bucket_name (str): Name of the S3 bucket.
prefix (str): Prefix to filter the objects.
delimiter (str): Delimiter for grouping keys.
Returns:
List of sorted unique prefixes.
"""
paginator = _client.get_paginator('list_objects_v2')
prefixes = set()
try:
obj = client.get_object(Bucket=bucket, Key=key)
json_content = obj['Body'].read().decode('utf-8')
return json.loads(json_content)
for result in paginator.paginate(Bucket=bucket_name, Prefix=prefix, Delimiter=delimiter):
for prefix_item in result.get('CommonPrefixes', []):
prefixes.add(prefix_item.get('Prefix').rstrip('/'))
except Exception as e:
st.error(f"Error fetching {key}: {e}")
return None

bucket_name = AWS_BUCKET_NAME

st.title('S3 JSON Data Visualization')

# Get the list of VINs
vins = [prefix.split('/')[0] for prefix in list_prefixes(s3_client, bucket_name, prefix='', delimiter='/')]

if not vins:
st.error("No VINs found in the bucket. Please check your S3 connection and bucket contents.")
st.stop()

selected_vin = st.selectbox('Select VIN', vins)

if selected_vin:
# Now, list available years for the selected VIN
vin_prefix = f"{selected_vin}/"
years = [prefix.split('/')[-1] for prefix in list_prefixes(s3_client, bucket_name, prefix=vin_prefix, delimiter='/')]
selected_year = st.selectbox('Select Year', years)

if selected_year:
year_prefix = f"{vin_prefix}{selected_year}/"
months = [prefix.split('/')[-1] for prefix in list_prefixes(s3_client, bucket_name, prefix=year_prefix, delimiter='/')]
selected_month = st.selectbox('Select Month', months)

if selected_month:
month_prefix = f"{year_prefix}{selected_month}/"
days = [prefix.split('/')[-1] for prefix in list_prefixes(s3_client, bucket_name, prefix=month_prefix, delimiter='/')]
selected_day = st.selectbox('Select Day', days)

if selected_day:
# Now, we can list the JSON files for this VIN and date
day_prefix = f"{month_prefix}{selected_day}/"
# List all JSON files under this prefix
paginator = s3_client.get_paginator('list_objects_v2')
json_files = []
for result in paginator.paginate(Bucket=bucket_name, Prefix=day_prefix):
for obj in result.get('Contents', []):
key = obj['Key']
if key.endswith('.json'):
json_files.append(key)
st.write(f"Found {len(json_files)} JSON files for VIN {selected_vin} on {selected_year}-{selected_month}-{selected_day}")

if json_files:
# Add a progress bar
progress_bar = st.progress(0)

data_list = []
total_files = len(json_files)

# Use ThreadPoolExecutor to fetch JSON files concurrently
with ThreadPoolExecutor(max_workers=30) as executor:
future_to_key = {executor.submit(fetch_json_object, s3_client, bucket_name, key): key for key in json_files}
for idx, future in enumerate(as_completed(future_to_key)):
json_data = future.result()
if json_data:
# Extract 'created_at' and 'data'
created_at = json_data.get('created_at')
if created_at:
# Convert 'created_at' to datetime using timezone-aware method
timestamp = created_at.get('seconds', 0) + created_at.get('nanos', 0) / 1e9
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
else:
continue
data_entries = json_data.get('data', [])
for entry in data_entries:
key_entry = entry.get('key')
value_dict = entry.get('value', {}).get('Value', {})
val = None # Initialize val
if 'StringValue' in value_dict:
val = value_dict['StringValue']
data_list.append({'datetime': dt, 'key': key_entry, 'value': val})
elif 'LocationValue' in value_dict:
location = value_dict['LocationValue']
lat = location.get('latitude')
lon = location.get('longitude')
data_list.append({'datetime': dt, 'key': f"{key_entry}_lat", 'value': lat})
data_list.append({'datetime': dt, 'key': f"{key_entry}_lon", 'value': lon})
elif 'Invalid' in value_dict and value_dict['Invalid']:
val = None
data_list.append({'datetime': dt, 'key': key_entry, 'value': val})
st.error(f"Error listing prefixes: {e}")
return sorted(list(prefixes))

@st.cache_data
def fetch_all_json_objects(_client, bucket, keys):
"""
Fetch and parse all JSON objects from S3 concurrently.
Args:
_client: Boto3 S3 client.
bucket (str): S3 bucket name.
keys (list): List of S3 object keys.
Returns:
List of parsed JSON objects.
"""
def fetch_json(key):
try:
obj = _client.get_object(Bucket=bucket, Key=key)
json_content = obj['Body'].read().decode('utf-8')
return json.loads(json_content)
except Exception as e:
st.error(f"Error fetching {key}: {e}")
return None

json_data = []
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(fetch_json, key): key for key in keys}
for future in as_completed(futures):
data = future.result()
if data:
json_data.append(data)
return json_data

def process_json_data(json_objects):
"""
Process list of JSON objects into a DataFrame.
Args:
json_objects (list): List of JSON data.
Returns:
pandas.DataFrame: Processed data.
"""
data_list = []
for json_data in json_objects:
# Extract 'created_at' and 'data'
created_at = json_data.get('created_at')
if created_at:
# Convert 'created_at' to datetime using timezone-aware method
timestamp = created_at.get('seconds', 0) + created_at.get('nanos', 0) / 1e9
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
else:
continue
data_entries = json_data.get('data', [])
for entry in data_entries:
key_entry = entry.get('key')
value_dict = entry.get('value', {}).get('Value', {})
if 'StringValue' in value_dict:
val = value_dict['StringValue']
data_list.append({'datetime': dt, 'key': key_entry, 'value': val})
elif 'LocationValue' in value_dict:
location = value_dict['LocationValue']
lat = location.get('latitude')
lon = location.get('longitude')
data_list.append({'datetime': dt, 'key': f"{key_entry}_lat", 'value': lat})
data_list.append({'datetime': dt, 'key': f"{key_entry}_lon", 'value': lon})
elif 'Invalid' in value_dict and value_dict['Invalid']:
data_list.append({'datetime': dt, 'key': key_entry, 'value': None})
else:
data_list.append({'datetime': dt, 'key': key_entry, 'value': None})
return pd.DataFrame(data_list)

def main():
bucket_name = AWS_BUCKET_NAME

st.title('S3 JSON Data Visualization')

# Get the list of VINs
prefixes = list_prefixes(s3_client, bucket_name, prefix='', delimiter='/')
vins = [vin.split('/')[0] for vin in prefixes] # Extract VINs
vins = list(sorted(set(vins))) # Ensure unique and sorted
if not vins:
st.error("No VINs found in the bucket. Please check your S3 connection and bucket contents.")
st.stop()

selected_vin = st.selectbox('Select VIN', vins)

if selected_vin:
# List available years for the selected VIN
vin_prefix = f"{selected_vin}/"
years_prefixes = list_prefixes(s3_client, bucket_name, prefix=vin_prefix, delimiter='/')
years = [year.split('/')[-1] for year in years_prefixes] # Extract years
years = list(sorted(set(years)))
if not years:
st.warning("No years found for the selected VIN.")
st.stop()

selected_year = st.selectbox('Select Year', years)

if selected_year:
year_prefix = f"{vin_prefix}{selected_year}/"
months_prefixes = list_prefixes(s3_client, bucket_name, prefix=year_prefix, delimiter='/')
months = [month.split('/')[-1] for month in months_prefixes] # Extract months
months = list(sorted(set(months)))
if not months:
st.warning("No months found for the selected VIN and year.")
st.stop()

selected_month = st.selectbox('Select Month', months)

if selected_month:
month_prefix = f"{year_prefix}{selected_month}/"
days_prefixes = list_prefixes(s3_client, bucket_name, prefix=month_prefix, delimiter='/')
days = [day.split('/')[-1] for day in days_prefixes] # Extract days
days = list(sorted(set(days)))
if not days:
st.warning("No days found for the selected VIN, year, and month.")
st.stop()

selected_day = st.selectbox('Select Day', days)

if selected_day:
# List all JSON files under the selected day
day_prefix = f"{month_prefix}{selected_day}/"
paginator = s3_client.get_paginator('list_objects_v2')
json_files = []
try:
for result in paginator.paginate(Bucket=bucket_name, Prefix=day_prefix):
for obj in result.get('Contents', []):
key = obj['Key']
if key.endswith('.json'):
json_files.append(key)
except Exception as e:
st.error(f"Error listing JSON files: {e}")
st.stop()

st.write(f"Found {len(json_files)} JSON files for VIN {selected_vin} on {selected_year}-{selected_month}-{selected_day}")

if json_files:
with st.spinner('Fetching and processing JSON files...'):
json_objects = fetch_all_json_objects(s3_client, bucket_name, json_files)
df = process_json_data(json_objects)

if not df.empty:
st.write("### Raw Dataframe")
st.dataframe(df)

# Convert 'key' to string
df['key'] = df['key'].astype(str)

# Convert 'value' to numeric if possible
df['value_numeric'] = pd.to_numeric(df['value'], errors='coerce')

unique_keys = df['key'].unique()
selected_keys = st.multiselect('Select keys to plot', unique_keys, key='selected_keys')

if selected_keys:
for key in selected_keys:
df_key = df[df['key'] == key].copy()
df_key.sort_values('datetime', inplace=True)

st.write(f"### Data for key `{key}`")
st.dataframe(df_key)

if df_key['value_numeric'].notnull().any():
df_key.set_index('datetime', inplace=True)
st.line_chart(df_key['value_numeric'], width=800, height=400)
else:
val = None
data_list.append({'datetime': dt, 'key': key_entry, 'value': val})

# Update progress bar
progress = (idx + 1) / total_files
progress_bar.progress(progress)

if data_list:
df = pd.DataFrame(data_list)
st.write("### Raw Dataframe")
st.write(df)

# Convert 'key' to string
df['key'] = df['key'].astype(str)

# Convert 'value' to numeric if possible
df['value_numeric'] = pd.to_numeric(df['value'], errors='coerce')

unique_keys = df['key'].unique()
selected_keys = st.multiselect('Select keys to plot', unique_keys)
if selected_keys:
for key in selected_keys:
df_key = df[df['key'] == key].copy()
df_key.sort_values('datetime', inplace=True)

st.write(f"### Data for key {key}")
st.write(df_key)

if df_key['value_numeric'].notnull().any():
df_key.set_index('datetime', inplace=True)
st.line_chart(df_key['value_numeric'])
else:
st.write(f"Key {key} has non-numeric values or no valid data to plot.")
st.write(df_key[['datetime', 'value']])
st.write(f"Key `{key}` has non-numeric values or no valid data to plot.")
st.dataframe(df_key[['datetime', 'value']])
else:
st.warning("Please select at least one key to plot.")
else:
st.warning("Please select at least one key to plot.")
st.warning("No data available to display.")
else:
st.warning("No data available to display.")
st.warning("No JSON files found for the selected day.")
else:
st.warning("No JSON files found for the selected day.")
st.warning("Please select a day.")
else:
st.warning("Please select a day.")
st.warning("Please select a month.")
else:
st.warning("Please select a month.")
st.warning("Please select a year.")
else:
st.warning("Please select a year.")
else:
st.warning("Please select a VIN.")
st.warning("Please select a VIN.")

if __name__ == "__main__":
main()

0 comments on commit eaf439a

Please sign in to comment.