Skip to content

Commit

Permalink
Refactoring python client to dynamically show sensors
Browse files Browse the repository at this point in the history
  • Loading branch information
francescogabbrielli committed Nov 21, 2018
1 parent cceda08 commit 46718f2
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 159 deletions.
128 changes: 39 additions & 89 deletions python-streaming-client/play_stream.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,60 @@
from io import BytesIO
import matplotlib.pyplot as plt
import matplotlib.image as img
import matplotlib.figure as f
from stream_client import StreamClient
from time import time, sleep
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from Tkinter import *
from threading import Thread, Lock
from stream_data import StreamBuffer

# Create a window with figure (remove this part and the display thread if don't need showing)
# https://stackoverflow.com/questions/34764535/why-cant-matplotlib-plot-in-a-different-thread
window=Tk()
fig = f.Figure(figsize=(8,8))
ax = fig.add_subplot(2,1,1)
ax2 = fig.add_subplot(2,1,2)
canvas = FigureCanvasTkAgg(fig, master=window)
canvas.draw()
canvas.get_tk_widget().pack(side=TOP, fill=BOTH, expand=1)
canvas._tkcanvas.pack(side=TOP, fill=BOTH, expand=1)
imageShown = None
line1 = None
line2 = None
line3 = None
frame = None
ylim_changed = False
xlim_changed = False
max_x = 0
min_x = 0
max_y = 1
min_y = 0
lock = Lock()
from stream_client import StreamClient, StreamBuffer, StreamDisplay

DATA_LEN = 100
timestamps = range(0,DATA_LEN)
currentIndex = 0
ax2.set_ylim(-15,15)



def show(buffer):
"""
Show matplotlib image on global imageShown
:param data: the jpeg data
"""
global imageShown, line1, line2, line3
try :
im = img.imread(BytesIO(buffer.image), format="jpg")
imageShown.set_data(im)
line1.set_ydata(buffer.data[0])
line2.set_ydata(buffer.data[1])
line3.set_ydata(buffer.data[2])
canvas.draw()
plt.pause(0.01)
except Exception as e:
print e


# Create stream client
s = StreamClient("192.168.1.3", 8080)
sc = StreamClient("192.168.1.2", 8080)
# Stream Buffer of DATA_LEN readings
sb = StreamBuffer(DATA_LEN)


#target of display thread
def display():
global imageShown, line1, line2, line3
imageShown = ax.imshow(img.imread("img.jpg", format="jpg"))
line1, = ax2.plot([0]*DATA_LEN, 'r-')
line2, = ax2.plot([0]*DATA_LEN, 'b-')
line3, = ax2.plot([0]*DATA_LEN, 'g-')
sleep(0.1)
while True:
show(sb.getCurrent())
# do another task with the data
sb.swap()
sleep(0.01)
# Display data/image
sd = StreamDisplay(sb)

shown = False

def update_data(lines):
global shown, currentIndex
for line in lines:
if len(line)<2:
break
values = line.split(",")[1:]
#print line
if ord(values[0][0]) > 64:
sensors = sb.set_headers(values)
sd.show(sensors)
shown = True
continue
sb.update_data(currentIndex, values)
currentIndex += 1
if currentIndex == DATA_LEN:
currentIndex = 0
return currentIndex


def streaming_callback(timestamp, type, data):
global currentIndex, frame
global shown
if type=="image":
print timestamp
sb.updateImage(data)
#print timestamp
sb.update_image(data)
if not shown:
print "SHOW IMAGES ONLY"
sd.show()
shown = True
elif type:
data = data.rstrip()
try:
print timestamp, data.rstrip()
readings = data.rstrip().split(",")
timestamps[currentIndex] = int(readings[0])
sb.updateData(currentIndex, readings[1:4])
currentIndex += 1
if currentIndex == DATA_LEN:
currentIndex = 0
#print timestamp, data
#readings = data.rstrip().split(",")
update_data(data.split("\n"))
except Exception as e:
print (e)
print ("DATA=[%s]" % data.rstrip())
print ("DATA=[%s]" % data)


# start client and show stream
s.get(streaming_callback)

# wait for client to buffer some data
sleep(1)
sc.get(streaming_callback)

# start display thread
Thread(name="display", target=display).start()

# wait
window.mainloop()
sd.wait()
print 'Done'
184 changes: 181 additions & 3 deletions python-streaming-client/stream_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@
from base64 import b64encode
from threading import Lock,Thread
import re
from time import time
from time import time, sleep
from copy import copy
from io import BytesIO
import matplotlib.image as img
import matplotlib.figure as fig
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from Tkinter import *
import traceback


class StreamClient:
"""
A sample Stream Client to access the SensorLogger app streaming
A sample streaming client for accessing the SensorLogger app
multipart streaming
"""

