Compare commits
7 Commits
e505fbe25a
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a6857e33a | ||
|
|
7f41783a9b | ||
|
|
a3290d1d15 | ||
|
|
7471c3d36d | ||
|
|
512a54487d | ||
|
|
abf7d58157 | ||
|
|
46476aba51 |
@@ -7,3 +7,5 @@ __pycache__/
|
|||||||
.git/
|
.git/
|
||||||
.gitignore
|
.gitignore
|
||||||
data/
|
data/
|
||||||
|
.env
|
||||||
|
.gitea/
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
.venv/
|
.venv/
|
||||||
__pycache__/
|
__pycache__/
|
||||||
.git/
|
.git/
|
||||||
|
.env
|
||||||
10
README.md
10
README.md
@@ -14,10 +14,15 @@ The watcher pulls `https://apps.truenas.com/catalog` over plain HTTP (no browser
|
|||||||
|
|
||||||
On changes, it sends Telegram messages with:
|
On changes, it sends Telegram messages with:
|
||||||
|
|
||||||
- one detailed message per newly added app (name, URL, train, added date, catalog summary, plus extra details parsed from the app page when available)
|
- one detailed message per newly added app (name, URL, train, added date, catalog summary, and page title)
|
||||||
|
- screenshot images from the app page, posted as Telegram photos (up to the configured per-app limit)
|
||||||
- removed apps (`-`)
|
- removed apps (`-`)
|
||||||
- updated apps (`~`) and field-level diffs
|
- updated apps (`~`) and field-level diffs
|
||||||
|
|
||||||
|
It also listens for Telegram commands from the configured chat:
|
||||||
|
|
||||||
|
- `/random` returns one random app using the same detailed format, including screenshots when available
|
||||||
|
|
||||||
## Environment variables
|
## Environment variables
|
||||||
|
|
||||||
- `TELEGRAM_BOT_TOKEN` (required for notifications)
|
- `TELEGRAM_BOT_TOKEN` (required for notifications)
|
||||||
@@ -27,6 +32,9 @@ On changes, it sends Telegram messages with:
|
|||||||
- `CATALOG_URL` (default: `https://apps.truenas.com/catalog`)
|
- `CATALOG_URL` (default: `https://apps.truenas.com/catalog`)
|
||||||
- `REQUEST_TIMEOUT_SECONDS` (default: `30`)
|
- `REQUEST_TIMEOUT_SECONDS` (default: `30`)
|
||||||
- `LOG_LEVEL` (default: `INFO`)
|
- `LOG_LEVEL` (default: `INFO`)
|
||||||
|
- `MAX_SCREENSHOTS_PER_APP` (default: `3`)
|
||||||
|
- `TELEGRAM_POLL_SECONDS` (default: `10`)
|
||||||
|
- `MEDIA_BASE_URL` (default: `https://media.sys.truenas.net`)
|
||||||
|
|
||||||
## Build
|
## Build
|
||||||
|
|
||||||
|
|||||||
402
watcher.py
402
watcher.py
@@ -2,12 +2,13 @@ import hashlib
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import random
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
from dataclasses import dataclass, asdict
|
from dataclasses import dataclass, asdict
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Dict, List, Tuple
|
from typing import Dict, List, Optional, Tuple
|
||||||
from urllib.parse import urljoin, urlparse
|
from urllib.parse import urljoin, urlparse
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
@@ -25,6 +26,11 @@ USER_AGENT = os.getenv(
|
|||||||
)
|
)
|
||||||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||||
MAX_MESSAGE_LEN = 3900
|
MAX_MESSAGE_LEN = 3900
|
||||||
|
MAX_SCREENSHOTS_PER_APP = int(os.getenv("MAX_SCREENSHOTS_PER_APP", "10"))
|
||||||
|
TELEGRAM_POLL_SECONDS = int(os.getenv("TELEGRAM_POLL_SECONDS", "10"))
|
||||||
|
MEDIA_BASE_URL = os.getenv("MEDIA_BASE_URL", "https://media.sys.truenas.net")
|
||||||
|
|
||||||
|
last_telegram_update_id: Optional[int] = None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -154,7 +160,7 @@ def save_state(path: str, apps: Dict[str, AppSnapshot]) -> None:
|
|||||||
def format_field_change(label: str, old: str, new: str) -> str:
|
def format_field_change(label: str, old: str, new: str) -> str:
|
||||||
old_clean = old if old else "(empty)"
|
old_clean = old if old else "(empty)"
|
||||||
new_clean = new if new else "(empty)"
|
new_clean = new if new else "(empty)"
|
||||||
return f"{label}: '{old_clean}' -> '{new_clean}'"
|
return f"• {label}: '{old_clean}' → '{new_clean}'"
|
||||||
|
|
||||||
|
|
||||||
def collect_diffs(
|
def collect_diffs(
|
||||||
@@ -188,11 +194,13 @@ def collect_diffs(
|
|||||||
details.append(format_field_change("summary", old.summary, new.summary))
|
details.append(format_field_change("summary", old.summary, new.summary))
|
||||||
|
|
||||||
if not details:
|
if not details:
|
||||||
details.append("metadata changed")
|
details.append("• metadata changed")
|
||||||
|
|
||||||
changed_lines.append(f"~ {new.name} ({new.url})")
|
changed_lines.append(f"🔄 {new.name}")
|
||||||
|
changed_lines.append(f" 🔗 {new.url}")
|
||||||
for detail in details:
|
for detail in details:
|
||||||
changed_lines.append(f" - {detail}")
|
changed_lines.append(f" {detail}")
|
||||||
|
changed_lines.append("")
|
||||||
|
|
||||||
return added_urls, removed_urls, changed_lines, updated_count
|
return added_urls, removed_urls, changed_lines, updated_count
|
||||||
|
|
||||||
@@ -207,16 +215,28 @@ def build_summary_message(
|
|||||||
removed_count = len(removed_urls)
|
removed_count = len(removed_urls)
|
||||||
|
|
||||||
header = (
|
header = (
|
||||||
f"TrueNAS catalog changed at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
"📣 TrueNAS Catalog Update\n"
|
||||||
f"Added: {added_count} | Removed: {removed_count} | Updated: {updated_count}"
|
"━━━━━━━━━━━━━━━━━━━━━━\n"
|
||||||
|
f"🕒 {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
||||||
|
f"➕ Added: {added_count} | ➖ Removed: {removed_count} | 🔄 Updated: {updated_count}"
|
||||||
)
|
)
|
||||||
|
|
||||||
lines: List[str] = []
|
lines: List[str] = []
|
||||||
|
if removed_urls:
|
||||||
|
lines.append("🗑️ Removed Apps")
|
||||||
|
lines.append("")
|
||||||
for url in removed_urls:
|
for url in removed_urls:
|
||||||
app = previous[url]
|
app = previous[url]
|
||||||
lines.append(f"- {app.name} ({app.url})")
|
lines.append(f"➖ {app.name}")
|
||||||
|
lines.append(f" 🔗 {app.url}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
if changed_lines:
|
||||||
|
lines.append("✏️ Updated Apps")
|
||||||
|
lines.append("")
|
||||||
lines.extend(changed_lines)
|
lines.extend(changed_lines)
|
||||||
|
while lines and lines[-1] == "":
|
||||||
|
lines.pop()
|
||||||
return header, lines
|
return header, lines
|
||||||
|
|
||||||
|
|
||||||
@@ -227,6 +247,57 @@ def truncate_text(value: str, limit: int) -> str:
|
|||||||
return f"{text[: max(0, limit - 1)].rstrip()}…"
|
return f"{text[: max(0, limit - 1)].rstrip()}…"
|
||||||
|
|
||||||
|
|
||||||
|
def extract_app_id_from_url(app_url: str) -> str:
|
||||||
|
path_parts = [part for part in urlparse(app_url).path.strip("/").split("/") if part]
|
||||||
|
if not path_parts:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
if "catalog" in path_parts:
|
||||||
|
catalog_index = path_parts.index("catalog")
|
||||||
|
if catalog_index + 1 < len(path_parts):
|
||||||
|
return path_parts[catalog_index + 1]
|
||||||
|
|
||||||
|
return path_parts[-1]
|
||||||
|
|
||||||
|
|
||||||
|
def build_storj_screenshot_urls(session: requests.Session, app_id: str) -> List[str]:
|
||||||
|
if not app_id:
|
||||||
|
return []
|
||||||
|
|
||||||
|
screenshot_urls: List[str] = []
|
||||||
|
image_extensions = ["png", "jpg", "jpeg", "webp", "gif"]
|
||||||
|
for index in range(1, MAX_SCREENSHOTS_PER_APP + 1):
|
||||||
|
matched_for_index = False
|
||||||
|
for extension in image_extensions:
|
||||||
|
screenshot_url = (
|
||||||
|
f"{MEDIA_BASE_URL.rstrip('/')}/apps/{app_id}/screenshots/screenshot{index}.{extension}"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
response = session.get(
|
||||||
|
screenshot_url,
|
||||||
|
timeout=REQUEST_TIMEOUT_SECONDS,
|
||||||
|
headers={"User-Agent": USER_AGENT},
|
||||||
|
)
|
||||||
|
except requests.RequestException:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
continue
|
||||||
|
|
||||||
|
content_type = str(response.headers.get("Content-Type", "")).lower()
|
||||||
|
if content_type and "image" not in content_type:
|
||||||
|
continue
|
||||||
|
|
||||||
|
screenshot_urls.append(screenshot_url)
|
||||||
|
matched_for_index = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if not matched_for_index:
|
||||||
|
break
|
||||||
|
|
||||||
|
return screenshot_urls
|
||||||
|
|
||||||
|
|
||||||
def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[str, object]:
|
def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[str, object]:
|
||||||
response = session.get(
|
response = session.get(
|
||||||
app_url,
|
app_url,
|
||||||
@@ -238,119 +309,44 @@ def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[
|
|||||||
soup = BeautifulSoup(response.text, "html.parser")
|
soup = BeautifulSoup(response.text, "html.parser")
|
||||||
|
|
||||||
page_title = normalize_text(soup.title.get_text(" ", strip=True)) if soup.title else ""
|
page_title = normalize_text(soup.title.get_text(" ", strip=True)) if soup.title else ""
|
||||||
description = ""
|
app_id = extract_app_id_from_url(app_url)
|
||||||
for attrs in (
|
screenshot_urls = build_storj_screenshot_urls(session, app_id)
|
||||||
{"property": "og:description"},
|
|
||||||
{"name": "description"},
|
|
||||||
{"name": "twitter:description"},
|
|
||||||
):
|
|
||||||
tag = soup.find("meta", attrs=attrs)
|
|
||||||
if tag and tag.get("content"):
|
|
||||||
description = normalize_text(str(tag["content"]))
|
|
||||||
if description:
|
|
||||||
break
|
|
||||||
|
|
||||||
headings: List[str] = []
|
|
||||||
for tag in soup.find_all(["h1", "h2", "h3"]):
|
|
||||||
heading = normalize_text(tag.get_text(" ", strip=True))
|
|
||||||
if not heading:
|
|
||||||
continue
|
|
||||||
if heading not in headings:
|
|
||||||
headings.append(heading)
|
|
||||||
if len(headings) >= 6:
|
|
||||||
break
|
|
||||||
|
|
||||||
external_links: List[str] = []
|
|
||||||
seen_links = set()
|
|
||||||
for anchor in soup.find_all("a", href=True):
|
|
||||||
href = str(anchor.get("href", "")).strip()
|
|
||||||
if not href or href.startswith("#"):
|
|
||||||
continue
|
|
||||||
full_href = urljoin(app_url, href)
|
|
||||||
if not full_href.startswith("http"):
|
|
||||||
continue
|
|
||||||
if full_href.startswith(CATALOG_URL):
|
|
||||||
continue
|
|
||||||
if full_href in seen_links:
|
|
||||||
continue
|
|
||||||
seen_links.add(full_href)
|
|
||||||
|
|
||||||
label = normalize_text(anchor.get_text(" ", strip=True))
|
|
||||||
if not label:
|
|
||||||
label = full_href
|
|
||||||
label = truncate_text(label, 60)
|
|
||||||
external_links.append(f"{label} -> {full_href}")
|
|
||||||
if len(external_links) >= 5:
|
|
||||||
break
|
|
||||||
|
|
||||||
detected_fields: List[str] = []
|
|
||||||
body_text = normalize_text(soup.get_text(" ", strip=True))
|
|
||||||
label_patterns = {
|
|
||||||
"Version": r"(?:App\s+Version|Version)\s*[:\-]\s*([^\n\r|]{1,80})",
|
|
||||||
"Chart": r"(?:Chart\s+Version|Helm\s+Chart)\s*[:\-]\s*([^\n\r|]{1,80})",
|
|
||||||
"Category": r"Category\s*[:\-]\s*([^\n\r|]{1,80})",
|
|
||||||
"Maintainer": r"Maintainer(?:s)?\s*[:\-]\s*([^\n\r|]{1,120})",
|
|
||||||
"Homepage": r"Homepage\s*[:\-]\s*([^\n\r|]{1,160})",
|
|
||||||
"Source": r"Source\s*[:\-]\s*([^\n\r|]{1,160})",
|
|
||||||
}
|
|
||||||
for label, pattern in label_patterns.items():
|
|
||||||
match = re.search(pattern, body_text, flags=re.IGNORECASE)
|
|
||||||
if match:
|
|
||||||
value = truncate_text(match.group(1), 120)
|
|
||||||
detected_fields.append(f"{label}: {value}")
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"page_title": page_title,
|
"page_title": page_title,
|
||||||
"description": description,
|
"screenshot_urls": screenshot_urls,
|
||||||
"headings": headings,
|
|
||||||
"external_links": external_links,
|
|
||||||
"detected_fields": detected_fields,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def build_new_app_message(session: requests.Session, app: AppSnapshot) -> str:
|
def build_new_app_message(app: AppSnapshot, page_title: str = "", screenshot_count: int = 0) -> str:
|
||||||
lines: List[str] = [
|
lines: List[str] = [
|
||||||
"🆕 New TrueNAS app detected",
|
"🆕 New TrueNAS App",
|
||||||
f"Detected: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}",
|
"━━━━━━━━━━━━━━",
|
||||||
f"Name: {app.name}",
|
"",
|
||||||
f"URL: {app.url}",
|
f"🕒 Detected: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}",
|
||||||
|
"",
|
||||||
|
"ℹ️ App Details",
|
||||||
|
f"📦 Name: {app.name}",
|
||||||
|
f"🔗 URL: {app.url}",
|
||||||
]
|
]
|
||||||
|
|
||||||
if app.train:
|
if app.train:
|
||||||
lines.append(f"Train: {app.train}")
|
lines.append(f"🚂 Train: {app.train}")
|
||||||
if app.added:
|
if app.added:
|
||||||
lines.append(f"Added date: {app.added}")
|
lines.append(f"📅 Added: {app.added}")
|
||||||
if app.summary:
|
if app.summary:
|
||||||
lines.append(f"Catalog summary: {truncate_text(app.summary, 700)}")
|
lines.append("")
|
||||||
|
lines.append("📝 Summary")
|
||||||
try:
|
lines.append(truncate_text(app.summary, 700))
|
||||||
details = fetch_new_app_page_details(session, app.url)
|
|
||||||
except requests.RequestException as exc:
|
|
||||||
logging.warning("Unable to fetch app details for %s: %s", app.url, exc)
|
|
||||||
details = {}
|
|
||||||
|
|
||||||
page_title = str(details.get("page_title", "")) if details else ""
|
|
||||||
if page_title:
|
if page_title:
|
||||||
lines.append(f"Page title: {truncate_text(page_title, 180)}")
|
lines.append("")
|
||||||
|
lines.append(f"📄 Page: {truncate_text(page_title, 180)}")
|
||||||
|
if screenshot_count > 0:
|
||||||
|
lines.append("")
|
||||||
|
lines.append(f"🖼️ Screenshots: {screenshot_count} attached")
|
||||||
|
|
||||||
description = str(details.get("description", "")) if details else ""
|
while lines and lines[-1] == "":
|
||||||
if description:
|
lines.pop()
|
||||||
lines.append(f"Description: {truncate_text(description, 1000)}")
|
|
||||||
|
|
||||||
detected_fields = details.get("detected_fields", []) if details else []
|
|
||||||
if isinstance(detected_fields, list):
|
|
||||||
for field in detected_fields[:6]:
|
|
||||||
lines.append(str(field))
|
|
||||||
|
|
||||||
headings = details.get("headings", []) if details else []
|
|
||||||
if isinstance(headings, list) and headings:
|
|
||||||
lines.append(f"Headings: {truncate_text(' | '.join(headings[:6]), 320)}")
|
|
||||||
|
|
||||||
external_links = details.get("external_links", []) if details else []
|
|
||||||
if isinstance(external_links, list) and external_links:
|
|
||||||
lines.append("External links:")
|
|
||||||
for link in external_links[:5]:
|
|
||||||
lines.append(f"- {truncate_text(str(link), 220)}")
|
|
||||||
|
|
||||||
message = "\n".join(lines)
|
message = "\n".join(lines)
|
||||||
if len(message) <= MAX_MESSAGE_LEN:
|
if len(message) <= MAX_MESSAGE_LEN:
|
||||||
@@ -362,6 +358,11 @@ def build_new_app_message(session: requests.Session, app: AppSnapshot) -> str:
|
|||||||
return "\n".join(trimmed_lines)
|
return "\n".join(trimmed_lines)
|
||||||
|
|
||||||
|
|
||||||
|
def build_random_app_message(app: AppSnapshot, page_title: str = "", screenshot_count: int = 0) -> str:
|
||||||
|
message = build_new_app_message(app, page_title=page_title, screenshot_count=screenshot_count)
|
||||||
|
return message.replace("🆕 New TrueNAS App", "🎲 Random TrueNAS App", 1)
|
||||||
|
|
||||||
|
|
||||||
def split_message(header: str, lines: List[str], max_len: int = MAX_MESSAGE_LEN) -> List[str]:
|
def split_message(header: str, lines: List[str], max_len: int = MAX_MESSAGE_LEN) -> List[str]:
|
||||||
if not lines:
|
if not lines:
|
||||||
return [header]
|
return [header]
|
||||||
@@ -398,12 +399,147 @@ def send_telegram_message(session: requests.Session, text: str) -> None:
|
|||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
|
|
||||||
|
def send_telegram_photo(session: requests.Session, photo_url: str, caption: str = "") -> None:
|
||||||
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
||||||
|
logging.warning("Telegram token/chat id missing; skipping photo")
|
||||||
|
return
|
||||||
|
|
||||||
|
endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
|
||||||
|
payload = {
|
||||||
|
"chat_id": TELEGRAM_CHAT_ID,
|
||||||
|
"photo": photo_url,
|
||||||
|
}
|
||||||
|
if caption:
|
||||||
|
payload["caption"] = truncate_text(caption, 900)
|
||||||
|
|
||||||
|
response = session.post(endpoint, json=payload, timeout=REQUEST_TIMEOUT_SECONDS)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
|
||||||
|
def get_telegram_updates(session: requests.Session, offset: Optional[int]) -> List[Dict[str, object]]:
|
||||||
|
if not TELEGRAM_BOT_TOKEN:
|
||||||
|
return []
|
||||||
|
|
||||||
|
endpoint = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getUpdates"
|
||||||
|
params: Dict[str, object] = {
|
||||||
|
"timeout": min(max(1, REQUEST_TIMEOUT_SECONDS), 25),
|
||||||
|
"allowed_updates": json.dumps(["message"]),
|
||||||
|
}
|
||||||
|
if offset is not None:
|
||||||
|
params["offset"] = offset
|
||||||
|
|
||||||
|
response = session.get(endpoint, params=params, timeout=REQUEST_TIMEOUT_SECONDS + 5)
|
||||||
|
response.raise_for_status()
|
||||||
|
payload = response.json()
|
||||||
|
if not payload.get("ok"):
|
||||||
|
return []
|
||||||
|
|
||||||
|
result = payload.get("result", [])
|
||||||
|
return result if isinstance(result, list) else []
|
||||||
|
|
||||||
|
|
||||||
|
def send_random_app_response(session: requests.Session) -> None:
|
||||||
|
state = load_state(STATE_PATH)
|
||||||
|
if not state:
|
||||||
|
html = fetch_catalog_html(session)
|
||||||
|
state = parse_catalog(html)
|
||||||
|
|
||||||
|
if not state:
|
||||||
|
send_telegram_message(session, "⚠️ Unable to fetch apps right now. Try again in a moment.")
|
||||||
|
return
|
||||||
|
|
||||||
|
app = random.choice(list(state.values()))
|
||||||
|
|
||||||
|
page_title = ""
|
||||||
|
screenshot_urls: List[str] = []
|
||||||
|
try:
|
||||||
|
details = fetch_new_app_page_details(session, app.url)
|
||||||
|
page_title = str(details.get("page_title", ""))
|
||||||
|
screenshot_data = details.get("screenshot_urls", [])
|
||||||
|
if isinstance(screenshot_data, list):
|
||||||
|
screenshot_urls = [str(item) for item in screenshot_data if str(item).startswith("http")]
|
||||||
|
except requests.RequestException as exc:
|
||||||
|
logging.warning("Unable to fetch random app page details for %s: %s", app.url, exc)
|
||||||
|
|
||||||
|
send_telegram_message(
|
||||||
|
session,
|
||||||
|
build_random_app_message(app, page_title=page_title, screenshot_count=len(screenshot_urls)),
|
||||||
|
)
|
||||||
|
|
||||||
|
for index, screenshot_url in enumerate(screenshot_urls, start=1):
|
||||||
|
try:
|
||||||
|
send_telegram_photo(
|
||||||
|
session,
|
||||||
|
screenshot_url,
|
||||||
|
caption=f"🖼️ {app.name} • Screenshot {index}/{len(screenshot_urls)}",
|
||||||
|
)
|
||||||
|
except requests.RequestException as exc:
|
||||||
|
logging.warning("Failed to send random app screenshot for %s: %s", app.name, exc)
|
||||||
|
|
||||||
|
|
||||||
|
def initialize_telegram_offset(session: requests.Session) -> None:
|
||||||
|
global last_telegram_update_id
|
||||||
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
updates = get_telegram_updates(session, offset=None)
|
||||||
|
except requests.RequestException as exc:
|
||||||
|
logging.warning("Unable to initialize Telegram updates offset: %s", exc)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not updates:
|
||||||
|
return
|
||||||
|
|
||||||
|
update_ids = [item.get("update_id") for item in updates if isinstance(item.get("update_id"), int)]
|
||||||
|
if update_ids:
|
||||||
|
last_telegram_update_id = max(update_ids)
|
||||||
|
|
||||||
|
|
||||||
|
def poll_telegram_commands(session: requests.Session) -> None:
|
||||||
|
global last_telegram_update_id
|
||||||
|
|
||||||
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
||||||
|
return
|
||||||
|
|
||||||
|
offset = last_telegram_update_id + 1 if last_telegram_update_id is not None else None
|
||||||
|
updates = get_telegram_updates(session, offset=offset)
|
||||||
|
|
||||||
|
for update in updates:
|
||||||
|
update_id = update.get("update_id")
|
||||||
|
if isinstance(update_id, int):
|
||||||
|
last_telegram_update_id = update_id
|
||||||
|
|
||||||
|
message = update.get("message")
|
||||||
|
if not isinstance(message, dict):
|
||||||
|
continue
|
||||||
|
|
||||||
|
chat = message.get("chat")
|
||||||
|
if not isinstance(chat, dict):
|
||||||
|
continue
|
||||||
|
chat_id = str(chat.get("id", ""))
|
||||||
|
if chat_id != TELEGRAM_CHAT_ID:
|
||||||
|
continue
|
||||||
|
|
||||||
|
text = str(message.get("text", "")).strip()
|
||||||
|
command = text.split(" ", 1)[0].lower() if text else ""
|
||||||
|
if command.startswith("/random"):
|
||||||
|
logging.info("Received /random command from Telegram chat %s", chat_id)
|
||||||
|
try:
|
||||||
|
send_random_app_response(session)
|
||||||
|
except requests.RequestException as exc:
|
||||||
|
logging.warning("Failed to send /random response: %s", exc)
|
||||||
|
send_telegram_message(session, "⚠️ Failed to fetch a random app right now. Please try again.")
|
||||||
|
|
||||||
|
|
||||||
def send_startup_notification(session: requests.Session) -> None:
|
def send_startup_notification(session: requests.Session) -> None:
|
||||||
message = (
|
message = (
|
||||||
"TrueNAS catalog watcher is running ✅\n"
|
"✅ TrueNAS Catalog Watcher Online\n"
|
||||||
f"Started: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
"━━━━━━━━━━━━━━━━━━━━━━━━\n"
|
||||||
f"Catalog: {CATALOG_URL}\n"
|
"\n"
|
||||||
f"Interval: {CHECK_INTERVAL_SECONDS}s"
|
f"🕒 Started: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n"
|
||||||
|
f"🌐 Catalog: {CATALOG_URL}\n"
|
||||||
|
f"⏱️ Interval: {CHECK_INTERVAL_SECONDS}s"
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
send_telegram_message(session, message)
|
send_telegram_message(session, message)
|
||||||
@@ -437,7 +573,31 @@ def run_once(session: requests.Session, first_run: bool) -> bool:
|
|||||||
|
|
||||||
for url in added_urls:
|
for url in added_urls:
|
||||||
app = current_state[url]
|
app = current_state[url]
|
||||||
send_telegram_message(session, build_new_app_message(session, app))
|
page_title = ""
|
||||||
|
screenshot_urls: List[str] = []
|
||||||
|
try:
|
||||||
|
details = fetch_new_app_page_details(session, app.url)
|
||||||
|
page_title = str(details.get("page_title", ""))
|
||||||
|
screenshot_data = details.get("screenshot_urls", [])
|
||||||
|
if isinstance(screenshot_data, list):
|
||||||
|
screenshot_urls = [str(item) for item in screenshot_data if str(item).startswith("http")]
|
||||||
|
except requests.RequestException as exc:
|
||||||
|
logging.warning("Unable to fetch app page details for %s: %s", app.url, exc)
|
||||||
|
|
||||||
|
send_telegram_message(
|
||||||
|
session,
|
||||||
|
build_new_app_message(app, page_title=page_title, screenshot_count=len(screenshot_urls)),
|
||||||
|
)
|
||||||
|
|
||||||
|
for index, screenshot_url in enumerate(screenshot_urls, start=1):
|
||||||
|
try:
|
||||||
|
send_telegram_photo(
|
||||||
|
session,
|
||||||
|
screenshot_url,
|
||||||
|
caption=f"🖼️ {app.name} • Screenshot {index}/{len(screenshot_urls)}",
|
||||||
|
)
|
||||||
|
except requests.RequestException as exc:
|
||||||
|
logging.warning("Failed to send screenshot for %s: %s", app.name, exc)
|
||||||
|
|
||||||
header, summary_lines = build_summary_message(
|
header, summary_lines = build_summary_message(
|
||||||
added_count=len(added_urls),
|
added_count=len(added_urls),
|
||||||
@@ -459,6 +619,8 @@ def run_once(session: requests.Session, first_run: bool) -> bool:
|
|||||||
def validate_env() -> None:
|
def validate_env() -> None:
|
||||||
if CHECK_INTERVAL_SECONDS < 30:
|
if CHECK_INTERVAL_SECONDS < 30:
|
||||||
raise ValueError("CHECK_INTERVAL_SECONDS must be >= 30")
|
raise ValueError("CHECK_INTERVAL_SECONDS must be >= 30")
|
||||||
|
if TELEGRAM_POLL_SECONDS < 2:
|
||||||
|
raise ValueError("TELEGRAM_POLL_SECONDS must be >= 2")
|
||||||
|
|
||||||
|
|
||||||
def main() -> int:
|
def main() -> int:
|
||||||
@@ -477,9 +639,13 @@ def main() -> int:
|
|||||||
|
|
||||||
session = requests.Session()
|
session = requests.Session()
|
||||||
send_startup_notification(session)
|
send_startup_notification(session)
|
||||||
|
initialize_telegram_offset(session)
|
||||||
first_loop = True
|
first_loop = True
|
||||||
|
next_check_at = time.time()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
now = time.time()
|
||||||
|
if now >= next_check_at:
|
||||||
try:
|
try:
|
||||||
run_once(session, first_loop)
|
run_once(session, first_loop)
|
||||||
except requests.RequestException as exc:
|
except requests.RequestException as exc:
|
||||||
@@ -488,7 +654,17 @@ def main() -> int:
|
|||||||
logging.exception("Watcher iteration failed: %s", exc)
|
logging.exception("Watcher iteration failed: %s", exc)
|
||||||
|
|
||||||
first_loop = False
|
first_loop = False
|
||||||
time.sleep(CHECK_INTERVAL_SECONDS)
|
next_check_at = now + CHECK_INTERVAL_SECONDS
|
||||||
|
|
||||||
|
try:
|
||||||
|
poll_telegram_commands(session)
|
||||||
|
except requests.RequestException as exc:
|
||||||
|
logging.warning("Telegram polling failed: %s", exc)
|
||||||
|
except Exception as exc:
|
||||||
|
logging.exception("Telegram command processing failed: %s", exc)
|
||||||
|
|
||||||
|
sleep_for = min(TELEGRAM_POLL_SECONDS, max(1, int(next_check_at - time.time())))
|
||||||
|
time.sleep(sleep_for)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user