Files
workhive-caldav-sync/sync.py
LockeShor a519e46482
All checks were successful
Docker Image / build (push) Successful in 2m29s
work yay
2026-06-08 22:34:54 -04:00

199 lines
7.2 KiB
Python

from bs4 import BeautifulSoup
import requests
from datetime import datetime, timedelta
from caldav import DAVClient
import hashlib
import re
import os
from dotenv import load_dotenv
# load environment from .env when present
load_dotenv()
import hashlib
# Workhive and credentials (must be set via environment variables)
# Required secrets: WORKHIVE_TOKEN, CALDAV_USER, CALDAV_PASSWORD, CALDAV_LOCATION
BASE_URL = os.environ.get('BASE_URL')
TOKEN = os.environ.get('WORKHIVE_TOKEN')
NAME = os.environ.get('NAME', 'Jonathan Slivka')
# CalDAV credentials (must be provided via env)
USER = os.environ.get('CALDAV_USER')
PASSWORD = os.environ.get('CALDAV_PASSWORD')
LOCATION = os.environ.get('CALDAV_LOCATION')
# timezone to use for parsed datetimes (script runs in EST by default)
TIMEZONE = os.environ.get('TIMEZONE', 'America/New_York')
if not TOKEN:
raise SystemExit('WORKHIVE_TOKEN not set in environment')
if not USER or not PASSWORD or not LOCATION:
raise SystemExit('CALDAV_USER, CALDAV_PASSWORD, and CALDAV_LOCATION must be set in environment')
# with cookie passed as raw header (avoid requests' cookie encoding issues)
headers = {"Cookie": f"workhive_session={TOKEN}"}
#calculate the mondays of last week and the next 3 weeks
periods = []
today = datetime.today()
monday = today - timedelta(days=today.weekday()) - timedelta(weeks=1) # start from last week to catch any late-posted shifts
for i in range(4):
period = (monday + timedelta(weeks=i)).strftime("%Y-%m-%d")
periods.append(period)
print("Periods to check: " + ", ".join(periods))
facilities = {
"fac_8ed0d011c748":"Cary Swim Club",
"fac_f38dd7211e7e":"Scottish Hills"
}
urls = [f"{BASE_URL}?facility_id={facility}&period={period}" for facility in facilities for period in periods]
shifts = []
for url in urls:
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.content, "html.parser")
for li in soup.find_all("li"):
if NAME in li.get_text().strip():
shift_time = list(li.find_parent("div", class_="text-white").children)[1].get_text().strip().split("\n")[1].strip()
td = li.find_parent('td')
row = td.find_parent('tr')
cells = [c for c in row.find_all(['td', 'th'], recursive=False) if getattr(c, 'name', None) is not None]
col_index = None
for i, c in enumerate(cells):
if c is td:
col_index = i
break
table = row.find_parent('table')
header_ths = []
thead = table.find('thead')
header_row = thead.find('tr')
header_ths = [th for th in header_row.find_all('th', recursive=False)]
# extract date text from header (<th> may contain month and day on separate lines)
shift_date_text = header_ths[col_index].get_text().strip()
month, day = shift_date_text.split("\n")
day = day.split(" ")[1]
shift_date = f"{month} {day}"
facility = url.split("facility_id=")[1].split("&")[0]
facility_name = facilities.get(facility, "Unknown Facility")
# extract year from the period parameter in the URL so parsed datetimes use correct year
period_param = None
if "period=" in url:
period_param = url.split("period=")[1].split("&")[0]
if period_param:
year = int(period_param.split("-")[0])
else:
year = datetime.today().year
#e.g
#date=Jun 18
#time=2:50 PM -8:15 PM
# parse start and end times separately to avoid duplicate regex group names
parts = [p.strip() for p in shift_time.split('-')]
if len(parts) >= 2:
start_str, end_str = parts[0], parts[1]
else:
start_str = parts[0]
end_str = None
# try abbreviated month (%b) then full month (%B)
start_dt = end_dt = None
for fmt in ("%Y %b %d %I:%M %p", "%Y %B %d %I:%M %p"):
try:
start_dt = datetime.strptime(f"{year} {month} {day} {start_str}", fmt)
if end_str:
end_dt = datetime.strptime(f"{year} {month} {day} {end_str}", fmt)
break
except ValueError:
continue
if start_dt is None:
raise ValueError(f"Could not parse shift time: {shift_time!r} with date {shift_date!r}")
shifts.append({
"facility": facility_name,
"date": shift_date,
"time": shift_time,
"start_datetime": start_dt,
"end_datetime": end_dt,
})
for shift in shifts:
print(f"{shift['facility']} - {shift['date']} - {shift['time']}")
print(f" Start: {shift['start_datetime']}")
print(f" End: {shift['end_datetime']}")
print()
# push to CalDAV server with duplicate checking
client = DAVClient(LOCATION, username=USER, password=PASSWORD)
cals = client.principal().calendars()
work_calendar = None
for c in cals:
if getattr(c, 'url', '').rstrip('/') == LOCATION.rstrip('/'):
work_calendar = c
break
if not work_calendar:
print("No CalDAV calendar found; skipping push")
else:
print(f"Using calendar: {getattr(work_calendar, 'id', 'unknown')}")
for shift in shifts:
summary = f"{shift['facility']} Shift"
start_dt = shift['start_datetime']
end_dt = shift['end_datetime'] or start_dt
uid_src = f"{shift['facility']}|{start_dt.isoformat()}|{end_dt.isoformat()}"
uid = hashlib.sha1(uid_src.encode()).hexdigest() + "@workhive-sync"
exists = False
for e in work_calendar.events():
raw = e.data
txt = raw.decode('utf-8', 'ignore') if isinstance(raw, bytes) else str(raw)
m = re.search(r'^UID:(.+)$', txt, re.M)
if m and m.group(1).strip() == uid:
exists = True
break
msum = re.search(r'^SUMMARY:(.+)$', txt, re.M)
mdt = re.search(r'^DTSTART(?:;TZID=[^:]+)?:([0-9T]+)', txt, re.M)
if msum and mdt:
if msum.group(1).strip() == summary and mdt.group(1).strip().startswith(start_dt.strftime('%Y%m%dT%H%M%S')):
exists = True
break
if exists:
print(f"Skipping existing event: {summary} at {start_dt}")
continue
dtstart_str = start_dt.strftime('%Y%m%dT%H%M%S')
dtend_str = end_dt.strftime('%Y%m%dT%H%M%S')
ical = (
'BEGIN:VCALENDAR\r\n'
'VERSION:2.0\r\n'
'PRODID:-//workhive-caldav-sync//EN\r\n'
'BEGIN:VEVENT\r\n'
f'UID:{uid}\r\n'
f'SUMMARY:{summary}\r\n'
f'DTSTART;TZID=America/New_York:{dtstart_str}\r\n'
f'DTEND;TZID=America/New_York:{dtend_str}\r\n'
f'DESCRIPTION:Work shift at {shift["facility"]} on {shift["date"]} from {shift["time"]}\r\n'
'END:VEVENT\r\n'
'END:VCALENDAR\r\n'
)
work_calendar.add_event(ical)
print(f"Added event for shift on {shift['date']} at {shift['facility']}")