#!/usr/bin/env python3 import json import logging import math import os import subprocess import threading import time from collections import OrderedDict import bottle import numpy as np import serial # noinspection PyUnresolvedReferences import bottle.ext.websocket as bottle_ws # noinspection PyUnresolvedReferences from bottle.ext.websocket import GeventWebSocketServer import geventwebsocket.websocket import config import filters logging.basicConfig(filename='pixelserver.log', level=config.LogLevel) ######################################################################## # Utils # ######################################################################## class DataSource: def __init__(self, initial): self.data = initial self.listeners = [] def getData(self) -> "Frame": return self.data def addListener(self, listener): self.listeners.append(listener) return self def pushData(self, data): self.data = data for listener in self.listeners: with listener: listener.notify_all() class WatchDog(threading.Thread): def __init__(self, check, action): super().__init__(daemon=True) self.check = check self.action = action self.running = True def run(self): while running and self.running: if self.check(): logging.error("Watchdog: Executed") self.action() time.sleep(1) def stop(self): self.running = False class LogReader(threading.Thread): def __init__(self, runner): super().__init__(daemon=True) self.runner = runner self.log = "" self.running = True def clear(self): self.log = "" def getLog(self): return self.log def run(self): logging.info("LogReader started") while running and self.running: try: self.log += self.runner.app.stderr.read(1).decode("utf-8") except Exception as e: print(e) logging.error(str(e)) time.sleep(1) logging.info("LogReader closed") def stop(self): self.running = False class Frame: def __init__(self, buffer, channels=3): self.buffer: np.ndarray = buffer self.created = time.time() self.channels = channels def clone(self): f = Frame(self.buffer + 0, self.channels) f.created = self.created return f ######################################################################## # GUI # ######################################################################## if config.UseGui: import pygame class Gui(threading.Thread): def __init__(self, datasource): super().__init__(daemon=True) self.cv = threading.Condition() self.datasource = datasource.addListener(self.cv) def run(self): last_frame = time.time() logging.info("Starting GUI") sf = config.GuiScaleFactor pygame.init() screen = pygame.display.set_mode((sf * config.ScreenX, sf * config.ScreenY)) pygame.display.set_caption("Pixelserver - GUI Vis") while running: for event in pygame.event.get(): pass with self.cv: self.cv.wait() frame = self.datasource.getData() screen.fill((0, 0, 0)) if frame.channels == 3: for x in range(config.ScreenX): for y in range(config.ScreenY): color = (frame.buffer[y, x, 0], frame.buffer[y, x, 1], frame.buffer[y, x, 2]) pygame.draw.rect(screen, color, pygame.Rect(sf * x, sf * y, sf, sf)) elif frame.channels == 4: for x in range(config.ScreenX): for y in range(config.ScreenY): w = frame.buffer[y, x, 3] // 2 color = (frame.buffer[y, x, 0] // 2 + w, frame.buffer[y, x, 1] // 2 + w, frame.buffer[y, x, 2] // 2 + w) pygame.draw.rect(screen, color, pygame.Rect(sf * x, sf * y, sf, sf)) # logging.debug("Time to gui: " + str(time.time() - frame.created)) pygame.display.flip() if time.time() < last_frame + 1 / config.GuiFPS: time.sleep(max(0.01, time.time() - (last_frame + 1 / config.GuiFPS))) # time.sleep(0.01) last_frame = time.time() logging.info("Closing GUI") def join(self, **kwargs): with self.cv: self.cv.notify_all() super().join(**kwargs) ######################################################################## # Serial # ######################################################################## class SerialWriter(threading.Thread): def __init__(self, datasource): super().__init__(daemon=True) self.cv = threading.Condition() self.datasource = datasource.addListener(self.cv) self.gamma_rgbw = 0, 0, 0, 0 self.updateGamma = False def run(self): should_connect = True ser = None logging.info("Starting SerialWriter") while running: try: if should_connect: ser = serial.Serial(config.Serial) should_connect = False logging.info("Serial Opened") with self.cv: self.cv.wait(timeout=1 / 30) frame = self.datasource.getData() data = frame.buffer.reshape((config.ScreenX * config.ScreenY * frame.channels,)).astype(np.uint8).tobytes() if self.updateGamma: r, g, b, w = self.gamma_rgbw apply = lambda x, g: max(0, min(255, int(math.pow(x / 255, g) * 255))) buf = bytearray(4 * 256) for i in range(256): buf[i] = apply(i, r) buf[i + 256] = apply(i, g) buf[i + 256 * 2] = apply(i, b) buf[i + 256 * 3] = apply(i, w) ser.write(b"\x02") ser.write(buf) self.updateGamma = False if frame.channels == 3: ser.write(b"\01") ser.write(data) elif frame.channels == 4: ser.write(b"\03") ser.write(data) logging.debug(f"Time to gui: {time.time() - frame.created}") ser.flush() except Exception as e: if ser is not None: ser.close() ser = None logging.warning(f"Serial was close because: {e}") should_connect = True time.sleep(5) logging.info("Closing SerialWriter") def join(self, **kwargs): with self.cv: self.cv.notify_all() super().join(**kwargs) def setGamma(self, r, g, b, w): with self.cv: self.gamma_rgbw = r, g, b, w self.updateGamma = True self.cv.notify_all() ######################################################################## # App # ######################################################################## class App(threading.Thread): def __init__(self, name, cmd, param, listener, is_persistent, is_white=False, path="."): super().__init__(daemon=True) # start app self.name = name args = cmd + [str(config.ScreenX), str(config.ScreenY), param] self.app = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, cwd=path) self.last_update = time.time() # self.cv = threading.Condition() self.watchdog = WatchDog(lambda: self.isAppTimedOut(), lambda: self.terminateApp()) self.watchdog.start() self.logreader = LogReader(self) self.logreader.start() self.datasource = DataSource(Frame(np.zeros((config.ScreenY, config.ScreenX, 3)))) self.running = True self.listener = listener self.is_persistent = is_persistent self.is_white = is_white def run(self): while running and self.running and self.alive(): oshandle = self.app.stdout.fileno() try: channels = 4 if self.is_white else 3 data = os.read(oshandle, config.ScreenX * config.ScreenY * channels) assert len(data) == config.ScreenX * config.ScreenY * channels buffer = np.frombuffer(data, dtype=np.uint8, count=config.ScreenX * config.ScreenY * channels) buffer = buffer.reshape((config.ScreenY, config.ScreenX, channels)) frame = Frame(buffer, channels=channels) self.last_update = time.time() self.datasource.pushData(frame) except Exception as ex: logging.debug(f"Exception in App.run: {ex}") with self.listener: self.listener.notify_all() self.watchdog.stop() self.logreader.stop() self.watchdog.join() self.logreader.join() self.app.wait() logging.debug("App stopped") def alive(self): return self.app.poll() is None def stop(self): self.running = False self.app.kill() self.app.stdout.close() self.app.stderr.close() self.watchdog.stop() self.logreader.stop() self.app.wait() logging.debug("App stopped") def getLog(self): return self.logreader.getLog() def terminateApp(self): logging.error("Terminate app!") self.stop() def isAppTimedOut(self): return time.time() - self.last_update > config.NoDataTimeout class PixelCanvas(App): # noinspection PyMissingConstructor def __init__(self): threading.Thread.__init__(self, daemon=True) self.name = "pixelcanvas" self.running = True self.is_persistent = True self.datasource = DataSource(Frame(np.zeros((config.ScreenY, config.ScreenX, 3)))) def run(self): while running and self.running: time.sleep(1) def alive(self): return True def stop(self): pass def getLog(self): return "" def terminateApp(self): pass def isAppTimedOut(self): return False ######################################################################## # Main # ######################################################################## class AppRunner(threading.Thread): def __init__(self): super().__init__(daemon=True) self.last_crashlog = "" self.currentApp = -1 self.requestedApp = 0 self.app = None self.cv = threading.Condition() self.param = "" self.datasource = DataSource(Frame(np.zeros((config.ScreenY, config.ScreenX, 3)))) self.serial = SerialWriter(self.datasource) self.serial.start() self.persistent_apps = {} self.filters = OrderedDict() # start persistent apps for i, app in enumerate(config.Apps): if app.persistent: self.startApp(i) if config.UseGui: self.gui = Gui(self.datasource) self.gui.start() def requestApp(self, app, param=""): with self.cv: self.requestedApp = app self.param = param self.cv.notify_all() logging.info(f"Requesting app: {app}") def startApp(self, i, param=""): app = config.Apps[i] if app.name == "pixelcanvas": newapp = PixelCanvas() else: newapp = App(app.name, app.cmd, param, self.cv, is_persistent=app.persistent, is_white=app.white, path=app.path) newapp.datasource.addListener(self.cv) newapp.start() if app.persistent: self.persistent_apps[self.currentApp] = newapp return newapp def updateApp(self): try: if self.app is not None and not self.app.is_persistent: self.app.stop() self.currentApp = self.requestedApp if self.currentApp in self.persistent_apps.keys() and self.persistent_apps[self.currentApp].alive(): self.app = self.persistent_apps[self.currentApp] else: self.app = self.startApp(self.requestedApp, self.param) except FileNotFoundError as e: print(e) def run(self): logging.info("Starting Apprunner") while running: with self.cv: if self.app is None or not self.app.alive(): if self.app is not None: self.last_crashlog = self.app.getLog() self.requestedApp = 0 if self.requestedApp is not None: self.updateApp() self.requestedApp = None if self.app is not None: frame = self.app.datasource.getData().clone() # logging.debug("Runner in time: " + str(time.time() - frame.created)) for _, f in self.filters.items(): frame.buffer = f(frame.buffer) # logging.debug("Runner out time: " + str(time.time() - frame.created)) self.datasource.pushData(frame) self.cv.wait() self.serial.join() if config.UseGui: self.gui.join() logging.info("Close Apprunner") def getLog(self): if self.app is None: return "" return self.app.getLog() def setGamma(self, r, g, b, w): self.serial.setGamma(r, g, b, w) def setFilter(self, name, filter_): self.filters[name] = filter_ def removeFilter(self, name): if name in self.filters.keys(): del self.filters[name] ######################################################################## # Web Api # ######################################################################## @bottle.route('/<:re:.*>', method='OPTIONS') def enable_cors_generic_route(): add_cors_headers() @bottle.hook('after_request') def enable_cors_after_request_hook(): add_cors_headers() def add_cors_headers(): bottle.response.headers['Access-Control-Allow-Origin'] = '*' bottle.response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS' bottle.response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' def startApp(name, param=""): for i, app in enumerate(config.Apps): if app.name == name: runner.requestApp(i, param) return "ok" return "not_found" @bottle.route("/apps/list") def apps_list(): return json.dumps([ { "name": app.name, "guiname": app.guiname, "persistent": app.persistent, } for app in config.Apps ]) @bottle.route("/apps/start/") def apps_start_param(name): return startApp(name) @bottle.post("/apps/start/") def apps_start_post(name): param = bottle.request.forms.get('param') return startApp(name, param) @bottle.route("/apps/start//") def apps_start(name, param): return startApp(name, param) @bottle.route("/apps/log") def apps_log(): return runner.getLog() @bottle.route("/apps/crashlog") def apps_log(): return runner.last_crashlog @bottle.route("/apps/running") def apps_running(): return config.Apps[runner.currentApp].name @bottle.route("/frame") def frame(): data = runner.datasource.getData() return {"data": data.buffer.flatten().tolist(), "channels": data.channels} @bottle.route("/pixel//////") def pixel(x, y, r, g, b, w): if runner.app.name != "pixelcanvas": startApp("pixelcanvas") data = runner.app.datasource.getData().clone() data.created = time.time() data.buffer[y][x][0] = r data.buffer[y][x][1] = g data.buffer[y][x][2] = b if data.channels == 4: data.buffer[y][x][3] = w runner.datasource.pushData(data) return "ok" @bottle.get("/frame_ws", apply=[bottle_ws.websocket]) def frame_ws(ws: geventwebsocket.websocket.WebSocket): while not ws.closed: msg = json.loads(ws.receive()) if msg["ty"] == "frame": data = runner.datasource.getData() if msg.get("time", None) == str(data.created): ws.send(json.dumps({ "type": "frame_unchanged", }, separators=(',', ':'))) else: ws.send(json.dumps({ "ty": "frame", "data": data.buffer.flatten().tolist(), "channels": data.channels, "time": str(data.created), }, separators=(',', ':'))) elif msg["ty"] == "pixel": if runner.app.name != "pixelcanvas": startApp("pixelcanvas") x = msg["x"] y = msg["y"] data = runner.app.datasource.getData().clone() data.created = time.time() data.buffer[y, x, 0] = msg["r"] data.buffer[y, x, 1] = msg["g"] data.buffer[y, x, 2] = msg["b"] if data.channels == 4 and "w" in msg: data.buffer[y, x, 3] = msg["w"] runner.app.datasource.pushData(data) elif msg["ty"] == "fill": if runner.app.name != "pixelcanvas": startApp("pixelcanvas") data = runner.app.datasource.getData().clone() data.created = time.time() data.buffer[..., 0] = msg["r"] data.buffer[..., 1] = msg["g"] data.buffer[..., 2] = msg["b"] if data.channels == 4 and "w" in msg: data.buffer[..., 3] = msg["w"] runner.app.datasource.pushData(data) @bottle.route("/") def index(): return bottle.static_file("index.html", root='html') @bottle.route("/setgamma////") def setGamma(r, g, b, w): runner.setGamma(r, g, b, w) return "ok" @bottle.route("/setbrightness/") def setIntensity(i): if not 0 <= i <= 1: return "bad_value" runner.setFilter("0_intensity", filters.MakeBrightnessFilter(i)) return "ok" @bottle.route("/filter/flipx/") def flipx(do): if do == "true": runner.setFilter("1_flipx", filters.FlipXFilter) else: runner.removeFilter("1_flipx") return "ok" @bottle.route("/filter/flipy/") def flipy(do): if do == "true": runner.setFilter("1_flipy", filters.FlipYFilter) else: runner.removeFilter("1_flipy") return "ok" @bottle.route("/filter/img/") def setfilter(name): if name == "none": runner.removeFilter("3_imgfilter") else: runner.setFilter("3_imgfilter", filters.MakeBrightnessImageFilter(name)) return "ok" @bottle.post("/filter/expr/") def filter_expr(): expr = bottle.request.forms.get('expr') if expr == "" or expr == "none": runner.removeFilter("5_brightnessfunction") else: runner.setFilter("5_brightnessfunction", filters.MakeBrightnessExprFilter(expr)) return "ok" # generic rule must be the last @bottle.route("/") def serve_static(filepath): return bottle.static_file(filepath, root='html') def main(): global running, runner ######################################################################## # Startup # ######################################################################## running = True runner = AppRunner() runner.start() # runner.setFilter("5_crazy", MakeBrightnessExprFilter("0.5+0.25*sin(x/3)/x")) bottle.run(host=config.WebHost, port=config.WebPort, server=GeventWebSocketServer) ######################################################################## # Shutdown # ######################################################################## running = False runner.join() if __name__ == '__main__': main()