pixelserver2/main.py
2024-11-10 02:27:36 +01:00

532 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
import json
import logging
import math
import os
import subprocess
import threading
import time
from collections import OrderedDict
import bottle
import numpy as np
import serial
import config
import filters
logging.basicConfig(filename='pixelserver.log', level=config.LogLevel)
########################################################################
# Utils #
########################################################################
class DataSource:
def __init__(self, initial):
self.data = initial
self.listeners = []
def getData(self):
return self.data
def addListener(self, listener):
self.listeners.append(listener)
return self
def pushData(self, data):
self.data = data
for listener in self.listeners:
with listener:
listener.notify_all()
class WatchDog(threading.Thread):
def __init__(self, check, action):
super().__init__(daemon=True)
self.check = check
self.action = action
self.running = True
def run(self):
while running and self.running:
if self.check():
logging.error("Watchdog: Executed")
self.action()
time.sleep(1)
def stop(self):
self.running = False
class LogReader(threading.Thread):
def __init__(self, runner):
super().__init__(daemon=True)
self.runner = runner
self.log = ""
self.running = True
def clear(self):
self.log = ""
def getLog(self):
return self.log
def run(self):
logging.info("LogReader started")
while running and self.running:
try:
self.log += self.runner.app.stderr.read(1).decode("utf-8")
except Exception as e:
print(e)
logging.error(str(e))
time.sleep(1)
logging.info("LogReader closed")
def stop(self):
self.running = False
class Frame:
def __init__(self, buffer, channels=3):
self.buffer = buffer
self.created = time.time()
self.channels = channels
def clone(self):
f = Frame(self.buffer + 0, self.channels)
f.created = self.created
return f
########################################################################
# GUI #
########################################################################
if config.UseGui:
import pygame
class Gui(threading.Thread):
def __init__(self, datasource):
super().__init__(daemon=True)
self.cv = threading.Condition()
self.datasource = datasource.addListener(self.cv)
def run(self):
last_frame = time.time()
logging.info("Starting GUI")
sf = config.GuiScaleFactor
pygame.init()
screen = pygame.display.set_mode((sf * config.ScreenX, sf * config.ScreenY))
pygame.display.set_caption("Pixelserver - GUI Vis")
while running:
for event in pygame.event.get():
pass
with self.cv:
self.cv.wait()
frame = self.datasource.getData()
screen.fill((0, 0, 0))
if frame.channels == 3:
for x in range(config.ScreenX):
for y in range(config.ScreenY):
color = (frame.buffer[y, x, 0], frame.buffer[y, x, 1], frame.buffer[y, x, 2])
pygame.draw.rect(screen, color, pygame.Rect(sf * x, sf * y, sf, sf))
elif frame.channels == 4:
for x in range(config.ScreenX):
for y in range(config.ScreenY):
w = frame.buffer[y, x, 3] // 2
color = (frame.buffer[y, x, 0] // 2 + w, frame.buffer[y, x, 1] // 2 + w, frame.buffer[y, x, 2] // 2 + w)
pygame.draw.rect(screen, color, pygame.Rect(sf * x, sf * y, sf, sf))
# logging.debug("Time to gui: " + str(time.time() - frame.created))
pygame.display.flip()
if time.time() < last_frame + 1 / config.GuiFPS:
time.sleep(max(0.01, time.time() - (last_frame + 1 / config.GuiFPS)))
# time.sleep(0.01)
last_frame = time.time()
logging.info("Closing GUI")
def join(self, **kwargs):
with self.cv:
self.cv.notify_all()
super().join(**kwargs)
########################################################################
# Serial #
########################################################################
class SerialWriter(threading.Thread):
def __init__(self, datasource):
super().__init__(daemon=True)
self.cv = threading.Condition()
self.datasource = datasource.addListener(self.cv)
self.gamma_rgbw = 0, 0, 0, 0
self.updateGamma = False
def run(self):
should_connect = True
ser = None
logging.info("Starting SerialWriter")
while running:
try:
if should_connect:
ser = serial.Serial(config.Serial)
should_connect = False
logging.info("Serial Opened")
with self.cv:
self.cv.wait(timeout=1 / 30)
frame = self.datasource.getData()
data = frame.buffer.reshape((config.ScreenX * config.ScreenY * frame.channels,)).astype(np.uint8).tobytes()
if self.updateGamma:
r, g, b, w = self.gamma_rgbw
apply = lambda x, g: max(0, min(255, int(math.pow(x / 255, g) * 255)))
buf = bytearray(4 * 256)
for i in range(256):
buf[i] = apply(i, r)
buf[i + 256] = apply(i, g)
buf[i + 256 * 2] = apply(i, b)
buf[i + 256 * 3] = apply(i, w)
ser.write(b"\x02")
ser.write(buf)
self.updateGamma = False
if frame.channels == 3:
ser.write(b"\01")
ser.write(data)
elif frame.channels == 4:
ser.write(b"\03")
ser.write(data)
logging.debug(f"Time to gui: {time.time() - frame.created}")
ser.flush()
except Exception as e:
if ser is not None:
ser.close()
ser = None
logging.warning(f"Serial was close because: {e}")
should_connect = True
time.sleep(5)
logging.info("Closing SerialWriter")
def join(self, **kwargs):
with self.cv:
self.cv.notify_all()
super().join(**kwargs)
def setGamma(self, r, g, b, w):
with self.cv:
self.gamma_rgbw = r, g, b, w
self.updateGamma = True
self.cv.notify_all()
########################################################################
# App #
########################################################################
class App(threading.Thread):
def __init__(self, cmd, param, listener, is_persistent, is_white=False, path="."):
super().__init__(daemon=True)
# start app
args = cmd + [str(config.ScreenX), str(config.ScreenY), param]
self.app = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, cwd=path)
self.last_update = time.time()
self.cv = threading.Condition()
self.watchdog = WatchDog(lambda: self.isAppTimedOut(), lambda: self.terminateApp())
self.watchdog.start()
self.logreader = LogReader(self)
self.logreader.start()
self.datasource = DataSource(Frame(np.zeros((config.ScreenY, config.ScreenX, 3))))
self.running = True
self.listener = listener
self.is_persistent = is_persistent
self.is_white = is_white
def run(self):
while running and self.running and self.alive():
oshandle = self.app.stdout.fileno()
try:
channels = 4 if self.is_white else 3
data = os.read(oshandle, config.ScreenX * config.ScreenY * channels)
assert len(data) == config.ScreenX * config.ScreenY * channels
buffer = np.frombuffer(data, dtype=np.uint8, count=config.ScreenX * config.ScreenY * channels)
buffer = buffer.reshape((config.ScreenY, config.ScreenX, channels))
frame = Frame(buffer, channels=channels)
self.last_update = time.time()
self.datasource.pushData(frame)
except:
logging.debug("Exception in App.run")
with self.listener:
self.listener.notify_all()
self.watchdog.stop()
self.logreader.stop()
self.watchdog.join()
self.logreader.join()
self.app.wait()
logging.debug("App stopped")
def alive(self):
return self.app.poll() is None
def stop(self):
self.running = False
self.app.kill()
self.app.stdout.close()
self.app.stderr.close()
self.watchdog.stop()
self.logreader.stop()
self.app.wait()
logging.debug("App stopped")
def getLog(self):
return self.logreader.getLog()
def terminateApp(self):
logging.error("Terminate app!")
self.stop()
def isAppTimedOut(self):
return time.time() - self.last_update > config.NoDataTimeout
########################################################################
# Main #
########################################################################
class AppRunner(threading.Thread):
def __init__(self):
super().__init__(daemon=True)
self.last_crashlog = ""
self.currentApp = -1
self.requestedApp = 0
self.app = None
self.cv = threading.Condition()
self.param = ""
self.datasource = DataSource(Frame(np.zeros((config.ScreenY, config.ScreenX, 3))))
self.serial = SerialWriter(self.datasource)
self.serial.start()
self.persistent_apps = {}
self.filters = OrderedDict()
# start persistent apps
for i, app in enumerate(config.Apps):
if app.persistent:
self.startApp(i)
if config.UseGui:
self.gui = Gui(self.datasource)
self.gui.start()
def requestApp(self, app, param=""):
with self.cv:
self.requestedApp = app
self.param = param
self.cv.notify_all()
logging.info(f"Requesting app: {app}")
def startApp(self, i, param=""):
app = config.Apps[i]
newapp = App(app.cmd, param, self.cv, is_persistent=app.persistent, is_white=app.white, path=app.path)
newapp.datasource.addListener(self.cv)
newapp.start()
if app.persistent:
self.persistent_apps[self.currentApp] = newapp
return newapp
def updateApp(self):
try:
if self.app is not None and not self.app.is_persistent:
self.app.stop()
self.currentApp = self.requestedApp
if self.currentApp in self.persistent_apps.keys() and self.persistent_apps[self.currentApp].alive():
self.app = self.persistent_apps[self.currentApp]
else:
self.app = self.startApp(self.requestedApp, self.param)
except FileNotFoundError as e:
print(e)
def run(self):
logging.info("Starting Apprunner")
while running:
with self.cv:
if self.app is None or not self.app.alive():
if self.app is not None:
self.last_crashlog = self.app.getLog()
self.requestedApp = 0
if self.requestedApp is not None:
self.updateApp()
self.requestedApp = None
if self.app is not None:
frame = self.app.datasource.getData().clone()
# logging.debug("Runner in time: " + str(time.time() - frame.created))
for _, f in self.filters.items():
frame.buffer = f(frame.buffer)
# logging.debug("Runner out time: " + str(time.time() - frame.created))
self.datasource.pushData(frame)
self.cv.wait()
self.serial.join()
if config.UseGui:
self.gui.join()
logging.info("Close Apprunner")
def getLog(self):
if self.app is None:
return ""
return self.app.getLog()
def setGamma(self, r, g, b, w):
self.serial.setGamma(r, g, b, w)
def setFilter(self, name, filter_):
self.filters[name] = filter_
def removeFilter(self, name):
if name in self.filters.keys():
del self.filters[name]
########################################################################
# Web Api #
########################################################################
@bottle.route('/<:re:.*>', method='OPTIONS')
def enable_cors_generic_route():
add_cors_headers()
@bottle.hook('after_request')
def enable_cors_after_request_hook():
add_cors_headers()
def add_cors_headers():
bottle.response.headers['Access-Control-Allow-Origin'] = '*'
bottle.response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
bottle.response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
def startApp(name, param=""):
for i, app in enumerate(config.Apps):
if app.name == name:
runner.requestApp(i, param)
return "ok"
return "not_found"
@bottle.route("/apps/list")
def apps_list():
return json.dumps([
{
"name": app.name,
"guiname": app.guiname,
"persistent": app.persistent,
} for app in config.Apps
])
@bottle.route("/apps/start/<name>")
def apps_start_param(name):
return startApp(name)
@bottle.post("/apps/start/<name>")
def apps_start_post(name):
param = bottle.request.forms.get('param')
return startApp(name, param)
@bottle.route("/apps/start/<name>/<param>")
def apps_start(name, param):
return startApp(name, param)
@bottle.route("/apps/log")
def apps_log():
return runner.getLog()
@bottle.route("/apps/crashlog")
def apps_log():
return runner.last_crashlog
@bottle.route("/apps/running")
def apps_running():
return config.Apps[runner.currentApp].name
@bottle.route("/")
def index():
return bottle.static_file("index.html", root='html')
@bottle.route("/<filepath:path>")
def serve_static(filepath):
return bottle.static_file(filepath, root='html')
@bottle.route("/setgamma/<r>/<g>/<b>/<w>")
def setGamma(r, g, b, w):
runner.setGamma(float(r), float(g), float(b), float(w))
return "ok"
@bottle.route("/setbrightness/<i>")
def setIntensity(i):
i = float(i)
if not 0 <= i <= 1:
return "bad_value"
runner.setFilter("0_intensity", filters.MakeBrightnessFilter(i))
return "ok"
@bottle.route("/filter/flipx/<do>")
def flipx(do):
if do == "true":
runner.setFilter("1_flipx", filters.FlipXFilter)
else:
runner.removeFilter("1_flipx")
return "ok"
@bottle.route("/filter/flipy/<do>")
def flipy(do):
if do == "true":
runner.setFilter("1_flipy", filters.FlipYFilter)
else:
runner.removeFilter("1_flipy")
return "ok"
@bottle.route("/filter/img/<name>")
def setfilter(name):
if name == "none":
runner.removeFilter("3_imgfilter")
else:
runner.setFilter("3_imgfilter", filters.MakeBrightnessImageFilter(name))
return "ok"
@bottle.post("/filter/expr/")
def filter_expr():
expr = bottle.request.forms.get('expr')
if expr == "" or expr == "none":
runner.removeFilter("5_brightnessfunction")
else:
runner.setFilter("5_brightnessfunction", filters.MakeBrightnessExprFilter(expr))
return "ok"
if __name__ == '__main__':
########################################################################
# Startup #
########################################################################
running = True
runner = AppRunner()
runner.start()
# runner.setFilter("5_crazy", MakeBrightnessExprFilter("0.5+0.25*sin(x/3)/x"))
bottle.run(host=config.WebHost, port=config.WebPort)
########################################################################
# Shutdown #
########################################################################
running = False
runner.join()