from bs4 import BeautifulSoup import requests from datetime import datetime, timedelta from caldav import DAVClient import hashlib import re import os from dotenv import load_dotenv # load environment from .env when present load_dotenv() import hashlib # Workhive and credentials (must be set via environment variables) # Required secrets: WORKHIVE_TOKEN, CALDAV_USER, CALDAV_PASSWORD, CALDAV_LOCATION BASE_URL = os.environ.get('BASE_URL') TOKEN = os.environ.get('WORKHIVE_TOKEN') NAME = os.environ.get('NAME', 'Jonathan Slivka') # CalDAV credentials (must be provided via env) USER = os.environ.get('CALDAV_USER') PASSWORD = os.environ.get('CALDAV_PASSWORD') LOCATION = os.environ.get('CALDAV_LOCATION') # timezone to use for parsed datetimes (script runs in EST by default) TIMEZONE = os.environ.get('TIMEZONE', 'America/New_York') if not TOKEN: raise SystemExit('WORKHIVE_TOKEN not set in environment') if not USER or not PASSWORD or not LOCATION: raise SystemExit('CALDAV_USER, CALDAV_PASSWORD, and CALDAV_LOCATION must be set in environment') # with cookie passed as raw header (avoid requests' cookie encoding issues) headers = {"Cookie": f"workhive_session={TOKEN}"} #calculate the mondays of last week and the next 3 weeks periods = [] today = datetime.today() monday = today - timedelta(days=today.weekday()) - timedelta(weeks=1) # start from last week to catch any late-posted shifts for i in range(4): period = (monday + timedelta(weeks=i)).strftime("%Y-%m-%d") periods.append(period) print("Periods to check: " + ", ".join(periods)) facilities = { "fac_8ed0d011c748":"Cary Swim Club", "fac_f38dd7211e7e":"Scottish Hills" } urls = [f"{BASE_URL}?facility_id={facility}&period={period}" for facility in facilities for period in periods] shifts = [] for url in urls: response = requests.get(url, headers=headers) soup = BeautifulSoup(response.content, "html.parser") for li in soup.find_all("li"): if NAME in li.get_text().strip(): shift_time = list(li.find_parent("div", class_="text-white").children)[1].get_text().strip().split("\n")[1].strip() td = li.find_parent('td') row = td.find_parent('tr') cells = [c for c in row.find_all(['td', 'th'], recursive=False) if getattr(c, 'name', None) is not None] col_index = None for i, c in enumerate(cells): if c is td: col_index = i break table = row.find_parent('table') header_ths = [] thead = table.find('thead') header_row = thead.find('tr') header_ths = [th for th in header_row.find_all('th', recursive=False)] # extract date text from header ( may contain month and day on separate lines) shift_date_text = header_ths[col_index].get_text().strip() month, day = shift_date_text.split("\n") day = day.split(" ")[1] shift_date = f"{month} {day}" facility = url.split("facility_id=")[1].split("&")[0] facility_name = facilities.get(facility, "Unknown Facility") # extract year from the period parameter in the URL so parsed datetimes use correct year period_param = None if "period=" in url: period_param = url.split("period=")[1].split("&")[0] if period_param: year = int(period_param.split("-")[0]) else: year = datetime.today().year #e.g #date=Jun 18 #time=2:50 PM -8:15 PM # parse start and end times separately to avoid duplicate regex group names parts = [p.strip() for p in shift_time.split('-')] if len(parts) >= 2: start_str, end_str = parts[0], parts[1] else: start_str = parts[0] end_str = None # try abbreviated month (%b) then full month (%B) start_dt = end_dt = None for fmt in ("%Y %b %d %I:%M %p", "%Y %B %d %I:%M %p"): try: start_dt = datetime.strptime(f"{year} {month} {day} {start_str}", fmt) if end_str: end_dt = datetime.strptime(f"{year} {month} {day} {end_str}", fmt) break except ValueError: continue if start_dt is None: raise ValueError(f"Could not parse shift time: {shift_time!r} with date {shift_date!r}") shifts.append({ "facility": facility_name, "date": shift_date, "time": shift_time, "start_datetime": start_dt, "end_datetime": end_dt, }) for shift in shifts: print(f"{shift['facility']} - {shift['date']} - {shift['time']}") print(f" Start: {shift['start_datetime']}") print(f" End: {shift['end_datetime']}") print() # push to CalDAV server with duplicate checking client = DAVClient(LOCATION, username=USER, password=PASSWORD) cals = client.principal().calendars() work_calendar = None for c in cals: if getattr(c, 'url', '').rstrip('/') == LOCATION.rstrip('/'): work_calendar = c break if not work_calendar: print("No CalDAV calendar found; skipping push") else: print(f"Using calendar: {getattr(work_calendar, 'id', 'unknown')}") for shift in shifts: summary = f"{shift['facility']} Shift" start_dt = shift['start_datetime'] end_dt = shift['end_datetime'] or start_dt uid_src = f"{shift['facility']}|{start_dt.isoformat()}|{end_dt.isoformat()}" uid = hashlib.sha1(uid_src.encode()).hexdigest() + "@workhive-sync" exists = False for e in work_calendar.events(): raw = e.data txt = raw.decode('utf-8', 'ignore') if isinstance(raw, bytes) else str(raw) m = re.search(r'^UID:(.+)$', txt, re.M) if m and m.group(1).strip() == uid: exists = True break msum = re.search(r'^SUMMARY:(.+)$', txt, re.M) mdt = re.search(r'^DTSTART(?:;TZID=[^:]+)?:([0-9T]+)', txt, re.M) if msum and mdt: if msum.group(1).strip() == summary and mdt.group(1).strip().startswith(start_dt.strftime('%Y%m%dT%H%M%S')): exists = True break if exists: print(f"Skipping existing event: {summary} at {start_dt}") continue dtstart_str = start_dt.strftime('%Y%m%dT%H%M%S') dtend_str = end_dt.strftime('%Y%m%dT%H%M%S') ical = ( 'BEGIN:VCALENDAR\r\n' 'VERSION:2.0\r\n' 'PRODID:-//workhive-caldav-sync//EN\r\n' 'BEGIN:VEVENT\r\n' f'UID:{uid}\r\n' f'SUMMARY:{summary}\r\n' f'DTSTART;TZID=America/New_York:{dtstart_str}\r\n' f'DTEND;TZID=America/New_York:{dtend_str}\r\n' f'DESCRIPTION:Work shift at {shift["facility"]} on {shift["date"]} from {shift["time"]}\r\n' 'END:VEVENT\r\n' 'END:VCALENDAR\r\n' ) work_calendar.add_event(ical) print(f"Added event for shift on {shift['date']} at {shift['facility']}")