Files
truenas-catalog-notify/watcher.py
LockeShor 4398abd9e6 initial
2026-03-01 00:38:00 -05:00

327 lines
9.7 KiB
Python

import hashlib
import json
import logging
import os
import sys
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Dict, List, Tuple
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
CATALOG_URL = os.getenv("CATALOG_URL", "https://apps.truenas.com/catalog")
STATE_PATH = os.getenv("STATE_PATH", "/data/catalog_state.json")
CHECK_INTERVAL_SECONDS = int(os.getenv("CHECK_INTERVAL_SECONDS", "1800"))
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
REQUEST_TIMEOUT_SECONDS = int(os.getenv("REQUEST_TIMEOUT_SECONDS", "30"))
USER_AGENT = os.getenv(
"USER_AGENT",
"truenas-catalog-notify/1.0 (+https://apps.truenas.com/catalog)",
)
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
MAX_MESSAGE_LEN = 3900
@dataclass
class AppSnapshot:
name: str
url: str
train: str
added: str
summary: str
content_hash: str
def configure_logging() -> None:
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s %(levelname)s %(message)s",
)
def normalize_text(value: str) -> str:
return " ".join(value.split())
def compute_hash(parts: List[str]) -> str:
digest = hashlib.sha256("||".join(parts).encode("utf-8")).hexdigest()
return digest
def fetch_catalog_html(session: requests.Session) -> str:
response = session.get(
CATALOG_URL,
timeout=REQUEST_TIMEOUT_SECONDS,
headers={"User-Agent": USER_AGENT},
)
response.raise_for_status()
return response.text
def is_catalog_app_link(href: str) -> bool:
if not href:
return False
parsed = urlparse(href)
path = parsed.path.rstrip("/")
return path.startswith("/catalog/") and path != "/catalog"
def parse_catalog(html: str) -> Dict[str, AppSnapshot]:
soup = BeautifulSoup(html, "html.parser")
cards_root = soup.find(id="catalog-cards")
candidates = cards_root.find_all("a", href=True) if cards_root else soup.find_all("a", href=True)
snapshots: Dict[str, AppSnapshot] = {}
for anchor in candidates:
raw_href = anchor.get("href", "")
full_url = urljoin(CATALOG_URL, raw_href)
if not is_catalog_app_link(urlparse(full_url).path):
continue
text = normalize_text(anchor.get_text(" ", strip=True))
if not text:
continue
name = text.split(" Train:")[0].strip()
train = ""
added = ""
summary = ""
if " Train:" in text:
remainder = text.split(" Train:", 1)[1].strip()
if " Added:" in remainder:
train_part, after_added = remainder.split(" Added:", 1)
train = train_part.strip()
pieces = after_added.split(" ", 1)
added = pieces[0].strip()
summary = pieces[1].strip() if len(pieces) > 1 else ""
else:
train = remainder
else:
summary = text
app_hash = compute_hash([name, train, added, summary, full_url])
snapshots[full_url] = AppSnapshot(
name=name,
url=full_url,
train=train,
added=added,
summary=summary,
content_hash=app_hash,
)
return snapshots
def load_state(path: str) -> Dict[str, AppSnapshot]:
if not os.path.exists(path):
return {}
with open(path, "r", encoding="utf-8") as handle:
data = json.load(handle)
apps = data.get("apps", {})
loaded: Dict[str, AppSnapshot] = {}
for url, value in apps.items():
loaded[url] = AppSnapshot(
name=value.get("name", ""),
url=value.get("url", url),
train=value.get("train", ""),
added=value.get("added", ""),
summary=value.get("summary", ""),
content_hash=value.get("content_hash", ""),
)
return loaded
def save_state(path: str, apps: Dict[str, AppSnapshot]) -> None:
directory = os.path.dirname(path)
if directory:
os.makedirs(directory, exist_ok=True)
payload = {
"updated_at": datetime.now(timezone.utc).isoformat(),
"apps": {url: asdict(snapshot) for url, snapshot in sorted(apps.items())},
}
with open(path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, ensure_ascii=False)
def format_field_change(label: str, old: str, new: str) -> str:
old_clean = old if old else "(empty)"
new_clean = new if new else "(empty)"
return f"{label}: '{old_clean}' -> '{new_clean}'"
def build_diff_message(
previous: Dict[str, AppSnapshot],
current: Dict[str, AppSnapshot],
) -> Tuple[str, List[str], int]:
prev_urls = set(previous.keys())
curr_urls = set(current.keys())
added_urls = sorted(curr_urls - prev_urls)
removed_urls = sorted(prev_urls - curr_urls)
common_urls = sorted(curr_urls & prev_urls)
changed_lines: List[str] = []
updated_count = 0
for url in common_urls:
old = previous[url]
new = current[url]
if old.content_hash == new.content_hash:
continue
updated_count += 1
details: List[str] = []
if old.name != new.name:
details.append(format_field_change("name", old.name, new.name))
if old.train != new.train:
details.append(format_field_change("train", old.train, new.train))
if old.added != new.added:
details.append(format_field_change("added", old.added, new.added))
if old.summary != new.summary:
details.append(format_field_change("summary", old.summary, new.summary))
if not details:
details.append("metadata changed")
changed_lines.append(f"~ {new.name} ({new.url})")
for detail in details:
changed_lines.append(f" - {detail}")
header = (
f"TrueNAS catalog changed at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
f"Added: {len(added_urls)} | Removed: {len(removed_urls)} | Updated: {updated_count}"
)
lines: List[str] = []
for url in added_urls:
app = current[url]
lines.append(f"+ {app.name} ({app.url})")
for url in removed_urls:
app = previous[url]
lines.append(f"- {app.name} ({app.url})")
lines.extend(changed_lines)
return header, lines, updated_count
def split_message(header: str, lines: List[str], max_len: int = MAX_MESSAGE_LEN) -> List[str]:
if not lines:
return [header]
chunks: List[str] = []
current_chunk = header
for line in lines:
candidate = f"{current_chunk}\n{line}"
if len(candidate) <= max_len:
current_chunk = candidate
continue
chunks.append(current_chunk)
current_chunk = f"{header}\n{line}"
chunks.append(current_chunk)
return chunks
def send_telegram_message(session: requests.Session, text: str) -> None:
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
logging.warning("Telegram token/chat id missing; skipping message")
return
endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"disable_web_page_preview": True,
}
response = session.post(endpoint, json=payload, timeout=REQUEST_TIMEOUT_SECONDS)
response.raise_for_status()
def send_startup_notification(session: requests.Session) -> None:
message = (
"TrueNAS catalog watcher is running ✅\n"
f"Started: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
f"Catalog: {CATALOG_URL}\n"
f"Interval: {CHECK_INTERVAL_SECONDS}s"
)
try:
send_telegram_message(session, message)
except requests.RequestException as exc:
logging.error("Failed to send startup Telegram message: %s", exc)
def run_once(session: requests.Session, first_run: bool) -> bool:
previous_state = load_state(STATE_PATH)
html = fetch_catalog_html(session)
current_state = parse_catalog(html)
if not current_state:
raise RuntimeError("Parsed zero catalog entries; aborting to avoid overwriting state")
if first_run and not previous_state:
save_state(STATE_PATH, current_state)
logging.info("Initial snapshot saved with %d apps", len(current_state))
return False
header, diff_lines, _ = build_diff_message(previous_state, current_state)
changed = bool(diff_lines)
if changed:
logging.info("Catalog change detected with %d line items", len(diff_lines))
for message in split_message(header, diff_lines):
send_telegram_message(session, message)
else:
logging.info("No catalog changes detected")
save_state(STATE_PATH, current_state)
return changed
def validate_env() -> None:
if CHECK_INTERVAL_SECONDS < 30:
raise ValueError("CHECK_INTERVAL_SECONDS must be >= 30")
def main() -> int:
configure_logging()
try:
validate_env()
except Exception as exc:
logging.error("Invalid environment: %s", exc)
return 2
logging.info("Starting TrueNAS catalog watcher")
logging.info("Catalog URL: %s", CATALOG_URL)
logging.info("State file: %s", STATE_PATH)
logging.info("Interval: %ss", CHECK_INTERVAL_SECONDS)
session = requests.Session()
send_startup_notification(session)
first_loop = True
while True:
try:
run_once(session, first_loop)
except requests.RequestException as exc:
logging.error("Network error: %s", exc)
except Exception as exc:
logging.exception("Watcher iteration failed: %s", exc)
first_loop = False
time.sleep(CHECK_INTERVAL_SECONDS)
if __name__ == "__main__":
sys.exit(main())