CHUNK_SIZE = 32768
Expand Down Expand Up @@ -55,25 +64,194 @@ def read(self):
restart = StreamClient.BUFFER_SIZE-StreamClient.CHUNK_SIZE
d = self.r.read(StreamClient.CHUNK_SIZE)
while d:
#print "---------------------------", len(d)
#print d
self.buffer = self.buffer[StreamClient.CHUNK_SIZE:]+d
self.last_update = time()
if start<0:
start = self.buffer.find(self.boundary, restart)
end = self.buffer.find(self.boundary, start+self.blen)
while start<end:
#print (start, end, self.buffer[end+self.blen:end+100])
all = self.buffer[start+self.blen:end]
headers_len = all.find("\r\n\r\n") + 4
headers = all[:headers_len]
#print headers
try:
content_type = self.re_type.match(headers).group(1)
timestamp = self.re_timestamp.match(headers).group(1)
#print "FOUND: "+content_type, timestamp
l = end - start - headers_len - 4
self.callback(timestamp, content_type, all[headers_len:headers_len+l+1])
except Exception as e:
print e
print ("ERROR CONTENT TYPE: %d" % headers_len)
traceback.print_exc()
#print ("ERROR CONTENT TYPE: %s" % headers)
timestamp = -1
#try:
# l = int(self.re_length.match(headers).group(1))
#finally:
# print (all[:min(128,end)])
# print "--------------------------------------------------"
start = end
end = self.buffer.find(self.boundary, start+self.blen)
d = self.r.read(StreamClient.CHUNK_SIZE)
start -= StreamClient.CHUNK_SIZE
restart = max(0, start, restart-StreamClient.CHUNK_SIZE)


class Buffer:

def __init__(self, image, data):
self.image = image
self.data = data

def __copy__(self):
return Buffer(self.image, [d[:] for d in self.data])


class StreamBuffer:

N_BUFFERS = 5

def __init__(self, len):
self.buffers = None
self.currentIndex = 0
self.current = None
self.len = len
self.lock = Lock()
self.re = re.compile(r"([a-zA-Z _\-]+[^XYZ]) ?([XYZ])?")
self.init_buffers(0)

def init_buffers(self, dimension):
self.lock.acquire()
data = [[0] * self.len for i in range(0, dimension)]
self.buffers = [Buffer(None, data) for i in range(0, StreamBuffer.N_BUFFERS)]
self.lock.release()

def update_image(self, image):
self.lock.acquire()
try:
buf = self.buffers[self.currentIndex]
buf.image = image
finally:
self.lock.release()

def update_data(self, i, values):
self.lock.acquire()
try:
buf = self.buffers[self.currentIndex]
for j,v in enumerate(values):
buf.data[j][i] = float(v)
finally:
self.lock.release()

def set_headers(self, values):
old = ""
dim = 0
sensors = []

for v in values:
m = self.re.match(v)
name = m.group(1)
if name != old:
if old != "":
sensors.append({"name": old, "dimension": dim})
dim = 1
old = name
else:
dim += 1

sensors.append({"name": name, "dimension": dim})
self.init_buffers(len(values))
return sensors

def swap(self):
self.lock.acquire()
current = self.buffers[self.currentIndex]
self.currentIndex = (self.currentIndex + 1) % StreamBuffer.N_BUFFERS
self.buffers[self.currentIndex] = copy(current)
self.lock.release()
return current


class SensorDisplay:

def __init__(self, name, dimension, length, axis, labels=("X", "Y", "Z", "par1", "par2"), styles=("r-","g-","b-","y-","m-")):
self.sensor = name
self.dimension = dimension
self.labels = labels
self.lines = []
axis.set_ylim(-15, 15)
axis.set_title(name)
for i in range(0, dimension):
line, = axis.plot([0] * length, styles[i])
if dimension>1:
line.set_label(labels[i])
self.lines.append(line)
axis.legend()

def set_data(self, data):
for i,line in enumerate(self.lines):
line.set_ydata(data[i])


class StreamDisplay:
"""
Create a window with figure (remove this part and the display thread if don't need showing)
https://stackoverflow.com/questions/34764535/why-cant-matplotlib-plot-in-a-different-thread
"""

def __init__(self, buffer):

self.buffer = buffer
self.window = Tk()
self.figure = fig.Figure(figsize=(8, 10))
self.charts = []

def draw(self):
buffer = self.buffer.swap()
try:
if buffer.image is not None:
im = img.imread(BytesIO(buffer.image), format=self.img_format)
self.image.set_data(im)
if len(buffer.data):
curr = 0
for chart in self.charts:
chart.set_data(buffer.data[curr:curr+chart.dimension])
curr += chart.dimension
self.canvas.draw()
plt.pause(0.01)
except Exception as e:
print e
#traceback.print_stack()

def wait(self):
self.window.mainloop()

def show(self, sensors=[], img_format="jpg"):
# init axes
self.sensors = sensors
self.img_format = img_format
self.image = self.figure.add_subplot(len(sensors)+1,1,1).imshow(img.imread("img.jpg", format=img_format))
axes = []
for i, s in enumerate(sensors):
ax = self.figure.add_subplot(len(sensors)+1, 1, i+2, label=s["name"])
self.charts.append(SensorDisplay(s["name"], s["dimension"], self.buffer.len, ax))

# init canvas
canvas = FigureCanvasTkAgg(self.figure, master=self.window)
canvas.draw()
canvas.get_tk_widget().pack(side=TOP, fill=BOTH, expand=1)
canvas._tkcanvas.pack(side=TOP, fill=BOTH, expand=1)
self.canvas = canvas

# start display thread
Thread(name="display", target=self.thread, args=(self, 0.01)).start()

def thread(self, display, delay):
""" target of display thread """
while True:
sleep(delay)
display.draw()
# do another task with the data
Loading

0 comments on commit 46718f2

Please sign in to comment.