import hashlib import json import logging import os import re import sys import time from dataclasses import dataclass, asdict from datetime import datetime, timezone from typing import Dict, List, Tuple from urllib.parse import urljoin, urlparse import requests from bs4 import BeautifulSoup CATALOG_URL = os.getenv("CATALOG_URL", "https://apps.truenas.com/catalog") STATE_PATH = os.getenv("STATE_PATH", "/data/catalog_state.json") CHECK_INTERVAL_SECONDS = int(os.getenv("CHECK_INTERVAL_SECONDS", "1800")) TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "") REQUEST_TIMEOUT_SECONDS = int(os.getenv("REQUEST_TIMEOUT_SECONDS", "30")) USER_AGENT = os.getenv( "USER_AGENT", "truenas-catalog-notify/1.0 (+https://apps.truenas.com/catalog)", ) LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() MAX_MESSAGE_LEN = 3900 @dataclass class AppSnapshot: name: str url: str train: str added: str summary: str content_hash: str def configure_logging() -> None: logging.basicConfig( level=getattr(logging, LOG_LEVEL, logging.INFO), format="%(asctime)s %(levelname)s %(message)s", ) def normalize_text(value: str) -> str: return " ".join(value.split()) def compute_hash(parts: List[str]) -> str: digest = hashlib.sha256("||".join(parts).encode("utf-8")).hexdigest() return digest def fetch_catalog_html(session: requests.Session) -> str: response = session.get( CATALOG_URL, timeout=REQUEST_TIMEOUT_SECONDS, headers={"User-Agent": USER_AGENT}, ) response.raise_for_status() return response.text def is_catalog_app_link(href: str) -> bool: if not href: return False parsed = urlparse(href) path = parsed.path.rstrip("/") return path.startswith("/catalog/") and path != "/catalog" def parse_catalog(html: str) -> Dict[str, AppSnapshot]: soup = BeautifulSoup(html, "html.parser") cards_root = soup.find(id="catalog-cards") candidates = cards_root.find_all("a", href=True) if cards_root else soup.find_all("a", href=True) snapshots: Dict[str, AppSnapshot] = {} for anchor in candidates: raw_href = anchor.get("href", "") full_url = urljoin(CATALOG_URL, raw_href) if not is_catalog_app_link(urlparse(full_url).path): continue text = normalize_text(anchor.get_text(" ", strip=True)) if not text: continue name = text.split(" Train:")[0].strip() train = "" added = "" summary = "" if " Train:" in text: remainder = text.split(" Train:", 1)[1].strip() if " Added:" in remainder: train_part, after_added = remainder.split(" Added:", 1) train = train_part.strip() pieces = after_added.strip().split(" ", 1) added = pieces[0].strip() summary = pieces[1].strip() if len(pieces) > 1 else "" else: train = remainder else: summary = text app_hash = compute_hash([name, train, added, summary, full_url]) snapshots[full_url] = AppSnapshot( name=name, url=full_url, train=train, added=added, summary=summary, content_hash=app_hash, ) return snapshots def load_state(path: str) -> Dict[str, AppSnapshot]: if not os.path.exists(path): return {} with open(path, "r", encoding="utf-8") as handle: data = json.load(handle) apps = data.get("apps", {}) loaded: Dict[str, AppSnapshot] = {} for url, value in apps.items(): loaded[url] = AppSnapshot( name=value.get("name", ""), url=value.get("url", url), train=value.get("train", ""), added=value.get("added", ""), summary=value.get("summary", ""), content_hash=value.get("content_hash", ""), ) return loaded def save_state(path: str, apps: Dict[str, AppSnapshot]) -> None: directory = os.path.dirname(path) if directory: os.makedirs(directory, exist_ok=True) payload = { "updated_at": datetime.now(timezone.utc).isoformat(), "apps": {url: asdict(snapshot) for url, snapshot in sorted(apps.items())}, } with open(path, "w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2, ensure_ascii=False) def format_field_change(label: str, old: str, new: str) -> str: old_clean = old if old else "(empty)" new_clean = new if new else "(empty)" return f"{label}: '{old_clean}' -> '{new_clean}'" def collect_diffs( previous: Dict[str, AppSnapshot], current: Dict[str, AppSnapshot], ) -> Tuple[List[str], List[str], List[str], int]: prev_urls = set(previous.keys()) curr_urls = set(current.keys()) added_urls = sorted(curr_urls - prev_urls) removed_urls = sorted(prev_urls - curr_urls) common_urls = sorted(curr_urls & prev_urls) changed_lines: List[str] = [] updated_count = 0 for url in common_urls: old = previous[url] new = current[url] if old.content_hash == new.content_hash: continue updated_count += 1 details: List[str] = [] if old.name != new.name: details.append(format_field_change("name", old.name, new.name)) if old.train != new.train: details.append(format_field_change("train", old.train, new.train)) if old.added != new.added: details.append(format_field_change("added", old.added, new.added)) if old.summary != new.summary: details.append(format_field_change("summary", old.summary, new.summary)) if not details: details.append("metadata changed") changed_lines.append(f"~ {new.name} ({new.url})") for detail in details: changed_lines.append(f" - {detail}") return added_urls, removed_urls, changed_lines, updated_count def build_summary_message( added_count: int, removed_urls: List[str], changed_lines: List[str], updated_count: int, previous: Dict[str, AppSnapshot], ) -> Tuple[str, List[str]]: removed_count = len(removed_urls) header = ( f"TrueNAS catalog changed at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n" f"Added: {added_count} | Removed: {removed_count} | Updated: {updated_count}" ) lines: List[str] = [] for url in removed_urls: app = previous[url] lines.append(f"- {app.name} ({app.url})") lines.extend(changed_lines) return header, lines def truncate_text(value: str, limit: int) -> str: text = normalize_text(value) if len(text) <= limit: return text return f"{text[: max(0, limit - 1)].rstrip()}…" def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[str, object]: response = session.get( app_url, timeout=REQUEST_TIMEOUT_SECONDS, headers={"User-Agent": USER_AGENT}, ) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") page_title = normalize_text(soup.title.get_text(" ", strip=True)) if soup.title else "" description = "" for attrs in ( {"property": "og:description"}, {"name": "description"}, {"name": "twitter:description"}, ): tag = soup.find("meta", attrs=attrs) if tag and tag.get("content"): description = normalize_text(str(tag["content"])) if description: break headings: List[str] = [] for tag in soup.find_all(["h1", "h2", "h3"]): heading = normalize_text(tag.get_text(" ", strip=True)) if not heading: continue if heading not in headings: headings.append(heading) if len(headings) >= 6: break external_links: List[str] = [] seen_links = set() for anchor in soup.find_all("a", href=True): href = str(anchor.get("href", "")).strip() if not href or href.startswith("#"): continue full_href = urljoin(app_url, href) if not full_href.startswith("http"): continue if full_href.startswith(CATALOG_URL): continue if full_href in seen_links: continue seen_links.add(full_href) label = normalize_text(anchor.get_text(" ", strip=True)) if not label: label = full_href label = truncate_text(label, 60) external_links.append(f"{label} -> {full_href}") if len(external_links) >= 5: break detected_fields: List[str] = [] body_text = normalize_text(soup.get_text(" ", strip=True)) label_patterns = { "Version": r"(?:App\s+Version|Version)\s*[:\-]\s*([^\n\r|]{1,80})", "Chart": r"(?:Chart\s+Version|Helm\s+Chart)\s*[:\-]\s*([^\n\r|]{1,80})", "Category": r"Category\s*[:\-]\s*([^\n\r|]{1,80})", "Maintainer": r"Maintainer(?:s)?\s*[:\-]\s*([^\n\r|]{1,120})", "Homepage": r"Homepage\s*[:\-]\s*([^\n\r|]{1,160})", "Source": r"Source\s*[:\-]\s*([^\n\r|]{1,160})", } for label, pattern in label_patterns.items(): match = re.search(pattern, body_text, flags=re.IGNORECASE) if match: value = truncate_text(match.group(1), 120) detected_fields.append(f"{label}: {value}") return { "page_title": page_title, "description": description, "headings": headings, "external_links": external_links, "detected_fields": detected_fields, } def build_new_app_message(session: requests.Session, app: AppSnapshot) -> str: lines: List[str] = [ "🆕 New TrueNAS app detected", f"Detected: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}", f"Name: {app.name}", f"URL: {app.url}", ] if app.train: lines.append(f"Train: {app.train}") if app.added: lines.append(f"Added date: {app.added}") if app.summary: lines.append(f"Catalog summary: {truncate_text(app.summary, 700)}") try: details = fetch_new_app_page_details(session, app.url) except requests.RequestException as exc: logging.warning("Unable to fetch app details for %s: %s", app.url, exc) details = {} page_title = str(details.get("page_title", "")) if details else "" if page_title: lines.append(f"Page title: {truncate_text(page_title, 180)}") description = str(details.get("description", "")) if details else "" if description: lines.append(f"Description: {truncate_text(description, 1000)}") detected_fields = details.get("detected_fields", []) if details else [] if isinstance(detected_fields, list): for field in detected_fields[:6]: lines.append(str(field)) headings = details.get("headings", []) if details else [] if isinstance(headings, list) and headings: lines.append(f"Headings: {truncate_text(' | '.join(headings[:6]), 320)}") external_links = details.get("external_links", []) if details else [] if isinstance(external_links, list) and external_links: lines.append("External links:") for link in external_links[:5]: lines.append(f"- {truncate_text(str(link), 220)}") message = "\n".join(lines) if len(message) <= MAX_MESSAGE_LEN: return message trimmed_lines = [line if len(line) <= 280 else truncate_text(line, 280) for line in lines] while len("\n".join(trimmed_lines)) > MAX_MESSAGE_LEN and len(trimmed_lines) > 8: trimmed_lines.pop() return "\n".join(trimmed_lines) def split_message(header: str, lines: List[str], max_len: int = MAX_MESSAGE_LEN) -> List[str]: if not lines: return [header] chunks: List[str] = [] current_chunk = header for line in lines: candidate = f"{current_chunk}\n{line}" if len(candidate) <= max_len: current_chunk = candidate continue chunks.append(current_chunk) current_chunk = f"{header}\n{line}" chunks.append(current_chunk) return chunks def send_telegram_message(session: requests.Session, text: str) -> None: if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: logging.warning("Telegram token/chat id missing; skipping message") return endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" payload = { "chat_id": TELEGRAM_CHAT_ID, "text": text, "disable_web_page_preview": True, } response = session.post(endpoint, json=payload, timeout=REQUEST_TIMEOUT_SECONDS) response.raise_for_status() def send_startup_notification(session: requests.Session) -> None: message = ( "TrueNAS catalog watcher is running ✅\n" f"Started: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n" f"Catalog: {CATALOG_URL}\n" f"Interval: {CHECK_INTERVAL_SECONDS}s" ) try: send_telegram_message(session, message) except requests.RequestException as exc: logging.error("Failed to send startup Telegram message: %s", exc) def run_once(session: requests.Session, first_run: bool) -> bool: previous_state = load_state(STATE_PATH) html = fetch_catalog_html(session) current_state = parse_catalog(html) if not current_state: raise RuntimeError("Parsed zero catalog entries; aborting to avoid overwriting state") if first_run and not previous_state: save_state(STATE_PATH, current_state) logging.info("Initial snapshot saved with %d apps", len(current_state)) return False added_urls, removed_urls, changed_lines, updated_count = collect_diffs(previous_state, current_state) changed = bool(added_urls or removed_urls or changed_lines) if changed: logging.info( "Catalog change detected (added=%d, removed=%d, updated=%d)", len(added_urls), len(removed_urls), updated_count, ) for url in added_urls: app = current_state[url] send_telegram_message(session, build_new_app_message(session, app)) header, summary_lines = build_summary_message( added_count=len(added_urls), removed_urls=removed_urls, changed_lines=changed_lines, updated_count=updated_count, previous=previous_state, ) if summary_lines: for message in split_message(header, summary_lines): send_telegram_message(session, message) else: logging.info("No catalog changes detected") save_state(STATE_PATH, current_state) return changed def validate_env() -> None: if CHECK_INTERVAL_SECONDS < 30: raise ValueError("CHECK_INTERVAL_SECONDS must be >= 30") def main() -> int: configure_logging() try: validate_env() except Exception as exc: logging.error("Invalid environment: %s", exc) return 2 logging.info("Starting TrueNAS catalog watcher") logging.info("Catalog URL: %s", CATALOG_URL) logging.info("State file: %s", STATE_PATH) logging.info("Interval: %ss", CHECK_INTERVAL_SECONDS) session = requests.Session() send_startup_notification(session) first_loop = True while True: try: run_once(session, first_loop) except requests.RequestException as exc: logging.error("Network error: %s", exc) except Exception as exc: logging.exception("Watcher iteration failed: %s", exc) first_loop = False time.sleep(CHECK_INTERVAL_SECONDS) if __name__ == "__main__": sys.exit(main())