changed file tree
added pixelserver2 setup config and script
This commit is contained in:
367
beatsaberceiling/beatsaberceiling.py
Normal file
367
beatsaberceiling/beatsaberceiling.py
Normal file
@@ -0,0 +1,367 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import websocket
|
||||
|
||||
try:
|
||||
import thread
|
||||
except ImportError:
|
||||
import _thread as thread
|
||||
import threading
|
||||
from datetime import datetime
|
||||
import time as time_
|
||||
import json
|
||||
import png
|
||||
import numpy as np
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
# get screen size from parameters
|
||||
Nx = 80
|
||||
try:
|
||||
Nx = int(sys.argv[1])
|
||||
except:
|
||||
pass
|
||||
|
||||
Ny = 40
|
||||
try:
|
||||
Ny = int(sys.argv[2])
|
||||
except:
|
||||
pass
|
||||
|
||||
# get input parameter
|
||||
beatsaber_host = '127.0.0.1'
|
||||
try:
|
||||
new_host = str(sys.argv[3])
|
||||
if new_host != "":
|
||||
beatsaber_host = new_host
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
class Layer:
|
||||
def __init__(self, new_path):
|
||||
self.path = new_path
|
||||
# read png file
|
||||
self.png_reader = png.Reader(self.path)
|
||||
# gray_scale_map_small = png_reader.read()
|
||||
self.gray_scale_map_small_direct = self.png_reader.asRGB8()
|
||||
# get png data from png object
|
||||
self.png_width = self.gray_scale_map_small_direct[0]
|
||||
self.png_height = self.gray_scale_map_small_direct[1]
|
||||
self.png_image_data = self.gray_scale_map_small_direct[2]
|
||||
# internal layer stuff
|
||||
self.color_values = 3
|
||||
self.max_brightness = 255
|
||||
self.rgb_buffer_length = (self.png_width * self.png_height) * self.color_values
|
||||
self.layer_buffer_current_color = np.zeros(self.color_values, np.uint8)
|
||||
self.layer_buffer_array = np.zeros(self.rgb_buffer_length, np.uint8)
|
||||
self.png_image_data_flatten = np.array(self.flatten_rgb_array_from_png(self.png_image_data))
|
||||
# internal event states
|
||||
self.layer_current_event_value = 0
|
||||
self.animation_finished = True
|
||||
self.animation_current_brightness = self.max_brightness
|
||||
# just debug stuff
|
||||
# print("created layer for " + self.path)
|
||||
|
||||
def run(self):
|
||||
# print("next frame for " + self.path)
|
||||
self.run_next_fade_color()
|
||||
self.render_rgb_layer(
|
||||
self.layer_buffer_current_color[0],
|
||||
self.layer_buffer_current_color[1],
|
||||
self.layer_buffer_current_color[2]
|
||||
)
|
||||
|
||||
def run_next_fade_color(self):
|
||||
if not self.animation_finished and self.animation_current_brightness > 0:
|
||||
# fade rgb values
|
||||
self.animation_current_brightness -= 30
|
||||
|
||||
# set current layer color brightness to animation brightness
|
||||
for i in range(3):
|
||||
self.layer_buffer_current_color[i] = self.get_new_color_value(
|
||||
self.layer_buffer_current_color[i],
|
||||
self.animation_current_brightness
|
||||
)
|
||||
|
||||
def trigger_event(self, event_value):
|
||||
# print("new event " + self.path + " value: " + str(event_value))
|
||||
# get new rgb color for frame, based on event
|
||||
self.layer_buffer_current_color = self.get_current_frame_rgb_color(event_value)
|
||||
|
||||
def get_next_frame(self):
|
||||
return self.layer_buffer_array
|
||||
|
||||
def get_current_frame_rgb_color(self, event_value):
|
||||
# create black rgb color array
|
||||
output_rgb_array = np.zeros(self.color_values, np.uint8)
|
||||
|
||||
if event_value == 1 or event_value == 2:
|
||||
# static blue light
|
||||
output_rgb_array[2] = self.max_brightness
|
||||
self.animation_finished = True
|
||||
elif event_value == 3:
|
||||
# fade blue light
|
||||
output_rgb_array[2] = self.max_brightness
|
||||
# reset current brightness of color for new animation
|
||||
self.animation_current_brightness = self.max_brightness
|
||||
self.animation_finished = False
|
||||
elif event_value == 5 or event_value == 6:
|
||||
# static red light
|
||||
output_rgb_array[0] = self.max_brightness
|
||||
self.animation_finished = True
|
||||
elif event_value == 7:
|
||||
# fade red light
|
||||
output_rgb_array[0] = self.max_brightness
|
||||
# reset current brightness of color for new animation
|
||||
self.animation_current_brightness = self.max_brightness
|
||||
self.animation_finished = False
|
||||
elif event_value == 8:
|
||||
# idle color
|
||||
output_rgb_array[1] = self.max_brightness
|
||||
self.animation_finished = True
|
||||
|
||||
return output_rgb_array
|
||||
|
||||
def render_rgb_layer(self, r=0, g=0, b=0):
|
||||
def dolayer(color, channel):
|
||||
if color == 0:
|
||||
self.layer_buffer_array[channel::3] *= 0
|
||||
else:
|
||||
im = (color * (self.png_image_data_flatten[channel::3].astype(np.uint16)))
|
||||
self.layer_buffer_array[channel::3] = (im/255).astype(np.uint8)
|
||||
dolayer(r, 0)
|
||||
dolayer(g, 1)
|
||||
dolayer(b, 2)
|
||||
|
||||
# helper method to calculate new color value
|
||||
# based on current value and target mapping value
|
||||
def get_new_color_value(self, current_value, mapping_value):
|
||||
mapping_value_float = float(mapping_value / self.max_brightness)
|
||||
new_value = int(round(current_value * mapping_value_float))
|
||||
# print("new_color: " + str(current_value) + ":" + str(mapping_value) + "=" + str(new_value))
|
||||
return new_value
|
||||
|
||||
# helper method to flatted the png array payload, called only once on init
|
||||
def flatten_rgb_array_from_png(self, rgb_array):
|
||||
output_array = np.zeros(self.rgb_buffer_length, np.uint8)
|
||||
current_value_index = 0
|
||||
# got over every row of png
|
||||
for current_row in rgb_array:
|
||||
# go over value in payload
|
||||
for current_value in current_row:
|
||||
# write current value to output array buffer
|
||||
output_array[current_value_index] = current_value
|
||||
current_value_index += 1
|
||||
return output_array
|
||||
|
||||
|
||||
# ------------------------------------
|
||||
# Websocket event part start
|
||||
# ------------------------------------
|
||||
def parse_json(input_json):
|
||||
json_content = json.loads(input_json)
|
||||
current_event = json_content["event"]
|
||||
if current_event == "noteCut" or current_event == "noteFullyCut":
|
||||
event_note_cut(json_content["noteCut"])
|
||||
elif current_event == "bombCut":
|
||||
event_bomb_cut()
|
||||
elif current_event == "beatmapEvent":
|
||||
event_beat_map(json_content["beatmapEvent"])
|
||||
elif current_event == "hello" or current_event == "menu":
|
||||
event_idle()
|
||||
# else:
|
||||
# print("other event: " + current_event)
|
||||
|
||||
|
||||
def event_note_cut(note_cut_object):
|
||||
# print("Note Cut")
|
||||
event_note_cut_parse(note_cut_object)
|
||||
|
||||
|
||||
def event_note_cut_parse(note_cut_object):
|
||||
saber_type = note_cut_object["saberType"]
|
||||
note_type = note_cut_object["noteType"]
|
||||
light_value = 0
|
||||
if note_type == "NoteA":
|
||||
light_value = 7
|
||||
elif note_type == "NoteB":
|
||||
light_value = 3
|
||||
|
||||
if saber_type == "SaberA":
|
||||
trigger_saber_a(light_value)
|
||||
elif saber_type == "SaberB":
|
||||
trigger_saber_b(light_value)
|
||||
|
||||
|
||||
def trigger_saber_a(light_value):
|
||||
# print("saber a (red/left): " + str(light_value))
|
||||
pass
|
||||
|
||||
|
||||
def trigger_saber_b(light_value):
|
||||
# print("saber b (blue/right): " + str(light_value))
|
||||
pass
|
||||
|
||||
|
||||
def event_bomb_cut():
|
||||
# print("Bomb Cut")
|
||||
pass
|
||||
|
||||
|
||||
def event_beat_map(event_object):
|
||||
# print("Beatmap")
|
||||
event_beat_map_parse(event_object)
|
||||
|
||||
|
||||
def event_beat_map_parse(beatmap_event_object):
|
||||
event_type = beatmap_event_object["type"]
|
||||
event_value = beatmap_event_object["value"]
|
||||
if 0 <= event_type < 5:
|
||||
if event_type == 0:
|
||||
trigger_light_small(event_value)
|
||||
elif event_type == 1:
|
||||
trigger_light_big(event_value)
|
||||
elif event_type == 2:
|
||||
trigger_light_left(event_value)
|
||||
elif event_type == 3:
|
||||
trigger_light_right(event_value)
|
||||
elif event_type == 4:
|
||||
trigger_light_middle(event_value)
|
||||
|
||||
|
||||
def event_idle():
|
||||
# disable small, center, big lights
|
||||
trigger_light_small(0)
|
||||
trigger_light_middle(0)
|
||||
trigger_light_big(0)
|
||||
# set both laser to green (custom value 8)
|
||||
trigger_light_left(8)
|
||||
trigger_light_right(8)
|
||||
|
||||
|
||||
def trigger_light_small(value):
|
||||
# print("light small " + str(value))
|
||||
layer_small.trigger_event(value)
|
||||
|
||||
|
||||
def trigger_light_middle(value):
|
||||
# print("light middle " + str(value))
|
||||
layer_middle.trigger_event(value)
|
||||
|
||||
|
||||
def trigger_light_big(value):
|
||||
# print("light big " + str(value))
|
||||
layer_big.trigger_event(value)
|
||||
|
||||
|
||||
def trigger_light_left(value):
|
||||
# print("light left " + str(value))
|
||||
layer_left.trigger_event(value)
|
||||
|
||||
|
||||
def trigger_light_right(value):
|
||||
# print("light right " + str(value))
|
||||
layer_right.trigger_event(value)
|
||||
|
||||
|
||||
def on_message(ws, message):
|
||||
parse_json(message)
|
||||
|
||||
|
||||
def on_error(ws, error):
|
||||
# print(error)
|
||||
pass
|
||||
|
||||
|
||||
def on_close(ws):
|
||||
# print("### websocket closed ###")
|
||||
pass
|
||||
|
||||
|
||||
def on_open(ws):
|
||||
# print("websocket connected")
|
||||
# thread.start_new_thread(run, ())
|
||||
pass
|
||||
|
||||
|
||||
# ------------------------------------
|
||||
# Websocket event part end
|
||||
# ------------------------------------
|
||||
|
||||
|
||||
def get_higher_color_value(value1, value2):
|
||||
new_value = value2
|
||||
if value1 > value2:
|
||||
new_value = value1
|
||||
return new_value
|
||||
|
||||
|
||||
def merge_layer_arrays(layer_first, layer_second):
|
||||
return np.maximum(layer_first, layer_second)
|
||||
# output_array = np.zeros(buffer_length, np.uint8)
|
||||
# for idx in range(0, buffer_length):
|
||||
# output_array[idx] = get_higher_color_value(layer_first[idx], layer_second[idx])
|
||||
# return output_array
|
||||
|
||||
|
||||
def get_merged_layers_array():
|
||||
merged_value = merge_layer_arrays(layer_small.get_next_frame(), layer_middle.get_next_frame())
|
||||
merged_value = merge_layer_arrays(merged_value, layer_big.get_next_frame())
|
||||
merged_value = merge_layer_arrays(merged_value, layer_left.get_next_frame())
|
||||
merged_value = merge_layer_arrays(merged_value, layer_right.get_next_frame())
|
||||
output_array = merged_value
|
||||
|
||||
return output_array
|
||||
|
||||
|
||||
def millis():
|
||||
dt = datetime.now()
|
||||
cur_millis = dt.microsecond / 1000
|
||||
return int(round((time_.time() * 1000) + int(cur_millis)))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
buffer_length = Nx * Ny * 3
|
||||
# create layer objects for the lights
|
||||
layer_small = Layer('maps/small.png')
|
||||
layer_middle = Layer('maps/middle.png')
|
||||
layer_big = Layer('maps/big.png')
|
||||
layer_left = Layer('maps/left.png')
|
||||
layer_right = Layer('maps/right.png')
|
||||
|
||||
# Start websocket logic thread
|
||||
ws = websocket.WebSocketApp('ws://' + beatsaber_host + ':6557/socket',
|
||||
on_message=on_message,
|
||||
on_error=on_error,
|
||||
on_close=on_close,
|
||||
on_open=on_open)
|
||||
wst = threading.Thread(target=ws.run_forever)
|
||||
wst.daemon = True
|
||||
wst.start()
|
||||
|
||||
# frame refresh time limit, default ~16ms for about ~60fps
|
||||
frame_update_limit_ms = 0.016
|
||||
frame_last_update_ms = time.time()
|
||||
frame_current_millis = time.time()
|
||||
|
||||
# Start layer logic loop
|
||||
while True:
|
||||
frame_current_millis = time.time()
|
||||
if frame_current_millis - frame_last_update_ms >= frame_update_limit_ms:
|
||||
# run layer render logic
|
||||
layer_small.run()
|
||||
layer_middle.run()
|
||||
layer_big.run()
|
||||
layer_left.run()
|
||||
layer_right.run()
|
||||
# merge layers together
|
||||
output_buffer = get_merged_layers_array()
|
||||
# convert frame buffer to byte string and write to stdout
|
||||
os.write(1, output_buffer.tobytes())
|
||||
# refresh last frame update time
|
||||
frame_last_update_ms = time.time()
|
||||
else:
|
||||
time.sleep((frame_last_update_ms+frame_update_limit_ms)-frame_current_millis)
|
||||
|
BIN
beatsaberceiling/maps/LightShowMap.xcf
Normal file
BIN
beatsaberceiling/maps/LightShowMap.xcf
Normal file
Binary file not shown.
BIN
beatsaberceiling/maps/big.png
Normal file
BIN
beatsaberceiling/maps/big.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 170 B |
BIN
beatsaberceiling/maps/left.png
Normal file
BIN
beatsaberceiling/maps/left.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 136 B |
BIN
beatsaberceiling/maps/middle.png
Normal file
BIN
beatsaberceiling/maps/middle.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 165 B |
BIN
beatsaberceiling/maps/right.png
Normal file
BIN
beatsaberceiling/maps/right.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 140 B |
BIN
beatsaberceiling/maps/small.png
Normal file
BIN
beatsaberceiling/maps/small.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 159 B |
7
beatsaberceiling/setup.sh
Normal file
7
beatsaberceiling/setup.sh
Normal file
@@ -0,0 +1,7 @@
|
||||
#!/bin/bash
|
||||
|
||||
cp -f -r beatsaberceiling ../../apps/
|
||||
|
||||
chmod +x ../../apps/beatsaberceiling/beatsaberceiling.py
|
||||
|
||||
cp -f beatsaberconf.py ../../configs/
|
Reference in New Issue
Block a user