From 512a54487dd1782fe571e24cdf763a686dca0867 Mon Sep 17 00:00:00 2001 From: LockeShor <75901583+LockeShor@users.noreply.github.com> Date: Mon, 2 Mar 2026 17:20:09 -0500 Subject: [PATCH] eh it works --- README.md | 5 ++ watcher.py | 228 ++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 203 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index f716040..eb93fe0 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,10 @@ On changes, it sends Telegram messages with: - removed apps (`-`) - updated apps (`~`) and field-level diffs +It also listens for Telegram commands from the configured chat: + +- `/random` returns one random app using the same detailed format, including screenshots when available + ## Environment variables - `TELEGRAM_BOT_TOKEN` (required for notifications) @@ -29,6 +33,7 @@ On changes, it sends Telegram messages with: - `REQUEST_TIMEOUT_SECONDS` (default: `30`) - `LOG_LEVEL` (default: `INFO`) - `MAX_SCREENSHOTS_PER_APP` (default: `3`) +- `TELEGRAM_POLL_SECONDS` (default: `10`) ## Build diff --git a/watcher.py b/watcher.py index 5dbd75a..5189b27 100644 --- a/watcher.py +++ b/watcher.py @@ -2,12 +2,13 @@ import hashlib import json import logging import os +import random import re import sys import time from dataclasses import dataclass, asdict from datetime import datetime, timezone -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple from urllib.parse import urljoin, urlparse import requests @@ -26,6 +27,9 @@ USER_AGENT = os.getenv( LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() MAX_MESSAGE_LEN = 3900 MAX_SCREENSHOTS_PER_APP = int(os.getenv("MAX_SCREENSHOTS_PER_APP", "3")) +TELEGRAM_POLL_SECONDS = int(os.getenv("TELEGRAM_POLL_SECONDS", "10")) + +last_telegram_update_id: Optional[int] = None @dataclass @@ -155,7 +159,7 @@ def save_state(path: str, apps: Dict[str, AppSnapshot]) -> None: def format_field_change(label: str, old: str, new: str) -> str: old_clean = old if old else "(empty)" new_clean = new if new else "(empty)" - return f"{label}: '{old_clean}' -> '{new_clean}'" + return f"â€ĸ {label}: '{old_clean}' → '{new_clean}'" def collect_diffs( @@ -189,11 +193,13 @@ def collect_diffs( details.append(format_field_change("summary", old.summary, new.summary)) if not details: - details.append("metadata changed") + details.append("â€ĸ metadata changed") - changed_lines.append(f"~ {new.name} ({new.url})") + changed_lines.append(f"🔄 {new.name}") + changed_lines.append(f" 🔗 {new.url}") for detail in details: - changed_lines.append(f" - {detail}") + changed_lines.append(f" {detail}") + changed_lines.append("") return added_urls, removed_urls, changed_lines, updated_count @@ -208,16 +214,28 @@ def build_summary_message( removed_count = len(removed_urls) header = ( - f"TrueNAS catalog changed at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n" - f"Added: {added_count} | Removed: {removed_count} | Updated: {updated_count}" + "đŸ“Ŗ TrueNAS Catalog Update\n" + "━━━━━━━━━━━━━━━━━━━━━━\n" + f"🕒 {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n" + f"➕ Added: {added_count} | ➖ Removed: {removed_count} | 🔄 Updated: {updated_count}" ) lines: List[str] = [] + if removed_urls: + lines.append("đŸ—‘ī¸ Removed Apps") + lines.append("") for url in removed_urls: app = previous[url] - lines.append(f"- {app.name} ({app.url})") + lines.append(f"➖ {app.name}") + lines.append(f" 🔗 {app.url}") + lines.append("") + if changed_lines: + lines.append("âœī¸ Updated Apps") + lines.append("") lines.extend(changed_lines) + while lines and lines[-1] == "": + lines.pop() return header, lines @@ -310,22 +328,33 @@ def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[ def build_new_app_message(app: AppSnapshot, page_title: str = "", screenshot_count: int = 0) -> str: lines: List[str] = [ - "🆕 New TrueNAS app detected", - f"Detected: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}", - f"Name: {app.name}", - f"URL: {app.url}", + "🆕 New TrueNAS App", + "━━━━━━━━━━━━━━", + "", + f"🕒 Detected: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}", + "", + "â„šī¸ App Details", + f"đŸ“Ļ Name: {app.name}", + f"🔗 URL: {app.url}", ] if app.train: - lines.append(f"Train: {app.train}") + lines.append(f"🚂 Train: {app.train}") if app.added: - lines.append(f"Added date: {app.added}") + lines.append(f"📅 Added: {app.added}") if app.summary: - lines.append(f"Catalog summary: {truncate_text(app.summary, 700)}") + lines.append("") + lines.append("📝 Summary") + lines.append(truncate_text(app.summary, 700)) if page_title: - lines.append(f"Page title: {truncate_text(page_title, 180)}") + lines.append("") + lines.append(f"📄 Page: {truncate_text(page_title, 180)}") if screenshot_count > 0: - lines.append(f"Screenshots: {screenshot_count} attached") + lines.append("") + lines.append(f"đŸ–ŧī¸ Screenshots: {screenshot_count} attached") + + while lines and lines[-1] == "": + lines.pop() message = "\n".join(lines) if len(message) <= MAX_MESSAGE_LEN: @@ -337,6 +366,11 @@ def build_new_app_message(app: AppSnapshot, page_title: str = "", screenshot_cou return "\n".join(trimmed_lines) +def build_random_app_message(app: AppSnapshot, page_title: str = "", screenshot_count: int = 0) -> str: + message = build_new_app_message(app, page_title=page_title, screenshot_count=screenshot_count) + return message.replace("🆕 New TrueNAS App", "🎲 Random TrueNAS App", 1) + + def split_message(header: str, lines: List[str], max_len: int = MAX_MESSAGE_LEN) -> List[str]: if not lines: return [header] @@ -390,12 +424,130 @@ def send_telegram_photo(session: requests.Session, photo_url: str, caption: str response.raise_for_status() +def get_telegram_updates(session: requests.Session, offset: Optional[int]) -> List[Dict[str, object]]: + if not TELEGRAM_BOT_TOKEN: + return [] + + endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates" + params: Dict[str, object] = { + "timeout": min(max(1, REQUEST_TIMEOUT_SECONDS), 25), + "allowed_updates": json.dumps(["message"]), + } + if offset is not None: + params["offset"] = offset + + response = session.get(endpoint, params=params, timeout=REQUEST_TIMEOUT_SECONDS + 5) + response.raise_for_status() + payload = response.json() + if not payload.get("ok"): + return [] + + result = payload.get("result", []) + return result if isinstance(result, list) else [] + + +def send_random_app_response(session: requests.Session) -> None: + state = load_state(STATE_PATH) + if not state: + html = fetch_catalog_html(session) + state = parse_catalog(html) + + if not state: + send_telegram_message(session, "âš ī¸ Unable to fetch apps right now. Try again in a moment.") + return + + app = random.choice(list(state.values())) + + page_title = "" + screenshot_urls: List[str] = [] + try: + details = fetch_new_app_page_details(session, app.url) + page_title = str(details.get("page_title", "")) + screenshot_data = details.get("screenshot_urls", []) + if isinstance(screenshot_data, list): + screenshot_urls = [str(item) for item in screenshot_data if str(item).startswith("http")] + except requests.RequestException as exc: + logging.warning("Unable to fetch random app page details for %s: %s", app.url, exc) + + send_telegram_message( + session, + build_random_app_message(app, page_title=page_title, screenshot_count=len(screenshot_urls)), + ) + + for index, screenshot_url in enumerate(screenshot_urls, start=1): + try: + send_telegram_photo( + session, + screenshot_url, + caption=f"đŸ–ŧī¸ {app.name} â€ĸ Screenshot {index}/{len(screenshot_urls)}", + ) + except requests.RequestException as exc: + logging.warning("Failed to send random app screenshot for %s: %s", app.name, exc) + + +def initialize_telegram_offset(session: requests.Session) -> None: + global last_telegram_update_id + if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: + return + + try: + updates = get_telegram_updates(session, offset=None) + except requests.RequestException as exc: + logging.warning("Unable to initialize Telegram updates offset: %s", exc) + return + + if not updates: + return + + update_ids = [item.get("update_id") for item in updates if isinstance(item.get("update_id"), int)] + if update_ids: + last_telegram_update_id = max(update_ids) + + +def poll_telegram_commands(session: requests.Session) -> None: + global last_telegram_update_id + + if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: + return + + offset = last_telegram_update_id + 1 if last_telegram_update_id is not None else None + updates = get_telegram_updates(session, offset=offset) + + for update in updates: + update_id = update.get("update_id") + if isinstance(update_id, int): + last_telegram_update_id = update_id + + message = update.get("message") + if not isinstance(message, dict): + continue + + chat = message.get("chat") + if not isinstance(chat, dict): + continue + chat_id = str(chat.get("id", "")) + if chat_id != TELEGRAM_CHAT_ID: + continue + + text = str(message.get("text", "")).strip() + command = text.split(" ", 1)[0].lower() if text else "" + if command.startswith("/random"): + logging.info("Received /random command from Telegram chat %s", chat_id) + try: + send_random_app_response(session) + except requests.RequestException as exc: + logging.warning("Failed to send /random response: %s", exc) + send_telegram_message(session, "âš ī¸ Failed to fetch a random app right now. Please try again.") + + def send_startup_notification(session: requests.Session) -> None: message = ( - "TrueNAS catalog watcher is running ✅\n" - f"Started: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n" - f"Catalog: {CATALOG_URL}\n" - f"Interval: {CHECK_INTERVAL_SECONDS}s" + "✅ TrueNAS Catalog Watcher Online\n" + "━━━━━━━━━━━━━━━━━━━━━━━━\n" + "\n" + f"🕒 Started: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n" + f"🌐 Catalog: {CATALOG_URL}\n" + f"âąī¸ Interval: {CHECK_INTERVAL_SECONDS}s" ) try: send_telegram_message(session, message) @@ -450,7 +602,7 @@ def run_once(session: requests.Session, first_run: bool) -> bool: send_telegram_photo( session, screenshot_url, - caption=f"{app.name} screenshot {index}/{len(screenshot_urls)}", + caption=f"đŸ–ŧī¸ {app.name} â€ĸ Screenshot {index}/{len(screenshot_urls)}", ) except requests.RequestException as exc: logging.warning("Failed to send screenshot for %s: %s", app.name, exc) @@ -475,6 +627,8 @@ def run_once(session: requests.Session, first_run: bool) -> bool: def validate_env() -> None: if CHECK_INTERVAL_SECONDS < 30: raise ValueError("CHECK_INTERVAL_SECONDS must be >= 30") + if TELEGRAM_POLL_SECONDS < 2: + raise ValueError("TELEGRAM_POLL_SECONDS must be >= 2") def main() -> int: @@ -493,18 +647,32 @@ def main() -> int: session = requests.Session() send_startup_notification(session) + initialize_telegram_offset(session) first_loop = True + next_check_at = time.time() while True: - try: - run_once(session, first_loop) - except requests.RequestException as exc: - logging.error("Network error: %s", exc) - except Exception as exc: - logging.exception("Watcher iteration failed: %s", exc) + now = time.time() + if now >= next_check_at: + try: + run_once(session, first_loop) + except requests.RequestException as exc: + logging.error("Network error: %s", exc) + except Exception as exc: + logging.exception("Watcher iteration failed: %s", exc) - first_loop = False - time.sleep(CHECK_INTERVAL_SECONDS) + first_loop = False + next_check_at = now + CHECK_INTERVAL_SECONDS + + try: + poll_telegram_commands(session) + except requests.RequestException as exc: + logging.warning("Telegram polling failed: %s", exc) + except Exception as exc: + logging.exception("Telegram command processing failed: %s", exc) + + sleep_for = min(TELEGRAM_POLL_SECONDS, max(1, int(next_check_at - time.time()))) + time.sleep(sleep_for) if __name__ == "__main__":