forked from templela/python-betsy
-
Notifications
You must be signed in to change notification settings - Fork 1
/
test.py
189 lines (145 loc) · 5.34 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/bin/env python3
from PIL import Image,ImageDraw
import json
import io
import os
import random
from numpy import tile
from betsy.protocol import CommandSocket
from itertools import product
import time
from PIL import GifImagePlugin
import datetime
# Opening JSON file
f = open('inventory.json')
inventory = json.load(f)
tile_size = inventory["settings"]["dimensions"]
dims = [ len(inventory["mapping"][0]), len(inventory["mapping"]) ]
sn_to_ip = {}
for x in inventory["inventory"]:
sn_to_ip[ x["serial_number"] ] = x["ipv6_link_local"]
netInt="en10"
# Tile images returns a 2d array of an input image split into tiles of given size.
# crop-false will scale, crop-true will crop.
def tile_img(img, tilesize=18, matrix_dims=[9,6], crop=False):
if crop: # Crop/Scale image to matrix size before anything else:
img = img.crop((0,0,tilesize*matrix_dims[0],tilesize*matrix_dims[1]))
else:
img = img.resize((tilesize*matrix_dims[0],tilesize*matrix_dims[1]))
w, h = img.size
imgs = []
grid = product(range(0, h-h%tilesize, tilesize), range(0, w-w%tilesize, tilesize))
for i, j in grid:
idx_i = int(i/tilesize)
idx_j = int(j/tilesize)
box = (j, i, j+tilesize, i+tilesize)
if len(imgs) < idx_i+1:
imgs.append([])
if len(imgs[idx_i]) < idx_j+1:
imgs[idx_i].append([])
imgs[idx_i][idx_j]=img.crop(box)
return imgs
# Returns a tile-sized image of the serial #
def serial_img(sn, i=255, j=0):
image1 = Image.new("RGB", tile_size, (0, 0, 0))
draw = ImageDraw.Draw(image1)
draw.text([2,6],sn,fill=(i, j, 0), align='CENTER')
return image1
# This sends the image to a given panel.
def send_raw_img(csock, img, destaddr):
# PROTOCOL.md:
# ... each consisting of little endian 16-bit per pixel-channel values (48-bit per pixel) arranged left to right, top to bottom.
# Each buffer is thus 18*18*3*2 or 1944 bytes in size.
# The high order 4 bits of each 16-bit channel value are ignored.
img_byte_arr = img.tobytes("raw", "RGB")
# Here we pad 8bit RGB to 16 bit; note we're losing the top half of the byte so we can't drive full brightness:
img2 = [0]*1944
for x in range(972):
img2[x*2] = img_byte_arr[x]
csock.dpc_data(destaddr,1,bytes(img2))
# Finally upload the frame buffer to LEDs:
csock.dpc_upload(destaddr, 1)
# Simple function to send the Serial number to each panel:
def send_sn_image(inv=inventory):
csock = CommandSocket(netInt)
# Note -- This is a little silly. Should probably just use `for i in inv["inventory"]` and ignore the mapping.
# But this allows us to do the fun gradient.
for i in range(len(inv["mapping"])):
for j in range(len(inv["mapping"][i])):
sn=inv["mapping"][i][j]
img = serial_img("%d"%sn, 255-i*30, 255-j*30)
destaddr = csock.get_ipv6_addr_info(sn_to_ip[sn])
send_raw_img(csock,img, destaddr)
# Sends an image array
def send_images(imgs, inv=inventory):
csock = CommandSocket(netInt)
for i in range(len(inv["mapping"])):
for j in range(len(inv["mapping"][i])):
sn=inv["mapping"][i][j]
img = imgs[i][j]
destaddr = csock.get_ipv6_addr_info(sn_to_ip[sn])
send_raw_img(csock,img, destaddr)
# Simple function to send the Serial number to each panel:
def send_reset(inv=inventory):
csock = CommandSocket(netInt)
for i in range(len(inv["mapping"])):
for j in range(len(inv["mapping"][i])):
sn=inv["mapping"][i][j]
destaddr = csock.get_ipv6_addr_info(sn_to_ip[sn])
csock.send_commands("reset firmware",destaddr)
def handle_gif(imageObject, duration=5):
# TODO: Duration is time to play each gif in seconds, rather than loops
timetot = 0
while timetot < duration:
# Display individual frames from the loaded animated GIF file
for frame in range(0,imageObject.n_frames):
imageObject.seek(frame)
sleep = imageObject.info['duration']/1000
sleep = sleep if sleep > 0 else 0.16
new_im = Image.new("RGB", imageObject.size)
new_im.paste(imageObject)
hl = tile_img(new_im, 18, crop=False) # False = scale.
send_images(hl)
timetot+= sleep
time.sleep(sleep)
def handle_image(imageObject, displaytime=5):
# Potentially uncessary
new_im = Image.new("RGB", imageObject.size)
new_im.paste(imageObject)
# Potentially uncessary
hl = tile_img(new_im, 18, crop=False) # False = scale.
send_images(hl)
time.sleep(displaytime)
### Use to reset on bootup; call just once:
print("Sending Reset")
send_reset()
# time.sleep(3)
# Call this to call the SNs to the screen and ensure they're in the right order
print("Sending Serial Numbers")
send_sn_image()
# Here, draw a picture!
# img = Image.open("nyan2.jpg")
# hl = tile_img(img, 18, crop=False) # False = scale.
#send_images(hl)
# while 1:
# imageObject = Image.open('rainbow.gif')
# handle_gif(imageObject)
# Comment out as you want to configure it.
# subdir = 'test/'
subdir = 'cool/'
# subdir = 'meme/'
pathPrefix = 'image/' + subdir
while 1:
send_reset()
files = os.listdir(pathPrefix)
random.shuffle(files)
for file in files:
if (not file.__contains__("DS_Store")):
imgPath = pathPrefix + file
imageObject = Image.open(imgPath)
if getattr(imageObject, "is_animated", True):
handle_gif(imageObject)
else:
# Comment out for GIF ONLY MODE.
# handle_image(imgPath, 5)
time.sleep(0.001)