from PIL import Image, ImageDraw, ImageFont from datetime import datetime, timedelta from paho.mqtt import client as mqtt_client from pyhamtools import locator from collections import deque from dxspot_const import * import sys import time import json import random import logging logging.basicConfig( level=logging.INFO, stream=sys.stderr, format='[%(asctime)s] %(levelname)s: %(message)s' ) class EsStatusTracker: def __init__(self, window: int, threshold: int): self.window = window self.threshold = threshold self._match_times = deque() self._status = False def add_match(self, ts): self._match_times.append(ts) def tick(self): now = time.time() while self._match_times and self._match_times[0] < now - self.window: self._match_times.popleft() old_status = self._status self._status = len(self._match_times) >= self.threshold if old_status != self._status: logging.info(f'Band Status Changed: {old_status} -> {self._status}') @property def status(self) -> bool: return self._status ALL_TRACKERS = [EsStatusTracker(DX_BAND_OPEN_WINDOW, DX_BAND_OPEN_THRESHOLD)] def on_connect(client, userdata, flags, reason_code, properties): logging.info(f'Connected to MQTT broker: {reason_code}') for topic in PSKREPORT_TOPICS: client.subscribe(topic) def on_message(client, userdata, msg): try: spot = json.loads(msg.payload) tracker = ALL_TRACKERS[0] except: logging.warning('Failed to parse MQTT message, payload busted') return d = locator.calculate_distance(spot['sl'][0:8], spot['rl'][0:8]) if d < DX_DISTANCE_MIN or d > DX_DISTANCE_MAX: return logging.info('Possible Es Spot: ' + f'{spot['sc']}->{spot['rc']} ' + f'{int(d)}km {spot['rp']}dB') tracker.add_match(spot['t']) def update_image(state, font_path=IMG_FONT, output=IMG_OUT_PATH): width = 155 margin = 6 height = margin + 22 + 12 * 2 + margin font_path = "FSEX302.ttf" img = Image.new("RGB", (width, height), '#000000') draw = ImageDraw.Draw(img) try: font = ImageFont.truetype(font_path, 16) except: font = ImageFont.load_default() # Title & Time draw.text((width // 2, margin), 'VHF Conditions', font=font, fill='white', anchor='mm') time_line = datetime.utcnow().strftime('%Y-%m-%dT%H:%MZ') draw.text((width // 2, margin + 12), time_line, font=font, fill='#ffff00', anchor='mm') draw.line(((width // 6, margin + 22), (width // 6 * 5, margin + 22)), fill = 'grey', width=1) # Table rows y = 30 draw.text((2, y), ' ITEM STATUS ', font=font, fill="yellow") draw.text((2, y+12), '6m EsCN ', font=font) if state: draw.text((2+8*8, y+12), '50MHz ES', font=font, fill="#00ff00") else: draw.text((2+8*8, y+12), 'Band Closed', font=font, fill="red") img.save(output) mqttc = mqtt_client.Client( mqtt_client.CallbackAPIVersion.VERSION2, f'dxspot-{MY_CALL}-{random.randint(0, 1000)}' ) mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.connect(PSKREPORT_MQTT_SERVER) mqttc.loop_start() try: while True: for tracker in ALL_TRACKERS: tracker.tick() update_image(tracker.status) time.sleep(10) except KeyboardInterrupt: mqttc.disconnect()