Files
csc-checkin/main.py
Lockeshor 706b849515 refactor
2026-03-06 13:09:05 -05:00

739 lines
24 KiB
Python

import csv
import datetime as dt
import difflib
import re
import shutil
import sqlite3
from pathlib import Path
from flask import Flask, jsonify, request, send_file
BASE_DIR = Path(__file__).resolve().parent
DB_PATH = BASE_DIR / "pool_checkin.sqlite3"
BACKUP_DIR = BASE_DIR / "backups"
EXPORT_DIR = BASE_DIR / "exports"
app = Flask(__name__)
# Code-only configuration: change this constant to update the swim-test age rule.
DEFAULT_MIN_SWIM_TEST_AGE = 13
def dict_row_factory(cursor, row):
return {col[0]: row[idx] for idx, col in enumerate(cursor.description)}
def get_connection() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = dict_row_factory
conn.execute("PRAGMA foreign_keys = ON;")
return conn
def normalize_text(value: str) -> str:
return re.sub(r"[^a-z0-9 ]+", "", (value or "").lower()).strip()
def fuzzy_score(query: str, candidate: str) -> float:
q = normalize_text(query)
c = normalize_text(candidate)
if not q or not c:
return 0.0
if q in c:
return 1.0 + (len(q) / max(len(c), 1))
base = difflib.SequenceMatcher(None, q, c).ratio()
token_scores = [difflib.SequenceMatcher(None, q, token).ratio() for token in c.split()]
token_max = max(token_scores) if token_scores else 0.0
return max(base, token_max)
def normalize_phone_for_storage(phone_raw: str) -> str | None:
digits = re.sub(r"\D", "", phone_raw or "")
if not digits:
return None
if len(digits) == 10:
return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}"
if len(digits) == 11 and digits.startswith("1"):
d = digits[1:]
return f"+1 {d[:3]}-{d[3:6]}-{d[6:]}"
raise ValueError("Phone number must include area code (10 digits).")
def initialize_database() -> None:
BACKUP_DIR.mkdir(exist_ok=True)
EXPORT_DIR.mkdir(exist_ok=True)
with get_connection() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS families (
id INTEGER PRIMARY KEY AUTOINCREMENT,
family_name TEXT NOT NULL,
primary_phone TEXT,
guest_passes INTEGER NOT NULL DEFAULT 0 CHECK(guest_passes >= 0),
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS members (
id INTEGER PRIMARY KEY AUTOINCREMENT,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
age INTEGER NOT NULL CHECK(age >= 0),
family_id INTEGER,
is_checked_in INTEGER NOT NULL DEFAULT 0 CHECK(is_checked_in IN (0, 1)),
swim_test_passed INTEGER NOT NULL DEFAULT 0 CHECK(swim_test_passed IN (0, 1)),
can_use_deep_end INTEGER NOT NULL DEFAULT 0 CHECK(can_use_deep_end IN (0, 1)),
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (family_id) REFERENCES families(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_members_name ON members(last_name, first_name);
CREATE INDEX IF NOT EXISTS idx_members_family ON members(family_id);
CREATE INDEX IF NOT EXISTS idx_members_checked_in ON members(is_checked_in);
"""
)
family_columns = {
row["name"]
for row in conn.execute("PRAGMA table_info(families);").fetchall()
}
if "guest_passes" not in family_columns:
conn.execute(
"ALTER TABLE families "
"ADD COLUMN guest_passes INTEGER NOT NULL DEFAULT 0 CHECK(guest_passes >= 0);"
)
def to_member_payload(row: dict, min_swim_test_age: int) -> dict:
requires_swim_test = row["age"] < min_swim_test_age
return {
"id": row["id"],
"firstName": row["first_name"],
"lastName": row["last_name"],
"fullName": f"{row['first_name']} {row['last_name']}",
"age": row["age"],
"familyId": row["family_id"],
"checkedIn": bool(row["is_checked_in"]),
"swimTestPassed": bool(row["swim_test_passed"]),
"requiresSwimTest": requires_swim_test,
}
@app.route("/")
def homepage():
return send_file(BASE_DIR / "index.html")
@app.route("/family/<int:family_id>")
def family_page(family_id: int):
# The family id is consumed client-side from the URL path.
return send_file(BASE_DIR / "family.html")
@app.route("/manage")
def manage_page():
return send_file(BASE_DIR / "manage.html")
@app.route("/api/dashboard", methods=["GET"])
def api_dashboard():
with get_connection() as conn:
totals = conn.execute(
"""
SELECT
COUNT(*) AS total_members,
SUM(CASE WHEN family_id IS NULL THEN 1 ELSE 0 END) AS individual_members,
SUM(CASE WHEN is_checked_in = 1 THEN 1 ELSE 0 END) AS checked_in_members,
SUM(CASE WHEN age < ? THEN 1 ELSE 0 END) AS children_count,
SUM(CASE WHEN age < ? AND swim_test_passed = 1 THEN 1 ELSE 0 END) AS children_swim_test_passed
FROM members;
"""
,
(DEFAULT_MIN_SWIM_TEST_AGE, DEFAULT_MIN_SWIM_TEST_AGE),
).fetchone()
family_count = conn.execute("SELECT COUNT(*) AS count FROM families;").fetchone()["count"]
return jsonify(
{
"totalMembers": totals["total_members"] or 0,
"individuals": totals["individual_members"] or 0,
"families": family_count,
"checkedIn": totals["checked_in_members"] or 0,
"children": totals["children_count"] or 0,
"childrenSwimPassed": totals["children_swim_test_passed"] or 0,
"minSwimTestAge": DEFAULT_MIN_SWIM_TEST_AGE,
}
)
@app.route("/api/checked-in", methods=["GET"])
def api_checked_in_members():
query = request.args.get("q", "").strip()
has_query = bool(query)
threshold = 0.62
query_norm = normalize_text(query)
with get_connection() as conn:
rows = conn.execute(
"""
SELECT
m.id,
m.first_name,
m.last_name,
m.age,
m.family_id,
m.is_checked_in,
m.swim_test_passed,
f.family_name
FROM members m
LEFT JOIN families f ON m.family_id = f.id
ORDER BY m.last_name, m.first_name;
"""
).fetchall()
family_last_name_score = {}
if has_query:
for row in rows:
family_id = row["family_id"]
if family_id is None:
continue
score = fuzzy_score(query, row["last_name"])
current = family_last_name_score.get(family_id, 0.0)
family_last_name_score[family_id] = max(current, score)
candidates = []
for row in rows:
checked_in = bool(row["is_checked_in"])
if not has_query and not checked_in:
continue
first_score = fuzzy_score(query, row["first_name"]) if has_query else 1.0
family_score = family_last_name_score.get(row["family_id"], 0.0) if has_query else 0.0
last_score = fuzzy_score(query, row["last_name"]) if has_query else 0.0
matches_query = (
not has_query
or first_score >= threshold
or family_score >= threshold
or (row["family_id"] is None and last_score >= threshold)
)
if not matches_query:
continue
full_name = f"{row['first_name']} {row['last_name']}"
family_name = row["family_name"] if row["family_name"] else "Individual"
first_norm = normalize_text(row["first_name"])
last_norm = normalize_text(row["last_name"])
full_norm = normalize_text(full_name)
# Ordered matching boosts: exact > prefix > early-position > fuzzy.
prefix_bonus = 0.0
position_bonus = 0.0
if has_query and query_norm:
if first_norm == query_norm:
prefix_bonus = 1.6
elif first_norm.startswith(query_norm):
prefix_bonus = 1.25
elif full_norm.startswith(query_norm):
prefix_bonus = 1.05
if query_norm in first_norm:
position_bonus = max(position_bonus, 0.35 - (first_norm.find(query_norm) * 0.03))
if query_norm in full_norm:
position_bonus = max(position_bonus, 0.2 - (full_norm.find(query_norm) * 0.015))
family_boost = family_score * 0.85
match_score = max(first_score + prefix_bonus + position_bonus, last_score, family_boost)
candidates.append(
{
"id": row["id"],
"firstName": row["first_name"],
"lastName": row["last_name"],
"fullName": full_name,
"age": row["age"],
"checkedIn": checked_in,
"familyName": family_name,
"requiresSwimTest": row["age"] < DEFAULT_MIN_SWIM_TEST_AGE,
"swimTestPassed": bool(row["swim_test_passed"]),
"matchScore": round(match_score, 4),
"firstScore": round(first_score, 4),
"familyScore": round(family_score, 4),
"lastScore": round(last_score, 4),
}
)
if has_query:
candidates.sort(
key=lambda item: (
-item["matchScore"],
-item["firstScore"],
-item["familyScore"],
not item["checkedIn"],
item["firstName"].lower(),
item["lastName"].lower(),
)
)
else:
candidates.sort(
key=lambda item: (
not item["checkedIn"],
item["lastName"].lower(),
item["firstName"].lower(),
)
)
return jsonify({"items": candidates})
@app.route("/api/individuals", methods=["GET"])
def api_individuals():
query = request.args.get("q", "").strip().lower()
with get_connection() as conn:
rows = conn.execute(
"""
SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end
FROM members
WHERE family_id IS NULL
ORDER BY last_name, first_name;
"""
).fetchall()
filtered = []
for row in rows:
payload = to_member_payload(row, DEFAULT_MIN_SWIM_TEST_AGE)
searchable = f"{payload['fullName']} {payload['id']}".lower()
if query and query not in searchable:
continue
filtered.append(payload)
return jsonify({"items": filtered})
@app.route("/api/families", methods=["GET", "POST"])
def api_families():
if request.method == "POST":
body = request.get_json(silent=True) or {}
family_name = (body.get("familyName") or "").strip()
primary_phone = (body.get("primaryPhone") or "").strip()
if not family_name:
return jsonify({"error": "`familyName` is required."}), 400
try:
normalized_phone = normalize_phone_for_storage(primary_phone)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
with get_connection() as conn:
result = conn.execute(
"INSERT INTO families (family_name, primary_phone, guest_passes) VALUES (?, ?, 0);",
(family_name, normalized_phone),
)
family = conn.execute(
"SELECT id, family_name, primary_phone, guest_passes FROM families WHERE id = ?;",
(result.lastrowid,),
).fetchone()
return (
jsonify(
{
"item": {
"id": family["id"],
"familyName": family["family_name"],
"primaryPhone": family["primary_phone"],
"guestPasses": family["guest_passes"],
}
}
),
201,
)
query = request.args.get("q", "").strip().lower()
with get_connection() as conn:
families = conn.execute(
"SELECT id, family_name, primary_phone, guest_passes FROM families ORDER BY family_name;"
).fetchall()
members = conn.execute(
"""
SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end
FROM members
WHERE family_id IS NOT NULL
ORDER BY age DESC, last_name, first_name;
"""
).fetchall()
members_by_family = {}
for row in members:
item = to_member_payload(row, DEFAULT_MIN_SWIM_TEST_AGE)
members_by_family.setdefault(item["familyId"], []).append(item)
payload = []
for family in families:
family_members = members_by_family.get(family["id"], [])
family_name = family["family_name"]
score = 0.0
if query:
candidates = [family_name] + [m["fullName"] for m in family_members]
score = max((fuzzy_score(query, candidate) for candidate in candidates), default=0.0)
if score < 0.42:
continue
else:
score = 1.0
payload.append(
{
"id": family["id"],
"familyName": family_name,
"primaryPhone": family["primary_phone"],
"guestPasses": family["guest_passes"],
"members": family_members,
"matchScore": round(score, 4),
}
)
payload.sort(key=lambda item: (-item["matchScore"], item["familyName"].lower()))
return jsonify({"items": payload})
@app.route("/api/members", methods=["POST"])
def api_create_member():
body = request.get_json(silent=True) or {}
first_name = (body.get("firstName") or "").strip()
last_name = (body.get("lastName") or "").strip()
age = body.get("age")
family_id = body.get("familyId")
if not first_name or not last_name:
return jsonify({"error": "`firstName` and `lastName` are required."}), 400
if not isinstance(age, int) or age < 0:
return jsonify({"error": "`age` must be a non-negative integer."}), 400
if family_id is None:
return jsonify({"error": "`familyId` is required for this form."}), 400
if not isinstance(family_id, int):
return jsonify({"error": "`familyId` must be an integer."}), 400
with get_connection() as conn:
family = conn.execute(
"SELECT id FROM families WHERE id = ?;",
(family_id,),
).fetchone()
if family is None:
return jsonify({"error": "Family not found."}), 404
seed_can_use_deep_end = int(age >= DEFAULT_MIN_SWIM_TEST_AGE)
result = conn.execute(
"""
INSERT INTO members
(first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end)
VALUES (?, ?, ?, ?, 0, 0, ?);
""",
(first_name, last_name, age, family_id, seed_can_use_deep_end),
)
member = conn.execute(
"""
SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end
FROM members
WHERE id = ?;
""",
(result.lastrowid,),
).fetchone()
return jsonify({"member": to_member_payload(member, DEFAULT_MIN_SWIM_TEST_AGE)}), 201
@app.route("/api/families/<int:family_id>", methods=["GET"])
def api_family_detail(family_id: int):
with get_connection() as conn:
family = conn.execute(
"SELECT id, family_name, primary_phone, guest_passes FROM families WHERE id = ?;",
(family_id,),
).fetchone()
if family is None:
return jsonify({"error": "Family not found."}), 404
member_rows = conn.execute(
"""
SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end
FROM members
WHERE family_id = ?
ORDER BY age DESC, last_name, first_name;
""",
(family_id,),
).fetchall()
return jsonify(
{
"item": {
"id": family["id"],
"familyName": family["family_name"],
"primaryPhone": family["primary_phone"],
"guestPasses": family["guest_passes"],
"minSwimTestAge": DEFAULT_MIN_SWIM_TEST_AGE,
"members": [to_member_payload(row, DEFAULT_MIN_SWIM_TEST_AGE) for row in member_rows],
}
}
)
@app.route("/api/families/<int:family_id>/guest-passes", methods=["PATCH"])
def api_update_family_guest_passes(family_id: int):
body = request.get_json(silent=True) or {}
guest_passes = body.get("guestPasses")
if not isinstance(guest_passes, int) or guest_passes < 0:
return jsonify({"error": "`guestPasses` must be a non-negative integer."}), 400
with get_connection() as conn:
result = conn.execute(
"UPDATE families SET guest_passes = ? WHERE id = ?;",
(guest_passes, family_id),
)
if result.rowcount == 0:
return jsonify({"error": "Family not found."}), 404
family = conn.execute(
"SELECT id, family_name, primary_phone, guest_passes FROM families WHERE id = ?;",
(family_id,),
).fetchone()
return jsonify(
{
"item": {
"id": family["id"],
"familyName": family["family_name"],
"primaryPhone": family["primary_phone"],
"guestPasses": family["guest_passes"],
},
"message": "Guest passes updated.",
}
)
@app.route("/api/families/<int:family_id>", methods=["DELETE"])
def api_delete_family(family_id: int):
with get_connection() as conn:
family = conn.execute(
"SELECT id, family_name FROM families WHERE id = ?;",
(family_id,),
).fetchone()
if family is None:
return jsonify({"error": "Family not found."}), 404
deleted_members = conn.execute(
"DELETE FROM members WHERE family_id = ?;",
(family_id,),
).rowcount
conn.execute("DELETE FROM families WHERE id = ?;", (family_id,))
return jsonify(
{
"message": (
f"Deleted {family['family_name']} family and {deleted_members} member"
f"{'s' if deleted_members != 1 else ''}."
),
"deletedFamilyId": family_id,
"deletedMembers": deleted_members,
}
)
@app.route("/api/members/<int:member_id>/checkin", methods=["PATCH"])
def api_update_checkin(member_id: int):
body = request.get_json(silent=True) or {}
checked_in = body.get("checkedIn")
if not isinstance(checked_in, bool):
return jsonify({"error": "`checkedIn` must be a boolean."}), 400
with get_connection() as conn:
result = conn.execute(
"UPDATE members SET is_checked_in = ? WHERE id = ?;",
(int(checked_in), member_id),
)
if result.rowcount == 0:
return jsonify({"error": "Member not found."}), 404
member = conn.execute(
"""
SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end
FROM members
WHERE id = ?;
""",
(member_id,),
).fetchone()
return jsonify({"member": to_member_payload(member, DEFAULT_MIN_SWIM_TEST_AGE)})
@app.route("/api/members/<int:member_id>/swim-test", methods=["PATCH"])
def api_update_swim_test(member_id: int):
body = request.get_json(silent=True) or {}
passed = body.get("swimTestPassed")
if not isinstance(passed, bool):
return jsonify({"error": "`swimTestPassed` must be a boolean."}), 400
with get_connection() as conn:
result = conn.execute(
"UPDATE members SET swim_test_passed = ? WHERE id = ?;",
(int(passed), member_id),
)
if result.rowcount == 0:
return jsonify({"error": "Member not found."}), 404
member = conn.execute(
"""
SELECT id, first_name, last_name, age, family_id, is_checked_in, swim_test_passed, can_use_deep_end
FROM members
WHERE id = ?;
""",
(member_id,),
).fetchone()
return jsonify({"member": to_member_payload(member, DEFAULT_MIN_SWIM_TEST_AGE)})
@app.route("/api/admin/backup", methods=["POST"])
def api_admin_backup():
stamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = BACKUP_DIR / f"pool_backup_{stamp}.sqlite3"
shutil.copy2(DB_PATH, backup_path)
return jsonify({"message": f"Backup created at {backup_path.name}"})
@app.route("/api/admin/export", methods=["POST"])
def api_admin_export():
stamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
csv_path = EXPORT_DIR / f"pool_members_{stamp}.csv"
with get_connection() as conn:
rows = conn.execute(
"""
SELECT
m.id,
m.first_name,
m.last_name,
m.age,
m.family_id,
COALESCE(f.family_name, 'Individual') AS family_name,
COALESCE(f.primary_phone, '') AS primary_phone,
m.is_checked_in,
m.swim_test_passed
FROM members m
LEFT JOIN families f ON m.family_id = f.id
ORDER BY family_name, m.last_name, m.first_name;
"""
).fetchall()
with csv_path.open("w", newline="", encoding="utf-8") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(
[
"member_id",
"first_name",
"last_name",
"age",
"family_id",
"family_name",
"primary_phone",
"checked_in",
"swim_test_passed",
"requires_swim_test",
]
)
for row in rows:
writer.writerow(
[
row["id"],
row["first_name"],
row["last_name"],
row["age"],
row["family_id"],
row["family_name"],
row["primary_phone"],
row["is_checked_in"],
row["swim_test_passed"],
int(row["age"] < DEFAULT_MIN_SWIM_TEST_AGE),
]
)
return jsonify({"message": f"Export generated: {csv_path.name}"})
@app.route("/api/admin/reindex", methods=["POST"])
def api_admin_reindex():
with get_connection() as conn:
conn.execute("REINDEX;")
return jsonify({"message": "SQLite indexes rebuilt successfully."})
@app.route("/api/admin/vacuum", methods=["POST"])
def api_admin_vacuum():
# VACUUM requires autocommit mode in SQLite.
conn = sqlite3.connect(DB_PATH, isolation_level=None)
try:
conn.execute("VACUUM;")
finally:
conn.close()
return jsonify({"message": "VACUUM completed successfully."})
@app.route("/api/admin/reset-checkins", methods=["POST"])
def api_admin_reset_checkins():
with get_connection() as conn:
conn.execute("UPDATE members SET is_checked_in = 0;")
return jsonify({"message": "All members were checked out."})
@app.route("/api/admin/reset-swim-tests", methods=["POST"])
def api_admin_reset_swim_tests():
with get_connection() as conn:
conn.execute("UPDATE members SET swim_test_passed = 0;")
conn.execute(
"UPDATE members SET can_use_deep_end = CASE WHEN age >= ? THEN 1 ELSE 0 END;",
(DEFAULT_MIN_SWIM_TEST_AGE,),
)
return jsonify({"message": "All swim tests were reset to not passed."})
@app.route("/api/admin/cleanup-orphan-members", methods=["POST"])
def api_admin_cleanup_orphan_members():
with get_connection() as conn:
deleted = conn.execute(
"""
DELETE FROM members
WHERE family_id IS NULL
OR family_id NOT IN (SELECT id FROM families);
"""
).rowcount
return jsonify(
{
"message": (
f"Removed {deleted} member"
f"{'s' if deleted != 1 else ''} with null or missing family ids."
),
"deletedMembers": deleted,
}
)
initialize_database()
if __name__ == "__main__":
app.run(debug=False)