@@ -2,12 +2,13 @@ import hashlib
import json
import json
import logging
import logging
import os
import os
import random
import re
import re
import sys
import sys
import time
import time
from dataclasses import dataclass , asdict
from dataclasses import dataclass , asdict
from datetime import datetime , timezone
from datetime import datetime , timezone
from typing import Dict , List , Tuple
from typing import Dict , List , Optional , Tuple
from urllib . parse import urljoin , urlparse
from urllib . parse import urljoin , urlparse
import requests
import requests
@@ -25,7 +26,11 @@ USER_AGENT = os.getenv(
)
)
LOG_LEVEL = os . getenv ( " LOG_LEVEL " , " INFO " ) . upper ( )
LOG_LEVEL = os . getenv ( " LOG_LEVEL " , " INFO " ) . upper ( )
MAX_MESSAGE_LEN = 3900
MAX_MESSAGE_LEN = 3900
MAX_SCREENSHOTS_PER_APP = int ( os . getenv ( " MAX_SCREENSHOTS_PER_APP " , " 3 " ) )
MAX_SCREENSHOTS_PER_APP = int ( os . getenv ( " MAX_SCREENSHOTS_PER_APP " , " 10 " ) )
TELEGRAM_POLL_SECONDS = int ( os . getenv ( " TELEGRAM_POLL_SECONDS " , " 10 " ) )
MEDIA_BASE_URL = os . getenv ( " MEDIA_BASE_URL " , " https://media.sys.truenas.net " )
last_telegram_update_id : Optional [ int ] = None
@dataclass
@dataclass
@@ -155,7 +160,7 @@ def save_state(path: str, apps: Dict[str, AppSnapshot]) -> None:
def format_field_change ( label : str , old : str , new : str ) - > str :
def format_field_change ( label : str , old : str , new : str ) - > str :
old_clean = old if old else " (empty) "
old_clean = old if old else " (empty) "
new_clean = new if new else " (empty) "
new_clean = new if new else " (empty) "
return f " { label } : ' { old_clean } ' -> ' { new_clean } ' "
return f " • { label } : ' { old_clean } ' → ' { new_clean } ' "
def collect_diffs (
def collect_diffs (
@@ -189,11 +194,13 @@ def collect_diffs(
details . append ( format_field_change ( " summary " , old . summary , new . summary ) )
details . append ( format_field_change ( " summary " , old . summary , new . summary ) )
if not details :
if not details :
details . append ( " metadata changed " )
details . append ( " • metadata changed" )
changed_lines . append ( f " ~ { new . name } ( { new . url } ) ")
changed_lines . append ( f " 🔄 { new . name } " )
changed_lines . append ( f " 🔗 { new . url } " )
for detail in details :
for detail in details :
changed_lines . append ( f " - { detail } " )
changed_lines . append ( f " { detail } " )
changed_lines . append ( " " )
return added_urls , removed_urls , changed_lines , updated_count
return added_urls , removed_urls , changed_lines , updated_count
@@ -208,16 +215,28 @@ def build_summary_message(
removed_count = len ( removed_urls )
removed_count = len ( removed_urls )
header = (
header = (
f " TrueNAS c atalog changed at { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } \n "
" 📣 TrueNAS C atalog Update \n "
f " Added: { added_count } | Removed: { removed_count } | Updated: { updated_count } "
" ━━━━━━━━━━━━━━━━━━━━━━ \n "
f " 🕒 { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } \n "
f " ➕ Added: { added_count } | ➖ Removed: { removed_count } | 🔄 Updated: { updated_count } "
)
)
lines : List [ str ] = [ ]
lines : List [ str ] = [ ]
if removed_urls :
lines . append ( " 🗑️ Removed Apps " )
lines . append ( " " )
for url in removed_urls :
for url in removed_urls :
app = previous [ url ]
app = previous [ url ]
lines . append ( f " - { app . name } ( { app . url } ) ")
lines . append ( f " ➖ { app . name } " )
lines . append ( f " 🔗 { app . url } " )
lines . append ( " " )
if changed_lines :
lines . append ( " ✏️ Updated Apps " )
lines . append ( " " )
lines . extend ( changed_lines )
lines . extend ( changed_lines )
while lines and lines [ - 1 ] == " " :
lines . pop ( )
return header , lines
return header , lines
@@ -228,6 +247,57 @@ def truncate_text(value: str, limit: int) -> str:
return f " { text [ : max ( 0 , limit - 1 ) ] . rstrip ( ) } … "
return f " { text [ : max ( 0 , limit - 1 ) ] . rstrip ( ) } … "
def extract_app_id_from_url ( app_url : str ) - > str :
path_parts = [ part for part in urlparse ( app_url ) . path . strip ( " / " ) . split ( " / " ) if part ]
if not path_parts :
return " "
if " catalog " in path_parts :
catalog_index = path_parts . index ( " catalog " )
if catalog_index + 1 < len ( path_parts ) :
return path_parts [ catalog_index + 1 ]
return path_parts [ - 1 ]
def build_storj_screenshot_urls ( session : requests . Session , app_id : str ) - > List [ str ] :
if not app_id :
return [ ]
screenshot_urls : List [ str ] = [ ]
image_extensions = [ " png " , " jpg " , " jpeg " , " webp " , " gif " ]
for index in range ( 1 , MAX_SCREENSHOTS_PER_APP + 1 ) :
matched_for_index = False
for extension in image_extensions :
screenshot_url = (
f " { MEDIA_BASE_URL . rstrip ( ' / ' ) } /apps/ { app_id } /screenshots/screenshot { index } . { extension } "
)
try :
response = session . get (
screenshot_url ,
timeout = REQUEST_TIMEOUT_SECONDS ,
headers = { " User-Agent " : USER_AGENT } ,
)
except requests . RequestException :
continue
if response . status_code != 200 :
continue
content_type = str ( response . headers . get ( " Content-Type " , " " ) ) . lower ( )
if content_type and " image " not in content_type :
continue
screenshot_urls . append ( screenshot_url )
matched_for_index = True
break
if not matched_for_index :
break
return screenshot_urls
def fetch_new_app_page_details ( session : requests . Session , app_url : str ) - > Dict [ str , object ] :
def fetch_new_app_page_details ( session : requests . Session , app_url : str ) - > Dict [ str , object ] :
response = session . get (
response = session . get (
app_url ,
app_url ,
@@ -239,68 +309,8 @@ def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[
soup = BeautifulSoup ( response . text , " html.parser " )
soup = BeautifulSoup ( response . text , " html.parser " )
page_title = normalize_text ( soup . title . get_text ( " " , strip = True ) ) if soup . title else " "
page_title = normalize_text ( soup . title . get_text ( " " , strip = True ) ) if soup . title else " "
app_id = extract_app_id_from_url ( app_url )
screenshot_candidates : List [ Tuple [ int , int , str ] ] = [ ]
screenshot_urls = build_storj_screenshot_urls ( session , app_id )
seen_urls : set [ str ] = set ( )
og_image = soup . find ( " meta " , attrs = { " property " : " og:image " } )
if og_image and og_image . get ( " content " ) :
og_image_url = urljoin ( app_url , str ( og_image [ " content " ] ) . strip ( ) )
if og_image_url . startswith ( " http " ) :
screenshot_candidates . append ( ( 2 , 0 , og_image_url ) )
for index , tag in enumerate ( soup . find_all ( " img " , src = True ) , start = 1 ) :
raw_src = str ( tag . get ( " src " , " " ) ) . strip ( )
if not raw_src or raw_src . startswith ( " data: " ) :
continue
image_url = urljoin ( app_url , raw_src )
if not image_url . startswith ( " http " ) or image_url in seen_urls :
continue
seen_urls . add ( image_url )
width_value = str ( tag . get ( " width " , " " ) ) . strip ( )
height_value = str ( tag . get ( " height " , " " ) ) . strip ( )
if width_value . isdigit ( ) and int ( width_value ) < 200 :
continue
if height_value . isdigit ( ) and int ( height_value ) < 120 :
continue
descriptor = " " . join (
[
str ( tag . get ( " alt " , " " ) ) ,
str ( tag . get ( " title " , " " ) ) ,
" " . join ( tag . get ( " class " , [ ] ) ) ,
str ( tag . get ( " id " , " " ) ) ,
image_url ,
]
) . lower ( )
if any ( skip in descriptor for skip in [ " logo " , " favicon " , " icon " , " avatar " , " badge " ] ) :
continue
score = 0
if " screenshot " in descriptor or " screen-shot " in descriptor or " screen shot " in descriptor :
score + = 4
if " gallery " in descriptor or " carousel " in descriptor or " preview " in descriptor :
score + = 2
if re . search ( r " \ .(png|jpe?g|webp)( \ ?|$) " , image_url , flags = re . IGNORECASE ) :
score + = 1
if score > 0 :
screenshot_candidates . append ( ( score , index , image_url ) )
screenshot_candidates . sort ( key = lambda item : ( - item [ 0 ] , item [ 1 ] ) )
screenshot_urls : List [ str ] = [ ]
emitted : set [ str ] = set ( )
for _ , _ , image_url in screenshot_candidates :
if image_url in emitted :
continue
emitted . add ( image_url )
screenshot_urls . append ( image_url )
if len ( screenshot_urls ) > = MAX_SCREENSHOTS_PER_APP :
break
return {
return {
" page_title " : page_title ,
" page_title " : page_title ,
@@ -310,22 +320,33 @@ def fetch_new_app_page_details(session: requests.Session, app_url: str) -> Dict[
def build_new_app_message ( app : AppSnapshot , page_title : str = " " , screenshot_count : int = 0 ) - > str :
def build_new_app_message ( app : AppSnapshot , page_title : str = " " , screenshot_count : int = 0 ) - > str :
lines : List [ str ] = [
lines : List [ str ] = [
" 🆕 New TrueNAS a pp detected " ,
" 🆕 New TrueNAS A pp " ,
f " Detected: { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } " ,
" ━━━━━━━━━━━━━━ " ,
f " Name: { app . name } ",
" " ,
f " URL: { app . url } " ,
f " 🕒 Detected: { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } " ,
" " ,
" ℹ ️ App Details" ,
f " 📦 Name: { app . name } " ,
f " 🔗 URL: { app . url } " ,
]
]
if app . train :
if app . train :
lines . append ( f " Train: { app . train } " )
lines . append ( f " 🚂 Train: { app . train } " )
if app . added :
if app . added :
lines . append ( f " Added date : { app . added } " )
lines . append ( f " 📅 Added: { app . added } " )
if app . summary :
if app . summary :
lines . append ( f " Catalog summary: { truncate_text ( app . summary , 700 ) } ")
lines . append ( " " )
lines . append ( " 📝 Summary " )
lines . append ( truncate_text ( app . summary , 700 ) )
if page_title :
if page_title :
lines . append ( f " Page title: { truncate_text ( page_title , 180 ) } ")
lines . append ( " " )
lines . append ( f " 📄 Page: { truncate_text ( page_title , 180 ) } " )
if screenshot_count > 0 :
if screenshot_count > 0 :
lines . append ( f " Screenshots: { screenshot_count } attached ")
lines . append ( " " )
lines . append ( f " 🖼️ Screenshots: { screenshot_count } attached " )
while lines and lines [ - 1 ] == " " :
lines . pop ( )
message = " \n " . join ( lines )
message = " \n " . join ( lines )
if len ( message ) < = MAX_MESSAGE_LEN :
if len ( message ) < = MAX_MESSAGE_LEN :
@@ -337,6 +358,11 @@ def build_new_app_message(app: AppSnapshot, page_title: str = "", screenshot_cou
return " \n " . join ( trimmed_lines )
return " \n " . join ( trimmed_lines )
def build_random_app_message ( app : AppSnapshot , page_title : str = " " , screenshot_count : int = 0 ) - > str :
message = build_new_app_message ( app , page_title = page_title , screenshot_count = screenshot_count )
return message . replace ( " 🆕 New TrueNAS App " , " 🎲 Random TrueNAS App " , 1 )
def split_message ( header : str , lines : List [ str ] , max_len : int = MAX_MESSAGE_LEN ) - > List [ str ] :
def split_message ( header : str , lines : List [ str ] , max_len : int = MAX_MESSAGE_LEN ) - > List [ str ] :
if not lines :
if not lines :
return [ header ]
return [ header ]
@@ -390,12 +416,130 @@ def send_telegram_photo(session: requests.Session, photo_url: str, caption: str
response . raise_for_status ( )
response . raise_for_status ( )
def get_telegram_updates ( session : requests . Session , offset : Optional [ int ] ) - > List [ Dict [ str , object ] ] :
if not TELEGRAM_BOT_TOKEN :
return [ ]
endpoint = f " https://api.telegram.org/bot { TELEGRAM_BOT_TOKEN } /getUpdates "
params : Dict [ str , object ] = {
" timeout " : min ( max ( 1 , REQUEST_TIMEOUT_SECONDS ) , 25 ) ,
" allowed_updates " : json . dumps ( [ " message " ] ) ,
}
if offset is not None :
params [ " offset " ] = offset
response = session . get ( endpoint , params = params , timeout = REQUEST_TIMEOUT_SECONDS + 5 )
response . raise_for_status ( )
payload = response . json ( )
if not payload . get ( " ok " ) :
return [ ]
result = payload . get ( " result " , [ ] )
return result if isinstance ( result , list ) else [ ]
def send_random_app_response ( session : requests . Session ) - > None :
state = load_state ( STATE_PATH )
if not state :
html = fetch_catalog_html ( session )
state = parse_catalog ( html )
if not state :
send_telegram_message ( session , " ⚠️ Unable to fetch apps right now. Try again in a moment. " )
return
app = random . choice ( list ( state . values ( ) ) )
page_title = " "
screenshot_urls : List [ str ] = [ ]
try :
details = fetch_new_app_page_details ( session , app . url )
page_title = str ( details . get ( " page_title " , " " ) )
screenshot_data = details . get ( " screenshot_urls " , [ ] )
if isinstance ( screenshot_data , list ) :
screenshot_urls = [ str ( item ) for item in screenshot_data if str ( item ) . startswith ( " http " ) ]
except requests . RequestException as exc :
logging . warning ( " Unable to fetch random app page details for %s : %s " , app . url , exc )
send_telegram_message (
session ,
build_random_app_message ( app , page_title = page_title , screenshot_count = len ( screenshot_urls ) ) ,
)
for index , screenshot_url in enumerate ( screenshot_urls , start = 1 ) :
try :
send_telegram_photo (
session ,
screenshot_url ,
caption = f " 🖼️ { app . name } • Screenshot { index } / { len ( screenshot_urls ) } " ,
)
except requests . RequestException as exc :
logging . warning ( " Failed to send random app screenshot for %s : %s " , app . name , exc )
def initialize_telegram_offset ( session : requests . Session ) - > None :
global last_telegram_update_id
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID :
return
try :
updates = get_telegram_updates ( session , offset = None )
except requests . RequestException as exc :
logging . warning ( " Unable to initialize Telegram updates offset: %s " , exc )
return
if not updates :
return
update_ids = [ item . get ( " update_id " ) for item in updates if isinstance ( item . get ( " update_id " ) , int ) ]
if update_ids :
last_telegram_update_id = max ( update_ids )
def poll_telegram_commands ( session : requests . Session ) - > None :
global last_telegram_update_id
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID :
return
offset = last_telegram_update_id + 1 if last_telegram_update_id is not None else None
updates = get_telegram_updates ( session , offset = offset )
for update in updates :
update_id = update . get ( " update_id " )
if isinstance ( update_id , int ) :
last_telegram_update_id = update_id
message = update . get ( " message " )
if not isinstance ( message , dict ) :
continue
chat = message . get ( " chat " )
if not isinstance ( chat , dict ) :
continue
chat_id = str ( chat . get ( " id " , " " ) )
if chat_id != TELEGRAM_CHAT_ID :
continue
text = str ( message . get ( " text " , " " ) ) . strip ( )
command = text . split ( " " , 1 ) [ 0 ] . lower ( ) if text else " "
if command . startswith ( " /random " ) :
logging . info ( " Received /random command from Telegram chat %s " , chat_id )
try :
send_random_app_response ( session )
except requests . RequestException as exc :
logging . warning ( " Failed to send /random response: %s " , exc )
send_telegram_message ( session , " ⚠️ Failed to fetch a random app right now. Please try again. " )
def send_startup_notification ( session : requests . Session ) - > None :
def send_startup_notification ( session : requests . Session ) - > None :
message = (
message = (
" TrueNAS c atalog w atcher is running ✅ \n "
" ✅ TrueNAS C atalog W atcher Online \n "
f " Started: { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } \n "
" ━━━━━━━━━━━━━━━━━━━━━━━━ \n "
f " Catalog: { CATALOG_URL } \n "
" \n "
f " Interval: { CHECK_INTERVAL_SECONDS } s "
f " 🕒 Started: { datetime . now ( timezone . utc ) . strftime ( ' % Y- % m- %d % H: % M: % S UTC ' ) } \n "
f " 🌐 Catalog: { CATALOG_URL } \n "
f " ⏱️ Interval: { CHECK_INTERVAL_SECONDS } s "
)
)
try :
try :
send_telegram_message ( session , message )
send_telegram_message ( session , message )
@@ -450,7 +594,7 @@ def run_once(session: requests.Session, first_run: bool) -> bool:
send_telegram_photo (
send_telegram_photo (
session ,
session ,
screenshot_url ,
screenshot_url ,
caption = f " { app . name } s creenshot { index } / { len ( screenshot_urls ) } " ,
caption = f " 🖼️ { app . name } • S creenshot { index } / { len ( screenshot_urls ) } " ,
)
)
except requests . RequestException as exc :
except requests . RequestException as exc :
logging . warning ( " Failed to send screenshot for %s : %s " , app . name , exc )
logging . warning ( " Failed to send screenshot for %s : %s " , app . name , exc )
@@ -475,6 +619,8 @@ def run_once(session: requests.Session, first_run: bool) -> bool:
def validate_env ( ) - > None :
def validate_env ( ) - > None :
if CHECK_INTERVAL_SECONDS < 30 :
if CHECK_INTERVAL_SECONDS < 30 :
raise ValueError ( " CHECK_INTERVAL_SECONDS must be >= 30 " )
raise ValueError ( " CHECK_INTERVAL_SECONDS must be >= 30 " )
if TELEGRAM_POLL_SECONDS < 2 :
raise ValueError ( " TELEGRAM_POLL_SECONDS must be >= 2 " )
def main ( ) - > int :
def main ( ) - > int :
@@ -493,9 +639,13 @@ def main() -> int:
session = requests . Session ( )
session = requests . Session ( )
send_startup_notification ( session )
send_startup_notification ( session )
initialize_telegram_offset ( session )
first_loop = True
first_loop = True
next_check_at = time . time ( )
while True :
while True :
now = time . time ( )
if now > = next_check_at :
try :
try :
run_once ( session , first_loop )
run_once ( session , first_loop )
except requests . RequestException as exc :
except requests . RequestException as exc :
@@ -504,7 +654,17 @@ def main() -> int:
logging . exception ( " Watcher iteration failed: %s " , exc )
logging . exception ( " Watcher iteration failed: %s " , exc )
first_loop = False
first_loop = False
time . sleep ( CHECK_INTERVAL_SECONDS )
next_check_at = now + CHECK_INTERVAL_SECONDS
try :
poll_telegram_commands ( session )
except requests . RequestException as exc :
logging . warning ( " Telegram polling failed: %s " , exc )
except Exception as exc :
logging . exception ( " Telegram command processing failed: %s " , exc )
sleep_for = min ( TELEGRAM_POLL_SECONDS , max ( 1 , int ( next_check_at - time . time ( ) ) ) )
time . sleep ( sleep_for )
if __name__ == " __main__ " :
if __name__ == " __main__ " :