from datetime import datetime, date, time, timedelta from fastapi import FastAPI, Request, Form, Depends from fastapi.responses import RedirectResponse, HTMLResponse from fastapi.templating import Jinja2Templates from starlette.middleware.sessions import SessionMiddleware from starlette.staticfiles import StaticFiles from sqlalchemy.orm import Session from .settings import settings from .db import SessionLocal, engine, Base from .models import ( User, Location, Block, BlockCandidate, BlockAssignment, ) from .auth import authenticate # Create DB tables on startup (OK for MVP; later migrate to Alembic) Base.metadata.create_all(bind=engine) app = FastAPI(title=settings.APP_NAME) # Static assets app.mount("/static", StaticFiles(directory="app/static"), name="static") # Cookie-based session app.add_middleware( SessionMiddleware, secret_key=settings.SECRET_KEY, https_only=settings.COOKIE_SECURE, ) templates = Jinja2Templates(directory="app/templates") # ------------------------- # DB dependency # ------------------------- def get_db(): db = SessionLocal() try: yield db finally: db.close() # ------------------------- # Auth helpers # ------------------------- def current_user(request: Request, db: Session) -> User | None: uid = request.session.get("uid") if not uid: return None return db.query(User).filter(User.id == uid, User.is_active == True).first() def is_manager(user: User) -> bool: return user.role in ("manager", "ceo") # ------------------------- # Parse helpers for manager form # ------------------------- def parse_date_yyyy_mm_dd(value: str) -> date: return datetime.strptime(value.strip(), "%Y-%m-%d").date() def parse_time_hh_mm(value: str) -> time: return datetime.strptime(value.strip(), "%H:%M").time() def build_datetimes(start_date_str: str, start_time_str: str, end_time_str: str) -> tuple[datetime, datetime]: """ Builds start_dt and end_dt from: - start_date (YYYY-MM-DD) - start_time (HH:MM) - end_time (HH:MM) If end_time is earlier than start_time, we assume the shift ends next day (overnight). """ d = parse_date_yyyy_mm_dd(start_date_str) t_start = parse_time_hh_mm(start_time_str) t_end = parse_time_hh_mm(end_time_str) start_dt = datetime.combine(d, t_start) end_dt = datetime.combine(d, t_end) if end_dt <= start_dt: end_dt = end_dt + timedelta(days=1) return start_dt, end_dt # ------------------------- # Routes: Login / Logout # ------------------------- @app.get("/", response_class=HTMLResponse) def root(request: Request, db: Session = Depends(get_db)): user = current_user(request, db) if not user: return RedirectResponse("/login", status_code=302) return RedirectResponse("/dashboard", status_code=302) @app.get("/login", response_class=HTMLResponse) def login_page(request: Request): return templates.TemplateResponse("login.html", {"request": request, "error": None}) @app.post("/login") def login( request: Request, email: str = Form(...), password: str = Form(...), db: Session = Depends(get_db), ): user = authenticate(db, email, password) if not user: return templates.TemplateResponse( "login.html", {"request": request, "error": "Invalid email or password."} ) request.session["uid"] = user.id return RedirectResponse("/dashboard", status_code=302) @app.post("/logout") def logout(request: Request): request.session.clear() return RedirectResponse("/login", status_code=302) # ------------------------- # Dashboard # ------------------------- @app.get("/dashboard", response_class=HTMLResponse) def dashboard(request: Request, db: Session = Depends(get_db)): user = current_user(request, db) if not user: return RedirectResponse("/login", status_code=302) # Determine visible scope (CEO can see all; others see their location only) q = db.query(Block).filter(Block.status == "open") if user.role != "ceo": if not user.location_id: blocks = [] else: q = q.filter(Block.location_id == user.location_id) blocks = q.order_by(Block.start_dt.asc()).limit(150).all() else: blocks = q.order_by(Block.start_dt.asc()).limit(300).all() # Location name for display (optional) location_name = None if user.location_id: loc = db.query(Location).filter(Location.id == user.location_id).first() location_name = loc.name if loc else None return templates.TemplateResponse( "dashboard.html", { "request": request, "user": user, "blocks": blocks, "location_name": location_name, "now": datetime.utcnow(), }, ) # ------------------------- # Employee action: Pick block # ------------------------- @app.post("/blocks/{block_id}/pick") def pick_block(block_id: int, request: Request, db: Session = Depends(get_db)): user = current_user(request, db) if not user: return RedirectResponse("/login", status_code=302) block = db.query(Block).filter(Block.id == block_id, Block.status == "open").first() if not block: return RedirectResponse("/dashboard", status_code=302) # Managers/CEO do not pick blocks if user.role in ("manager", "ceo"): return RedirectResponse("/dashboard", status_code=302) # Role and location enforcement if user.role != block.role_required: return RedirectResponse("/dashboard", status_code=302) if user.location_id is None or block.location_id != user.location_id: return RedirectResponse("/dashboard", status_code=302) existing = ( db.query(BlockCandidate) .filter( BlockCandidate.block_id == block_id, BlockCandidate.user_id == user.id, BlockCandidate.state == "requested", ) .first() ) if not existing: db.add(BlockCandidate(block_id=block_id, user_id=user.id, state="requested")) db.commit() return RedirectResponse("/dashboard", status_code=302) # ------------------------- # Manager/CEO: Create block # ------------------------- @app.get("/manager/blocks/new", response_class=HTMLResponse) def manager_new_block_page(request: Request, db: Session = Depends(get_db)): user = current_user(request, db) if not user or not is_manager(user): return RedirectResponse("/login", status_code=302) locations = db.query(Location).order_by(Location.name.asc()).all() # default date = today today = datetime.utcnow().date().strftime("%Y-%m-%d") # Provide standard shift time options (you can add more later) shift_options = [ ("10:00", "10:00 AM"), ("18:00", "6:00 PM"), ] end_options = [ ("21:00", "9:00 PM"), ("03:00", "3:00 AM (next day)"), ] return templates.TemplateResponse( "manager_new_block.html", { "request": request, "user": user, "locations": locations, "error": None, "default_date": today, "shift_options": shift_options, "end_options": end_options, }, ) @app.post("/manager/blocks/new") def manager_create_block( request: Request, location_id: int = Form(...), role_required: str = Form(...), title: str = Form(...), start_date: str = Form(...), # YYYY-MM-DD start_time: str = Form(...), # HH:MM end_time: str = Form(...), # HH:MM (may be next day) slots_total: int = Form(...), notes: str = Form(""), db: Session = Depends(get_db), ): user = current_user(request, db) if not user or not is_manager(user): return RedirectResponse("/login", status_code=302) locations = db.query(Location).order_by(Location.name.asc()).all() # Managers can only post for their own location (CEO can post anywhere) if user.role != "ceo" and location_id != user.location_id: # Re-render form with error and options today = datetime.utcnow().date().strftime("%Y-%m-%d") shift_options = [("10:00", "10:00 AM"), ("18:00", "6:00 PM")] end_options = [("21:00", "9:00 PM"), ("03:00", "3:00 AM (next day)")] return templates.TemplateResponse( "manager_new_block.html", { "request": request, "user": user, "locations": locations, "error": "Managers can only post blocks for their own location.", "default_date": today, "shift_options": shift_options, "end_options": end_options, }, ) allowed_roles = {"forklift_operator", "classification", "supervisor", "driver"} if role_required not in allowed_roles: today = datetime.utcnow().date().strftime("%Y-%m-%d") shift_options = [("10:00", "10:00 AM"), ("18:00", "6:00 PM")] end_options = [("21:00", "9:00 PM"), ("03:00", "3:00 AM (next day)")] return templates.TemplateResponse( "manager_new_block.html", { "request": request, "user": user, "locations": locations, "error": "Invalid role_required.", "default_date": today, "shift_options": shift_options, "end_options": end_options, }, ) try: start_dt, end_dt = build_datetimes(start_date, start_time, end_time) except ValueError: today = datetime.utcnow().date().strftime("%Y-%m-%d") shift_options = [("10:00", "10:00 AM"), ("18:00", "6:00 PM")] end_options = [("21:00", "9:00 PM"), ("03:00", "3:00 AM (next day)")] return templates.TemplateResponse( "manager_new_block.html", { "request": request, "user": user, "locations": locations, "error": "Invalid date/time format. Use the picker and dropdowns.", "default_date": today, "shift_options": shift_options, "end_options": end_options, }, ) # Basic sanity: slot >= 1 try: slots_total_int = int(slots_total) if slots_total_int < 1: raise ValueError() except Exception: slots_total_int = 1 b = Block( location_id=location_id, role_required=role_required, title=title.strip(), start_dt=start_dt, end_dt=end_dt, slots_total=slots_total_int, posted_by_user_id=user.id, notes=notes.strip() if notes else None, status="open", ) db.add(b) db.commit() return RedirectResponse("/dashboard", status_code=302) # ------------------------- # Manager/CEO: View block details + candidates + assignments # ------------------------- @app.get("/manager/blocks/{block_id}", response_class=HTMLResponse) def block_detail(request: Request, block_id: int, db: Session = Depends(get_db)): user = current_user(request, db) if not user or not is_manager(user): return RedirectResponse("/login", status_code=302) block = db.query(Block).filter(Block.id == block_id).first() if not block: return RedirectResponse("/dashboard", status_code=302) if user.role != "ceo" and block.location_id != user.location_id: return RedirectResponse("/dashboard", status_code=302) # Active assignments assigned_rows = ( db.query(BlockAssignment, User) .join(User, User.id == BlockAssignment.user_id) .filter( BlockAssignment.block_id == block_id, BlockAssignment.released_at.is_(None), ) .all() ) assigned = [{"name": u.name, "email": u.email, "user_id": u.id} for a, u in assigned_rows] assigned_count = len(assigned) # Requested candidates cand_rows = ( db.query(BlockCandidate, User) .join(User, User.id == BlockCandidate.user_id) .filter( BlockCandidate.block_id == block_id, BlockCandidate.state == "requested", ) .all() ) candidates = [ {"user_id": u.id, "user_name": u.name, "user_email": u.email} for c, u in cand_rows ] return templates.TemplateResponse( "block_detail.html", { "request": request, "block": block, "candidates": candidates, "assigned": assigned, "assigned_count": assigned_count, }, ) # ------------------------- # Manager/CEO: Assign selected candidates # ------------------------- @app.post("/manager/blocks/{block_id}/assign") def assign_selected( request: Request, block_id: int, user_ids: list[int] = Form([]), db: Session = Depends(get_db), ): user = current_user(request, db) if not user or not is_manager(user): return RedirectResponse("/login", status_code=302) block = db.query(Block).filter(Block.id == block_id).first() if not block: return RedirectResponse("/dashboard", status_code=302) if user.role != "ceo" and block.location_id != user.location_id: return RedirectResponse("/dashboard", status_code=302) active_assigned = ( db.query(BlockAssignment) .filter( BlockAssignment.block_id == block_id, BlockAssignment.released_at.is_(None), ) .count() ) remaining = max(0, int(block.slots_total) - int(active_assigned)) if remaining <= 0: return RedirectResponse(f"/manager/blocks/{block_id}", status_code=302) selected = user_ids[:remaining] for uid in selected: already = ( db.query(BlockAssignment) .filter( BlockAssignment.block_id == block_id, BlockAssignment.user_id == uid, BlockAssignment.released_at.is_(None), ) .first() ) if already: continue db.add(BlockAssignment(block_id=block_id, user_id=uid, assigned_by_user_id=user.id)) cand = ( db.query(BlockCandidate) .filter( BlockCandidate.block_id == block_id, BlockCandidate.user_id == uid, BlockCandidate.state == "requested", ) .first() ) if cand: cand.state = "approved" db.commit() return RedirectResponse(f"/manager/blocks/{block_id}", status_code=302)