import csv import datetime as dt import difflib import re import shutil import sqlite3 from pathlib import Path from flask import Flask, jsonify, request, send_file BASE_DIR = Path(__file__).resolve().parent DB_PATH = BASE_DIR / "pool_checkin.sqlite3" BACKUP_DIR = BASE_DIR / "backups" EXPORT_DIR = BASE_DIR / "exports" app = Flask(__name__) # Code-only configuration: change this constant to update the swim-test age rule. DEFAULT_MIN_SWIM_TEST_AGE = 13 def dict_row_factory(cursor, row): return {col[0]: row[idx] for idx, col in enumerate(cursor.description)} def get_connection() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH) conn.row_factory = dict_row_factory conn.execute("PRAGMA foreign_keys = ON;") return conn def normalize_text(value: str) -> str: return re.sub(r"[^a-z0-9 ]+", "", (value or "").lower()).strip() def fuzzy_score(query: str, candidate: str) -> float: q = normalize_text(query) c = normalize_text(candidate) if not q or not c: return 0.0 if q in c: return 1.0 + (len(q) / max(len(c), 1)) base = difflib.SequenceMatcher(None, q, c).ratio() token_scores = [difflib.SequenceMatcher(None, q, token).ratio() for token in c.split()] token_max = max(token_scores) if token_scores else 0.0 return max(base, token_max) def normalize_phone_for_storage(phone_raw: str) -> str | None: digits = re.sub(r"\D", "", phone_raw or "") if not digits: return None if len(digits) == 10: return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}" if len(digits) == 11 and digits.startswith("1"): d = digits[1:] return f"+1 {d[:3]}-{d[3:6]}-{d[6:]}" raise ValueError("Phone number must include area code (10 digits).") def initialize_database() -> None: BACKUP_DIR.mkdir(exist_ok=True) EXPORT_DIR.mkdir(exist_ok=True) with get_connection() as conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS families ( id INTEGER PRIMARY KEY AUTOINCREMENT, family_name TEXT NOT NULL, primary_phone TEXT, guest_passes INTEGER NOT NULL DEFAULT 0 CHECK(guest_passes >= 0), created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS members ( id INTEGER PRIMARY KEY AUTOINCREMENT, first_name TEXT NOT NULL, last_name TEXT NOT NULL, age INTEGER NOT NULL CHECK(age >= 0), family_id INTEGER, is_checked_in INTEGER NOT NULL DEFAULT 0 CHECK(is_checked_in IN (0, 1)), swim_test_passed INTEGER NOT NULL DEFAULT 0 CHECK(swim_test_passed IN (0, 1)), can_use_deep_end INTEGER NOT NULL DEFAULT 0 CHECK(can_use_deep_end IN (0, 1)), created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (family_id) REFERENCES families(id) ON DELETE SET NULL ); CREATE INDEX IF NOT EXISTS idx_members_name ON members(last_name, first_name); CREATE INDEX IF NOT EXISTS idx_members_family ON members(family_id); CREATE INDEX IF NOT EXISTS idx_members_checked_in ON members(is_checked_in); """ ) family_columns = { row["name"] for row in conn.execute("PRAGMA table_info(families);").fetchall() } if "guest_passes" not in family_columns: conn.execute( "ALTER TABLE families " "ADD COLUMN guest_passes INTEGER NOT NULL DEFAULT 0 CHECK(guest_passes >= 0);" ) def to_member_payload(row: dict, min_swim_test_age: int) -> dict: requires_swim_test = row["age"] < min_swim_test_age return { "id": row["id"], "firstName": row["first_name"], "lastName": row["last_name"], "fullName": f"{row['first_name']} {row['last_name']}", "age": row["age"], "familyId": row["family_id"], "checkedIn": bool(row["is_checked_in"]), "swimTestPassed": bool(row["swim_test_passed"]), "requiresSwimTest": requires_swim_test, } @app.route("/") def homepage(): return send_file(BASE_DIR / "index.html") @app.route("/family/") def family_page(family_id: int): # The family id is consumed client-side from the URL path. return send_file(BASE_DIR / "family.html") @app.route("/manage") def manage_page(): return send_file(BASE_DIR / "manage.html") @app.route("/api/dashboard", methods=["GET"]) def api_dashboard(): with get_connection() as conn: totals = conn.execute( """ SELECT COUNT(*) AS total_members, SUM(CASE WHEN family_id IS NULL THEN 1 ELSE 0 END) AS individual_members, SUM(CASE WHEN is_checked_in = 1 THEN 1 ELSE 0 END) AS checked_in_members, SUM(CASE WHEN age < ? THEN 1 ELSE 0 END) AS children_count, SUM(CASE WHEN age < ? AND swim_test_passed = 1 THEN 1 ELSE 0 END) AS children_swim_test_passed FROM members; """ , (DEFAULT_MIN_SWIM_TEST_AGE, DEFAULT_MIN_SWIM_TEST_AGE), ).fetchone() family_count = conn.execute("SELECT COUNT(*) AS count FROM families;").fetchone()["count"] return jsonify( { "totalMembers": totals["total_members"] or 0, "individuals": totals["individual_members"] or 0, "families": family_count, "checkedIn": totals["checked_in_members"] or 0, "children": totals["children_count"] or 0, "childrenSwimPassed": totals["children_swim_test_passed"] or 0, "minSwimTestAge": DEFAULT_MIN_SWIM_TEST_AGE, } ) @app.route("/api/checked-in", methods=["GET"]) def api_checked_in_members(): query = request.args.get("q", "").strip() has_query = bool(query) threshold = 0.62 query_norm = normalize_text(query) with get_connection() as conn: rows = conn.execute( """ SELECT m.id, m.first_name, m.last_name, m.age, m.family_id, m.is_checked_in, m.swim_test_passed, f.family_name FROM members m LEFT JOIN families f ON m.family_id = f.id ORDER BY m.last_name, m.first_name; """ ).fetchall() family_last_name_score = {} if has_query: for row in rows: family_id = row["family_id"] if family_id is None: continue score = fuzzy_score(query, row["last_name"]) current = family_last_name_score.get(family_id, 0.0) family_last_name_score[family_id] = max(current, score) candidates = [] for row in rows: checked_in = bool(row["is_checked_in"]) if not has_query and not checked_in: continue first_score = fuzzy_score(query, row["first_name"]) if has_query else 1.0 family_score = family_last_name_score.get(row["family_id"], 0.0) if has_query else 0.0 last_score = fuzzy_score(query, row["last_name"]) if has_query else 0.0 matches_query = ( not has_query or first_score >= threshold or family_score >= threshold or (row["family_id"] is None and last_score >= threshold) ) if not matches_query: continue full_name = f"{row['first_name']} {row['last_name']}" family_name = row["family_name"] if row["family_name"] else "Individual" first_norm = normalize_text(row["first_name"]) last_norm = normalize_text(row["last_name"]) full_norm = normalize_text(full_name) # Ordered matching boosts: exact > prefix > early-position > fuzzy. prefix_bonus = 0.0 position_bonus = 0.0 if has_query and query_norm: if first_norm == query_norm: prefix_bonus = 1.6 elif first_norm.startswith(query_norm): prefix_bonus = 1.25 elif full_norm.startswith(query_norm): prefix_bonus = 1.05 if query_norm in first_norm: position_bonus = max(position_bonus, 0.35 - (first_norm.find(query_norm) * 0.03)) if query_norm in full_norm: position_bonus = max(position_bonus, 0.2 - (full_norm.find(query_norm) * 0.015)) family_boost = family_score * 0.85 match_score = max(first_score + prefix_bonus + position_bonus, last_score, family_boost) candidates.append( { "id": row["id"], "firstName": row["first_name"], "lastName": row["last_name"], "fullName": full_name, "age": row["age"], "checkedIn": checked_in, "familyName": family_name, "requiresSwimTest": row["age"] < DEFAULT_MIN_SWIM_TEST_AGE, "swimTestPassed": bool(row["swim_test_passed"]), "matchScore": round(match_score, 4), "firstScore": round(first_score, 4), "familyScore": round(family_score, 4), "lastScore": round(last_score, 4), } ) if has_query: candidates.sort( key=lambda item: ( -item["matchScore"], -item["firstScore"], -item["familyScore"], not item["checkedIn"], item["firstName"].lower(), item["lastName"].lower(), ) ) else: candidates.sort( key=lambda item: ( not item["checkedIn"], item["lastName"].lower(), item["firstName"].lower(), ) ) return jsonify({"items": candidates}) @app.route("/api/individuals", methods=["GET"]) def api_individuals(): query = request.args.get("q", "").strip().lower() with get_connection() as conn: rows = conn.execute( """ SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end FROM members WHERE family_id IS NULL ORDER BY last_name, first_name; """ ).fetchall() filtered = [] for row in rows: payload = to_member_payload(row, DEFAULT_MIN_SWIM_TEST_AGE) searchable = f"{payload['fullName']} {payload['id']}".lower() if query and query not in searchable: continue filtered.append(payload) return jsonify({"items": filtered}) @app.route("/api/families", methods=["GET", "POST"]) def api_families(): if request.method == "POST": body = request.get_json(silent=True) or {} family_name = (body.get("familyName") or "").strip() primary_phone = (body.get("primaryPhone") or "").strip() if not family_name: return jsonify({"error": "`familyName` is required."}), 400 try: normalized_phone = normalize_phone_for_storage(primary_phone) except ValueError as exc: return jsonify({"error": str(exc)}), 400 with get_connection() as conn: result = conn.execute( "INSERT INTO families (family_name, primary_phone, guest_passes) VALUES (?, ?, 0);", (family_name, normalized_phone), ) family = conn.execute( "SELECT id, family_name, primary_phone, guest_passes FROM families WHERE id = ?;", (result.lastrowid,), ).fetchone() return ( jsonify( { "item": { "id": family["id"], "familyName": family["family_name"], "primaryPhone": family["primary_phone"], "guestPasses": family["guest_passes"], } } ), 201, ) query = request.args.get("q", "").strip().lower() with get_connection() as conn: families = conn.execute( "SELECT id, family_name, primary_phone, guest_passes FROM families ORDER BY family_name;" ).fetchall() members = conn.execute( """ SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end FROM members WHERE family_id IS NOT NULL ORDER BY age DESC, last_name, first_name; """ ).fetchall() members_by_family = {} for row in members: item = to_member_payload(row, DEFAULT_MIN_SWIM_TEST_AGE) members_by_family.setdefault(item["familyId"], []).append(item) payload = [] for family in families: family_members = members_by_family.get(family["id"], []) family_name = family["family_name"] score = 0.0 if query: candidates = [family_name] + [m["fullName"] for m in family_members] score = max((fuzzy_score(query, candidate) for candidate in candidates), default=0.0) if score < 0.42: continue else: score = 1.0 payload.append( { "id": family["id"], "familyName": family_name, "primaryPhone": family["primary_phone"], "guestPasses": family["guest_passes"], "members": family_members, "matchScore": round(score, 4), } ) payload.sort(key=lambda item: (-item["matchScore"], item["familyName"].lower())) return jsonify({"items": payload}) @app.route("/api/members", methods=["POST"]) def api_create_member(): body = request.get_json(silent=True) or {} first_name = (body.get("firstName") or "").strip() last_name = (body.get("lastName") or "").strip() age = body.get("age") family_id = body.get("familyId") if not first_name or not last_name: return jsonify({"error": "`firstName` and `lastName` are required."}), 400 if not isinstance(age, int) or age < 0: return jsonify({"error": "`age` must be a non-negative integer."}), 400 if family_id is None: return jsonify({"error": "`familyId` is required for this form."}), 400 if not isinstance(family_id, int): return jsonify({"error": "`familyId` must be an integer."}), 400 with get_connection() as conn: family = conn.execute( "SELECT id FROM families WHERE id = ?;", (family_id,), ).fetchone() if family is None: return jsonify({"error": "Family not found."}), 404 seed_can_use_deep_end = int(age >= DEFAULT_MIN_SWIM_TEST_AGE) result = conn.execute( """ INSERT INTO members (first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end) VALUES (?, ?, ?, ?, 0, 0, ?); """, (first_name, last_name, age, family_id, seed_can_use_deep_end), ) member = conn.execute( """ SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end FROM members WHERE id = ?; """, (result.lastrowid,), ).fetchone() return jsonify({"member": to_member_payload(member, DEFAULT_MIN_SWIM_TEST_AGE)}), 201 @app.route("/api/families/", methods=["GET"]) def api_family_detail(family_id: int): with get_connection() as conn: family = conn.execute( "SELECT id, family_name, primary_phone, guest_passes FROM families WHERE id = ?;", (family_id,), ).fetchone() if family is None: return jsonify({"error": "Family not found."}), 404 member_rows = conn.execute( """ SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end FROM members WHERE family_id = ? ORDER BY age DESC, last_name, first_name; """, (family_id,), ).fetchall() return jsonify( { "item": { "id": family["id"], "familyName": family["family_name"], "primaryPhone": family["primary_phone"], "guestPasses": family["guest_passes"], "minSwimTestAge": DEFAULT_MIN_SWIM_TEST_AGE, "members": [to_member_payload(row, DEFAULT_MIN_SWIM_TEST_AGE) for row in member_rows], } } ) @app.route("/api/families//guest-passes", methods=["PATCH"]) def api_update_family_guest_passes(family_id: int): body = request.get_json(silent=True) or {} guest_passes = body.get("guestPasses") if not isinstance(guest_passes, int) or guest_passes < 0: return jsonify({"error": "`guestPasses` must be a non-negative integer."}), 400 with get_connection() as conn: result = conn.execute( "UPDATE families SET guest_passes = ? WHERE id = ?;", (guest_passes, family_id), ) if result.rowcount == 0: return jsonify({"error": "Family not found."}), 404 family = conn.execute( "SELECT id, family_name, primary_phone, guest_passes FROM families WHERE id = ?;", (family_id,), ).fetchone() return jsonify( { "item": { "id": family["id"], "familyName": family["family_name"], "primaryPhone": family["primary_phone"], "guestPasses": family["guest_passes"], }, "message": "Guest passes updated.", } ) @app.route("/api/families/", methods=["DELETE"]) def api_delete_family(family_id: int): with get_connection() as conn: family = conn.execute( "SELECT id, family_name FROM families WHERE id = ?;", (family_id,), ).fetchone() if family is None: return jsonify({"error": "Family not found."}), 404 deleted_members = conn.execute( "DELETE FROM members WHERE family_id = ?;", (family_id,), ).rowcount conn.execute("DELETE FROM families WHERE id = ?;", (family_id,)) return jsonify( { "message": ( f"Deleted {family['family_name']} family and {deleted_members} member" f"{'s' if deleted_members != 1 else ''}." ), "deletedFamilyId": family_id, "deletedMembers": deleted_members, } ) @app.route("/api/members//checkin", methods=["PATCH"]) def api_update_checkin(member_id: int): body = request.get_json(silent=True) or {} checked_in = body.get("checkedIn") if not isinstance(checked_in, bool): return jsonify({"error": "`checkedIn` must be a boolean."}), 400 with get_connection() as conn: result = conn.execute( "UPDATE members SET is_checked_in = ? WHERE id = ?;", (int(checked_in), member_id), ) if result.rowcount == 0: return jsonify({"error": "Member not found."}), 404 member = conn.execute( """ SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end FROM members WHERE id = ?; """, (member_id,), ).fetchone() return jsonify({"member": to_member_payload(member, DEFAULT_MIN_SWIM_TEST_AGE)}) @app.route("/api/members//swim-test", methods=["PATCH"]) def api_update_swim_test(member_id: int): body = request.get_json(silent=True) or {} passed = body.get("swimTestPassed") if not isinstance(passed, bool): return jsonify({"error": "`swimTestPassed` must be a boolean."}), 400 with get_connection() as conn: result = conn.execute( "UPDATE members SET swim_test_passed = ? WHERE id = ?;", (int(passed), member_id), ) if result.rowcount == 0: return jsonify({"error": "Member not found."}), 404 member = conn.execute( """ SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end FROM members WHERE id = ?; """, (member_id,), ).fetchone() return jsonify({"member": to_member_payload(member, DEFAULT_MIN_SWIM_TEST_AGE)}) @app.route("/api/admin/backup", methods=["POST"]) def api_admin_backup(): stamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = BACKUP_DIR / f"pool_backup_{stamp}.sqlite3" shutil.copy2(DB_PATH, backup_path) return jsonify({"message": f"Backup created at {backup_path.name}"}) @app.route("/api/admin/export", methods=["POST"]) def api_admin_export(): stamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S") csv_path = EXPORT_DIR / f"pool_members_{stamp}.csv" with get_connection() as conn: rows = conn.execute( """ SELECT m.id, m.first_name, m.last_name, m.age, m.family_id, COALESCE(f.family_name, 'Individual') AS family_name, COALESCE(f.primary_phone, '') AS primary_phone, m.is_checked_in, m.swim_test_passed FROM members m LEFT JOIN families f ON m.family_id = f.id ORDER BY family_name, m.last_name, m.first_name; """ ).fetchall() with csv_path.open("w", newline="", encoding="utf-8") as csv_file: writer = csv.writer(csv_file) writer.writerow( [ "member_id", "first_name", "last_name", "age", "family_id", "family_name", "primary_phone", "checked_in", "swim_test_passed", "requires_swim_test", ] ) for row in rows: writer.writerow( [ row["id"], row["first_name"], row["last_name"], row["age"], row["family_id"], row["family_name"], row["primary_phone"], row["is_checked_in"], row["swim_test_passed"], int(row["age"] < DEFAULT_MIN_SWIM_TEST_AGE), ] ) return jsonify({"message": f"Export generated: {csv_path.name}"}) @app.route("/api/admin/reindex", methods=["POST"]) def api_admin_reindex(): with get_connection() as conn: conn.execute("REINDEX;") return jsonify({"message": "SQLite indexes rebuilt successfully."}) @app.route("/api/admin/vacuum", methods=["POST"]) def api_admin_vacuum(): # VACUUM requires autocommit mode in SQLite. conn = sqlite3.connect(DB_PATH, isolation_level=None) try: conn.execute("VACUUM;") finally: conn.close() return jsonify({"message": "VACUUM completed successfully."}) @app.route("/api/admin/reset-checkins", methods=["POST"]) def api_admin_reset_checkins(): with get_connection() as conn: conn.execute("UPDATE members SET is_checked_in = 0;") return jsonify({"message": "All members were checked out."}) @app.route("/api/admin/reset-swim-tests", methods=["POST"]) def api_admin_reset_swim_tests(): with get_connection() as conn: conn.execute("UPDATE members SET swim_test_passed = 0;") conn.execute( "UPDATE members SET can_use_deep_end = CASE WHEN age >= ? THEN 1 ELSE 0 END;", (DEFAULT_MIN_SWIM_TEST_AGE,), ) return jsonify({"message": "All swim tests were reset to not passed."}) @app.route("/api/admin/cleanup-orphan-members", methods=["POST"]) def api_admin_cleanup_orphan_members(): with get_connection() as conn: deleted = conn.execute( """ DELETE FROM members WHERE family_id IS NULL OR family_id NOT IN (SELECT id FROM families); """ ).rowcount return jsonify( { "message": ( f"Removed {deleted} member" f"{'s' if deleted != 1 else ''} with null or missing family ids." ), "deletedMembers": deleted, } ) initialize_database() if __name__ == "__main__": app.run(debug=False)