@@ -2,12 +2,13 @@ import hashlib
import json
import logging
import os
import random
import re
import sys
import time
from dataclasses import dataclass , asdict
from datetime import datetime , timezone
from typing import Dict , List , Tuple
from typing import Dict , List , Optional , Tuple
from urllib . parse import urljoin , urlparse
import requests
@@ -25,7 +26,11 @@ USER_AGENT = os.getenv(
)
LOG_LEVEL = os . getenv ( " LOG_LEVEL " , " INFO " ) . upper ( )
MAX_MESSAGE_LEN = 3900
MAX_SCREENSHOTS_PER_APP = int ( os . getenv ( " MAX_SCREENSHOTS_PER_APP " , " 3 " ) )
MAX_SCREENSHOTS_PER_APP = int ( os . getenv ( " MAX_SCREENSHOTS_PER_APP " , " 10 " ) )
TELEGRAM_POLL_SECONDS = int ( os . getenv ( " TELEGRAM_POLL_SECONDS " , " 10 " ) )
MEDIA_BASE_URL = os . getenv ( " MEDIA_BASE_URL " , " https://media.sys.truenas.net " )
last_telegram_update_id : Optional [ int ] = None
@dataclass
@@ -155,7 +160,7 @@ def save_state(path: str, apps: Dict[str, AppSnapshot]) -> None:
def format_field_change ( label : str , old : str , new : str ) - > str :
old_clean = old if old else " (empty) "
new_clean = new if new else " (empty) "
return f " { label } : ' { old_clean } ' -> ' { new_clean } ' "
return f " • { label } : ' { old_clean } ' → ' { new_clean } ' "
def collect_diffs (
@@ -189,11 +194,13 @@ def collect_diffs(
details . append ( format_field_change ( " summary " , old . summary , new . summary ) )
if not details :
details . append ( " metadata changed " )
details . append ( " • metadata changed" )
changed_lines . append ( f " ~ { new . name } ( { new . url } ) ")
changed_lines . append ( f " 🔄 { new . name } " )
changed_lines . append ( f " 🔗 { new . url } " )
for detail in details :
changed_lines . append ( f " - { detail } " )
changed_lines . append ( f " { detail } " )
changed_lines . append ( " " )
return added_urls , removed_urls , changed_lines , updated_count
@@ -208,16 +215,28 @@ def build_summary_message(
removed_count = len ( removed_urls )
header = (
f " TrueNAS c atalog changed at { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } \n "
f " Added: { added_count } | Removed: { removed_count } | Updated: { updated_count } "
" 📣 TrueNAS C atalog Update \n "
" ━━━━━━━━━━━━━━━━━━━━━━ \n "
f " 🕒 { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } \n "
f " ➕ Added: { added_count } | ➖ Removed: { removed_count } | 🔄 Updated: { updated_count } "
)
lines : List [ str ] = [ ]
if removed_urls :
lines . append ( " 🗑️ Removed Apps " )
lines . append ( " " )
for url in removed_urls :
app = previous [ url ]
lines . append ( f " - { app . name } ( { app . url } ) ")
lines . append ( f " ➖ { app . name } " )
lines . append ( f " 🔗 { app . url } " )
lines . append ( " " )
if changed_lines :
lines . append ( " ✏️ Updated Apps " )
lines . append ( " " )
lines . extend ( changed_lines )
while lines and lines [ - 1 ] == " " :
lines . pop ( )
return header , lines
@@ -228,6 +247,57 @@ def truncate_text(value: str, limit: int) -> str:
return f " { text [ : max ( 0 , limit - 1 ) ] . rstrip ( ) } … "
def extract_app_id_from_url ( app_url : str ) - > str :
path_parts = [ part for part in urlparse ( app_url ) . path . strip ( " / " ) . split ( " / " ) if part ]
if not path_parts :
return " "
if " catalog " in path_parts :
catalog_index = path_parts . index ( " catalog " )
if catalog_index + 1 < len ( path_parts ) :
return path_parts [ catalog_index + 1 ]
return path_parts [ - 1 ]
def build_storj_screenshot_urls ( session : requests . Session , app_id : str ) - > List [ str ] :
if not app_id :
return [ ]
screenshot_urls : List [ str ] = [ ]
image_extensions = [ " png " , " jpg " , " jpeg " , " webp " , " gif " ]
for index in range ( 1 , MAX_SCREENSHOTS_PER_APP + 1 ) :
matched_for_index = False
for extension in image_extensions :
screenshot_url = (
f " { MEDIA_BASE_URL . rstrip ( ' / ' ) } /apps/ { app_id } /screenshots/screenshot { index } . { extension } "
)
try :
response = session . get (
screenshot_url ,
timeout = REQUEST_TIMEOUT_SECONDS ,
headers = { " User-Agent " : USER_AGENT } ,
)
except requests . RequestException :
continue
if response . status_code != 200 :
continue
content_type = str ( response . headers . get ( " Content-Type " , " " ) ) . lower ( )
if content_type and " image " not in content_type :
continue
screenshot_urls . append ( screenshot_url )
matched_for_index = True
break
if not matched_for_index :
break
return screenshot_urls
def fetch_new_app_page_details ( session : requests . Session , app_url : str ) - > Dict [ str , object ] :
response = session . get (
app_url ,
@@ -239,68 +309,8 @@ def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[
soup = BeautifulSoup ( response . text , " html.parser " )
page_title = normalize_text ( soup . title . get_text ( " " , strip = True ) ) if soup . title else " "
screenshot_candidates : List [ Tuple [ int , int , str ] ] = [ ]
seen_urls : set [ str ] = set ( )
og_image = soup . find ( " meta " , attrs = { " property " : " og:image " } )
if og_image and og_image . get ( " content " ) :
og_image_url = urljoin ( app_url , str ( og_image [ " content " ] ) . strip ( ) )
if og_image_url . startswith ( " http " ) :
screenshot_candidates . append ( ( 2 , 0 , og_image_url ) )
for index , tag in enumerate ( soup . find_all ( " img " , src = True ) , start = 1 ) :
raw_src = str ( tag . get ( " src " , " " ) ) . strip ( )
if not raw_src or raw_src . startswith ( " data: " ) :
continue
image_url = urljoin ( app_url , raw_src )
if not image_url . startswith ( " http " ) or image_url in seen_urls :
continue
seen_urls . add ( image_url )
width_value = str ( tag . get ( " width " , " " ) ) . strip ( )
height_value = str ( tag . get ( " height " , " " ) ) . strip ( )
if width_value . isdigit ( ) and int ( width_value ) < 200 :
continue
if height_value . isdigit ( ) and int ( height_value ) < 120 :
continue
descriptor = " " . join (
[
str ( tag . get ( " alt " , " " ) ) ,
str ( tag . get ( " title " , " " ) ) ,
" " . join ( tag . get ( " class " , [ ] ) ) ,
str ( tag . get ( " id " , " " ) ) ,
image_url ,
]
) . lower ( )
if any ( skip in descriptor for skip in [ " logo " , " favicon " , " icon " , " avatar " , " badge " ] ) :
continue
score = 0
if " screenshot " in descriptor or " screen-shot " in descriptor or " screen shot " in descriptor :
score + = 4
if " gallery " in descriptor or " carousel " in descriptor or " preview " in descriptor :
score + = 2
if re . search ( r " \ .(png|jpe?g|webp)( \ ?|$) " , image_url , flags = re . IGNORECASE ) :
score + = 1
if score > 0 :
screenshot_candidates . append ( ( score , index , image_url ) )
screenshot_candidates . sort ( key = lambda item : ( - item [ 0 ] , item [ 1 ] ) )
screenshot_urls : List [ str ] = [ ]
emitted : set [ str ] = set ( )
for _ , _ , image_url in screenshot_candidates :
if image_url in emitted :
continue
emitted . add ( image_url )
screenshot_urls . append ( image_url )
if len ( screenshot_urls ) > = MAX_SCREENSHOTS_PER_APP :
break
app_id = extract_app_id_from_url ( app_url )
screenshot_urls = build_storj_screenshot_urls ( session , app_id )
return {
" page_title " : page_title ,
@@ -310,22 +320,33 @@ def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[
def build_new_app_message ( app : AppSnapshot , page_title : str = " " , screenshot_count : int = 0 ) - > str :
lines : List [ str ] = [
" 🆕 New TrueNAS a pp detected " ,
f " Detected: { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } " ,
f " Name: { app . name } ",
f " URL: { app . url } " ,
" 🆕 New TrueNAS A pp " ,
" ━━━━━━━━━━━━━━ " ,
" " ,
f " 🕒 Detected: { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } " ,
" " ,
" ℹ ️ App Details" ,
f " 📦 Name: { app . name } " ,
f " 🔗 URL: { app . url } " ,
]
if app . train :
lines . append ( f " Train: { app . train } " )
lines . append ( f " 🚂 Train: { app . train } " )
if app . added :
lines . append ( f " Added date : { app . added } " )
lines . append ( f " 📅 Added: { app . added } " )
if app . summary :
lines . append ( f " Catalog summary: { truncate_text ( app . summary , 700 ) } ")
lines . append ( " " )
lines . append ( " 📝 Summary " )
lines . append ( truncate_text ( app . summary , 700 ) )
if page_title :
lines . append ( f " Page title: { truncate_text ( page_title , 180 ) } ")
lines . append ( " " )
lines . append ( f " 📄 Page: { truncate_text ( page_title , 180 ) } " )
if screenshot_count > 0 :
lines . append ( f " Screenshots: { screenshot_count } attached ")
lines . append ( " " )
lines . append ( f " 🖼️ Screenshots: { screenshot_count } attached " )
while lines and lines [ - 1 ] == " " :
lines . pop ( )
message = " \n " . join ( lines )
if len ( message ) < = MAX_MESSAGE_LEN :
@@ -337,6 +358,11 @@ def build_new_app_message(app: AppSnapshot, page_title: str = "", screenshot_cou
return " \n " . join ( trimmed_lines )
def build_random_app_message ( app : AppSnapshot , page_title : str = " " , screenshot_count : int = 0 ) - > str :
message = build_new_app_message ( app , page_title = page_title , screenshot_count = screenshot_count )
return message . replace ( " 🆕 New TrueNAS App " , " 🎲 Random TrueNAS App " , 1 )
def split_message ( header : str , lines : List [ str ] , max_len : int = MAX_MESSAGE_LEN ) - > List [ str ] :
if not lines :
return [ header ]
@@ -390,12 +416,130 @@ def send_telegram_photo(session: requests.Session, photo_url: str, caption: str
response . raise_for_status ( )
def get_telegram_updates ( session : requests . Session , offset : Optional [ int ] ) - > List [ Dict [ str , object ] ] :
if not TELEGRAM_BOT_TOKEN :
return [ ]
endpoint = f " https://api.telegram.org/bot { TELEGRAM_BOT_TOKEN } /getUpdates "
params : Dict [ str , object ] = {
" timeout " : min ( max ( 1 , REQUEST_TIMEOUT_SECONDS ) , 25 ) ,
" allowed_updates " : json . dumps ( [ " message " ] ) ,
}
if offset is not None :
params [ " offset " ] = offset
response = session . get ( endpoint , params = params , timeout = REQUEST_TIMEOUT_SECONDS + 5 )
response . raise_for_status ( )
payload = response . json ( )
if not payload . get ( " ok " ) :
return [ ]
result = payload . get ( " result " , [ ] )
return result if isinstance ( result , list ) else [ ]
def send_random_app_response ( session : requests . Session ) - > None :
state = load_state ( STATE_PATH )
if not state :
html = fetch_catalog_html ( session )
state = parse_catalog ( html )
if not state :
send_telegram_message ( session , " ⚠️ Unable to fetch apps right now. Try again in a moment. " )
return
app = random . choice ( list ( state . values ( ) ) )
page_title = " "
screenshot_urls : List [ str ] = [ ]
try :
details = fetch_new_app_page_details ( session , app . url )
page_title = str ( details . get ( " page_title " , " " ) )
screenshot_data = details . get ( " screenshot_urls " , [ ] )
if isinstance ( screenshot_data , list ) :
screenshot_urls = [ str ( item ) for item in screenshot_data if str ( item ) . startswith ( " http " ) ]
except requests . RequestException as exc :
logging . warning ( " Unable to fetch random app page details for %s : %s " , app . url , exc )
send_telegram_message (
session ,
build_random_app_message ( app , page_title = page_title , screenshot_count = len ( screenshot_urls ) ) ,
)
for index , screenshot_url in enumerate ( screenshot_urls , start = 1 ) :
try :
send_telegram_photo (
session ,
screenshot_url ,
caption = f " 🖼️ { app . name } • Screenshot { index } / { len ( screenshot_urls ) } " ,
)
except requests . RequestException as exc :
logging . warning ( " Failed to send random app screenshot for %s : %s " , app . name , exc )
def initialize_telegram_offset ( session : requests . Session ) - > None :
global last_telegram_update_id
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID :
return
try :
updates = get_telegram_updates ( session , offset = None )
except requests . RequestException as exc :
logging . warning ( " Unable to initialize Telegram updates offset: %s " , exc )
return
if not updates :
return
update_ids = [ item . get ( " update_id " ) for item in updates if isinstance ( item . get ( " update_id " ) , int ) ]
if update_ids :
last_telegram_update_id = max ( update_ids )
def poll_telegram_commands ( session : requests . Session ) - > None :
global last_telegram_update_id
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID :
return
offset = last_telegram_update_id + 1 if last_telegram_update_id is not None else None
updates = get_telegram_updates ( session , offset = offset )
for update in updates :
update_id = update . get ( " update_id " )
if isinstance ( update_id , int ) :
last_telegram_update_id = update_id
message = update . get ( " message " )
if not isinstance ( message , dict ) :
continue
chat = message . get ( " chat " )
if not isinstance ( chat , dict ) :
continue
chat_id = str ( chat . get ( " id " , " " ) )
if chat_id != TELEGRAM_CHAT_ID :
continue
text = str ( message . get ( " text " , " " ) ) . strip ( )
command = text . split ( " " , 1 ) [ 0 ] . lower ( ) if text else " "
if command . startswith ( " /random " ) :
logging . info ( " Received /random command from Telegram chat %s " , chat_id )
try :
send_random_app_response ( session )
except requests . RequestException as exc :
logging . warning ( " Failed to send /random response: %s " , exc )
send_telegram_message ( session , " ⚠️ Failed to fetch a random app right now. Please try again. " )
def send_startup_notification ( session : requests . Session ) - > None :
message = (
" TrueNAS c atalog w atcher is running ✅ \n "
f " Started: { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } \n "
f " Catalog: { CATALOG_URL } \n "
f " Interval: { CHECK_INTERVAL_SECONDS } s "
" ✅ TrueNAS C atalog W atcher Online \n "
" ━━━━━━━━━━━━━━━━━━━━━━━━ \n "
" \n "
f " 🕒 Started: { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } \n "
f " 🌐 Catalog: { CATALOG_URL } \n "
f " ⏱️ Interval: { CHECK_INTERVAL_SECONDS } s "
)
try :
send_telegram_message ( session , message )
@@ -450,7 +594,7 @@ def run_once(session: requests.Session, first_run: bool) -> bool:
send_telegram_photo (
session ,
screenshot_url ,
caption = f " { app . name } s creenshot { index } / { len ( screenshot_urls ) } " ,
caption = f " 🖼️ { app . name } • S creenshot { index } / { len ( screenshot_urls ) } " ,
)
except requests . RequestException as exc :
logging . warning ( " Failed to send screenshot for %s : %s " , app . name , exc )
@@ -475,6 +619,8 @@ def run_once(session: requests.Session, first_run: bool) -> bool:
def validate_env ( ) - > None :
if CHECK_INTERVAL_SECONDS < 30 :
raise ValueError ( " CHECK_INTERVAL_SECONDS must be >= 30 " )
if TELEGRAM_POLL_SECONDS < 2 :
raise ValueError ( " TELEGRAM_POLL_SECONDS must be >= 2 " )
def main ( ) - > int :
@@ -493,18 +639,32 @@ def main() -> int:
session = requests . Session ( )
send_startup_notification ( session )
initialize_telegram_offset ( session )
first_loop = True
next_check_at = time . time ( )
while True :
try :
run_once ( session , first_loop )
except requests . RequestException as exc :
logging . error ( " Network error: %s " , exc )
except Exception as exc :
logging . exception ( " Watcher iteration failed : %s " , exc )
now = time . time ( )
if now > = next_check_at :
try :
run_once ( session , first_loop )
except requests . Request Exception as exc :
logging . error ( " Network error : %s " , exc )
except Exception as exc :
logging . exception ( " Watcher iteration failed: %s " , exc )
first_loop = False
time . sleep ( CHECK_INTERVAL_SECONDS )
first_loop = False
next_check_at = now + CHECK_INTERVAL_SECONDS
try :
poll_telegram_commands ( session )
except requests . RequestException as exc :
logging . warning ( " Telegram polling failed: %s " , exc )
except Exception as exc :
logging . exception ( " Telegram command processing failed: %s " , exc )
sleep_for = min ( TELEGRAM_POLL_SECONDS , max ( 1 , int ( next_check_at - time . time ( ) ) ) )
time . sleep ( sleep_for )
if __name__ == " __main__ " :