Files
truenas-catalog-notify/watcher.py
LockeShor 7471c3d36d
All checks were successful
Docker Image / build (push) Successful in 1m7s
use correct screenshots
2026-03-02 18:31:01 -05:00

662 lines
22 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import hashlib
import json
import logging
import os
import random
import re
import sys
import time
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Dict, List, Optional, Tuple
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
CATALOG_URL = os.getenv("CATALOG_URL", "https://apps.truenas.com/catalog")
STATE_PATH = os.getenv("STATE_PATH", "/data/catalog_state.json")
CHECK_INTERVAL_SECONDS = int(os.getenv("CHECK_INTERVAL_SECONDS", "1800"))
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
REQUEST_TIMEOUT_SECONDS = int(os.getenv("REQUEST_TIMEOUT_SECONDS", "30"))
USER_AGENT = os.getenv(
"USER_AGENT",
"truenas-catalog-notify/1.0 (+https://apps.truenas.com/catalog)",
)
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
MAX_MESSAGE_LEN = 3900
MAX_SCREENSHOTS_PER_APP = int(os.getenv("MAX_SCREENSHOTS_PER_APP", "3"))
TELEGRAM_POLL_SECONDS = int(os.getenv("TELEGRAM_POLL_SECONDS", "10"))
MEDIA_BASE_URL = os.getenv("MEDIA_BASE_URL", "https://media.sys.truenas.net")
last_telegram_update_id: Optional[int] = None
@dataclass
class AppSnapshot:
name: str
url: str
train: str
added: str
summary: str
content_hash: str
def configure_logging() -> None:
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s %(levelname)s %(message)s",
)
def normalize_text(value: str) -> str:
return " ".join(value.split())
def compute_hash(parts: List[str]) -> str:
digest = hashlib.sha256("||".join(parts).encode("utf-8")).hexdigest()
return digest
def fetch_catalog_html(session: requests.Session) -> str:
response = session.get(
CATALOG_URL,
timeout=REQUEST_TIMEOUT_SECONDS,
headers={"User-Agent": USER_AGENT},
)
response.raise_for_status()
return response.text
def is_catalog_app_link(href: str) -> bool:
if not href:
return False
parsed = urlparse(href)
path = parsed.path.rstrip("/")
return path.startswith("/catalog/") and path != "/catalog"
def parse_catalog(html: str) -> Dict[str, AppSnapshot]:
soup = BeautifulSoup(html, "html.parser")
cards_root = soup.find(id="catalog-cards")
candidates = cards_root.find_all("a", href=True) if cards_root else soup.find_all("a", href=True)
snapshots: Dict[str, AppSnapshot] = {}
for anchor in candidates:
raw_href = anchor.get("href", "")
full_url = urljoin(CATALOG_URL, raw_href)
if not is_catalog_app_link(urlparse(full_url).path):
continue
text = normalize_text(anchor.get_text(" ", strip=True))
if not text:
continue
name = text.split(" Train:")[0].strip()
train = ""
added = ""
summary = ""
if " Train:" in text:
remainder = text.split(" Train:", 1)[1].strip()
if " Added:" in remainder:
train_part, after_added = remainder.split(" Added:", 1)
train = train_part.strip()
pieces = after_added.strip().split(" ", 1)
added = pieces[0].strip()
summary = pieces[1].strip() if len(pieces) > 1 else ""
else:
train = remainder
else:
summary = text
app_hash = compute_hash([name, train, added, summary, full_url])
snapshots[full_url] = AppSnapshot(
name=name,
url=full_url,
train=train,
added=added,
summary=summary,
content_hash=app_hash,
)
return snapshots
def load_state(path: str) -> Dict[str, AppSnapshot]:
if not os.path.exists(path):
return {}
with open(path, "r", encoding="utf-8") as handle:
data = json.load(handle)
apps = data.get("apps", {})
loaded: Dict[str, AppSnapshot] = {}
for url, value in apps.items():
loaded[url] = AppSnapshot(
name=value.get("name", ""),
url=value.get("url", url),
train=value.get("train", ""),
added=value.get("added", ""),
summary=value.get("summary", ""),
content_hash=value.get("content_hash", ""),
)
return loaded
def save_state(path: str, apps: Dict[str, AppSnapshot]) -> None:
directory = os.path.dirname(path)
if directory:
os.makedirs(directory, exist_ok=True)
payload = {
"updated_at": datetime.now(timezone.utc).isoformat(),
"apps": {url: asdict(snapshot) for url, snapshot in sorted(apps.items())},
}
with open(path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, ensure_ascii=False)
def format_field_change(label: str, old: str, new: str) -> str:
old_clean = old if old else "(empty)"
new_clean = new if new else "(empty)"
return f"{label}: '{old_clean}''{new_clean}'"
def collect_diffs(
previous: Dict[str, AppSnapshot],
current: Dict[str, AppSnapshot],
) -> Tuple[List[str], List[str], List[str], int]:
prev_urls = set(previous.keys())
curr_urls = set(current.keys())
added_urls = sorted(curr_urls - prev_urls)
removed_urls = sorted(prev_urls - curr_urls)
common_urls = sorted(curr_urls & prev_urls)
changed_lines: List[str] = []
updated_count = 0
for url in common_urls:
old = previous[url]
new = current[url]
if old.content_hash == new.content_hash:
continue
updated_count += 1
details: List[str] = []
if old.name != new.name:
details.append(format_field_change("name", old.name, new.name))
if old.train != new.train:
details.append(format_field_change("train", old.train, new.train))
if old.added != new.added:
details.append(format_field_change("added", old.added, new.added))
if old.summary != new.summary:
details.append(format_field_change("summary", old.summary, new.summary))
if not details:
details.append("• metadata changed")
changed_lines.append(f"🔄 {new.name}")
changed_lines.append(f" 🔗 {new.url}")
for detail in details:
changed_lines.append(f" {detail}")
changed_lines.append("")
return added_urls, removed_urls, changed_lines, updated_count
def build_summary_message(
added_count: int,
removed_urls: List[str],
changed_lines: List[str],
updated_count: int,
previous: Dict[str, AppSnapshot],
) -> Tuple[str, List[str]]:
removed_count = len(removed_urls)
header = (
"📣 TrueNAS Catalog Update\n"
"━━━━━━━━━━━━━━━━━━━━━━\n"
f"🕒 {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
f" Added: {added_count} | Removed: {removed_count} | 🔄 Updated: {updated_count}"
)
lines: List[str] = []
if removed_urls:
lines.append("🗑️ Removed Apps")
lines.append("")
for url in removed_urls:
app = previous[url]
lines.append(f" {app.name}")
lines.append(f" 🔗 {app.url}")
lines.append("")
if changed_lines:
lines.append("✏️ Updated Apps")
lines.append("")
lines.extend(changed_lines)
while lines and lines[-1] == "":
lines.pop()
return header, lines
def truncate_text(value: str, limit: int) -> str:
text = normalize_text(value)
if len(text) <= limit:
return text
return f"{text[: max(0, limit - 1)].rstrip()}"
def extract_app_id_from_url(app_url: str) -> str:
path_parts = [part for part in urlparse(app_url).path.strip("/").split("/") if part]
if not path_parts:
return ""
if "catalog" in path_parts:
catalog_index = path_parts.index("catalog")
if catalog_index + 1 < len(path_parts):
return path_parts[catalog_index + 1]
return path_parts[-1]
def build_storj_screenshot_urls(session: requests.Session, app_id: str) -> List[str]:
if not app_id:
return []
screenshot_urls: List[str] = []
for index in range(1, MAX_SCREENSHOTS_PER_APP + 1):
screenshot_url = f"{MEDIA_BASE_URL.rstrip('/')}/apps/{app_id}/screenshots/screenshot{index}.png"
try:
response = session.get(
screenshot_url,
timeout=REQUEST_TIMEOUT_SECONDS,
headers={"User-Agent": USER_AGENT},
)
except requests.RequestException:
break
if response.status_code != 200:
break
content_type = str(response.headers.get("Content-Type", "")).lower()
if content_type and "image" not in content_type:
break
screenshot_urls.append(screenshot_url)
return screenshot_urls
def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[str, object]:
response = session.get(
app_url,
timeout=REQUEST_TIMEOUT_SECONDS,
headers={"User-Agent": USER_AGENT},
)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
page_title = normalize_text(soup.title.get_text(" ", strip=True)) if soup.title else ""
app_id = extract_app_id_from_url(app_url)
screenshot_urls = build_storj_screenshot_urls(session, app_id)
return {
"page_title": page_title,
"screenshot_urls": screenshot_urls,
}
def build_new_app_message(app: AppSnapshot, page_title: str = "", screenshot_count: int = 0) -> str:
lines: List[str] = [
"🆕 New TrueNAS App",
"━━━━━━━━━━━━━━",
"",
f"🕒 Detected: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}",
"",
" App Details",
f"📦 Name: {app.name}",
f"🔗 URL: {app.url}",
]
if app.train:
lines.append(f"🚂 Train: {app.train}")
if app.added:
lines.append(f"📅 Added: {app.added}")
if app.summary:
lines.append("")
lines.append("📝 Summary")
lines.append(truncate_text(app.summary, 700))
if page_title:
lines.append("")
lines.append(f"📄 Page: {truncate_text(page_title, 180)}")
if screenshot_count > 0:
lines.append("")
lines.append(f"🖼️ Screenshots: {screenshot_count} attached")
while lines and lines[-1] == "":
lines.pop()
message = "\n".join(lines)
if len(message) <= MAX_MESSAGE_LEN:
return message
trimmed_lines = [line if len(line) <= 280 else truncate_text(line, 280) for line in lines]
while len("\n".join(trimmed_lines)) > MAX_MESSAGE_LEN and len(trimmed_lines) > 8:
trimmed_lines.pop()
return "\n".join(trimmed_lines)
def build_random_app_message(app: AppSnapshot, page_title: str = "", screenshot_count: int = 0) -> str:
message = build_new_app_message(app, page_title=page_title, screenshot_count=screenshot_count)
return message.replace("🆕 New TrueNAS App", "🎲 Random TrueNAS App", 1)
def split_message(header: str, lines: List[str], max_len: int = MAX_MESSAGE_LEN) -> List[str]:
if not lines:
return [header]
chunks: List[str] = []
current_chunk = header
for line in lines:
candidate = f"{current_chunk}\n{line}"
if len(candidate) <= max_len:
current_chunk = candidate
continue
chunks.append(current_chunk)
current_chunk = f"{header}\n{line}"
chunks.append(current_chunk)
return chunks
def send_telegram_message(session: requests.Session, text: str) -> None:
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
logging.warning("Telegram token/chat id missing; skipping message")
return
endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"disable_web_page_preview": True,
}
response = session.post(endpoint, json=payload, timeout=REQUEST_TIMEOUT_SECONDS)
response.raise_for_status()
def send_telegram_photo(session: requests.Session, photo_url: str, caption: str = "") -> None:
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
logging.warning("Telegram token/chat id missing; skipping photo")
return
endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"photo": photo_url,
}
if caption:
payload["caption"] = truncate_text(caption, 900)
response = session.post(endpoint, json=payload, timeout=REQUEST_TIMEOUT_SECONDS)
response.raise_for_status()
def get_telegram_updates(session: requests.Session, offset: Optional[int]) -> List[Dict[str, object]]:
if not TELEGRAM_BOT_TOKEN:
return []
endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates"
params: Dict[str, object] = {
"timeout": min(max(1, REQUEST_TIMEOUT_SECONDS), 25),
"allowed_updates": json.dumps(["message"]),
}
if offset is not None:
params["offset"] = offset
response = session.get(endpoint, params=params, timeout=REQUEST_TIMEOUT_SECONDS + 5)
response.raise_for_status()
payload = response.json()
if not payload.get("ok"):
return []
result = payload.get("result", [])
return result if isinstance(result, list) else []
def send_random_app_response(session: requests.Session) -> None:
state = load_state(STATE_PATH)
if not state:
html = fetch_catalog_html(session)
state = parse_catalog(html)
if not state:
send_telegram_message(session, "⚠️ Unable to fetch apps right now. Try again in a moment.")
return
app = random.choice(list(state.values()))
page_title = ""
screenshot_urls: List[str] = []
try:
details = fetch_new_app_page_details(session, app.url)
page_title = str(details.get("page_title", ""))
screenshot_data = details.get("screenshot_urls", [])
if isinstance(screenshot_data, list):
screenshot_urls = [str(item) for item in screenshot_data if str(item).startswith("http")]
except requests.RequestException as exc:
logging.warning("Unable to fetch random app page details for %s: %s", app.url, exc)
send_telegram_message(
session,
build_random_app_message(app, page_title=page_title, screenshot_count=len(screenshot_urls)),
)
for index, screenshot_url in enumerate(screenshot_urls, start=1):
try:
send_telegram_photo(
session,
screenshot_url,
caption=f"🖼️ {app.name} • Screenshot {index}/{len(screenshot_urls)}",
)
except requests.RequestException as exc:
logging.warning("Failed to send random app screenshot for %s: %s", app.name, exc)
def initialize_telegram_offset(session: requests.Session) -> None:
global last_telegram_update_id
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
return
try:
updates = get_telegram_updates(session, offset=None)
except requests.RequestException as exc:
logging.warning("Unable to initialize Telegram updates offset: %s", exc)
return
if not updates:
return
update_ids = [item.get("update_id") for item in updates if isinstance(item.get("update_id"), int)]
if update_ids:
last_telegram_update_id = max(update_ids)
def poll_telegram_commands(session: requests.Session) -> None:
global last_telegram_update_id
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
return
offset = last_telegram_update_id + 1 if last_telegram_update_id is not None else None
updates = get_telegram_updates(session, offset=offset)
for update in updates:
update_id = update.get("update_id")
if isinstance(update_id, int):
last_telegram_update_id = update_id
message = update.get("message")
if not isinstance(message, dict):
continue
chat = message.get("chat")
if not isinstance(chat, dict):
continue
chat_id = str(chat.get("id", ""))
if chat_id != TELEGRAM_CHAT_ID:
continue
text = str(message.get("text", "")).strip()
command = text.split(" ", 1)[0].lower() if text else ""
if command.startswith("/random"):
logging.info("Received /random command from Telegram chat %s", chat_id)
try:
send_random_app_response(session)
except requests.RequestException as exc:
logging.warning("Failed to send /random response: %s", exc)
send_telegram_message(session, "⚠️ Failed to fetch a random app right now. Please try again.")
def send_startup_notification(session: requests.Session) -> None:
message = (
"✅ TrueNAS Catalog Watcher Online\n"
"━━━━━━━━━━━━━━━━━━━━━━━━\n"
"\n"
f"🕒 Started: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
f"🌐 Catalog: {CATALOG_URL}\n"
f"⏱️ Interval: {CHECK_INTERVAL_SECONDS}s"
)
try:
send_telegram_message(session, message)
except requests.RequestException as exc:
logging.error("Failed to send startup Telegram message: %s", exc)
def run_once(session: requests.Session, first_run: bool) -> bool:
previous_state = load_state(STATE_PATH)
html = fetch_catalog_html(session)
current_state = parse_catalog(html)
if not current_state:
raise RuntimeError("Parsed zero catalog entries; aborting to avoid overwriting state")
if first_run and not previous_state:
save_state(STATE_PATH, current_state)
logging.info("Initial snapshot saved with %d apps", len(current_state))
return False
added_urls, removed_urls, changed_lines, updated_count = collect_diffs(previous_state, current_state)
changed = bool(added_urls or removed_urls or changed_lines)
if changed:
logging.info(
"Catalog change detected (added=%d, removed=%d, updated=%d)",
len(added_urls),
len(removed_urls),
updated_count,
)
for url in added_urls:
app = current_state[url]
page_title = ""
screenshot_urls: List[str] = []
try:
details = fetch_new_app_page_details(session, app.url)
page_title = str(details.get("page_title", ""))
screenshot_data = details.get("screenshot_urls", [])
if isinstance(screenshot_data, list):
screenshot_urls = [str(item) for item in screenshot_data if str(item).startswith("http")]
except requests.RequestException as exc:
logging.warning("Unable to fetch app page details for %s: %s", app.url, exc)
send_telegram_message(
session,
build_new_app_message(app, page_title=page_title, screenshot_count=len(screenshot_urls)),
)
for index, screenshot_url in enumerate(screenshot_urls, start=1):
try:
send_telegram_photo(
session,
screenshot_url,
caption=f"🖼️ {app.name} • Screenshot {index}/{len(screenshot_urls)}",
)
except requests.RequestException as exc:
logging.warning("Failed to send screenshot for %s: %s", app.name, exc)
header, summary_lines = build_summary_message(
added_count=len(added_urls),
removed_urls=removed_urls,
changed_lines=changed_lines,
updated_count=updated_count,
previous=previous_state,
)
if summary_lines:
for message in split_message(header, summary_lines):
send_telegram_message(session, message)
else:
logging.info("No catalog changes detected")
save_state(STATE_PATH, current_state)
return changed
def validate_env() -> None:
if CHECK_INTERVAL_SECONDS < 30:
raise ValueError("CHECK_INTERVAL_SECONDS must be >= 30")
if TELEGRAM_POLL_SECONDS < 2:
raise ValueError("TELEGRAM_POLL_SECONDS must be >= 2")
def main() -> int:
configure_logging()
try:
validate_env()
except Exception as exc:
logging.error("Invalid environment: %s", exc)
return 2
logging.info("Starting TrueNAS catalog watcher")
logging.info("Catalog URL: %s", CATALOG_URL)
logging.info("State file: %s", STATE_PATH)
logging.info("Interval: %ss", CHECK_INTERVAL_SECONDS)
session = requests.Session()
send_startup_notification(session)
initialize_telegram_offset(session)
first_loop = True
next_check_at = time.time()
while True:
now = time.time()
if now >= next_check_at:
try:
run_once(session, first_loop)
except requests.RequestException as exc:
logging.error("Network error: %s", exc)
except Exception as exc:
logging.exception("Watcher iteration failed: %s", exc)
first_loop = False
next_check_at = now + CHECK_INTERVAL_SECONDS
try:
poll_telegram_commands(session)
except requests.RequestException as exc:
logging.warning("Telegram polling failed: %s", exc)
except Exception as exc:
logging.exception("Telegram command processing failed: %s", exc)
sleep_for = min(TELEGRAM_POLL_SECONDS, max(1, int(next_check_at - time.time())))
time.sleep(sleep_for)
if __name__ == "__main__":
sys.exit(main())