from PIL import Image, ImageDraw, ImageFont from datetime import datetime, timedelta from paho.mqtt import client as mqtt_client from pyhamtools import locator from collections import deque from dxspot_const import * import sys import time import json import random import logging logging.basicConfig( level=logging.INFO, stream=sys.stderr, format='[%(asctime)s] %(levelname)s: %(message)s' ) class EsStatusTracker: def __init__(self, window: int, threshold: int): self.window = window self.threshold = threshold self._match_times = deque() self._status = False def add_match(self, ts): self._match_times.append(ts) def tick(self): now = time.time() while self._match_times and self._match_times[0] < now - self.window: self._match_times.popleft() old_status = self._status self._status = len(self._match_times) >= self.threshold if old_status != self._status: logging.info(f'Band Status Changed: {old_status} -> {self._status}') @property def status(self) -> bool: return self._status ALL_TRACKERS = { '6m': EsStatusTracker(DX_BAND_OPEN_WINDOW, DX_BAND_OPEN_THRESHOLD), '2m': EsStatusTracker(DX_BAND_OPEN_WINDOW, DX_BAND_OPEN_THRESHOLD), } def on_connect(client, userdata, flags, rc): logging.info(f'Connected to MQTT broker: {str(rc)}') for topic in PSKREPORT_TOPICS: client.subscribe(topic) def on_message(client, userdata, msg): try: spot = json.loads(msg.payload.decode()) tracker = ALL_TRACKERS[spot['b']] except: logging.warning('Failed to parse MQTT message, payload busted') return d = locator.calculate_distance(spot['sl'][0:8], spot['rl'][0:8]) if d < DX_DISTANCE_MIN or d > DX_DISTANCE_MAX: return logging.info('Possible Es Spot: ' + f'{spot["sc"]}->{spot["rc"]} ' + f'{int(d)}km {spot["rp"]}dB') tracker.add_match(spot['t']) def update_image(tbl, font_path=IMG_FONT, output=IMG_OUT_PATH): width = 155 margin = 6 height = margin + 22 + 12 * (1 + len(tbl)) + margin img = Image.new("RGB", (width, height), '#000000') draw = ImageDraw.Draw(img) try: font = ImageFont.truetype(font_path, 16) except: font = ImageFont.load_default() # Title & Time draw.text((width // 2, margin), 'VHF Conditions', font=font, fill='white', anchor='mm') time_line = datetime.utcnow().strftime('%Y-%m-%dT%H:%MZ') draw.text((width // 2, margin + 12), time_line, font=font, fill='#ffff00', anchor='mm') draw.line(((width // 6, margin + 22), (width // 6 * 5, margin + 22)), fill = 'grey', width=1) # Table rows y = 30 draw.text((2, y), ' ITEM STATUS ', font=font, fill="yellow") y += 12 for col in tbl: draw.text((2, y), col[0], font=font) draw.text((2+8*8, y), col[1][0], font=font, fill=col[1][1]) y += 12 img.save(output) mqttc = mqtt_client.Client( client_id=f'dxspot-{MY_CALL}-{random.randint(0, 1000)}' ) mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.connect(PSKREPORT_MQTT_SERVER) mqttc.loop_start() try: while True: for tracker in ALL_TRACKERS.values(): tracker.tick() update_image([ ('6m EsCN', ('50MHz ES', '#00ff00') if ALL_TRACKERS['6m'].status else ('Band Closed', '#ff0000')), ('2m EsCN', ('114MHz ES', '#00ff00') if ALL_TRACKERS['2m'].status else ('Band Closed', '#ff0000')), ]) time.sleep(10) except KeyboardInterrupt: mqttc.disconnect()