Newer
Older
from fastapi import (
APIRouter,
FastAPI,
status,
Depends,
HTTPException,
BackgroundTasks,
Response,
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
from io import StringIO
import csv
from database import Base, engine, get_db, SessionLocal
from utils import check_user_level
from routes.tests import testRouter
from routes.studies import studiesRouter
websocket_users = defaultdict(lambda: defaultdict(set))
websocket_users_global = defaultdict(set)
@asynccontextmanager
async def lifespan(app: FastAPI):
db = SessionLocal()
try:
if crud.get_user_by_email(db, config.ADMIN_EMAIL) is None:
crud.create_user(
db,
schemas.UserCreate(
email=config.ADMIN_EMAIL,
nickname=config.ADMIN_NICKNAME,
password=config.ADMIN_PASSWORD,
type=models.UserType.ADMIN.value,
),
)
finally:
db.close()
yield
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=config.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
apiRouter = APIRouter(prefix="/tmp-api")
v1Router = APIRouter(prefix="/v1")
authRouter = APIRouter(prefix="/auth", tags=["auth"])
usersRouter = APIRouter(prefix="/users", tags=["users"])
sessionsRouter = APIRouter(prefix="/sessions", tags=["sessions"])
studyRouter = APIRouter(prefix="/studies", tags=["studies"])
websocketRouter = APIRouter(prefix="/ws", tags=["websocket"])
@v1Router.get("/health", status_code=status.HTTP_204_NO_CONTENT)
@authRouter.post("/login", status_code=200)
def login(
response: Response,
db: Session = Depends(get_db),
):
db_user = crud.get_user_by_email_and_password(db, login.email, login.password)
raise HTTPException(status_code=401, detail="Incorrect email or password")
subject = {"uid": db_user.id, "email": db_user.email, "nickname": db_user.nickname}
access_token = jwt_cookie.create_access_token(subject)
jwt_cookie.set_access_cookie(
response=response,
access_token=access_token,
refresh_token = jwt_cookie.create_refresh_token(subject)
jwt_cookie.set_refresh_cookie(
response=response,
refresh_token=refresh_token,
expires_delta=jwt_cookie.refresh_expires_delta,
@authRouter.post("/register", status_code=status.HTTP_201_CREATED)
if db_user:
raise HTTPException(status_code=400, detail="User already registered")
user_data = schemas.UserCreate(
email=register.email,
password=register.password,
nickname=register.nickname,
type=(
models.UserType.TUTOR.value
if register.is_tutor
else models.UserType.STUDENT.value
),
user = crud.create_user(db=db, user=user_data)
return user.id
@usersRouter.post("", status_code=status.HTTP_201_CREATED)
def create_user(
user: schemas.UserCreate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
if not check_user_level(current_user, models.UserType.ADMIN):
status_code=401, detail="You do not have permission to create a user"
)
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="User already registered")
user = crud.create_user(db=db, user=user)
return user.id
@usersRouter.get("/{user_id}", response_model=schemas.User)
def read_user(
user_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user.id != user_id
):
status_code=401, detail="You do not have permission to view this user"
)
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
@usersRouter.get("", response_model=list[schemas.User])
def read_users(
skip: int = 0,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
@usersRouter.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(
user_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
if not check_user_level(current_user, models.UserType.ADMIN):
status_code=401, detail="You do not have permission to delete a user"
)
if not crud.delete_user(db, user_id):
raise HTTPException(status_code=404, detail="User not found")
@usersRouter.patch("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def update_user(
user_id: int,
user: schemas.UserUpdate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
if not check_user_level(current_user, models.UserType.ADMIN) and (
current_user.id != user_id or user.type is not None
status_code=401, detail="You do not have permission to update this user"
)
if not crud.update_user(db, user, user_id):
raise HTTPException(status_code=404, detail="User not found")
@usersRouter.get("/by-email/{email}", response_model=schemas.User)
def read_user_by_email(
email: str,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401, detail="You do not have permission to view this user"
)
db_user = crud.get_user_by_email(db, email)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@usersRouter.get("/{user_id}/sessions", response_model=list[schemas.Session])
def read_user_sessions(
user_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user.id != user_id
):
status_code=401,
detail="You do not have permission to view this user's sessions",
)
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user.sessions
@usersRouter.get(
"/{user_id}/contacts/{contact_id}/sessions", response_model=list[schemas.Session]
)
def read_user_contact_sessions(
user_id: int,
contact_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user.id != user_id
):
raise HTTPException(
status_code=401,
detail="You do not have permission to view this user's sessions",
)
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return crud.get_contact_sessions(db, user_id, contact_id)
def store_typing_entries(
db, entries: list[schemas.TestTypingEntryCreate], typing_id: int
):
for e in entries:
crud.create_test_typing_entry(db, e, typing_id)
pass
def create_test_typing(
test: schemas.TestTypingCreate,
background_tasks.add_task(store_typing_entries, db, test.entries, typing_id)
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
@usersRouter.post(
"/{user_id}/contacts/{contact_id}", status_code=status.HTTP_201_CREATED
)
def create_contact(
user_id: int,
contact_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user.id != user_id
):
raise HTTPException(
status_code=401,
detail="You do not have permission to create a contact for this user",
)
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db_contact = crud.get_user(db, contact_id)
if db_contact is None:
raise HTTPException(status_code=404, detail="Contact not found")
return crud.create_contact(db, db_user, db_contact)
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
@usersRouter.post(
"/{user_id}/contacts-email/{contact_email}", status_code=status.HTTP_201_CREATED
)
def create_contact_by_email(
user_id: int,
contact_email: str,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user.id != user_id
):
raise HTTPException(
status_code=401,
detail="You do not have permission to create a contact for this user",
)
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db_contact = crud.get_user_by_email(db, contact_email)
if db_contact is None:
raise HTTPException(status_code=404, detail="Contact not found")
if db_contact in db_user.contacts:
raise HTTPException(
status_code=400, detail="Contact already exists for this user"
)
return crud.create_contact(db, db_user, db_contact)
@usersRouter.get("/{user_id}/contacts", response_model=list[schemas.User])
def get_contacts(
user_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user.id != user_id
):
raise HTTPException(
status_code=401,
detail="You do not have permission to view this user's contacts",
)
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user.contacts + db_user.contact_of
@usersRouter.post("/{user_id}/surveys/weekly", status_code=status.HTTP_201_CREATED)
def create_weekly_survey(
user_id: int,
survey: schemas.UserSurveyWeeklyCreate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user.id != user_id
):
raise HTTPException(
status_code=401,
detail="You do not have permission to create a survey for this user",
)
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return crud.create_user_survey_weekly(db, user_id, survey).id
@usersRouter.post(
"/{user_id}/contacts/{contact_id}/bookings", status_code=status.HTTP_201_CREATED
)
user_id: int,
contact_id: int,
booking: schemas.SessionBookingCreate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user.id != user_id
):
raise HTTPException(
status_code=401,
detail="You do not have permission to create a booking for this user",
)
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db_contact = crud.get_user(db, contact_id)
if db_contact is None:
raise HTTPException(status_code=404, detail="Contact not found")
return crud.create_session_with_users(
db, [db_user, db_contact], booking.start_time, booking.end_time
).id
@sessionsRouter.post("", response_model=schemas.Session)
def create_session(
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
rep = crud.create_session(db, current_user)
rep.length = 0
return rep
@sessionsRouter.get("/{session_id}", response_model=schemas.Session)
def read_session(
session_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user not in db_session.users
):
status_code=401, detail="You do not have permission to view this session"
)
@sessionsRouter.delete("/{session_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_session(
session_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
if not check_user_level(current_user, models.UserType.ADMIN):
status_code=401, detail="You do not have permission to delete a session"
)
@sessionsRouter.patch("/{session_id}", status_code=status.HTTP_204_NO_CONTENT)
def update_session(
session_id: int,
session: schemas.SessionUpdate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if not check_user_level(current_user, models.UserType.TUTOR):
status_code=401, detail="You do not have permission to update this session"
)
@sessionsRouter.get("/{session_id}/download/messages")
def download_session(
session_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to download this session",
)
data = crud.get_messages(db, session_id)
output = StringIO()
writer = csv.writer(output)
writer.writerow(models.Message.__table__.columns.keys())
for row in data:
writer.writerow(row.raw())
output.seek(0)
return StreamingResponse(
output,
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename={session_id}-messages.csv"
},
)
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
@sessionsRouter.get("/download/messages")
def download_sessions_messages(
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to download messages",
)
data = crud.get_all_messages(db)
output = StringIO()
writer = csv.writer(output)
writer.writerow(models.Message.__table__.columns.keys())
for row in data:
writer.writerow(row.raw())
output.seek(0)
return StreamingResponse(
output,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=messages.csv"},
)
@sessionsRouter.get("/download/metadata")
def download_sessions_metadata(
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to download metadata",
)
data = crud.get_all_metadata(db)
output = StringIO()
writer = csv.writer(output)
writer.writerow(models.MessageMetadata.__table__.columns.keys())
for row in data:
writer.writerow(row.raw())
output.seek(0)
return StreamingResponse(
output,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=metadata.csv"},
)
@sessionsRouter.get("/download/feedbacks")
def download_sessions_feedbacks(
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to download feedbacks",
)
data = crud.get_all_feedbacks(db)
output = StringIO()
writer = csv.writer(output)
writer.writerow(models.MessageFeedback.__table__.columns.keys())
for row in data:
writer.writerow(row.raw())
output.seek(0)
return StreamingResponse(
output,
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=feedbacks.csv"},
)
async def send_websoket_add_to_session(session: schemas.Session, user_id: int):
if user_id in websocket_users_global:
for user_websocket in websocket_users_global[user_id]:
await user_websocket.send_text(
json.dumps(
{
"type": "session",
"action": "create",
"data": session.to_dict(),
}
)
)
@sessionsRouter.post(
"/{session_id}/users/{user_id}", status_code=status.HTTP_201_CREATED
)
def add_user_to_session(
session_id: int,
user_id: int,
background_tasks: BackgroundTasks,
current_user: schemas.User = Depends(get_jwt_user),
if (
db_session is not None
and current_user not in db_session.users
or db_session is None
and not check_user_level(current_user, models.UserType.TUTOR)
):
raise HTTPException(
status_code=401,
detail="You do not have permission to add a user to a session",
)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db_session.users.append(db_user)
db.commit()
db.refresh(db_session)
background_tasks.add_task(
send_websoket_add_to_session,
schemas.Session.model_validate(db_session),
schemas.User.model_validate(db_user).id,
)
@sessionsRouter.delete(
"/{session_id}/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT
)
def remove_user_from_session(
session_id: int,
user_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
if not check_user_level(current_user, models.UserType.TUTOR):
status_code=401,
detail="You do not have permission to remove a user from a session",
)
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
db_user = crud.get_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db_session.users.remove(db_user)
db.commit()
@sessionsRouter.get("", response_model=list[schemas.Session])
def read_sessions(
skip: int = 0,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
if check_user_level(current_user, models.UserType.ADMIN):
return crud.get_sessions(db, current_user, skip=skip)
@sessionsRouter.post("/{session_id}/satisfy", status_code=status.HTTP_204_NO_CONTENT)
session_id: int,
satisfy: schemas.SessionSatisfyCreate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user not in db_session.users
):
raise HTTPException(
status_code=401,
detail="You do not have permission to satisfy this session",
)
crud.create_session_satisfy(db, current_user.id, session_id, satisfy)
@sessionsRouter.get("/{session_id}/messages", response_model=list[schemas.Message])
def read_messages(
session_id: int,
skip: int = 0,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user not in db_session.users
):
status_code=401,
detail="You do not have permission to view messages in this session",
)
return crud.get_messages(db, session_id, skip=skip)
async def send_websoket_message(session_id: int, message: schemas.Message, action: str):
{"type": "message", "action": action, "data": message.to_dict()}
for _, user_websockets in websocket_users[session_id].items():
for user_websocket in user_websockets:
await user_websocket.send_text(content)
async def send_websoket_feedback(session_id: int, action: str, feedback: dict):
content = json.dumps({"type": "message", "action": action, "data": feedback})
for _, user_websockets in websocket_users[session_id].items():
for user_websocket in user_websockets:
await user_websocket.send_text(content)
def store_metadata(db, message_id, metadata: list[schemas.MessageMetadataCreate]):
for m in metadata:
crud.create_message_metadata(db, message_id, m)
pass
@sessionsRouter.post("/{session_id}/messages", status_code=status.HTTP_201_CREATED)
def create_message(
session_id: int,
entryMessage: schemas.MessageCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user not in db_session.users
):
status_code=401,
detail="You do not have permission to create a message in this session",
)
action = "create" if entryMessage.message_id is None else "update"
message = crud.create_message(db, entryMessage, current_user, db_session)
background_tasks.add_task(store_metadata, db, message.id, entryMessage.metadata)
send_websoket_message,
session_id,
schemas.Message.model_validate(message),
action,
return {
"id": message.id,
"message_id": message.message_id,
"reply_to": message.reply_to_message_id,
}
"/{session_id}/messages/{message_id}/feedback",
status_code=status.HTTP_201_CREATED,
session_id: int,
message_id: int,
feedback: schemas.MessageFeedbackCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user not in db_session.users
):
raise HTTPException(
status_code=401,
detail="You do not have permission to feedback this message in this session",
)
message = crud.get_message(db, message_id)
if message is None:
raise HTTPException(status_code=404, detail="Message not found")
feedback = crud.create_message_feedback(db, message_id, feedback)
background_tasks.add_task(
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
@sessionsRouter.delete(
"/{session_id}/messages/{message_id}/feedback/{feedback_id}",
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_feedback(
session_id: int,
message_id: int,
feedback_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user not in db_session.users
):
raise HTTPException(
status_code=401,
detail="You do not have permission to delete feedbacks in this session",
)
db_message = crud.get_message(db, message_id)
if db_message is None:
raise HTTPException(status_code=404, detail="Message not found")
db_feedback = crud.get_message_feedback(db, feedback_id)
if db_feedback is None:
raise HTTPException(status_code=404, detail="Feedback not found")
crud.delete_message_feedback(db, feedback_id)
background_tasks.add_task(
send_websoket_feedback,
session_id,
"deleteFeedback",
{
"message_id": message_id,
"feedback_id": feedback_id,
},
)
async def send_websoket_typing(session_id: int, user_id: int):
content = json.dumps(
{"type": "message", "action": "typing", "data": {"user": user_id}}
)
for user, user_websockets in websocket_users[session_id].items():
if user != user_id:
for user_websocket in user_websockets:
await user_websocket.send_text(content)
@sessionsRouter.post("/{session_id}/typing", status_code=status.HTTP_204_NO_CONTENT)
def propagate_typing(
session_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
background_tasks.add_task(send_websoket_typing, session_id, current_user.id)
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
return
async def send_websoket_presence(session_id: int, user_id: int):
content = json.dumps(
{"type": "presence", "action": "online", "data": {"user": user_id}}
)
for user, user_websockets in websocket_users[session_id].items():
if user != user_id:
for user_websocket in user_websockets:
await user_websocket.send_text(content)
@sessionsRouter.post("/{session_id}/presence", status_code=status.HTTP_204_NO_CONTENT)
def propagate_presence(
session_id: int,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
background_tasks.add_task(send_websoket_presence, session_id, current_user.id)
session_id: int, token: str, websocket: WebSocket, db: Session = Depends(get_db)
try:
payload = jwt.decode(token, config.JWT_SECRET_KEY, algorithms=["HS256"])
except ExpiredSignatureError:
await websocket.close(code=1008, reason="Token expired")
return
except jwt.JWTError:
await websocket.close(code=1008, reason="Invalid token")
return
current_user = crud.get_user(db, user_id=payload["subject"]["uid"])
if current_user is None:
db_session = crud.get_session(db, session_id)
if db_session is None:
raise HTTPException(status_code=404, detail="Session not found")
if (
not check_user_level(current_user, models.UserType.ADMIN)
and current_user not in db_session.users
):
status_code=401, detail="You do not have permission to access this session"
)
await websocket.accept()
websocket_users[session_id][current_user.id].add(websocket)
try:
while True:
data = await websocket.receive_text()