navpoint/navpoint/usb.py

127 lines
3.9 KiB
Python
Raw Permalink Normal View History

2024-07-22 15:09:15 -07:00
import threading
import queue
import tkinter as tk
from tkinter import ttk
from tkinter import messagebox
from serial import Serial
from pynmeagps import NMEAReader
import serial.tools.list_ports
import serial.serialutil
import navpoint.content
import navpoint.fix_path
2024-07-23 09:34:31 -07:00
import navpoint.intercom
2024-07-22 15:09:15 -07:00
2024-07-23 09:34:31 -07:00
def main(port, intercom):
2024-07-22 15:09:15 -07:00
try:
with Serial(port, timeout=3) as stream:
reader = NMEAReader(stream)
2024-07-23 09:34:31 -07:00
intercom.put(
"Connected",
port,
2024-07-22 15:09:15 -07:00
)
while True:
_, parsed_data = reader.read()
try:
lat = parsed_data.lat
lon = parsed_data.lon
time = parsed_data.time
date = parsed_data.date
2024-07-28 15:54:02 -07:00
if lat or lon: # ignore 0, 0 fixes
2024-07-28 13:02:41 -07:00
navpoint.content.content = f"""<?xml version='1.0' encoding='us-ascii'?><kml xmlns="http://www.opengis.net/kml/2.2" xmlns:gx="http://www.google.com/kml/ext/2.2"><Document><Style id="icon"><IconStyle><Icon><href>http://127.0.0.1:8888/marker.png</href></Icon></IconStyle></Style><name>Navpoint</name><open>1</open><Placemark><name>Position</name><styleUrl>#icon</styleUrl><Point><coordinates>{lon},{lat},0</coordinates></Point><description>{date} {time} UTC</description></Placemark></Document></kml>"""
2024-07-23 18:21:14 -07:00
intercom.put(
"LocationUpdate",
{
"lat": lat,
"lon": lon,
"time": f"{date} {time} UTC",
},
)
2024-07-22 15:09:15 -07:00
except AttributeError:
pass
except serial.serialutil.SerialException as e:
2024-07-23 09:34:31 -07:00
intercom.put(
"Disconnected",
port,
2024-07-22 15:09:15 -07:00
)
messagebox.showerror(
"Navpoint Error", f"Error reading from port {port}: {str(e)}"
)
2024-07-23 09:34:31 -07:00
def run(port, intercom):
threading.Thread(target=main, args=(port, intercom), daemon=True).start()
2024-07-22 15:09:15 -07:00
def clear(queue_to_clear):
try:
while True:
queue_to_clear.get(block=False)
except queue.Empty:
pass
def update_dropdown(dropdown):
dropdown["values"] = ["[Select port]"] + [
2024-07-22 15:27:16 -07:00
f"{port.device}: {port.description}"
2024-07-22 15:09:15 -07:00
for port in serial.tools.list_ports.comports()
]
dropdown.current(newindex=0)
def run_ui():
def selected(event):
2024-07-22 15:27:16 -07:00
selection = dropdown.get().split(": ", 1)[0]
2024-07-22 15:09:15 -07:00
if not selection.startswith("["):
2024-07-23 09:34:31 -07:00
intercom.clear()
run(selection, intercom)
2024-07-22 15:09:15 -07:00
2024-07-23 09:34:31 -07:00
def new_data(purpose, content):
2024-07-22 15:09:15 -07:00
if purpose == "LocationUpdate":
updated.config(text="Last updated: " + content["time"])
elif purpose == "Connected":
2024-07-22 15:27:16 -07:00
status.config(text="Connected to " + dropdown.get())
2024-07-22 15:09:15 -07:00
dropdown.pack_forget()
refresh.pack_forget()
elif purpose == "Disconnected":
status.config(text="Not connected")
update_dropdown(dropdown)
dropdown.pack()
refresh.pack()
def refresh_clicked():
update_dropdown(dropdown)
window = tk.Tk(className="navpoint")
window.title("Navpoint")
window.resizable(width=False, height=False)
status = ttk.Label(window, text="Not connected")
status.pack()
updated = ttk.Label(window, text="Waiting for location")
updated.pack()
n = tk.StringVar()
dropdown = ttk.Combobox(window, state="readonly", textvariable=n, width=40)
update_dropdown(dropdown)
dropdown.pack()
refresh = ttk.Button(
window, text="Refresh device list", command=refresh_clicked
)
refresh.pack()
dropdown.bind("<<ComboboxSelected>>", selected)
2024-07-23 09:34:31 -07:00
intercom = navpoint.intercom.Intercom(window, new_data)
2024-07-22 15:27:16 -07:00
2024-07-22 15:09:15 -07:00
try:
window.iconbitmap(navpoint.fix_path.fix_path("icon.ico"))
except tk.TclError:
pass
window.mainloop()