navpoint/navpoint/usb.py
2024-07-28 15:54:02 -07:00

127 lines
3.9 KiB
Python

import threading
import queue
import tkinter as tk
from tkinter import ttk
from tkinter import messagebox
from serial import Serial
from pynmeagps import NMEAReader
import serial.tools.list_ports
import serial.serialutil
import navpoint.content
import navpoint.fix_path
import navpoint.intercom
def main(port, intercom):
try:
with Serial(port, timeout=3) as stream:
reader = NMEAReader(stream)
intercom.put(
"Connected",
port,
)
while True:
_, parsed_data = reader.read()
try:
lat = parsed_data.lat
lon = parsed_data.lon
time = parsed_data.time
date = parsed_data.date
if lat or lon: # ignore 0, 0 fixes
navpoint.content.content = f"""<?xml version='1.0' encoding='us-ascii'?><kml xmlns="http://www.opengis.net/kml/2.2" xmlns:gx="http://www.google.com/kml/ext/2.2"><Document><Style id="icon"><IconStyle><Icon><href>http://127.0.0.1:8888/marker.png</href></Icon></IconStyle></Style><name>Navpoint</name><open>1</open><Placemark><name>Position</name><styleUrl>#icon</styleUrl><Point><coordinates>{lon},{lat},0</coordinates></Point><description>{date} {time} UTC</description></Placemark></Document></kml>"""
intercom.put(
"LocationUpdate",
{
"lat": lat,
"lon": lon,
"time": f"{date} {time} UTC",
},
)
except AttributeError:
pass
except serial.serialutil.SerialException as e:
intercom.put(
"Disconnected",
port,
)
messagebox.showerror(
"Navpoint Error", f"Error reading from port {port}: {str(e)}"
)
def run(port, intercom):
threading.Thread(target=main, args=(port, intercom), daemon=True).start()
def clear(queue_to_clear):
try:
while True:
queue_to_clear.get(block=False)
except queue.Empty:
pass
def update_dropdown(dropdown):
dropdown["values"] = ["[Select port]"] + [
f"{port.device}: {port.description}"
for port in serial.tools.list_ports.comports()
]
dropdown.current(newindex=0)
def run_ui():
def selected(event):
selection = dropdown.get().split(": ", 1)[0]
if not selection.startswith("["):
intercom.clear()
run(selection, intercom)
def new_data(purpose, content):
if purpose == "LocationUpdate":
updated.config(text="Last updated: " + content["time"])
elif purpose == "Connected":
status.config(text="Connected to " + dropdown.get())
dropdown.pack_forget()
refresh.pack_forget()
elif purpose == "Disconnected":
status.config(text="Not connected")
update_dropdown(dropdown)
dropdown.pack()
refresh.pack()
def refresh_clicked():
update_dropdown(dropdown)
window = tk.Tk(className="navpoint")
window.title("Navpoint")
window.resizable(width=False, height=False)
status = ttk.Label(window, text="Not connected")
status.pack()
updated = ttk.Label(window, text="Waiting for location")
updated.pack()
n = tk.StringVar()
dropdown = ttk.Combobox(window, state="readonly", textvariable=n, width=40)
update_dropdown(dropdown)
dropdown.pack()
refresh = ttk.Button(
window, text="Refresh device list", command=refresh_clicked
)
refresh.pack()
dropdown.bind("<<ComboboxSelected>>", selected)
intercom = navpoint.intercom.Intercom(window, new_data)
try:
window.iconbitmap(navpoint.fix_path.fix_path("icon.ico"))
except tk.TclError:
pass
window.mainloop()