#!/usr/bin/env python3 import base64 import datetime import io import json import logging import math import os import subprocess import threading import time from collections import OrderedDict from pathlib import Path import PIL.Image import bottle # noinspection PyUnresolvedReferences import bottle.ext.websocket as bottle_ws import geventwebsocket.websocket import numpy as np import urllib.request import serial # noinspection PyUnresolvedReferences from bottle.ext.websocket import GeventWebSocketServer import config import filters logging.basicConfig(filename='pixelserver.log', level=config.LogLevel) ######################################################################## # Utils # ######################################################################## class DataSource: def __init__(self, initial): self.data = initial self.listeners = [] def getData(self) -> "Frame": return self.data def addListener(self, listener): self.listeners.append(listener) return self def pushData(self, data): self.data = data for listener in self.listeners: with listener: listener.notify_all() class WatchDog(threading.Thread): def __init__(self, check, action): super().__init__(daemon=True) self.check = check self.action = action self.running = True def run(self): while running and self.running: if self.check(): logging.error("Watchdog: Executed") self.action() time.sleep(1) def stop(self): self.running = False class LogReader(threading.Thread): def __init__(self, runner): super().__init__(daemon=True) self.runner = runner self.log = "" self.running = True def clear(self): self.log = "" def getLog(self): return self.log def run(self): logging.info("LogReader started") while running and self.running: try: self.log += self.runner.app.stderr.read(1).decode("utf-8") except Exception as e: print(e) logging.error(str(e)) time.sleep(1) logging.info("LogReader closed") def stop(self): self.running = False class Frame: def __init__(self, buffer, channels=3): self.buffer: np.ndarray = buffer self.created = time.time() self.channels = channels def clone(self): f = Frame(self.buffer + 0, self.channels) f.created = self.created return f ######################################################################## # GUI # ######################################################################## if config.UseGui: import pygame class Gui(threading.Thread): def __init__(self, datasource): super().__init__(daemon=True) self.cv = threading.Condition() self.datasource = datasource.addListener(self.cv) def run(self): last_frame = time.time() logging.info("Starting GUI") sf = config.GuiScaleFactor pygame.init() screen = pygame.display.set_mode((sf * config.ScreenX, sf * config.ScreenY)) pygame.display.set_caption("Pixelserver - GUI Vis") while running: for event in pygame.event.get(): pass with self.cv: self.cv.wait() frame = self.datasource.getData() screen.fill((0, 0, 0)) if frame.channels == 3: for x in range(config.ScreenX): for y in range(config.ScreenY): color = (frame.buffer[y, x, 0], frame.buffer[y, x, 1], frame.buffer[y, x, 2]) pygame.draw.rect(screen, color, pygame.Rect(sf * x, sf * y, sf, sf)) elif frame.channels == 4: for x in range(config.ScreenX): for y in range(config.ScreenY): w = frame.buffer[y, x, 3] // 2 color = (frame.buffer[y, x, 0] // 2 + w, frame.buffer[y, x, 1] // 2 + w, frame.buffer[y, x, 2] // 2 + w) pygame.draw.rect(screen, color, pygame.Rect(sf * x, sf * y, sf, sf)) # logging.debug("Time to gui: " + str(time.time() - frame.created)) pygame.display.flip() if time.time() < last_frame + 1 / config.GuiFPS: time.sleep(max(0.01, time.time() - (last_frame + 1 / config.GuiFPS))) # time.sleep(0.01) last_frame = time.time() logging.info("Closing GUI") def join(self, **kwargs): with self.cv: self.cv.notify_all() super().join(**kwargs) ######################################################################## # Serial # ######################################################################## class SerialWriter(threading.Thread): def __init__(self, datasource): super().__init__(daemon=True) self.cv = threading.Condition() self.datasource = datasource.addListener(self.cv) self.gamma_rgbw = 0, 0, 0, 0 self.updateGamma = False def run(self): should_connect = True ser = None logging.info("Starting SerialWriter") while running: try: if should_connect: ser = serial.Serial(config.Serial) should_connect = False logging.info("Serial Opened") with self.cv: self.cv.wait(timeout=1 / 30) frame = self.datasource.getData() data = frame.buffer.reshape((config.ScreenX * config.ScreenY * frame.channels,)).astype(np.uint8).tobytes() if self.updateGamma: r, g, b, w = self.gamma_rgbw apply = lambda x, g: max(0, min(255, int(math.pow(x / 255, g) * 255))) buf = bytearray(4 * 256) for i in range(256): buf[i] = apply(i, r) buf[i + 256] = apply(i, g) buf[i + 256 * 2] = apply(i, b) buf[i + 256 * 3] = apply(i, w) ser.write(b"\x02") ser.write(buf) self.updateGamma = False if frame.channels == 3: ser.write(b"\01") ser.write(data) elif frame.channels == 4: ser.write(b"\03") ser.write(data) logging.debug(f"Time to gui: {time.time() - frame.created}") ser.flush() except Exception as e: if ser is not None: ser.close() ser = None logging.warning(f"Serial was close because: {e}") should_connect = True time.sleep(5) logging.info("Closing SerialWriter") def join(self, **kwargs): with self.cv: self.cv.notify_all() super().join(**kwargs) def setGamma(self, r, g, b, w): with self.cv: self.gamma_rgbw = r, g, b, w self.updateGamma = True self.cv.notify_all() ######################################################################## # App # ######################################################################## class App(threading.Thread): def __init__(self, name, cmd, param, listener, is_persistent, is_white=False, path="."): super().__init__(daemon=True) # start app self.name = name args = cmd + [str(config.ScreenX), str(config.ScreenY), param] self.app = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, cwd=path) self.last_update = time.time() # self.cv = threading.Condition() self.watchdog = WatchDog(lambda: self.isAppTimedOut(), lambda: self.terminateApp()) self.watchdog.start() self.logreader = LogReader(self) self.logreader.start() self.datasource = DataSource(Frame(np.zeros((config.ScreenY, config.ScreenX, 3)))) self.running = True self.listener = listener self.is_persistent = is_persistent self.is_white = is_white def run(self): while running and self.running and self.alive(): oshandle = self.app.stdout.fileno() try: channels = 4 if self.is_white else 3 data = os.read(oshandle, config.ScreenX * config.ScreenY * channels) assert len(data) == config.ScreenX * config.ScreenY * channels buffer = np.frombuffer(data, dtype=np.uint8, count=config.ScreenX * config.ScreenY * channels) buffer = buffer.reshape((config.ScreenY, config.ScreenX, channels)) frame = Frame(buffer, channels=channels) self.last_update = time.time() self.datasource.pushData(frame) except Exception as ex: logging.debug(f"Exception in App.run: {ex}") with self.listener: self.listener.notify_all() self.watchdog.stop() self.logreader.stop() self.watchdog.join() self.logreader.join() self.app.wait() logging.debug("App stopped") def alive(self): return self.app.poll() is None def stop(self): self.running = False self.app.kill() self.app.stdout.close() self.app.stderr.close() self.watchdog.stop() self.logreader.stop() self.app.wait() logging.debug("App stopped") def getLog(self): return self.logreader.getLog() def terminateApp(self): logging.error("Terminate app!") self.stop() def isAppTimedOut(self): return time.time() - self.last_update > config.NoDataTimeout class PixelCanvas(App): # noinspection PyMissingConstructor def __init__(self): threading.Thread.__init__(self, daemon=True) self.name = "pixelcanvas" self.running = True self.is_persistent = True self.datasource = DataSource(Frame(np.zeros((config.ScreenY, config.ScreenX, 3)))) def run(self): while running and self.running: time.sleep(1) def alive(self): return True def stop(self): pass def getLog(self): return "" def terminateApp(self): pass def isAppTimedOut(self): return False ######################################################################## # Main # ######################################################################## class AppRunner(threading.Thread): def __init__(self): super().__init__(daemon=True) self.last_crashlog = "" self.currentApp = -1 self.requestedApp = 0 self.app = None self.cv = threading.Condition() self.param = "" self.datasource = DataSource(Frame(np.zeros((config.ScreenY, config.ScreenX, 3)))) self.serial = SerialWriter(self.datasource) self.serial.start() self.persistent_apps = {} self.filters = OrderedDict() # start persistent apps for i, app in enumerate(config.Apps): if app.persistent: self.startApp(i) if config.UseGui: self.gui = Gui(self.datasource) self.gui.start() def requestApp(self, app, param=""): with self.cv: self.requestedApp = app self.param = param self.cv.notify_all() logging.info(f"Requesting app: {app}") def startApp(self, i, param=""): app = config.Apps[i] if app.name == "pixelcanvas": newapp = PixelCanvas() else: newapp = App(app.name, app.cmd, param, self.cv, is_persistent=app.persistent, is_white=app.white, path=app.path) newapp.datasource.addListener(self.cv) newapp.start() if app.persistent: self.persistent_apps[self.currentApp] = newapp return newapp def updateApp(self): try: if self.app is not None and not self.app.is_persistent: self.app.stop() self.currentApp = self.requestedApp if self.currentApp in self.persistent_apps.keys() and self.persistent_apps[self.currentApp].alive(): self.app = self.persistent_apps[self.currentApp] else: self.app = self.startApp(self.requestedApp, self.param) except FileNotFoundError as e: print(e) def run(self): logging.info("Starting Apprunner") while running: with self.cv: if self.app is None or not self.app.alive(): if self.app is not None: self.last_crashlog = self.app.getLog() self.requestedApp = 0 if self.requestedApp is not None: self.updateApp() self.requestedApp = None if self.app is not None: frame = self.app.datasource.getData().clone() # logging.debug("Runner in time: " + str(time.time() - frame.created)) for _, f in self.filters.items(): frame.buffer = f(frame.buffer) # logging.debug("Runner out time: " + str(time.time() - frame.created)) self.datasource.pushData(frame) self.cv.wait() self.serial.join() if config.UseGui: self.gui.join() logging.info("Close Apprunner") def getLog(self): if self.app is None: return "" return self.app.getLog() def setGamma(self, r, g, b, w): self.serial.setGamma(r, g, b, w) def setFilter(self, name, filter_): self.filters[name] = filter_ def removeFilter(self, name): if name in self.filters.keys(): del self.filters[name] ######################################################################## # Web Api # ######################################################################## @bottle.route('/<:re:.*>', method='OPTIONS') def enable_cors_generic_route(): add_cors_headers() @bottle.hook('after_request') def enable_cors_after_request_hook(): add_cors_headers() def add_cors_headers(): bottle.response.headers['Access-Control-Allow-Origin'] = '*' bottle.response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS' bottle.response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' def startApp(name, param=""): for i, app in enumerate(config.Apps): if app.name == name: runner.requestApp(i, param) return "ok" return "not_found" @bottle.route("/apps/list") def apps_list(): return json.dumps([ { "name": app.name, "guiname": app.guiname, "persistent": app.persistent, } for app in config.Apps ]) @bottle.route("/apps/start/") def apps_start_param(name): return startApp(name) @bottle.post("/apps/start/") def apps_start_post(name): param = bottle.request.forms.get('param') return startApp(name, param) @bottle.route("/apps/start//") def apps_start(name, param): return startApp(name, param) @bottle.route("/apps/log") def apps_log(): return runner.getLog() @bottle.route("/apps/crashlog") def apps_log(): return runner.last_crashlog @bottle.route("/apps/running") def apps_running(): return config.Apps[runner.currentApp].name @bottle.route("/frame") def frame(): data = runner.datasource.getData() return {"data": data.buffer.flatten().tolist(), "channels": data.channels} @bottle.route("/pixel//////") def pixel(x, y, r, g, b, w): if runner.app.name != "pixelcanvas": startApp("pixelcanvas") data = runner.app.datasource.getData().clone() data.created = time.time() data.buffer[y][x][0] = r data.buffer[y][x][1] = g data.buffer[y][x][2] = b if data.channels == 4: data.buffer[y][x][3] = w runner.app.datasource.pushData(data) return "ok" @bottle.get("/frame_ws", apply=[bottle_ws.websocket]) def frame_ws(ws: geventwebsocket.websocket.WebSocket): while not ws.closed: msg = json.loads(ws.receive()) if msg["ty"] == "frame": data = runner.datasource.getData() if msg.get("time", None) == str(data.created): ws.send(json.dumps({ "type": "frame_unchanged", }, separators=(',', ':'))) else: ws.send(json.dumps({ "ty": "frame", "data": data.buffer.flatten().tolist(), "channels": data.channels, "time": str(data.created), }, separators=(',', ':'))) elif msg["ty"] == "pixel": if runner.app.name != "pixelcanvas": startApp("pixelcanvas") x = msg["x"] y = msg["y"] data = runner.app.datasource.getData().clone() data.created = time.time() data.buffer[y, x, 0] = msg["r"] data.buffer[y, x, 1] = msg["g"] data.buffer[y, x, 2] = msg["b"] if data.channels == 4 and "w" in msg: data.buffer[y, x, 3] = msg["w"] runner.app.datasource.pushData(data) elif msg["ty"] == "fill": if runner.app.name != "pixelcanvas": startApp("pixelcanvas") data = runner.app.datasource.getData().clone() data.created = time.time() data.buffer[..., 0] = msg["r"] data.buffer[..., 1] = msg["g"] data.buffer[..., 2] = msg["b"] if data.channels == 4 and "w" in msg: data.buffer[..., 3] = msg["w"] runner.app.datasource.pushData(data) @bottle.route("/save_frame") def save_frame(): screenshot_dir = Path("./screenshots") screenshot_dir.mkdir(parents=True, exist_ok=True) data = runner.datasource.getData() PIL.Image.fromarray(data.buffer.copy().astype("u1")).save(screenshot_dir.joinpath(f"frame-{datetime.datetime.now().astimezone().replace(tzinfo=None).isoformat()}.png")) return "ok" @bottle.post("/load_frame") def load_frame(): src = bottle.request.json["src"] buf = np.array(PIL.Image.open(io.BytesIO(urllib.request.urlopen(src).read())), dtype=np.uint8) if buf.shape == (80, 40, 3) or buf.shape == (80, 40, 4): buf = buf.swapaxes(0, 1) elif buf.shape == (40, 80, 3) or buf.shape == (40, 80, 4): pass else: return 415 if runner.app.name != "pixelcanvas": startApp("pixelcanvas") for _ in range(200): if runner.requestedApp is None: break time.sleep(0.01) else: return 500 runner.app.datasource.pushData(Frame(buf, buf.shape[2])) return "ok" @bottle.route("/gallery_frames") def gallery_frames(): screenshot_dir = Path("./screenshots") if not screenshot_dir.exists(): return [] frames = [] for p in screenshot_dir.glob("frame-*.png"): try: dt = datetime.datetime.fromisoformat(p.stem.removeprefix("frame-")).astimezone().astimezone(datetime.timezone.utc) i = "data:image/png;base64," + base64.b64encode(p.read_bytes()).decode() frames.append({ "dt": dt.isoformat(), "caption": dt.strftime("%d.%m.%Y %H:%M"), "src": i, }) except: continue frames.sort(key=lambda f: f["dt"], reverse=True) return json.dumps(frames) @bottle.route("/") def index(): return bottle.static_file("index.html", root='html') @bottle.route("/setgamma////") def setGamma(r, g, b, w): runner.setGamma(r, g, b, w) return "ok" @bottle.route("/setbrightness/") def setIntensity(i): if not 0 <= i <= 1: return "bad_value" runner.setFilter("0_intensity", filters.MakeBrightnessFilter(i)) return "ok" @bottle.route("/filter/flipx/") def flipx(do): if do == "true": runner.setFilter("1_flipx", filters.FlipXFilter) else: runner.removeFilter("1_flipx") return "ok" @bottle.route("/filter/flipy/") def flipy(do): if do == "true": runner.setFilter("1_flipy", filters.FlipYFilter) else: runner.removeFilter("1_flipy") return "ok" @bottle.route("/filter/img/") def setfilter(name): if name == "none": runner.removeFilter("3_imgfilter") else: runner.setFilter("3_imgfilter", filters.MakeBrightnessImageFilter(name)) return "ok" @bottle.post("/filter/expr/") def filter_expr(): expr = bottle.request.forms.get('expr') if expr == "" or expr == "none": runner.removeFilter("5_brightnessfunction") else: runner.setFilter("5_brightnessfunction", filters.MakeBrightnessExprFilter(expr)) return "ok" # generic rule must be the last @bottle.route("/") def serve_static(filepath): return bottle.static_file(filepath, root='html') def main(): global running, runner ######################################################################## # Startup # ######################################################################## running = True runner = AppRunner() runner.start() # runner.setFilter("5_crazy", MakeBrightnessExprFilter("0.5+0.25*sin(x/3)/x")) bottle.run(host=config.WebHost, port=config.WebPort, server=GeventWebSocketServer) ######################################################################## # Shutdown # ######################################################################## running = False runner.join() if __name__ == '__main__': main()