Phalos-Typ-M-ZZA/run.py

178 lines
5.6 KiB
Python

from deutschebahn import DBInfoscreen
from pyfis.krone import Krone8200Display
from pyfis.krone.exceptions import CommunicationError
from pyfis.utils import get_vias
from util import timeout, TimeoutError
import hashlib
import json
import time
import traceback
from _confsel import CONFIG
if CONFIG == "PHALOS-TYP-M-ZZA":
from settings_phalos import *
def route_from(route, station):
start_found = False
new_route = []
for stop in route:
if not start_found:
start_found = stop == station
if start_found:
new_route.append(stop)
return new_route
@timeout(30)
def get_trains(dbi, station):
return dbi.get_trains(station)
def update_display(display, dbi):
try:
trains = dbi.calc_real_times(get_trains(dbi, STATION_CODE))
trains.sort(key=dbi.time_sort)
except:
traceback.print_exc()
display.info_1.set(OUT_OF_ORDER_TEXT)
print(display.render_ascii())
display.update()
return display.render_ascii()
if not trains:
display.info_1.set(NO_TRAINS_TEXT)
print(display.render_ascii())
display.update()
return display.render_ascii()
train = trains[0]
parts = train['train'].split()
train_type = parts[0]
if train_type == "RE":
display.info_2.set(NO_STOP_EVERYWHERE_TEXT)
is_arrival = train['destination'] == STATION_NAME
if train['platform'] != PLATFORM:
platform = int("".join(c for c in train['platform'] if c.isdigit()))
display.info_2.set(PLATFORM_CHANGE_FORMAT.format(platform))
if train['scheduledDeparture']:
hour, minute = map(int, train['scheduledDeparture'].split(":"))
display.hour.set(DEPARTURE_HOUR_FORMAT.format(hour))
display.minute.set(DEPARTURE_MIN_FORMAT.format(minute))
delay = dbi.round_delay(train['delayDeparture']) if train['delayDeparture'] else 0
delay_raw = train['delayDeparture'] or 0
elif train['scheduledArrival']:
hour, minute = map(int, train['scheduledArrival'].split(":"))
display.hour.set(ARRIVAL_HOUR_FORMAT.format(hour))
display.minute.set(ARRIVAL_MIN_FORMAT.format(minute))
delay = dbi.round_delay(train['delayArrival']) if train['delayArrival'] else 0
delay_raw = train['delayArrival'] or 0
if train['isCancelled']:
display.info_1.set(CANCELLED_TEXT)
else:
delay_text = ""
if CONFIG == "PHALOS-TYP-M-ZZA":
if 5 <= delay <= 20:
delay_text = DELAY_FORMAT.format(delay)
elif 20 < delay <= 30:
delay_text = DELAY_FORMAT.format(30)
elif 30 < delay <= 45:
delay_text = DELAY_FORMAT.format(45)
elif 45 < delay <= 60:
delay_text = DELAY_ABOVE_FORMAT.format(45)
elif delay > 60:
delay_text = UNSPECIFIC_DELAY_TEXT
display.info_1.set(delay_text)
route = route_from([VIA_MAP.get(stop['name'], stop['name']) for stop in train['route']], STATION_NAME)[:-1]
via_combination = get_vias(route, VIA_WEIGHTS, VIA_1, VIA_2, check_dashes=False, debug=True)
if via_combination:
via_code_1, via_code_2 = via_combination
display.via_1.set(MAP_VIA_1[via_code_1])
display.via_2.set(MAP_VIA_2[via_code_2])
if train['destination'] == STATION_NAME:
display.info_2.set(DO_NOT_BOARD_TEXT)
display.destination.set("von " + DESTINATION_MAP.get(train['route'][0]['name'], train['route'][0]['name']).replace("ß", "ss"))
else:
display.destination.set(DESTINATION_MAP.get(train['destination'], train['destination']).replace("ß", "ss"))
print(display.render_ascii())
return display.render_ascii()
def main():
ctrl = Krone8200Display(SERIAL_PORT, address=(0x41, 0x49), debug=True)
ctrl.port.timeout=5.0
dbi = DBInfoscreen(DBI_HOST)
display = TypMZZA(ctrl)
display.clear()
last_update = 0
last_heartbeat = 0
while True:
now = time.time()
if now - last_heartbeat >= 10:
print("\n" + "=" * 60 + "\n")
print("Sending heartbeat")
ctrl.send_end_comm()
last_heartbeat = now
continue
if now - last_update < 60:
time.sleep(1)
continue
print("\n" + "=" * 60 + "\n")
try:
display_id = update_display(display, dbi)
try:
with open("/tmp/m-zza-status.json", 'r') as f:
status = json.load(f)
except (FileNotFoundError, ValueError):
status = {'displayHash': None}
display_hash = hashlib.sha256(bytes(display_id, 'utf-8')).hexdigest()
update = True
if status['displayHash'] == display_hash:
print("No need to update display.")
update = False
else:
status['displayHash'] = display_hash
if update:
num_tries = 10
for i in range(num_tries):
try:
display.update()
break
except CommunicationError:
print("Communication error! Retrying... {}/{}".format(i+1, num_tries))
time.sleep(5)
with open("/tmp/m-zza-status.json", 'w') as f:
json.dump(status, f)
except:
traceback.print_exc()
last_update = now
display.update()
if __name__ == "__main__":
main()