672 lines
22 KiB
Python
672 lines
22 KiB
Python
import hashlib
|
||
import json
|
||
import logging
|
||
import os
|
||
import random
|
||
import re
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass, asdict
|
||
from datetime import datetime, timezone
|
||
from typing import Dict, List, Optional, Tuple
|
||
from urllib.parse import urljoin, urlparse
|
||
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
|
||
CATALOG_URL = os.getenv("CATALOG_URL", "https://apps.truenas.com/catalog")
|
||
STATE_PATH = os.getenv("STATE_PATH", "/data/catalog_state.json")
|
||
CHECK_INTERVAL_SECONDS = int(os.getenv("CHECK_INTERVAL_SECONDS", "1800"))
|
||
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
|
||
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
|
||
REQUEST_TIMEOUT_SECONDS = int(os.getenv("REQUEST_TIMEOUT_SECONDS", "30"))
|
||
USER_AGENT = os.getenv(
|
||
"USER_AGENT",
|
||
"truenas-catalog-notify/1.0 (+https://apps.truenas.com/catalog)",
|
||
)
|
||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||
MAX_MESSAGE_LEN = 3900
|
||
MAX_SCREENSHOTS_PER_APP = int(os.getenv("MAX_SCREENSHOTS_PER_APP", "3"))
|
||
TELEGRAM_POLL_SECONDS = int(os.getenv("TELEGRAM_POLL_SECONDS", "10"))
|
||
MEDIA_BASE_URL = os.getenv("MEDIA_BASE_URL", "https://media.sys.truenas.net")
|
||
|
||
last_telegram_update_id: Optional[int] = None
|
||
|
||
|
||
@dataclass
|
||
class AppSnapshot:
|
||
name: str
|
||
url: str
|
||
train: str
|
||
added: str
|
||
summary: str
|
||
content_hash: str
|
||
|
||
|
||
def configure_logging() -> None:
|
||
logging.basicConfig(
|
||
level=getattr(logging, LOG_LEVEL, logging.INFO),
|
||
format="%(asctime)s %(levelname)s %(message)s",
|
||
)
|
||
|
||
|
||
def normalize_text(value: str) -> str:
|
||
return " ".join(value.split())
|
||
|
||
|
||
def compute_hash(parts: List[str]) -> str:
|
||
digest = hashlib.sha256("||".join(parts).encode("utf-8")).hexdigest()
|
||
return digest
|
||
|
||
|
||
def fetch_catalog_html(session: requests.Session) -> str:
|
||
response = session.get(
|
||
CATALOG_URL,
|
||
timeout=REQUEST_TIMEOUT_SECONDS,
|
||
headers={"User-Agent": USER_AGENT},
|
||
)
|
||
response.raise_for_status()
|
||
return response.text
|
||
|
||
|
||
def is_catalog_app_link(href: str) -> bool:
|
||
if not href:
|
||
return False
|
||
parsed = urlparse(href)
|
||
path = parsed.path.rstrip("/")
|
||
return path.startswith("/catalog/") and path != "/catalog"
|
||
|
||
|
||
def parse_catalog(html: str) -> Dict[str, AppSnapshot]:
|
||
soup = BeautifulSoup(html, "html.parser")
|
||
cards_root = soup.find(id="catalog-cards")
|
||
candidates = cards_root.find_all("a", href=True) if cards_root else soup.find_all("a", href=True)
|
||
|
||
snapshots: Dict[str, AppSnapshot] = {}
|
||
for anchor in candidates:
|
||
raw_href = anchor.get("href", "")
|
||
full_url = urljoin(CATALOG_URL, raw_href)
|
||
if not is_catalog_app_link(urlparse(full_url).path):
|
||
continue
|
||
|
||
text = normalize_text(anchor.get_text(" ", strip=True))
|
||
if not text:
|
||
continue
|
||
|
||
name = text.split(" Train:")[0].strip()
|
||
train = ""
|
||
added = ""
|
||
summary = ""
|
||
|
||
if " Train:" in text:
|
||
remainder = text.split(" Train:", 1)[1].strip()
|
||
if " Added:" in remainder:
|
||
train_part, after_added = remainder.split(" Added:", 1)
|
||
train = train_part.strip()
|
||
pieces = after_added.strip().split(" ", 1)
|
||
added = pieces[0].strip()
|
||
summary = pieces[1].strip() if len(pieces) > 1 else ""
|
||
else:
|
||
train = remainder
|
||
else:
|
||
summary = text
|
||
|
||
app_hash = compute_hash([name, train, added, summary, full_url])
|
||
snapshots[full_url] = AppSnapshot(
|
||
name=name,
|
||
url=full_url,
|
||
train=train,
|
||
added=added,
|
||
summary=summary,
|
||
content_hash=app_hash,
|
||
)
|
||
|
||
return snapshots
|
||
|
||
|
||
def load_state(path: str) -> Dict[str, AppSnapshot]:
|
||
if not os.path.exists(path):
|
||
return {}
|
||
|
||
with open(path, "r", encoding="utf-8") as handle:
|
||
data = json.load(handle)
|
||
|
||
apps = data.get("apps", {})
|
||
loaded: Dict[str, AppSnapshot] = {}
|
||
for url, value in apps.items():
|
||
loaded[url] = AppSnapshot(
|
||
name=value.get("name", ""),
|
||
url=value.get("url", url),
|
||
train=value.get("train", ""),
|
||
added=value.get("added", ""),
|
||
summary=value.get("summary", ""),
|
||
content_hash=value.get("content_hash", ""),
|
||
)
|
||
return loaded
|
||
|
||
|
||
def save_state(path: str, apps: Dict[str, AppSnapshot]) -> None:
|
||
directory = os.path.dirname(path)
|
||
if directory:
|
||
os.makedirs(directory, exist_ok=True)
|
||
payload = {
|
||
"updated_at": datetime.now(timezone.utc).isoformat(),
|
||
"apps": {url: asdict(snapshot) for url, snapshot in sorted(apps.items())},
|
||
}
|
||
with open(path, "w", encoding="utf-8") as handle:
|
||
json.dump(payload, handle, indent=2, ensure_ascii=False)
|
||
|
||
|
||
def format_field_change(label: str, old: str, new: str) -> str:
|
||
old_clean = old if old else "(empty)"
|
||
new_clean = new if new else "(empty)"
|
||
return f"• {label}: '{old_clean}' → '{new_clean}'"
|
||
|
||
|
||
def collect_diffs(
|
||
previous: Dict[str, AppSnapshot],
|
||
current: Dict[str, AppSnapshot],
|
||
) -> Tuple[List[str], List[str], List[str], int]:
|
||
prev_urls = set(previous.keys())
|
||
curr_urls = set(current.keys())
|
||
|
||
added_urls = sorted(curr_urls - prev_urls)
|
||
removed_urls = sorted(prev_urls - curr_urls)
|
||
common_urls = sorted(curr_urls & prev_urls)
|
||
|
||
changed_lines: List[str] = []
|
||
updated_count = 0
|
||
for url in common_urls:
|
||
old = previous[url]
|
||
new = current[url]
|
||
if old.content_hash == new.content_hash:
|
||
continue
|
||
updated_count += 1
|
||
|
||
details: List[str] = []
|
||
if old.name != new.name:
|
||
details.append(format_field_change("name", old.name, new.name))
|
||
if old.train != new.train:
|
||
details.append(format_field_change("train", old.train, new.train))
|
||
if old.added != new.added:
|
||
details.append(format_field_change("added", old.added, new.added))
|
||
if old.summary != new.summary:
|
||
details.append(format_field_change("summary", old.summary, new.summary))
|
||
|
||
if not details:
|
||
details.append("• metadata changed")
|
||
|
||
changed_lines.append(f"🔄 {new.name}")
|
||
changed_lines.append(f" 🔗 {new.url}")
|
||
for detail in details:
|
||
changed_lines.append(f" {detail}")
|
||
changed_lines.append("")
|
||
|
||
return added_urls, removed_urls, changed_lines, updated_count
|
||
|
||
|
||
def build_summary_message(
|
||
added_count: int,
|
||
removed_urls: List[str],
|
||
changed_lines: List[str],
|
||
updated_count: int,
|
||
previous: Dict[str, AppSnapshot],
|
||
) -> Tuple[str, List[str]]:
|
||
removed_count = len(removed_urls)
|
||
|
||
header = (
|
||
"📣 TrueNAS Catalog Update\n"
|
||
"━━━━━━━━━━━━━━━━━━━━━━\n"
|
||
f"🕒 {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
||
f"➕ Added: {added_count} | ➖ Removed: {removed_count} | 🔄 Updated: {updated_count}"
|
||
)
|
||
|
||
lines: List[str] = []
|
||
if removed_urls:
|
||
lines.append("🗑️ Removed Apps")
|
||
lines.append("")
|
||
for url in removed_urls:
|
||
app = previous[url]
|
||
lines.append(f"➖ {app.name}")
|
||
lines.append(f" 🔗 {app.url}")
|
||
lines.append("")
|
||
|
||
if changed_lines:
|
||
lines.append("✏️ Updated Apps")
|
||
lines.append("")
|
||
lines.extend(changed_lines)
|
||
while lines and lines[-1] == "":
|
||
lines.pop()
|
||
return header, lines
|
||
|
||
|
||
def truncate_text(value: str, limit: int) -> str:
|
||
text = normalize_text(value)
|
||
if len(text) <= limit:
|
||
return text
|
||
return f"{text[: max(0, limit - 1)].rstrip()}…"
|
||
|
||
|
||
def extract_app_id_from_url(app_url: str) -> str:
|
||
path_parts = [part for part in urlparse(app_url).path.strip("/").split("/") if part]
|
||
if not path_parts:
|
||
return ""
|
||
|
||
if "catalog" in path_parts:
|
||
catalog_index = path_parts.index("catalog")
|
||
if catalog_index + 1 < len(path_parts):
|
||
return path_parts[catalog_index + 1]
|
||
|
||
return path_parts[-1]
|
||
|
||
|
||
def build_storj_screenshot_urls(session: requests.Session, app_id: str) -> List[str]:
|
||
if not app_id:
|
||
return []
|
||
|
||
screenshot_urls: List[str] = []
|
||
image_extensions = ["png", "jpg", "jpeg", "webp"]
|
||
for index in range(1, MAX_SCREENSHOTS_PER_APP + 1):
|
||
matched_for_index = False
|
||
for extension in image_extensions:
|
||
screenshot_url = (
|
||
f"{MEDIA_BASE_URL.rstrip('/')}/apps/{app_id}/screenshots/screenshot{index}.{extension}"
|
||
)
|
||
try:
|
||
response = session.get(
|
||
screenshot_url,
|
||
timeout=REQUEST_TIMEOUT_SECONDS,
|
||
headers={"User-Agent": USER_AGENT},
|
||
)
|
||
except requests.RequestException:
|
||
continue
|
||
|
||
if response.status_code != 200:
|
||
continue
|
||
|
||
content_type = str(response.headers.get("Content-Type", "")).lower()
|
||
if content_type and "image" not in content_type:
|
||
continue
|
||
|
||
screenshot_urls.append(screenshot_url)
|
||
matched_for_index = True
|
||
break
|
||
|
||
if not matched_for_index:
|
||
break
|
||
|
||
return screenshot_urls
|
||
|
||
|
||
def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[str, object]:
|
||
response = session.get(
|
||
app_url,
|
||
timeout=REQUEST_TIMEOUT_SECONDS,
|
||
headers={"User-Agent": USER_AGENT},
|
||
)
|
||
response.raise_for_status()
|
||
|
||
soup = BeautifulSoup(response.text, "html.parser")
|
||
|
||
page_title = normalize_text(soup.title.get_text(" ", strip=True)) if soup.title else ""
|
||
app_id = extract_app_id_from_url(app_url)
|
||
screenshot_urls = build_storj_screenshot_urls(session, app_id)
|
||
|
||
return {
|
||
"page_title": page_title,
|
||
"screenshot_urls": screenshot_urls,
|
||
}
|
||
|
||
|
||
def build_new_app_message(app: AppSnapshot, page_title: str = "", screenshot_count: int = 0) -> str:
|
||
lines: List[str] = [
|
||
"🆕 New TrueNAS App",
|
||
"━━━━━━━━━━━━━━",
|
||
"",
|
||
f"🕒 Detected: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}",
|
||
"",
|
||
"ℹ️ App Details",
|
||
f"📦 Name: {app.name}",
|
||
f"🔗 URL: {app.url}",
|
||
]
|
||
|
||
if app.train:
|
||
lines.append(f"🚂 Train: {app.train}")
|
||
if app.added:
|
||
lines.append(f"📅 Added: {app.added}")
|
||
if app.summary:
|
||
lines.append("")
|
||
lines.append("📝 Summary")
|
||
lines.append(truncate_text(app.summary, 700))
|
||
if page_title:
|
||
lines.append("")
|
||
lines.append(f"📄 Page: {truncate_text(page_title, 180)}")
|
||
if screenshot_count > 0:
|
||
lines.append("")
|
||
lines.append(f"🖼️ Screenshots: {screenshot_count} attached")
|
||
|
||
while lines and lines[-1] == "":
|
||
lines.pop()
|
||
|
||
message = "\n".join(lines)
|
||
if len(message) <= MAX_MESSAGE_LEN:
|
||
return message
|
||
|
||
trimmed_lines = [line if len(line) <= 280 else truncate_text(line, 280) for line in lines]
|
||
while len("\n".join(trimmed_lines)) > MAX_MESSAGE_LEN and len(trimmed_lines) > 8:
|
||
trimmed_lines.pop()
|
||
return "\n".join(trimmed_lines)
|
||
|
||
|
||
def build_random_app_message(app: AppSnapshot, page_title: str = "", screenshot_count: int = 0) -> str:
|
||
message = build_new_app_message(app, page_title=page_title, screenshot_count=screenshot_count)
|
||
return message.replace("🆕 New TrueNAS App", "🎲 Random TrueNAS App", 1)
|
||
|
||
|
||
def split_message(header: str, lines: List[str], max_len: int = MAX_MESSAGE_LEN) -> List[str]:
|
||
if not lines:
|
||
return [header]
|
||
|
||
chunks: List[str] = []
|
||
current_chunk = header
|
||
|
||
for line in lines:
|
||
candidate = f"{current_chunk}\n{line}"
|
||
if len(candidate) <= max_len:
|
||
current_chunk = candidate
|
||
continue
|
||
|
||
chunks.append(current_chunk)
|
||
current_chunk = f"{header}\n{line}"
|
||
|
||
chunks.append(current_chunk)
|
||
return chunks
|
||
|
||
|
||
def send_telegram_message(session: requests.Session, text: str) -> None:
|
||
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
||
logging.warning("Telegram token/chat id missing; skipping message")
|
||
return
|
||
|
||
endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
|
||
payload = {
|
||
"chat_id": TELEGRAM_CHAT_ID,
|
||
"text": text,
|
||
"disable_web_page_preview": True,
|
||
}
|
||
|
||
response = session.post(endpoint, json=payload, timeout=REQUEST_TIMEOUT_SECONDS)
|
||
response.raise_for_status()
|
||
|
||
|
||
def send_telegram_photo(session: requests.Session, photo_url: str, caption: str = "") -> None:
|
||
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
||
logging.warning("Telegram token/chat id missing; skipping photo")
|
||
return
|
||
|
||
endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
|
||
payload = {
|
||
"chat_id": TELEGRAM_CHAT_ID,
|
||
"photo": photo_url,
|
||
}
|
||
if caption:
|
||
payload["caption"] = truncate_text(caption, 900)
|
||
|
||
response = session.post(endpoint, json=payload, timeout=REQUEST_TIMEOUT_SECONDS)
|
||
response.raise_for_status()
|
||
|
||
|
||
def get_telegram_updates(session: requests.Session, offset: Optional[int]) -> List[Dict[str, object]]:
|
||
if not TELEGRAM_BOT_TOKEN:
|
||
return []
|
||
|
||
endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates"
|
||
params: Dict[str, object] = {
|
||
"timeout": min(max(1, REQUEST_TIMEOUT_SECONDS), 25),
|
||
"allowed_updates": json.dumps(["message"]),
|
||
}
|
||
if offset is not None:
|
||
params["offset"] = offset
|
||
|
||
response = session.get(endpoint, params=params, timeout=REQUEST_TIMEOUT_SECONDS + 5)
|
||
response.raise_for_status()
|
||
payload = response.json()
|
||
if not payload.get("ok"):
|
||
return []
|
||
|
||
result = payload.get("result", [])
|
||
return result if isinstance(result, list) else []
|
||
|
||
|
||
def send_random_app_response(session: requests.Session) -> None:
|
||
state = load_state(STATE_PATH)
|
||
if not state:
|
||
html = fetch_catalog_html(session)
|
||
state = parse_catalog(html)
|
||
|
||
if not state:
|
||
send_telegram_message(session, "⚠️ Unable to fetch apps right now. Try again in a moment.")
|
||
return
|
||
|
||
app = random.choice(list(state.values()))
|
||
|
||
page_title = ""
|
||
screenshot_urls: List[str] = []
|
||
try:
|
||
details = fetch_new_app_page_details(session, app.url)
|
||
page_title = str(details.get("page_title", ""))
|
||
screenshot_data = details.get("screenshot_urls", [])
|
||
if isinstance(screenshot_data, list):
|
||
screenshot_urls = [str(item) for item in screenshot_data if str(item).startswith("http")]
|
||
except requests.RequestException as exc:
|
||
logging.warning("Unable to fetch random app page details for %s: %s", app.url, exc)
|
||
|
||
send_telegram_message(
|
||
session,
|
||
build_random_app_message(app, page_title=page_title, screenshot_count=len(screenshot_urls)),
|
||
)
|
||
|
||
for index, screenshot_url in enumerate(screenshot_urls, start=1):
|
||
try:
|
||
send_telegram_photo(
|
||
session,
|
||
screenshot_url,
|
||
caption=f"🖼️ {app.name} • Screenshot {index}/{len(screenshot_urls)}",
|
||
)
|
||
except requests.RequestException as exc:
|
||
logging.warning("Failed to send random app screenshot for %s: %s", app.name, exc)
|
||
|
||
|
||
def initialize_telegram_offset(session: requests.Session) -> None:
|
||
global last_telegram_update_id
|
||
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
||
return
|
||
|
||
try:
|
||
updates = get_telegram_updates(session, offset=None)
|
||
except requests.RequestException as exc:
|
||
logging.warning("Unable to initialize Telegram updates offset: %s", exc)
|
||
return
|
||
|
||
if not updates:
|
||
return
|
||
|
||
update_ids = [item.get("update_id") for item in updates if isinstance(item.get("update_id"), int)]
|
||
if update_ids:
|
||
last_telegram_update_id = max(update_ids)
|
||
|
||
|
||
def poll_telegram_commands(session: requests.Session) -> None:
|
||
global last_telegram_update_id
|
||
|
||
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
||
return
|
||
|
||
offset = last_telegram_update_id + 1 if last_telegram_update_id is not None else None
|
||
updates = get_telegram_updates(session, offset=offset)
|
||
|
||
for update in updates:
|
||
update_id = update.get("update_id")
|
||
if isinstance(update_id, int):
|
||
last_telegram_update_id = update_id
|
||
|
||
message = update.get("message")
|
||
if not isinstance(message, dict):
|
||
continue
|
||
|
||
chat = message.get("chat")
|
||
if not isinstance(chat, dict):
|
||
continue
|
||
chat_id = str(chat.get("id", ""))
|
||
if chat_id != TELEGRAM_CHAT_ID:
|
||
continue
|
||
|
||
text = str(message.get("text", "")).strip()
|
||
command = text.split(" ", 1)[0].lower() if text else ""
|
||
if command.startswith("/random"):
|
||
logging.info("Received /random command from Telegram chat %s", chat_id)
|
||
try:
|
||
send_random_app_response(session)
|
||
except requests.RequestException as exc:
|
||
logging.warning("Failed to send /random response: %s", exc)
|
||
send_telegram_message(session, "⚠️ Failed to fetch a random app right now. Please try again.")
|
||
|
||
|
||
def send_startup_notification(session: requests.Session) -> None:
|
||
message = (
|
||
"✅ TrueNAS Catalog Watcher Online\n"
|
||
"━━━━━━━━━━━━━━━━━━━━━━━━\n"
|
||
"\n"
|
||
f"🕒 Started: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
||
f"🌐 Catalog: {CATALOG_URL}\n"
|
||
f"⏱️ Interval: {CHECK_INTERVAL_SECONDS}s"
|
||
)
|
||
try:
|
||
send_telegram_message(session, message)
|
||
except requests.RequestException as exc:
|
||
logging.error("Failed to send startup Telegram message: %s", exc)
|
||
|
||
|
||
def run_once(session: requests.Session, first_run: bool) -> bool:
|
||
previous_state = load_state(STATE_PATH)
|
||
html = fetch_catalog_html(session)
|
||
current_state = parse_catalog(html)
|
||
|
||
if not current_state:
|
||
raise RuntimeError("Parsed zero catalog entries; aborting to avoid overwriting state")
|
||
|
||
if first_run and not previous_state:
|
||
save_state(STATE_PATH, current_state)
|
||
logging.info("Initial snapshot saved with %d apps", len(current_state))
|
||
return False
|
||
|
||
added_urls, removed_urls, changed_lines, updated_count = collect_diffs(previous_state, current_state)
|
||
changed = bool(added_urls or removed_urls or changed_lines)
|
||
|
||
if changed:
|
||
logging.info(
|
||
"Catalog change detected (added=%d, removed=%d, updated=%d)",
|
||
len(added_urls),
|
||
len(removed_urls),
|
||
updated_count,
|
||
)
|
||
|
||
for url in added_urls:
|
||
app = current_state[url]
|
||
page_title = ""
|
||
screenshot_urls: List[str] = []
|
||
try:
|
||
details = fetch_new_app_page_details(session, app.url)
|
||
page_title = str(details.get("page_title", ""))
|
||
screenshot_data = details.get("screenshot_urls", [])
|
||
if isinstance(screenshot_data, list):
|
||
screenshot_urls = [str(item) for item in screenshot_data if str(item).startswith("http")]
|
||
except requests.RequestException as exc:
|
||
logging.warning("Unable to fetch app page details for %s: %s", app.url, exc)
|
||
|
||
send_telegram_message(
|
||
session,
|
||
build_new_app_message(app, page_title=page_title, screenshot_count=len(screenshot_urls)),
|
||
)
|
||
|
||
for index, screenshot_url in enumerate(screenshot_urls, start=1):
|
||
try:
|
||
send_telegram_photo(
|
||
session,
|
||
screenshot_url,
|
||
caption=f"🖼️ {app.name} • Screenshot {index}/{len(screenshot_urls)}",
|
||
)
|
||
except requests.RequestException as exc:
|
||
logging.warning("Failed to send screenshot for %s: %s", app.name, exc)
|
||
|
||
header, summary_lines = build_summary_message(
|
||
added_count=len(added_urls),
|
||
removed_urls=removed_urls,
|
||
changed_lines=changed_lines,
|
||
updated_count=updated_count,
|
||
previous=previous_state,
|
||
)
|
||
if summary_lines:
|
||
for message in split_message(header, summary_lines):
|
||
send_telegram_message(session, message)
|
||
else:
|
||
logging.info("No catalog changes detected")
|
||
|
||
save_state(STATE_PATH, current_state)
|
||
return changed
|
||
|
||
|
||
def validate_env() -> None:
|
||
if CHECK_INTERVAL_SECONDS < 30:
|
||
raise ValueError("CHECK_INTERVAL_SECONDS must be >= 30")
|
||
if TELEGRAM_POLL_SECONDS < 2:
|
||
raise ValueError("TELEGRAM_POLL_SECONDS must be >= 2")
|
||
|
||
|
||
def main() -> int:
|
||
configure_logging()
|
||
|
||
try:
|
||
validate_env()
|
||
except Exception as exc:
|
||
logging.error("Invalid environment: %s", exc)
|
||
return 2
|
||
|
||
logging.info("Starting TrueNAS catalog watcher")
|
||
logging.info("Catalog URL: %s", CATALOG_URL)
|
||
logging.info("State file: %s", STATE_PATH)
|
||
logging.info("Interval: %ss", CHECK_INTERVAL_SECONDS)
|
||
|
||
session = requests.Session()
|
||
send_startup_notification(session)
|
||
initialize_telegram_offset(session)
|
||
first_loop = True
|
||
next_check_at = time.time()
|
||
|
||
while True:
|
||
now = time.time()
|
||
if now >= next_check_at:
|
||
try:
|
||
run_once(session, first_loop)
|
||
except requests.RequestException as exc:
|
||
logging.error("Network error: %s", exc)
|
||
except Exception as exc:
|
||
logging.exception("Watcher iteration failed: %s", exc)
|
||
|
||
first_loop = False
|
||
next_check_at = now + CHECK_INTERVAL_SECONDS
|
||
|
||
try:
|
||
poll_telegram_commands(session)
|
||
except requests.RequestException as exc:
|
||
logging.warning("Telegram polling failed: %s", exc)
|
||
except Exception as exc:
|
||
logging.exception("Telegram command processing failed: %s", exc)
|
||
|
||
sleep_for = min(TELEGRAM_POLL_SECONDS, max(1, int(next_check_at - time.time())))
|
||
time.sleep(sleep_for)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|