Commit f1748cf4 authored by alyx's avatar alyx
Browse files

Merge pull request 'refactor' (#1) from nepeat/kronk:refactor into main

parents c0fb827e 8dab3289
Loading
Loading
Loading
Loading

.envrc

0 → 100644
+1 −0
Original line number Diff line number Diff line
layout python3
+2 −0
Original line number Diff line number Diff line
@@ -241,3 +241,5 @@ cython_debug/
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

morbital.py
.direnv/
 No newline at end of file
+10 −0
Original line number Diff line number Diff line
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
  # Ruff version.
  rev: v0.11.5
  hooks:
    # Run the linter.
    - id: ruff
      args: [ --fix ]
    # Run the formatter.
    - id: ruff-format
 No newline at end of file

Makefile

0 → 100644
+3 −0
Original line number Diff line number Diff line
requirements:
	wget -O morbital.py https://git.en0.io/alyx/morbital/-/raw/main/morbital.py
	pip install -r requirements.txt
 No newline at end of file
+211 −213
Original line number Diff line number Diff line
@@ -5,239 +5,237 @@ import threading
import requests
import json
import datetime
from flask import Flask, Response
from flask import Flask, Response, abort

# initialize all the variables that can be read/written from anywhere
global space_status, panel_status, last_interaction_time, selector, ppm, temperature, last_sensor_poll, last_space_status_change, clock_force_draw

space_status = "CLOSED"
panel_status = "init"
last_interaction_time = int(time.time())
selector = 0
temperature = 0
ppm = 0
last_sensor_poll = int(time.time())
last_space_status_change = time.time()
clock_force_draw = 0
class Kronk:
    def __init__(self):
        self.ppm = 0
        self.panel_status = "init"
        self.panel = morbital.MatrixOrbitalPanel("/dev/ttyUSB1")
        self.space_status = "CLOSED"
        self.last_space_status_change = time.time()

# flask init bullshit
app = Flask(__name__)
        self.last_interaction_time = int(time.time())
        self.selector = 0
        self.temperature = 0
        self.last_sensor_poll = int(time.time())
        self.clock_force_draw = 0

        self.tasks = []
        self.running = True

# flask route for status reading
@app.route('/status')
def get_status():
    global space_status, last_space_status_change
    if (space_status == "OPEN"):
        open = True
    else:
        open = False
    apistatus = {"open": open, "lastchange": last_space_status_change}
    return Response(json.dumps(apistatus), mimetype='application/json')


# flask route for remotely setting display state for if someone like, forgets to click out or whatever
@app.route('/remote/<state>', methods=['POST'])
def remote_close(state):
    valid_states = ['INVISIBLE', 'OPEN', 'CLOSED']
    global space_status, panel_status, last_space_status_change, clock_force_draw
    if (state in valid_states):
        space_status = state
        last_space_status_change = time.time()
        clock_force_draw = 1
        return Response("ok", mimetype='application/json')
    else:
        abort(400, message="Invalid state")


def start_flask_server(host='0.0.0.0', port=8080):
    def run_flask():
        app.run(host=host, port=port, debug=False, use_reloader=False)

    flask_thread = threading.Thread(target=run_flask)
    flask_thread.daemon = True
    flask_thread.start()


def airquality_update(panel, ppm):
    def airquality_update(self):
        # if PPM CO2 is <1000, green light for bottom LED
    if ppm < 1000:
        panel.set_device_led(2, 1)
        if self.ppm < 1000:
            self.panel.set_device_led(2, 1)
        # if PPM CO2 is between 1000 and 2000, yellow light
    if ppm > 999 and ppm < 2001:
        panel.set_device_led(2, 3)
        if self.ppm > 999 and self.ppm < 2001:
            self.panel.set_device_led(2, 3)
        # if PPM CO2 is over 2000, red light
    if ppm > 2000:
        panel.set_device_led(2, 2)


def write_line(panel, line, text):
    panel.set_cursor_position(1, line)
    panel.write_text(text)
    panel.reset_cursor_position()
        if self.ppm > 2000:
            self.panel.set_device_led(2, 2)

    def write_line(self, line, text):
        self.panel.set_cursor_position(1, line)
        self.panel.write_text(text)
        self.panel.reset_cursor_position()

def full_draw(panel, clear=1):
    global temperature, ppm
    if (clear == 1):
        panel.clear_display()
    panel.write_line(1, f"# /dev/hack is {space_status}")
    panel.write_line(2, f"Temp: {temperature}F, {ppm} PPM CO2")
    set_space_status_led(panel)
    airquality_update(panel, ppm)
    partial_draw_clock(panel)


def set_space_status_led(panel):
    global space_status
    if (space_status == "OPEN"):
        panel.set_device_led(0, 1)
    if (space_status == "INVISIBLE"):
        panel.set_device_led(0, 3)
    if (space_status == "CLOSED"):
        panel.set_device_led(0, 2)
    def poll_sensors(self):
        print("POLLING SENSORS")

        # awful gross disgusting error handling
        try:
            headers = {"User-Agent": "devhack kronk/python requests"}
            url = "https://devhack.net/spaceapi.json"
            sensors = requests.get(url, headers=headers).json()
            self.temperature = sensors["sensors"]["temperature"][0]["value"]
        except (IndexError, requests.exceptions.RequestException) as e:
            print(f"error polling: {e}")
            self.temperature = 0
        self.last_sensor_poll = int(time.time())

    def draw_space_update_menu(self, option):
        self.full_draw(0)
        self.panel.write_line(8, "                        ")
        self.panel.write_line(8, f"SET SPACE TO {option}?")

    def set_space_status_led(self):
        if self.space_status == "OPEN":
            self.panel.set_device_led(0, 1)
        if self.space_status == "INVISIBLE":
            self.panel.set_device_led(0, 3)
        if self.space_status == "CLOSED":
            self.panel.set_device_led(0, 2)

    def wakeup(self):
        print("WAKING UP")
        self.panel.display_backlight_on(0)  # Full brightness
        self.panel.set_keypad_backlight_brightness(255)

def partial_draw_clock(panel):
    global clock_force_draw
    def partial_draw_clock(self):
        now = datetime.datetime.now()
        time = now.strftime("%Y-%m-%d %H:%M:%S")
    panel.write_line(7, f"{time}")
        self.panel.write_line(7, f"{time}")
        # a force draw variable, in case someone does a remote close,
        # since partial clock draw is run regularly we will do that here
    if (clock_force_draw == 1):
        clock_force_draw = 0
        full_draw(panel)


def draw_space_update_menu(panel, option):
    full_draw(panel, 0)
    panel.write_line(8, f"                        ")
    panel.write_line(8, f"SET SPACE TO {option}?")


def wakeup(panel):
    print("WAKING UP")
    panel.display_backlight_on(0)  # Full brightness
    panel.set_keypad_backlight_brightness(255)


def poll_sensors():
    print("POLLING SENSORS")
    global temperature, ppm, last_sensor_poll
    # awful gross disgusting error handling
    try:
        headers = {'User-Agent': 'devhack kronk/python requests'}
        url = 'https://devhack.net/spaceapi.json'
        sensors = requests.get(url, headers=headers).json()
        temperature = sensors["sensors"]["temperature"][0]["value"]
    except:
        temperature = 0
    last_sensor_poll = int(time.time())


async def main():
    # initialization tasks

    # start flask
    start_flask_server()
    # initialize serial connection
    panel = morbital.MatrixOrbitalPanel("/dev/ttyUSB1")
    await panel.connect()

    await panel.reset_display()
    # clear display, reset cursor, set backlighs on
    panel.clear_display()
    panel.reset_cursor()
    panel.display_backlight_on(0)  # Full brightness
    panel.set_keypad_backlight_brightness(255)

    # start LEDs at RED (space status), OFF (undefined), OFF (air quality)
    panel.set_device_led(0, 2)
    panel.set_device_led(1, 0)
    panel.set_device_led(2, 0)

    # run a panel draw and update the sensors
    full_draw(panel)
    poll_sensors()

    # button press handler
    def on_button_press(char):
        if self.clock_force_draw == 1:
            self.clock_force_draw = 0
            self.full_draw()

    def full_draw(self, clear=1):
        if clear == 1:
            self.panel.clear_display()
        self.panel.write_line(1, f"# /dev/hack is {self.space_status}")
        self.panel.write_line(2, f"Temp: {self.temperature}F, {self.ppm} PPM CO2")
        self.set_space_status_led()
        self.airquality_update()
        self.partial_draw_clock()

    def on_button_press(self, char):
        options = ["OPEN", "CLOSED", "INVISIBLE"]
        global selector, space_status, panel_status, last_interaction_time, last_space_status_change
        print(f"Button pressed: {char}")
        print(panel_status)
        print(self.panel_status)
        # wake up backlights on button press if not already awake
        if (panel_status != "awake" and panel_status != "in_menu"):
            wakeup(panel)
        if self.panel_status != "awake" and self.panel_status != "in_menu":
            self.wakeup()
        # if a button is pressed, update the last interaction time
        last_interaction_time = int(time.time())
        self.last_interaction_time = int(time.time())
        # if right or up are pressed, increment through the list
        if char == "B" or char == "C":
            panel_status = "in_menu"
            selector = selector + 1
            self.panel_status = "in_menu"
            self.selector = self.selector + 1
            # loop back around
            if (selector > 2):
                selector = 0
            draw_space_update_menu(panel, options[selector])
            if self.selector > 2:
                self.selector = 0
            self.draw_space_update_menu(options[self.selector])
        # if left or down are pressed, decrement through the list
        if char == "H" or char == "D":
            panel_status = "in_menu"
            selector = selector - 1
            self.panel_status = "in_menu"
            self.selector = self.selector - 1
            # loop back around
            if (selector < 0):
                selector = 2
            draw_space_update_menu(panel, options[selector])
            if self.selector < 0:
                self.selector = 2
            self.draw_space_update_menu(options[self.selector])
        # if we're in a menu, and the enter key is pressed, set that
        if char == "E" and panel_status == "in_menu":
        if char == "E" and self.panel_status == "in_menu":
            # update the space status
            space_status = options[selector]
            print(f"STATUS CHANGED to {options[selector]}")
            self.space_status = options[self.selector]
            print(f"STATUS CHANGED to {options[self.selector]}")
            # poke the last space status change
            last_space_status_change = time.time()
            self.last_space_status_change = time.time()
            # awake, but no longer in menu
            panel_status = "awake"
            self.panel_status = "awake"
            # redraw panel
            full_draw(panel)
            self.full_draw()

    def screen_update():
        while True:
            global temperature, ppm, panel_status, last_interaction_time
    async def screen_update(self):
        while self.running:
            # if it's been two minutes since the last sensor poll, do it again
            if (int(time.time()) - last_sensor_poll > 120):
                poll_sensors()
            if int(time.time()) - self.last_sensor_poll > 120:
                self.poll_sensors()
            # do not run a full draw every second if it's in a menu
            if (panel_status == "in_menu"):
                partial_draw_clock(panel)
            if self.panel_status == "in_menu":
                self.partial_draw_clock()
            else:
                full_draw(panel, 0)
            time.sleep(1)

    def sleep_timeout():
        while True:
            global panel_status, last_interaction_time
            if panel_status != "asleep":
                print("BACKLIGHT TIMEOUT COUNTER:" + str(int(time.time() - last_interaction_time)))
                if (int(time.time()) - last_interaction_time > 3):
                    panel.keypad_backlight_off()
                    panel.display_backlight_off()
                    full_draw(panel)
                self.full_draw(0)
            await asyncio.sleep(1)

    async def sleep_timeout(self):
        while self.running:
            if self.panel_status != "asleep":
                print(
                    "BACKLIGHT TIMEOUT COUNTER:"
                    + str(int(time.time() - self.last_interaction_time))
                )
                if int(time.time()) - self.last_interaction_time > 3:
                    self.panel.keypad_backlight_off()
                    self.panel.display_backlight_off()
                    self.full_draw()
                    print("SLEEPING BACKLIGHT")
                    panel_status = "asleep"
            time.sleep(1)
                    self.panel_status = "asleep"
            await asyncio.sleep(1)

    async def main(self):
        # start flask
        self.start_flask_server()

        # initialize serial connection
        await self.panel.connect()

        await self.panel.reset_display()
        # clear display, reset cursor, set backlighs on
        self.panel.clear_display()
        self.panel.reset_cursor()
        self.panel.display_backlight_on(0)  # Full brightness
        self.panel.set_keypad_backlight_brightness(255)

        # start LEDs at RED (space status), OFF (undefined), OFF (air quality)
        self.panel.set_device_led(0, 2)
        self.panel.set_device_led(1, 0)
        self.panel.set_device_led(2, 0)

        # run a panel draw and update the sensors
        self.full_draw()
        self.poll_sensors()

    threading.Thread(target=sleep_timeout).start()
    threading.Thread(target=screen_update).start()
        self.tasks.append(asyncio.create_task(self.sleep_timeout()))
        self.tasks.append(asyncio.create_task(self.screen_update()))

    panel.add_button_callback(on_button_press)
        self.panel.add_button_callback(self.on_button_press)

        try:
        await panel.listen_for_buttons()
            await self.panel.listen_for_buttons()
        except KeyboardInterrupt:
            print("stopping...")
            self.running = False
            await asyncio.gather(*self.tasks)  # wait for all tasks to complete
        finally:
        panel.close()
            self.panel.close()

    # flask route for status reading
    def get_status(self):
        if self.space_status == "OPEN":
            open = True
        else:
            open = False

        apistatus = {
            "open": open,
            "lastchange": self.last_space_status_change,
        }

        return Response(json.dumps(apistatus), mimetype="application/json")

    # flask route for remotely setting display state for if someone like, forgets to click out or whatever
    def remote_close(self, new_state: str):
        valid_states = ["INVISIBLE", "OPEN", "CLOSED"]

        if new_state in valid_states:
            self.space_status = new_state
            self.last_space_status_change = time.time()
            self.clock_force_draw = 1
            return Response("ok", mimetype="application/json")
        else:
            abort(400, message="Invalid state")

    def start_flask_server(self, host="0.0.0.0", port=8080):
        flask_app = Flask(__name__)

        # setup the routes
        flask_app.add_url_rule("/status", view_func=self.get_status, methods=["GET"])
        flask_app.add_url_rule(
            "/remote/<new_state>", view_func=self.remote_close, methods=["POST"]
        )

        def run_flask():
            flask_app.run(host=host, port=port, debug=False, use_reloader=False)

        flask_thread = threading.Thread(target=run_flask)
        flask_thread.daemon = True
        flask_thread.start()


if __name__ == "__main__":
    asyncio.run(main())
    kronk = Kronk()
    asyncio.run(kronk.main())
Loading