Commit 654edb56 authored by alyx's avatar alyx
Browse files

Add kronk.py

parent b47c6c16
Loading
Loading
Loading
Loading

kronk.py

0 → 100644
+243 −0
Original line number Diff line number Diff line
import morbital
import asyncio
import time
import threading
import requests
import json
import datetime
from flask import Flask, Response

# initialize all the variables that can be read/written from anywhere
global space_status, panel_status, last_interaction_time, selector, ppm, temperature, last_sensor_poll, last_space_status_change, clock_force_draw

space_status = "CLOSED"
panel_status = "init"
last_interaction_time = int(time.time())
selector = 0
temperature = 0
ppm = 0
last_sensor_poll = int(time.time())
last_space_status_change = time.time()
clock_force_draw = 0

# flask init bullshit
app = Flask(__name__)


# flask route for status reading
@app.route('/status')
def get_status():
    global space_status, last_space_status_change
    if (space_status == "OPEN"):
        open = True
    else:
        open = False
    apistatus = {"open": open, "lastchange": last_space_status_change}
    return Response(json.dumps(apistatus), mimetype='text/json')


# flask route for remotely setting display state for if someone like, forgets to click out or whatever
@app.route('/remote/<state>', methods=['POST'])
def remote_close(state):
    valid_states = ['INVISIBLE', 'OPEN', 'CLOSED']
    global space_status, panel_status, last_space_status_change, clock_force_draw
    if (state in valid_states):
        space_status = state
        last_space_status_change = time.time()
        clock_force_draw = 1
        return Response("ok", mimetype='text/json')
    else:
        abort(400, message="Invalid state")


def start_flask_server(host='0.0.0.0', port=8080):
    def run_flask():
        app.run(host=host, port=port, debug=False, use_reloader=False)

    flask_thread = threading.Thread(target=run_flask)
    flask_thread.daemon = True
    flask_thread.start()


def airquality_update(panel, ppm):
    # if PPM CO2 is <1000, green light for bottom LED
    if ppm < 1000:
        panel.set_device_led(2, 1)
    # if PPM CO2 is between 1000 and 2000, yellow light
    if ppm > 999 and ppm < 2001:
        panel.set_device_led(2, 3)
    # if PPM CO2 is over 2000, red light
    if ppm > 2000:
        panel.set_device_led(2, 2)


def write_line(panel, line, text):
    panel.set_cursor_position(1, line)
    panel.write_text(text)
    panel.reset_cursor_position()


def full_draw(panel, clear=1):
    global temperature, ppm
    if (clear == 1):
        panel.clear_display()
    panel.write_line(1, f"# /dev/hack is {space_status}")
    panel.write_line(2, f"Temp: {temperature}F, {ppm} PPM CO2")
    set_space_status_led(panel)
    airquality_update(panel, ppm)
    partial_draw_clock(panel)


def set_space_status_led(panel):
    global space_status
    if (space_status == "OPEN"):
        panel.set_device_led(0, 1)
    if (space_status == "INVISIBLE"):
        panel.set_device_led(0, 3)
    if (space_status == "CLOSED"):
        panel.set_device_led(0, 2)


def partial_draw_clock(panel):
    global clock_force_draw
    now = datetime.datetime.now()
    time = now.strftime("%Y-%m-%d %H:%M:%S")
    panel.write_line(7, f"{time}")
    # a force draw variable, in case someone does a remote close,
    # since partial clock draw is run regularly we will do that here
    if (clock_force_draw == 1):
        clock_force_draw = 0
        full_draw(panel)


def draw_space_update_menu(panel, option):
    full_draw(panel, 0)
    panel.write_line(8, f"                        ")
    panel.write_line(8, f"SET SPACE TO {option}?")


def wakeup(panel):
    print("WAKING UP")
    panel.display_backlight_on(0)  # Full brightness
    panel.set_keypad_backlight_brightness(255)


def poll_sensors():
    print("POLLING SENSORS")
    global temperature, ppm, last_sensor_poll
    # awful gross disgusting error handling
    try:
        headers = {'User-Agent': 'devhack kronk/python requests'}
        url = 'https://devhack.net/spaceapi.json'
        sensors = requests.get(url, headers=headers).json()
        temperature = sensors["sensors"]["temperature"][0]["value"]
    except:
        temperature = 0
    last_sensor_poll = int(time.time())


async def main():
    # initialization tasks

    # start flask
    start_flask_server()
    # initialize serial connection
    panel = morbital.MatrixOrbitalPanel("/dev/ttyUSB1")
    await panel.connect()

    await panel.reset_display()
    # clear display, reset cursor, set backlighs on
    panel.clear_display()
    panel.reset_cursor()
    panel.display_backlight_on(0)  # Full brightness
    panel.set_keypad_backlight_brightness(255)

    # start LEDs at RED (space status), OFF (undefined), OFF (air quality)
    panel.set_device_led(0, 2)
    panel.set_device_led(1, 0)
    panel.set_device_led(2, 0)

    # run a panel draw and update the sensors
    full_draw(panel)
    poll_sensors()

    # button press handler
    def on_button_press(char):
        options = ["OPEN", "CLOSED", "INVISIBLE"]
        global selector, space_status, panel_status, last_interaction_time, last_space_status_change
        print(f"Button pressed: {char}")
        print(panel_status)
        # wake up backlights on button press if not already awake
        if (panel_status != "awake" and panel_status != "in_menu"):
            wakeup(panel)
        # if a button is pressed, update the last interaction time
        last_interaction_time = int(time.time())
        # if right or up are pressed, increment through the list
        if char == "B" or char == "C":
            panel_status = "in_menu"
            selector = selector + 1
            # loop back around
            if (selector > 2):
                selector = 0
            draw_space_update_menu(panel, options[selector])
        # if left or down are pressed, decrement through the list
        if char == "H" or char == "D":
            panel_status = "in_menu"
            selector = selector - 1
            # loop back around
            if (selector < 0):
                selector = 2
            draw_space_update_menu(panel, options[selector])
        # if we're in a menu, and the enter key is pressed, set that
        if char == "E" and panel_status == "in_menu":
            # update the space status
            space_status = options[selector]
            print(f"STATUS CHANGED to {options[selector]}")
            # poke the last space status change
            last_space_status_change = time.time()
            # awake, but no longer in menu
            panel_status = "awake"
            # redraw panel
            full_draw(panel)

    def screen_update():
        while True:
            global temperature, ppm, panel_status, last_interaction_time
            # if it's been two minutes since the last sensor poll, do it again
            if (int(time.time()) - last_sensor_poll > 120):
                poll_sensors()
            # do not run a full draw every second if it's in a menu
            if (panel_status == "in_menu"):
                partial_draw_clock(panel)
            else:
                full_draw(panel, 0)
            time.sleep(1)

    def sleep_timeout():
        while True:
            global panel_status, last_interaction_time
            if panel_status != "asleep":
                print("BACKLIGHT TIMEOUT COUNTER:" + str(int(time.time() - last_interaction_time)))
                if (int(time.time()) - last_interaction_time > 3):
                    panel.keypad_backlight_off()
                    panel.display_backlight_off()
                    full_draw(panel)
                    print("SLEEPING BACKLIGHT")
                    panel_status = "asleep"
            time.sleep(1)

    threading.Thread(target=sleep_timeout).start()
    threading.Thread(target=screen_update).start()

    panel.add_button_callback(on_button_press)

    try:
        await panel.listen_for_buttons()
    except KeyboardInterrupt:
        print("stopping...")
    finally:
        panel.close()


if __name__ == "__main__":
    asyncio.run(main())