added threading

added basic byte stream output
added test loop with only one active layer
disabled all prints
removed asyncio based websocket handling
changed websocket library
Šī revīzija ir iekļauta:
Elektrospy
2019-01-14 16:50:32 +01:00
vecāks ef77cfc0da
revīzija b426b42c77

277
main.py
Parādīt failu

@@ -1,11 +1,19 @@
#!/usr/bin/env python
# needed for websocket events
import asyncio
import websockets
import websocket
try:
import thread
except ImportError:
import _thread as thread
import threading
import time
import json
# needed for effect layer logic
import png
import numpy as np
# output
import os
import sys
class Layer:
@@ -21,32 +29,131 @@ class Layer:
self.png_image_data = self.gray_scale_map_small_direct[2]
# internal layer stuff
self.color_values = 3
self.max_brightness = 255
self.rgb_buffer_length = (self.png_width * self.png_height) * self.color_values
self.layer_buffer_array = bytearray(self.rgb_buffer_length)
self.layer_buffer_current_color = np.zeros(self.color_values, np.uint8)
self.layer_buffer_array = np.zeros(self.rgb_buffer_length, np.uint8)
self.png_image_data_flatten = self.flatten_rgb_array_from_png(self.png_image_data)
# internal event states
self.layer_current_event_value = 0
self.layer_event_triggered = False
self.animation_finished = True
self.animation_update_limit_ms = 25
self.animation_last_update_ms = int(round(time.time() * 1000))
self.animation_current_brightness = self.max_brightness
# just debug stuff
print("created layer for " + self.path)
# print("created layer for " + self.path)
def run(self):
print("next frame")
# print("next frame for " + self.path)
self.render_rgb_layer(
self.layer_buffer_current_color[0],
self.layer_buffer_current_color[1],
self.layer_buffer_current_color[2]
)
def trigger_event(self, event_value):
# print("new event " + self.path + " value: " + str(event_value))
self.layer_event_triggered = True
# get new rgb color for frame, based on event
self.layer_buffer_current_color = self.get_current_frame_rgb_color(event_value)
def get_next_frame(self):
return self.layer_buffer_array
def get_current_frame_rgb_color(self, event_value):
# create black rgb color array
output_rgb_array = np.zeros(self.color_values, np.uint8)
if self.layer_event_triggered:
# if event_value == 0:
# print("Layer off!")
if 0 > event_value < 3:
# static blue light
output_rgb_array[2] = self.max_brightness
elif event_value == 3:
# fade blue light
# reset current brightness of color for new animation
# self.animation_current_brightness = self.max_brightness
output_rgb_array[2] = self.max_brightness
elif 4 > event_value < 6:
# static red light
output_rgb_array[0] = self.max_brightness
elif event_value == 7:
# fade red light
# reset current brightness of color for new animation
# self.animation_current_brightness = self.max_brightness
output_rgb_array[0] = self.max_brightness
# elif event_value != 0 and event_value > 7:
# print("unknown event effect value")
# if not self.animation_finished:
# if int(round(time.time() * 1000) / self.animation_last_update_ms) > self.animation_update_limit_ms:
# if event_value == 3:
# # fade blue light
# output_rgb_array[2] = self.get_new_color_value(
# output_rgb_array[2],
# self.animation_current_brightness
# )
# elif event_value == 7:
# # fade red light
# output_rgb_array[2] = self.get_new_color_value(
# output_rgb_array[0],
# self.animation_current_brightness
# )
# if self.animation_finished:
# if animation is finished, reset current fade brightness
# self.animation_current_brightness = self.max_brightness
# print("animation finished!")
return output_rgb_array
# only used for debbuging
def print_png_values(self):
output_string = ""
# get color values
for x in self.png_image_data:
print(x)
for x in self.png_image_data_flatten:
output_string += str(x)
output_string += ","
# print(output_string)
def make_brightness_image_filter(self):
img = self.png_image_data / 255
intensity = 255
intensity = intensity.astype(float)
for i in range(intensity.shape[2]):
intensity[:, :, i] *= img
intensity.astype(np.uint8)
def get_rgb_layer(self, r=0, g=0, b=0):
def render_rgb_layer(self, r=0, g=0, b=0):
# for testing
b = self.max_brightness
# reset layer buffer, step by number of color values (default 3, rgb)
for i in range(0, self.rgb_buffer_length):
self.layer_buffer_array[i] = 0
# set new rgb values, step by number of color values (default 3, rgb)
for i in range(0, self.rgb_buffer_length, self.color_values):
self.layer_buffer_array[i] = r
self.layer_buffer_array[i+1] = g
self.layer_buffer_array[i+2] = b
# check for every r g b value if current brightness map value is not equal to zero
# if true to mapping, else ignore and keep it at zero (black)
if self.png_image_data_flatten[i] != 0:
self.layer_buffer_array[i] = self.get_new_color_value(r, self.png_image_data_flatten[i])
if self.png_image_data_flatten[i+1] != 0:
self.layer_buffer_array[i+1] = self.get_new_color_value(g, self.png_image_data_flatten[i+1])
if self.png_image_data_flatten[i+2] != 0:
self.layer_buffer_array[i+2] = self.get_new_color_value(b, self.png_image_data_flatten[i+2])
# helper method to calculate new color value
# based on current value and target mapping value
def get_new_color_value(self, current_value, mapping_value):
mapping_value_float = float(mapping_value / self.max_brightness)
new_value = int(round(current_value * mapping_value_float))
# print("new_color: " + str(current_value) + ":" + str(mapping_value) + "=" + str(new_value))
return new_value
# helper method to flatted the png array payload, called only once on init
def flatten_rgb_array_from_png(self, rgb_array):
output_array = np.zeros(self.rgb_buffer_length, np.uint8)
current_value_index = 0
# got over every row of png
for current_row in rgb_array:
# go over value in payload
for current_value in current_row:
# write current value to output array buffer
output_array[current_value_index] = current_value
current_value_index += 1
return output_array
# ------------------------------------
@@ -61,12 +168,12 @@ def parse_json(input_json):
event_bomb_cut()
elif current_event == "beatmapEvent":
event_beat_map(json_content["beatmapEvent"])
else:
print("other event: " + current_event)
# else:
# print("other event: " + current_event)
def event_note_cut(note_cut_object):
print("Note Cut")
# print("Note Cut")
event_note_cut_parse(note_cut_object)
@@ -86,15 +193,18 @@ def event_note_cut_parse(note_cut_object):
def trigger_saber_a(light_value):
print("saber a (red/left): " + str(light_value))
# print("saber a (red/left): " + str(light_value))
pass
def trigger_saber_b(light_value):
print("saber b (blue/right): " + str(light_value))
# print("saber b (blue/right): " + str(light_value))
pass
def event_bomb_cut():
print("Bomb Cut")
# print("Bomb Cut")
pass
def event_beat_map(event_object):
@@ -119,44 +229,127 @@ def event_beat_map_parse(beatmap_event_object):
def trigger_light_small(value):
print("light small " + str(value))
# print("light small " + str(value))
layer_small.trigger_event(value)
def trigger_light_big(value):
print("light big " + str(value))
# print("light big " + str(value))
layer_big.trigger_event(value)
def trigger_light_center(value):
print("light center " + str(value))
# print("light middle " + str(value))
layer_middle.trigger_event(value)
def trigger_light_left(value):
print("light left " + str(value))
# print("light left " + str(value))
layer_left.trigger_event(value)
def trigger_light_right(value):
print("light right " + str(value))
# print("light right " + str(value))
layer_right.trigger_event(value)
async def loop_websocket():
async with websockets.connect('ws://localhost:6557/socket') as websocket:
def on_message(ws, message):
parse_json(message)
def on_error(ws, error):
# print(error)
pass
def on_close(ws):
# print("### websocket closed ###")
pass
def on_open(ws):
def run(*args):
while True:
result = await websocket.recv()
# print(f"< {result}")
parse_json(result)
time.sleep(1)
# print("websocket connected")
thread.start_new_thread(run, ())
# ------------------------------------
# Websocket event part end
# ------------------------------------
buffer_length = 80 * 40 * 3
def get_new_color_value(current_value, mapping_value):
mapping_value_float = float(mapping_value / 255)
new_value = current_value * mapping_value_float
return round(new_value)
def merge_layer_arrays(layer_first, layer_second):
output_array = np.zeros(buffer_length, np.uint8)
for idx in range(0, buffer_length):
output_array[idx] = get_new_color_value(layer_first[idx], layer_second[idx])
return output_array
def get_merged_layers_array():
merged_value = merge_layer_arrays(layer_small.get_next_frame(), layer_middle.get_next_frame())
merged_value = merge_layer_arrays(merged_value, layer_big.get_next_frame())
merged_value = merge_layer_arrays(merged_value, layer_left.get_next_frame())
merged_value = merge_layer_arrays(merged_value, layer_right.get_next_frame())
output_array = merged_value
return output_array
def loop_layers():
# print("start layers loop")
time_ms = 10
t = 0
# main frame render loop
while True:
t += time_ms / 1000.0
# layer_small.run()
# layer_middle.run()
# layer_big.run()
# layer_left.run()
# layer_right.run()
# output_buffer = get_merged_layers_array()
# output_bytes = output_buffer.tobytes()
layer_middle.run()
layer_middle_frame = layer_middle.get_next_frame()
output_bytes = layer_middle_frame.tobytes()
# print(sys.getsizeof(output_bytes))
# print("new frame:")
# print(output_bytes)
os.write(1, output_bytes)
# warte time_ms ms
time.sleep(time_ms * 0.001)
if __name__ == '__main__':
layer_small = Layer('maps/small.png')
# layer_middle = Layer('maps/middle.png')
# layer_big = Layer('maps/big.png')
# layer_left = Layer('maps/left.png')
# layer_right = Layer('maps/right.png')
layer_middle = Layer('maps/middle.png')
layer_big = Layer('maps/big.png')
layer_left = Layer('maps/left.png')
layer_right = Layer('maps/right.png')
layer_small.print_png_values()
# Start websocket logic thread
# websocket.enableTrace(True)
ws = websocket.WebSocketApp('ws://127.0.0.1:6557/socket',
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
wst = threading.Thread(target=ws.run_forever)
wst.daemon = True
wst.start()
# Start websocket logic
asyncio.get_event_loop().run_until_complete(loop_websocket())
# Start layer logic thread
loop_layers()