Phalos-Typ-M-ZZA/run.py

197 lines
7.4 KiB
Python

from deutschebahn import DBInfoscreen
from pyfis.xatlabs import xatLabsCheetah
from pyfis.utils import get_vias
from util import timeout, TimeoutError
import hashlib
import json
import random
import re
import time
import traceback
from _confsel import CONFIG
if CONFIG == "PHALOS-TYP-M-ZZA":
from settings_phalos import *
def route_from(route, station):
start_found = False
new_route = []
for stop in route:
if not start_found:
start_found = stop.startswith(station)
if start_found:
new_route.append(stop)
return new_route
@timeout(30)
def get_trains(dbi, station):
return dbi.get_trains(station)
def filter_route(route):
# Filter all cancelled stops from a route
return [stop for stop in route if not stop.get('isCancelled')]
def update_display(display, dbi):
display.clear()
try:
trains = dbi.calc_real_times(get_trains(dbi, STATION_CODE))
trains.sort(key=dbi.time_sort)
if PLATFORMS_INCLUDE:
trains = list(filter(lambda t: t['platform'] in PLATFORMS_INCLUDE, trains))
elif PLATFORMS_EXCLUDE:
trains = list(filter(lambda t: t['platform'] not in PLATFORMS_EXCLUDE, trains))
# Remove arrivals
trains = list(filter(lambda t: not any(filter(lambda arrival_station_name: filter_route(t['route'])[-1]['name'].startswith(arrival_station_name), ARRIVAL_STATIONS)), trains))
except:
traceback.print_exc()
display.info_1.set(OUT_OF_ORDER_TEXT)
print(display.render_ascii())
display.update()
return display.render_ascii()
if not trains:
display.info_1.set(NO_TRAINS_TEXT)
print(display.render_ascii())
display.update()
return display.render_ascii()
train = trains[0]
parts = train['train'].split()
train_type = parts[0]
operator = train_type
if train_type == "Bus":
display.info_2.set(BUS_REPLACEMENT_TEXT)
if not parts[1][0].isdigit():
# Train number starts with a letter
train_type = parts[0] = parts[1].rstrip("0123456789")
parts[1] = parts[1][len(parts[0]):]
is_arrival = any(filter(lambda arrival_station_name: filter_route(train['route'])[-1]['name'].startswith(arrival_station_name), ARRIVAL_STATIONS))
# Check for multiple trains coupled together with different destinations
multi_trains = [t for t in trains[1:] if t.get('scheduledDeparture') == train.get('scheduledDeparture') and t.get('platform') == train.get('platform') and filter_route(t['route'])[-1]['name'] != filter_route(train['route'])[-1]['name']]
is_multi_train = len(multi_trains) > 0
#if train['platform'] != PLATFORM:
# platform = int("".join(c for c in train['platform'] if c.isdigit()))
# display.info_2.set(PLATFORM_CHANGE_FORMAT.format(platform))
if train['scheduledDeparture']:
hour, minute = map(int, train['scheduledDeparture'].split(":"))
display.hour.set(DEPARTURE_HOUR_FORMAT.format(hour))
display.minute.set(DEPARTURE_MIN_FORMAT.format(minute))
delay = dbi.round_delay(train['delayDeparture']) if train['delayDeparture'] else 0
delay_raw = train['delayDeparture'] or 0
elif train['scheduledArrival']:
hour, minute = map(int, train['scheduledArrival'].split(":"))
display.hour.set(ARRIVAL_HOUR_FORMAT.format(hour))
display.minute.set(ARRIVAL_MIN_FORMAT.format(minute))
delay = dbi.round_delay(train['delayArrival']) if train['delayArrival'] else 0
delay_raw = train['delayArrival'] or 0
if is_arrival:
display.info_2.set(DO_NOT_BOARD_TEXT)
if operator == "BRB":
# Special case for BRB because it's on info_2
display.info_2.set("BRB Bayerische Regiobahn")
else:
train_type_text = TRAIN_TYPE_MAP.get(operator, "")
display.info_1.set(train_type_text)
if train_type == "RE" and any([stop.get('isCancelled') for stop in train['route']]):
# If we have an RE train and any stop on the way is cancelled
display.info_1.set("RE hält nicht überall")
if train_type in ("BRB", "RB", "ALX", "WB") and any([stop.get('isCancelled') for stop in train['route']]):
# If we have an RB train and any stop on the way is cancelled
display.info_1.set("RB hält nicht überall")
if train['isCancelled']:
display.info_2.set(CANCELLED_TEXT)
elif delay >= 5:
delay_text = ""
if CONFIG == "PHALOS-TYP-M-ZZA":
if 5 <= delay <= 20:
delay_text = DELAY_FORMAT.format(delay)
elif 20 < delay <= 30:
delay_text = DELAY_FORMAT.format(30)
elif 30 < delay <= 45:
delay_text = DELAY_FORMAT.format(45)
elif 45 < delay <= 60:
delay_text = DELAY_ABOVE_FORMAT.format(45)
elif delay > 60:
delay_text = UNSPECIFIC_DELAY_TEXT
display.info_2.set(delay_text)
elif train_type == "S":
# If no delay: random train length indicator for S-Bahn
random.seed(train['trainNumber'])
train_length_pos = random.randint(36, 45)
display.info_2.set(MAP_INFO_2[train_length_pos])
elif train_type == "ICE":
# If no delay: random train length indicator for ICE
random.seed(train['trainNumber'])
ice_coaches = random.choice([5, 6, 7, 18, 19, 20, 21, 22, 23, 24, 25, 31, 32, 34])
display.info_2.set(MAP_INFO_2[ice_coaches])
elif is_multi_train:
display.info_2.set("2 Züge im Gleis Zugbeschriftung beachten")
route = route_from([VIA_MAP.get(stop['name'], stop['name']) for stop in [s for s in train['route'] if not s.get('isCancelled')]], STATION_NAME)[:-1]
via_combination = get_vias(route, VIA_WEIGHTS, VIA_1, VIA_2, check_dashes=False, debug=True)
if via_combination:
via_code_1, via_code_2 = via_combination
display.via_1.set(MAP_VIA_1[via_code_1])
display.via_2.set(MAP_VIA_2[via_code_2])
if is_arrival:
text = "von " + ARRIVAL_MAP.get(train['route'][0]['name'], train['route'][0]['name']).replace("ß", "ss")
text = text.rstrip("Hbf").strip()
display.destination.set(text)
else:
# Check if the last stop matches the destination.
# For some international trains, the route stops at the border,
# but the destination is correct.
# This is different from cancelled stops, which do show up in the route,
# but with the "isCancelled" flag set."
if train['route'][-1] != train['destination']:
# Incomplete route, show actual destination
dest = train['destination']
else:
# Normal route
dest = filter_route(train['route'])[-1]['name']
dest = re.sub(r"\(.*\)$", "", dest).strip() # Filter out suffixes in parentheses
text = DESTINATION_MAP.get(dest, dest).replace("ß", "ss")
if train_type == "S":
text = "".join(train['train'].split()) + " " + text
display.destination.set(text)
print(display.render_ascii())
return display.render_ascii()
def main():
ctrl = xatLabsCheetah(CHEETAH_HOST)
display = ctrl.get_splitflap_display()
dbi = DBInfoscreen(DBI_HOST)
display.clear()
try:
display_id = update_display(display, dbi)
display.update()
except:
traceback.print_exc()
if __name__ == "__main__":
main()