Newer
Older
from fastapi import APIRouter, FastAPI, Form, status, Depends, HTTPException, BackgroundTasks, Header
from fastapi.security import OAuth2PasswordRequestForm
from contextlib import asynccontextmanager
from database import Base, engine, get_db, SessionLocal
from utils import check_user_level
websocket_users = defaultdict(lambda: defaultdict(set))
@asynccontextmanager
async def lifespan(app: FastAPI):
db = SessionLocal()
try:
if crud.get_user_by_email(db, config.ADMIN_EMAIL) is None:
crud.create_user(db, schemas.UserCreate(email=config.ADMIN_EMAIL, nickname=config.ADMIN_NICKNAME,
password=config.ADMIN_PASSWORD, type=models.UserType.ADMIN.value))
finally:
db.close()
yield
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=config.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
apiRouter = APIRouter(prefix="/api")
v1Router = APIRouter(prefix="/v1")
authRouter = APIRouter(prefix="/auth", tags=["auth"])
usersRouter = APIRouter(prefix="/users", tags=["users"])
sessionsRouter = APIRouter(prefix="/sessions", tags=["sessions"])
websocketRouter = APIRouter(prefix="/ws", tags=["websocket"])
webhookRouter = APIRouter(prefix="/webhooks", tags=["webhook"])
stats = APIRouter(prefix="/stats", tags=["stats"])
@app.get("/health", status_code=status.HTTP_204_NO_CONTENT)
def login(email: Annotated[str, Form()], password: Annotated[str, Form()], db: Session = Depends(get_db)):
db_user = crud.get_user_by_email_and_password(db, email, password)
raise HTTPException(
status_code=401, detail="Incorrect email or password")
return {
"access_token": hashing.create_access_token(db_user),
"refresh_token": hashing.create_refresh_token(db_user),
@authRouter.post("/register", status_code=status.HTTP_201_CREATED)
def register(email: Annotated[str, Form()], password: Annotated[str, Form()], nickname: Annotated[str, Form()], db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=email)
if db_user:
raise HTTPException(status_code=400, detail="User already registered")
user_data = schemas.UserCreate(
email=email, password=password, nickname=nickname)
user = crud.create_user(db=db, user=user_data)
return user.id
@authRouter.post("/refresh", response_model=schemas.Token)
def refresh_token(current_user: models.User = Depends(hashing.get_jwt_user_from_refresh_token)):
return {
"access_token": hashing.create_access_token(current_user),
"refresh_token": hashing.create_refresh_token(current_user),
}
@usersRouter.post("", status_code=status.HTTP_201_CREATED)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401, detail="You do not have permission to create a user")
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="User already registered")
user = crud.create_user(db=db, user=user)
return user.id
@usersRouter.get("/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN) and current_user.id != user_id:
raise HTTPException(
status_code=401, detail="You do not have permission to view this user")
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
@usersRouter.get("", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401, detail="You do not have permission to view users")
return crud.get_users(db, skip=skip, limit=limit)
@usersRouter.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401, detail="You do not have permission to delete a user")
if not crud.delete_user(db, user_id):
raise HTTPException(status_code=404, detail="User not found")
@usersRouter.get("/{user_id}/sessions", response_model=list[schemas.Session])
def read_user_sessions(user_id: int, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN) and current_user.id != user_id:
raise HTTPException(
status_code=401, detail="You do not have permission to view this user's sessions")
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user.sessions
@usersRouter.post("/{user_id}/metadata", status_code=status.HTTP_201_CREATED)
def create_user_metadata(user_id: int, metadata: schemas.UserMetadataCreate, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN) and current_user.id != user_id:
raise HTTPException(
status_code=401, detail="You do not have permission to create metadata for this user")
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return crud.create_user_metadata(db, db_user.id, metadata).id
@usersRouter.get("/{user_id}/metadata", response_model=schemas.UserMetadata)
def read_user_metadata(user_id: int, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN) and current_user.id != user_id:
raise HTTPException(
status_code=401, detail="You do not have permission to view this user's metadata")
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
metadata = crud.get_user_metadata(db, db_user.id)
if metadata is None:
raise HTTPException(status_code=404, detail="Metadata not found")
return metadata
@usersRouter.post("/{user_id}/tests/typing", status_code=status.HTTP_201_CREATED)
def create_test_typing(user_id: int, test: schemas.TestTypingCreate, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN) and current_user.id != user_id:
raise HTTPException(
status_code=401, detail="You do not have permission to create a test for this user")
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return crud.create_test_typing(db, test, db_user).id
@sessionsRouter.post("", response_model=schemas.Session)
def create_session(db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.TUTOR):
raise HTTPException(
status_code=401, detail="You do not have permission to create a session")
@sessionsRouter.get("/{session_id}", response_model=schemas.Session)
def read_session(session_id: int, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if not check_user_level(current_user, models.UserType.ADMIN) and current_user not in db_session.users:
raise HTTPException(
status_code=401, detail="You do not have permission to view this session")
@sessionsRouter.delete("/{session_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_session(session_id: int, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401, detail="You do not have permission to delete a session")
@sessionsRouter.patch("/{session_id}", status_code=status.HTTP_204_NO_CONTENT)
def update_session(session_id: int, session: schemas.SessionUpdate, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if not check_user_level(current_user, models.UserType.ADMIN) and (current_user.type != models.UserType.TUTOR or current_user not in db_session.users):
raise HTTPException(
status_code=401, detail="You do not have permission to update this session")
@sessionsRouter.post("/{session_id}/users/{user_id}", status_code=status.HTTP_201_CREATED)
def add_user_to_session(session_id: int, user_id: int, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN) and (current_user.id != user_id or current_user.type != models.UserType.TUTOR):
raise HTTPException(
status_code=401, detail="You do not have permission to add a user to a session")
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db_session.users.append(db_user)
db.commit()
db.refresh(db_session)
@sessionsRouter.delete("/{session_id}/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def remove_user_from_session(session_id: int, user_id: int, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, models.UserType.ADMIN) and (current_user.id != user_id or current_user.type != models.UserType.TUTOR):
raise HTTPException(
status_code=401, detail="You do not have permission to remove a user from a session")
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db_session.users.remove(db_user)
db.commit()
@sessionsRouter.get("", response_model=list[schemas.Session])
def read_sessions(skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if check_user_level(current_user, models.UserType.ADMIN):
return crud.get_all_sessions(db, skip=skip, limit=limit)
return crud.get_sessions(db, current_user, skip=skip, limit=limit)
@sessionsRouter.get("/{session_id}/messages", response_model=list[schemas.Message])
def read_messages(session_id: int, skip: int = 0, limit: int = 100, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if not check_user_level(current_user, models.UserType.ADMIN) and current_user not in db_session.users:
raise HTTPException(
status_code=401, detail="You do not have permission to view messages in this session")
return crud.get_messages(db, session_id, skip=skip, limit=limit)
async def send_websoket_message(session_id: int, message: schemas.Message):
content = json.dumps({
"type": "message",
"action": "create",
"data": message.to_dict()
})
for _, user_websockets in websocket_users[session_id].items():
for user_websocket in user_websockets:
await user_websocket.send_text(content)
def store_metadata(db, message_id, metadata: list[schemas.MessageMetadataCreate]):
for m in metadata:
crud.create_message_metadata(db, message_id, m)
pass
@sessionsRouter.post("/{session_id}/messages", status_code=status.HTTP_201_CREATED)
def create_message(session_id: int, entryMessage: schemas.MessageCreate, background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if not check_user_level(current_user, models.UserType.ADMIN) and current_user not in db_session.users:
raise HTTPException(
status_code=401, detail="You do not have permission to create a message in this session")
message = crud.create_message(db, entryMessage, current_user, db_session)
background_tasks.add_task(
store_metadata, db, message.id, entryMessage.metadata)
background_tasks.add_task(
send_websoket_message, session_id, schemas.Message.model_validate(message))
@websocketRouter.websocket("/{token}/{session_id}")
async def websocket_session(token: str, session_id: int, websocket: WebSocket, db: Session = Depends(get_db)):
current_user = hashing.get_jwt_user(token=token, db=db)
if current_user is None:
raise HTTPException(status_code=401, detail="Invalid token")
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if not check_user_level(current_user, models.UserType.ADMIN) and current_user not in db_session.users:
raise HTTPException(
status_code=401, detail="You do not have permission to access this session")
await websocket.accept()
websocket_users[session_id][current_user.id].add(websocket)
try:
while True:
data = await websocket.receive_text()
for user_id, user_websockets in websocket_users[session_id].items():
if user_id == current_user.id:
continue
for user_websocket in user_websockets:
await user_websocket.send_text(data)
except:
websocket_users[session_id][current_user.id].remove(websocket)
try:
await websocket.close()
except:
pass
@webhookRouter.post("/sessions", status_code=status.HTTP_202_ACCEPTED)
async def webhook_session(webhook: schemas.CalComWebhook, x_cal_signature_256: Annotated[str | None, Header()] = None, db: Session = Depends(get_db)):
if x_cal_signature_256 != config.CALCOM_SECRET:
raise HTTPException(status_code=401, detail="Invalid secret")
if webhook.triggerEvent == "BOOKING_CREATED":
start_time = datetime.datetime.fromisoformat(
webhook.payload["startTime"])
end_time = datetime.datetime.fromisoformat(webhook.payload["endTime"])
end_time += datetime.timedelta(hours=1)
attendes = webhook.payload["attendees"]
emails = [attendee["email"]
for attendee in attendes if attendee != None]
db_users = [crud.get_user_by_email(db, email)
for email in emails if email != None]
users = [user for user in db_users if user != None]
if users:
db_session = crud.create_session_with_users(
db, users, start_time, end_time)
else:
raise HTTPException(status_code=404, detail="Users not found")
raise HTTPException(status_code=400, detail="Invalid trigger event")
v1Router.include_router(authRouter)
v1Router.include_router(usersRouter)
v1Router.include_router(sessionsRouter)
apiRouter.include_router(v1Router)
app.include_router(apiRouter)