Skip to content
Extraits de code Groupes Projets

Comparer les révisions

Les modifications sont affichées comme si la révision source était fusionnée avec la révision cible. En savoir plus sur la comparaison des révisions.

Source

Sélectionner le projet cible
No results found

Cible

Sélectionner le projet cible
  • sbibauw/languagelab
1 résultat
Afficher les modifications
Validations sur la source (33)
Affichage de
avec 1285 ajouts et 1102 suppressions
import os
import secrets
import logging
# Database
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///../languagelab.sqlite")
......
......@@ -7,6 +7,9 @@ import models
import schemas
from hashing import Hasher
from crud.tests import *
from crud.studies import *
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
......@@ -29,7 +32,7 @@ def get_users(db: Session, skip: int = 0):
return db.query(models.User).offset(skip).all()
def create_user(db: Session, user: schemas.UserCreate):
def create_user(db: Session, user: schemas.UserCreate) -> models.User:
password = Hasher.get_password_hash(user.password)
nickname = user.nickname if user.nickname else user.email.split("@")[0]
db_user = models.User(
......@@ -279,227 +282,3 @@ def create_test_typing_entry(
db.commit()
db.refresh(db_entry)
return db_entry
def create_survey(db: Session, survey: schemas.SurveyCreate):
db_survey = models.SurveySurvey(**survey.dict())
db.add(db_survey)
db.commit()
db.refresh(db_survey)
return db_survey
def get_survey(db: Session, survey_id: int):
return (
db.query(models.SurveySurvey)
.filter(models.SurveySurvey.id == survey_id)
.first()
)
def get_surveys(db: Session, skip: int = 0):
return db.query(models.SurveySurvey).offset(skip).all()
def delete_survey(db: Session, survey_id: int):
db.query(models.SurveySurvey).filter(models.SurveySurvey.id == survey_id).delete()
db.commit()
def add_group_to_survey(db: Session, survey_id: int, group: schemas.SurveyGroup):
db_survey = (
db.query(models.SurveySurvey)
.filter(models.SurveySurvey.id == survey_id)
.first()
)
db_survey.groups.append(group)
db.commit()
db.refresh(db_survey)
return db_survey
def remove_group_from_survey(db: Session, survey_id: int, group_id: int):
survey = (
db.query(models.SurveySurvey)
.filter(models.SurveySurvey.id == survey_id)
.first()
)
survey_group = (
db.query(models.SurveyGroup).filter(models.SurveyGroup.id == group_id).first()
)
if survey_group in survey.groups:
survey.groups.remove(survey_group)
db.commit()
def create_survey_group(db: Session, survey_group: schemas.SurveyGroupCreate):
db_survey_group = models.SurveyGroup(**survey_group.dict())
db.add(db_survey_group)
db.commit()
db.refresh(db_survey_group)
return db_survey_group
def get_survey_group(db: Session, survey_group_id: int) -> schemas.SurveyGroup:
return (
db.query(models.SurveyGroup)
.filter(models.SurveyGroup.id == survey_group_id)
.first()
)
def get_survey_groups(db: Session, skip: int = 0):
return db.query(models.SurveyGroup).offset(skip).all()
def delete_survey_group(db: Session, survey_group_id: int):
db.query(models.SurveyGroup).filter(
models.SurveyGroup.id == survey_group_id
).delete()
db.commit()
def add_item_to_survey_group(db: Session, group_id: int, item: models.SurveyQuestion):
db_survey_group = (
db.query(models.SurveyGroup).filter(models.SurveyGroup.id == group_id).first()
)
db_survey_group.questions.append(item)
db.commit()
db.refresh(db_survey_group)
return db_survey_group
def remove_item_from_survey_group(db: Session, group_id: int, item_id: int):
survey_group = (
db.query(models.SurveyGroup).filter(models.SurveyGroup.id == group_id).first()
)
survey_question = (
db.query(models.SurveyQuestion)
.filter(models.SurveyQuestion.id == item_id)
.first()
)
if survey_question in survey_group.questions:
survey_group.questions.remove(survey_question)
db.commit()
def create_survey_question(db: Session, survey_question: schemas.SurveyQuestionCreate):
db_survey_question = models.SurveyQuestion(**survey_question.dict())
db.add(db_survey_question)
db.commit()
db.refresh(db_survey_question)
return db_survey_question
def get_survey_question(db: Session, survey_question_id: int):
return (
db.query(models.SurveyQuestion)
.filter(models.SurveyQuestion.id == survey_question_id)
.first()
)
def get_survey_questions(db: Session, skip: int = 0):
return db.query(models.SurveyQuestion).offset(skip).all()
def delete_survey_question(db: Session, survey_question_id: int):
db.query(models.SurveyQuestion).filter(
models.SurveyQuestion.id == survey_question_id
).delete()
db.commit()
def create_survey_response(db: Session, survey_response: schemas.SurveyResponseCreate):
db_survey_response = models.SurveyResponse(**survey_response.dict())
db.add(db_survey_response)
db.commit()
db.refresh(db_survey_response)
return db_survey_response
def get_survey_responses(db: Session, sid: str, skip: int = 0):
return (
db.query(models.SurveyResponse)
.filter(models.SurveyResponse.sid == sid)
.offset(skip)
.all()
)
def create_survey_response_info(
db: Session, survey_response_info: schemas.SurveyResponseInfoCreate
):
db_survey_response_info = models.SurveyResponseInfo(**survey_response_info.dict())
db.add(db_survey_response_info)
db.commit()
db.refresh(db_survey_response_info)
return db_survey_response_info
def create_study(db: Session, study: schemas.StudyCreate):
db_study = models.Study(**study.dict())
db.add(db_study)
db.commit()
db.refresh(db_study)
return db_study
def get_study(db: Session, study_id: int):
return db.query(models.Study).filter(models.Study.id == study_id).first()
def get_studies(db: Session, skip: int = 0):
return db.query(models.Study).offset(skip).all()
def update_study(db: Session, study: schemas.StudyCreate, study_id: int):
db.query(models.Study).filter(models.Study.id == study_id).update(
{**study.model_dump(exclude_unset=True)}
)
db.commit()
def delete_study(db: Session, study_id: int):
db.query(models.Study).filter(models.Study.id == study_id).delete()
db.commit()
def add_user_to_study(db: Session, study_id: int, user: schemas.User):
db_study = db.query(models.Study).filter(models.Study.id == study_id).first()
db_study.users.append(user)
db.commit()
db.refresh(db_study)
return db_study
def remove_user_from_study(db: Session, study_id: int, user: schemas.User):
study = db.query(models.Study).filter(models.Study.id == study_id).first()
user = db.query(models.User).filter(models.User.id == user.id).first()
study.users.remove(user)
db.commit()
return study
def add_survey_to_study(db: Session, study_id: int, survey: schemas.Survey):
db_study = db.query(models.Study).filter(models.Study.id == study_id).first()
db_study.surveys.append(survey)
db.commit()
db.refresh(db_study)
return db_study
def remove_survey_from_study(db: Session, study_id: int, survey: schemas.Survey):
study = db.query(models.Study).filter(models.Study.id == study_id).first()
survey = (
db.query(models.SurveySurvey)
.filter(models.SurveySurvey.id == survey.id)
.first()
)
study.surveys.remove(survey)
db.commit()
return study
from typing import Optional
from sqlalchemy.orm import Session
import models
import schemas
def create_study(db: Session, study: schemas.StudyCreate) -> models.Study:
db_study = models.Study(
**study.model_dump(exclude_unset=True, exclude={"user_ids", "test_ids"})
)
if study.model_fields_set & {"user_ids", "test_ids"}:
if db_study:
if "user_ids" in study.model_fields_set:
db_study.users = (
db.query(models.User)
.filter(models.User.id.in_(study.user_ids))
.all()
)
if "test_ids" in study.model_fields_set:
db_study.tests = (
db.query(models.Test)
.filter(models.Test.id.in_(study.test_ids))
.all()
)
db.add(db_study)
db.commit()
db.refresh(db_study)
return db_study
def get_study(db: Session, study_id: int) -> Optional[models.Study]:
return db.query(models.Study).filter(models.Study.id == study_id).first()
def get_studies(db: Session, skip: int = 0) -> list[models.Study]:
return db.query(models.Study).offset(skip).all()
def update_study(db: Session, study: schemas.StudyCreate, study_id: int) -> None:
db.query(models.Study).filter(models.Study.id == study_id).update(
{**study.model_dump(exclude_unset=True, exclude={"user_ids", "test_ids"})}
)
if study.model_fields_set & {"user_ids", "test_ids"}:
if (
study_obj := db.query(models.Study)
.filter(models.Study.id == study_id)
.first()
):
if "user_ids" in study.model_fields_set:
study_obj.users = (
db.query(models.User)
.filter(models.User.id.in_(study.user_ids))
.all()
)
if "test_ids" in study.model_fields_set:
study_obj.tests = (
db.query(models.Test)
.filter(models.Test.id.in_(study.test_ids))
.all()
)
db.commit()
def delete_study(db: Session, study_id: int) -> None:
db.query(models.Study).filter(models.Study.id == study_id).delete()
db.commit()
from sqlalchemy.orm import Session
import models
import schemas
def create_test(db: Session, test: schemas.TestCreate) -> models.Test:
db_test = models.Test(**test.model_dump())
db.add(db_test)
db.commit()
db.refresh(db_test)
return db_test
def get_tests(db: Session, skip: int = 0) -> list[models.Test]:
return db.query(models.Test).offset(skip).all()
def get_test(db: Session, test_id: int) -> models.Test | None:
return db.query(models.Test).filter(models.Test.id == test_id).first()
def delete_test(db: Session, test_id: int) -> None:
db.query(models.Test).filter(models.Test.id == test_id).delete()
db.query(models.TestTask).filter(models.TestTask.test_id == test_id).delete()
db.query(models.TestTyping).filter(models.TestTyping.test_id == test_id).delete()
db.commit()
def add_group_to_test_task(db: Session, test: models.Test, group: models.TestTaskGroup):
test.groups.append(group)
db.commit()
db.refresh(test)
def remove_group_from_test_task(
db: Session, testTask: models.TestTask, group: models.TestTaskGroup
):
try:
testTask.groups.remove(group)
db.commit()
db.refresh(testTask)
except ValueError:
pass
def create_group(
db: Session, group: schemas.TestTaskGroupCreate
) -> models.TestTaskGroup:
db_group = models.TestTaskGroup(**group.model_dump())
db.add(db_group)
db.commit()
db.refresh(db_group)
return db_group
def get_group(db: Session, group_id: int) -> models.TestTaskGroup | None:
return (
db.query(models.TestTaskGroup)
.filter(models.TestTaskGroup.id == group_id)
.first()
)
def delete_group(db: Session, group_id: int) -> None:
db.query(models.TestTaskGroup).filter(models.TestTaskGroup.id == group_id).delete()
db.commit()
def add_question_to_group(
db: Session, group: models.TestTaskGroup, question: models.TestTaskQuestion
):
group.questions.append(question)
db.commit()
db.refresh(group)
def remove_question_from_group(
db: Session, group: models.TestTaskGroup, question: models.TestTaskQuestion
):
try:
group.questions.remove(question)
db.commit()
db.refresh(group)
except ValueError:
pass
def create_question(db: Session, question: schemas.TestTaskQuestionCreate):
db_question = models.TestTaskQuestion(**question.model_dump())
db.add(db_question)
db.commit()
db.refresh(db_question)
return db_question
def get_question(db: Session, question_id: int):
return (
db.query(models.TestTaskQuestion)
.filter(models.TestTaskQuestion.id == question_id)
.first()
)
def delete_question(db: Session, question_id: int):
db.query(models.TestTaskQuestion).filter(
models.TestTaskQuestion.id == question_id
).delete()
db.query(models.TestTaskQuestionQCM).filter(
models.TestTaskQuestionQCM.question_id == question_id
).delete()
db.commit()
return None
def create_test_entry(db: Session, entry: schemas.TestEntryCreate):
db_entry = models.TestEntry(**entry.model_dump())
db.add(db_entry)
db.commit()
db.refresh(db_entry)
return db_entry
from collections import defaultdict
import datetime
from typing import Annotated
from fastapi import (
APIRouter,
FastAPI,
......@@ -8,7 +6,6 @@ from fastapi import (
Depends,
HTTPException,
BackgroundTasks,
Header,
Response,
)
from sqlalchemy.orm import Session
......@@ -29,6 +26,8 @@ from database import Base, engine, get_db, SessionLocal
from utils import check_user_level
import config
from security import jwt_cookie, get_jwt_user
from routes.tests import testRouter
from routes.studies import studiesRouter
websocket_users = defaultdict(lambda: defaultdict(set))
websocket_users_global = defaultdict(set)
......@@ -72,8 +71,6 @@ usersRouter = APIRouter(prefix="/users", tags=["users"])
sessionsRouter = APIRouter(prefix="/sessions", tags=["sessions"])
studyRouter = APIRouter(prefix="/studies", tags=["studies"])
websocketRouter = APIRouter(prefix="/ws", tags=["websocket"])
webhookRouter = APIRouter(prefix="/webhooks", tags=["webhook"])
surveyRouter = APIRouter(prefix="/surveys", tags=["surveys"])
@v1Router.get("/health", status_code=status.HTTP_204_NO_CONTENT)
......@@ -963,190 +960,6 @@ def propagate_presence(
return
@studyRouter.post("", status_code=status.HTTP_201_CREATED)
def study_create(
study: schemas.StudyCreate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401, detail="You do not have permission to create a study"
)
return crud.create_study(db, study).id
@studyRouter.get("", response_model=list[schemas.Study])
def studies_read(
db: Session = Depends(get_db),
):
return crud.get_studies(db)
@studyRouter.get("/{study_id}", response_model=schemas.Study)
def study_read(
study_id: int,
db: Session = Depends(get_db),
):
study = crud.get_study(db, study_id)
if study is None:
raise HTTPException(status_code=404, detail="Study not found")
return study
@studyRouter.patch("/{study_id}", status_code=status.HTTP_204_NO_CONTENT)
def study_update(
study_id: int,
studyUpdate: schemas.StudyCreate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401, detail="You do not have permission to update a study"
)
study = crud.get_study(db, study_id)
if study is None:
raise HTTPException(status_code=404, detail="Study not found")
crud.update_study(db, studyUpdate, study_id)
@studyRouter.delete("/{study_id}", status_code=status.HTTP_204_NO_CONTENT)
def study_delete(
study_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401, detail="You do not have permission to delete a study"
)
study = crud.get_study(db, study_id)
if study is None:
raise HTTPException(status_code=404, detail="Study not found")
crud.delete_study(db, study_id)
@studyRouter.post("/{study_id}/users/{user_id}", status_code=status.HTTP_201_CREATED)
def study_add_user(
study_id: int,
user_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
user = crud.get_user(db, user_id)
if user != current_user and not check_user_level(
current_user, models.UserType.ADMIN
):
raise HTTPException(
status_code=401,
detail="You do not have permission to add a user to a study",
)
study = crud.get_study(db, study_id)
if study is None:
raise HTTPException(status_code=404, detail="Study not found")
if user is None:
raise HTTPException(status_code=404, detail="User not found")
if user in study.users:
raise HTTPException(status_code=400, detail="User already exists in this study")
crud.add_user_to_study(db, study_id, user)
@studyRouter.delete(
"/{study_id}/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT
)
def study_delete_user(
study_id: int,
user_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to add a user to a study",
)
study = crud.get_study(db, study_id)
if study is None:
raise HTTPException(status_code=404, detail="Study not found")
user = crud.get_user(db, user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
if user not in study.users:
raise HTTPException(status_code=400, detail="User does not exist in this study")
crud.remove_user_from_study(db, study_id, user)
@studyRouter.post(
"/{study_id}/surveys/{survey_id}", status_code=status.HTTP_201_CREATED
)
def study_add_survey(
study_id: int,
survey_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to add a survey to a study",
)
study = crud.get_study(db, study_id)
if study is None:
raise HTTPException(status_code=404, detail="Study not found")
survey = crud.get_survey(db, survey_id)
if survey is None:
raise HTTPException(status_code=404, detail="Survey not found")
if survey in study.surveys:
raise HTTPException(
status_code=400, detail="Survey already exists in this study"
)
crud.add_survey_to_study(db, study_id, survey)
@studyRouter.delete(
"/{study_id}/surveys/{survey_id}", status_code=status.HTTP_204_NO_CONTENT
)
def study_delete_survey(
study_id: int,
survey_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to add a survey to a study",
)
study = crud.get_study(db, study_id)
if study is None:
raise HTTPException(status_code=404, detail="Study not found")
survey = crud.get_survey(db, survey_id)
if survey is None:
raise HTTPException(status_code=404, detail="Survey not found")
if survey not in study.surveys:
raise HTTPException(
status_code=400, detail="Survey does not exist in this study"
)
crud.remove_survey_from_study(db, study_id, survey)
@websocketRouter.websocket("/sessions/{session_id}")
async def websocket_session(
session_id: int, token: str, websocket: WebSocket, db: Session = Depends(get_db)
......@@ -1199,7 +1012,7 @@ async def websocket_session(
@websocketRouter.websocket("/global")
async def websocket_session(
async def websocket_global(
token: str, websocket: WebSocket, db: Session = Depends(get_db)
):
try:
......@@ -1237,356 +1050,12 @@ async def websocket_session(
pass
@webhookRouter.post("/sessions", status_code=status.HTTP_202_ACCEPTED)
async def webhook_session(
webhook: schemas.CalComWebhook,
x_cal_signature_256: Annotated[str | None, Header()] = None,
db: Session = Depends(get_db),
):
# TODO: Fix. Signature is a hash, not the secret
# https://cal.com/docs/core-features/webhooks#adding-a-custom-payload-template
# if x_cal_signature_256 != config.CALCOM_SECRET:
# raise HTTPException(status_code=401, detail="Invalid secret")
if webhook.triggerEvent == "BOOKING_CREATED":
start_time = datetime.datetime.fromisoformat(webhook.payload["startTime"])
start_time -= datetime.timedelta(hours=1)
end_time = datetime.datetime.fromisoformat(webhook.payload["endTime"])
end_time += datetime.timedelta(hours=1)
attendes = webhook.payload["attendees"]
emails = [attendee["email"] for attendee in attendes if attendee != None]
print(emails)
db_users = [
crud.get_user_by_email(db, email) for email in emails if email != None
]
print(db_users)
users = [user for user in db_users if user != None]
if users:
db_session = crud.create_session_with_users(db, users, start_time, end_time)
else:
raise HTTPException(status_code=404, detail="Users not found")
return
raise HTTPException(status_code=400, detail="Invalid trigger event")
@surveyRouter.post("", status_code=status.HTTP_201_CREATED)
def create_survey(
survey: schemas.SurveyCreate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401, detail="You do not have permission to create a survey"
)
return crud.create_survey(db, survey).id
@surveyRouter.get("", response_model=list[schemas.Survey])
def get_surveys(
db: Session = Depends(get_db),
):
return crud.get_surveys(db)
@surveyRouter.get("/{survey_id}", response_model=schemas.Survey)
def get_survey(
survey_id: int,
db: Session = Depends(get_db),
):
survey = crud.get_survey(db, survey_id)
if survey is None:
raise HTTPException(status_code=404, detail="Survey not found")
return survey
@surveyRouter.delete("/{survey_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_survey(
survey_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401, detail="You do not have permission to delete a survey"
)
crud.delete_survey(db, survey_id)
@surveyRouter.post("/{survey_id}/groups", status_code=status.HTTP_201_CREATED)
def add_group_to_survey(
survey_id: int,
groupc: schemas.SurveySurveyAddGroup,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to add a group to a survey",
)
survey = crud.get_survey(db, survey_id)
if survey is None:
raise HTTPException(status_code=404, detail="Survey not found")
group = crud.get_survey_group(db, groupc.group_id)
if group is None:
raise HTTPException(status_code=404, detail="Survey group not found")
return crud.add_group_to_survey(db, survey_id, group)
@surveyRouter.delete(
"/{survey_id}/groups/{group_id}", status_code=status.HTTP_204_NO_CONTENT
)
def remove_group_from_survey(
survey_id: int,
group_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to remove a group from a survey",
)
if not crud.get_survey(db, survey_id):
raise HTTPException(status_code=404, detail="Survey not found")
if not crud.get_survey_group(db, group_id):
raise HTTPException(status_code=404, detail="Survey group not found")
crud.remove_group_from_survey(db, survey_id, group_id)
@surveyRouter.post("/groups", status_code=status.HTTP_201_CREATED)
def create_survey_group(
group: schemas.SurveyGroupCreate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to create a survey group",
)
return crud.create_survey_group(db, group).id
@surveyRouter.get("/groups", response_model=list[schemas.SurveyGroup])
def get_survey_groups(
db: Session = Depends(get_db),
):
return crud.get_survey_groups(db)
@surveyRouter.get("/groups/{group_id}", response_model=schemas.SurveyGroup)
def get_survey_group(
group_id: int,
db: Session = Depends(get_db),
):
group = crud.get_survey_group(db, group_id)
if group is None:
raise HTTPException(status_code=404, detail="Survey group not found")
return group
@surveyRouter.delete("/groups/{group_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_survey_group(
group_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to delete a survey group",
)
if not crud.get_survey_group(db, group_id):
raise HTTPException(status_code=404, detail="Survey group not found")
crud.delete_survey_group(db, group_id)
@surveyRouter.post("/groups/{group_id}/items", status_code=status.HTTP_201_CREATED)
def add_item_to_survey_group(
group_id: int,
question: schemas.SurveyGroupAddQuestion,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to add an item to a survey group",
)
item = crud.get_survey_question(db, question.question_id)
if item is None:
raise HTTPException(status_code=404, detail="Survey question not found")
return crud.add_item_to_survey_group(db, group_id, item)
@surveyRouter.delete(
"/groups/{group_id}/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT
)
def remove_item_from_survey_group(
group_id: int,
item_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to remove an item from a survey group",
)
if not crud.get_survey_group(db, group_id):
raise HTTPException(status_code=404, detail="Survey group not found")
if not crud.get_survey_question(db, item_id):
raise HTTPException(status_code=404, detail="Survey question not found")
crud.remove_item_from_survey_group(db, group_id, item_id)
@surveyRouter.post("/items", status_code=status.HTTP_201_CREATED)
def create_survey_question(
question: schemas.SurveyQuestionCreate,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to create a survey question",
)
return crud.create_survey_question(db, question).id
@surveyRouter.get("/items", response_model=list[schemas.SurveyQuestion])
def get_survey_questions(
db: Session = Depends(get_db),
):
return crud.get_survey_questions(db)
@surveyRouter.get("/items/{question_id}", response_model=schemas.SurveyQuestion)
def get_survey_question(
question_id: int,
db: Session = Depends(get_db),
):
question = crud.get_survey_question(db, question_id)
if question is None:
raise HTTPException(status_code=404, detail="Survey question not found")
return question
@surveyRouter.delete("/items/{question_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_survey_question(
question_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to delete a survey question",
)
if not crud.get_survey_question(db, question_id):
raise HTTPException(status_code=404, detail="Survey question not found")
crud.delete_survey_question(db, question_id)
@surveyRouter.post("/responses", status_code=status.HTTP_201_CREATED)
def create_survey_response(
response: schemas.SurveyResponseCreate,
db: Session = Depends(get_db),
):
return crud.create_survey_response(db, response).id
@surveyRouter.get("/responses/{survey_id}", response_model=list[schemas.SurveyResponse])
def get_survey_responses(
survey_id: int,
db: Session = Depends(get_db),
current_user: schemas.User = Depends(get_jwt_user),
):
if not check_user_level(current_user, models.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail="You do not have permission to view survey responses",
)
if not crud.get_survey(db, survey_id):
raise HTTPException(status_code=404, detail="Survey not found")
return crud.get_survey_responses(db, survey_id)
@surveyRouter.post("/info/{survey_id}", status_code=status.HTTP_201_CREATED)
def create_survey_info(
survey_id: int,
info: schemas.SurveyResponseInfoCreate,
db: Session = Depends(get_db),
):
if not crud.get_survey(db, survey_id):
raise HTTPException(status_code=404, detail="Survey not found")
return crud.create_survey_response_info(db, info)
@surveyRouter.get("/{survey_id}/score/{sid}", response_model=dict)
def get_survey_score(
survey_id: int,
sid: str,
db: Session = Depends(get_db),
):
if not crud.get_survey(db, survey_id):
raise HTTPException(status_code=404, detail="Survey not found")
responses = crud.get_survey_responses(db, sid)
score = 0
total = 0
for response in responses:
question = crud.get_survey_question(db, response.question_id)
if not question:
continue
total += 1
if response.selected_id == question.correct:
score += 1
return {
"survey_id": survey_id,
"score": round((score / total) * 100 if total > 0 else 0, 2),
}
v1Router.include_router(authRouter)
v1Router.include_router(usersRouter)
v1Router.include_router(sessionsRouter)
v1Router.include_router(studyRouter)
v1Router.include_router(websocketRouter)
v1Router.include_router(webhookRouter)
v1Router.include_router(surveyRouter)
v1Router.include_router(testRouter)
v1Router.include_router(studiesRouter)
apiRouter.include_router(v1Router)
app.include_router(apiRouter)
......@@ -180,134 +180,3 @@ class MessageFeedback(Base):
def raw(self):
return [self.id, self.message_id, self.start, self.end, self.content, self.date]
class TestTyping(Base):
__tablename__ = "test_typing"
id = Column(Integer, primary_key=True, index=True)
created_at = Column(DateTime, default=datetime_aware)
code = Column(String)
entries = relationship("TestTypingEntry", backref="typing")
class TestTypingEntry(Base):
__tablename__ = "test_typing_entry"
id = Column(Integer, primary_key=True, index=True)
typing_id = Column(Integer, ForeignKey("test_typing.id"), index=True)
exerciceId = Column(Integer)
position = Column(Integer)
downtime = Column(Integer)
uptime = Column(Integer)
keyCode = Column(Integer)
keyValue = Column(String)
class SurveyGroupQuestion(Base):
__tablename__ = "survey_group_questions"
group_id = Column(Integer, ForeignKey("survey_groups.id"), primary_key=True)
question_id = Column(Integer, ForeignKey("survey_questions.id"), primary_key=True)
class SurveyQuestion(Base):
__tablename__ = "survey_questions"
id = Column(Integer, primary_key=True, index=True)
question = Column(String)
correct = Column(Integer)
option1 = Column(String)
option2 = Column(String)
option3 = Column(String)
option4 = Column(String)
option5 = Column(String)
option6 = Column(String)
option7 = Column(String)
option8 = Column(String)
class SurveyGroup(Base):
__tablename__ = "survey_groups"
id = Column(Integer, primary_key=True, index=True)
title = Column(String)
demo = Column(String, default=False)
questions = relationship(
"SurveyQuestion", secondary="survey_group_questions", backref="group"
)
class SurveySurvey(Base):
__tablename__ = "survey_surveys"
id = Column(Integer, primary_key=True, index=True)
title = Column(String)
groups = relationship(
"SurveyGroup", secondary="survey_survey_groups", backref="survey"
)
studies = relationship("Study", secondary="study_surveys", back_populates="surveys")
class SurveySurveyGroup(Base):
__tablename__ = "survey_survey_groups"
survey_id = Column(Integer, ForeignKey("survey_surveys.id"), primary_key=True)
group_id = Column(Integer, ForeignKey("survey_groups.id"), primary_key=True)
class SurveyResponse(Base):
__tablename__ = "survey_responses"
id = Column(Integer, primary_key=True, index=True)
code = Column(String)
sid = Column(String)
uid = Column(Integer, ForeignKey("users.id"), default=None)
created_at = Column(DateTime, default=datetime_aware)
survey_id = Column(Integer, ForeignKey("survey_surveys.id"))
group_id = Column(Integer, ForeignKey("survey_groups.id"))
question_id = Column(Integer, ForeignKey("survey_questions.id"))
selected_id = Column(Integer)
response_time = Column(Float)
text = Column(String)
class SurveyResponseInfo(Base):
__tablename__ = "survey_response_info"
id = Column(Integer, primary_key=True, index=True)
sid = Column(String)
birthyear = Column(Integer)
gender = Column(String)
primary_language = Column(String)
other_language = Column(String)
education = Column(String)
class Study(Base):
__tablename__ = "studies"
id = Column(Integer, primary_key=True, index=True)
title = Column(String)
description = Column(String)
start_date = Column(DateTime)
end_date = Column(DateTime)
chat_duration = Column(Integer)
users = relationship("User", secondary="study_users", back_populates="studies")
surveys = relationship(
"SurveySurvey", secondary="study_surveys", back_populates="studies"
)
class StudyUser(Base):
__tablename__ = "study_users"
study_id = Column(Integer, ForeignKey("studies.id"), primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), primary_key=True)
class StudySurvey(Base):
__tablename__ = "study_surveys"
study_id = Column(Integer, ForeignKey("studies.id"), primary_key=True)
survey_id = Column(Integer, ForeignKey("survey_surveys.id"), primary_key=True)
from sqlalchemy import (
Column,
Float,
Integer,
String,
Boolean,
ForeignKey,
DateTime,
UniqueConstraint,
)
from sqlalchemy.orm import relationship
from enum import Enum
from database import Base
import datetime
from utils import datetime_aware
from models.studies import *
from models.tests import *
class UserType(Enum):
ADMIN = 0
TUTOR = 1
STUDENT = 2
class Contact(Base):
__tablename__ = "contacts"
user_id = Column(Integer, ForeignKey("users.id"), primary_key=True, index=True)
contact_id = Column(Integer, ForeignKey("users.id"), primary_key=True, index=True)
UniqueConstraint("user_id", "contact_id", name="unique_contact")
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
nickname = Column(String, index=True)
password = Column(String)
type = Column(Integer, default=UserType.STUDENT.value)
is_active = Column(Boolean, default=True)
availability = Column(String, default=0)
ui_language = Column(String, default="fr")
home_language = Column(String, default="en")
target_language = Column(String, default="fr")
birthdate = Column(DateTime, default=None)
gender = Column(String, default=None)
calcom_link = Column(String, default="")
last_survey = Column(DateTime, default=None)
sessions = relationship(
"Session", secondary="user_sessions", back_populates="users"
)
contacts = relationship(
"User",
secondary="contacts",
primaryjoin=(id == Contact.user_id),
secondaryjoin=(id == Contact.contact_id),
back_populates="contacts",
)
contact_of = relationship(
"User",
secondary="contacts",
primaryjoin=(id == Contact.contact_id),
secondaryjoin=(id == Contact.user_id),
back_populates="contacts",
)
studies = relationship("Study", secondary="study_users", back_populates="users")
class UserSurveyWeekly(Base):
__tablename__ = "users_survey_weekly"
id = Column(Integer, primary_key=True, index=True)
created_at = Column(DateTime, default=datetime_aware)
user_id = Column(Integer, ForeignKey("users.id"))
q1 = Column(Float)
q2 = Column(Float)
q3 = Column(Float)
q4 = Column(Float)
class Session(Base):
__tablename__ = "sessions"
id = Column(Integer, primary_key=True, index=True)
created_at = Column(DateTime, default=datetime_aware)
is_active = Column(Boolean, default=True)
start_time = Column(DateTime, default=datetime_aware)
end_time = Column(
DateTime,
default=lambda: datetime_aware() + datetime.timedelta(hours=12),
)
language = Column(String, default="fr")
users = relationship("User", secondary="user_sessions", back_populates="sessions")
class SessionSatisfy(Base):
__tablename__ = "session_satisfy"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
session_id = Column(Integer, ForeignKey("sessions.id"))
created_at = Column(DateTime, default=datetime_aware)
usefullness = Column(Integer)
easiness = Column(Integer)
remarks = Column(String)
class UserSession(Base):
__tablename__ = "user_sessions"
user_id = Column(Integer, ForeignKey("users.id"), primary_key=True, index=True)
session_id = Column(String, ForeignKey("sessions.id"), primary_key=True, index=True)
class Message(Base):
__tablename__ = "messages"
id = Column(Integer, primary_key=True, index=True)
message_id = Column(String)
content = Column(String)
user_id = Column(Integer, ForeignKey("users.id"))
session_id = Column(Integer, ForeignKey("sessions.id"))
created_at = Column(DateTime, default=datetime_aware)
reply_to_message_id = Column(
Integer, ForeignKey("messages.message_id"), nullable=True
)
feedbacks = relationship("MessageFeedback", backref="message")
replies = relationship(
"Message", backref="parent_message", remote_side=[message_id]
)
def raw(self):
return [
self.id,
self.message_id,
self.content,
self.user_id,
self.session_id,
self.reply_to_message_id,
self.created_at,
]
feedbacks = relationship("MessageFeedback", backref="message")
class MessageMetadata(Base):
__tablename__ = "message_metadata"
id = Column(Integer, primary_key=True, index=True)
message_id = Column(Integer, ForeignKey("messages.id"))
message = Column(String)
date = Column(Integer)
def raw(self):
return [self.id, self.message_id, self.message, self.date]
class MessageFeedback(Base):
__tablename__ = "message_feedbacks"
id = Column(Integer, primary_key=True, index=True)
message_id = Column(Integer, ForeignKey("messages.id"))
start = Column(Integer)
end = Column(Integer)
content = Column(String, default="")
date = Column(DateTime, default=datetime_aware)
def raw(self):
return [self.id, self.message_id, self.start, self.end, self.content, self.date]
from database import Base
from sqlalchemy import (
Column,
Integer,
String,
ForeignKey,
DateTime,
)
from sqlalchemy.orm import relationship
class Study(Base):
__tablename__ = "studies"
id = Column(Integer, primary_key=True, index=True)
title = Column(String)
description = Column(String)
start_date = Column(DateTime)
end_date = Column(DateTime)
nb_session = Column(Integer)
consent_participation = Column(String)
consent_privacy = Column(String)
consent_rights = Column(String)
consent_study_data = Column(String)
users = relationship("User", secondary="study_users")
tests = relationship("Test", secondary="study_tests")
class StudyUser(Base):
__tablename__ = "study_users"
study_id = Column(Integer, ForeignKey("studies.id"), primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"), primary_key=True)
class StudyTest(Base):
__tablename__ = "study_tests"
study_id = Column(Integer, ForeignKey("studies.id"), primary_key=True)
test_id = Column(Integer, ForeignKey("tests.id"), primary_key=True)
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Integer, String
from sqlalchemy.orm import relationship, validates
from utils import datetime_aware
from database import Base
class TestTyping(Base):
__tablename__ = "test_typings"
test_id = Column(Integer, ForeignKey("tests.id"), primary_key=True)
explanations = Column(String, nullable=True)
text = Column(String, nullable=False)
repeat = Column(Integer, nullable=False, default=1)
duration = Column(Integer, nullable=False, default=0)
test = relationship(
"Test", uselist=False, back_populates="test_typing", lazy="selectin"
)
class TestTask(Base):
__tablename__ = "test_tasks"
test_id = Column(Integer, ForeignKey("tests.id"), primary_key=True)
test = relationship(
"Test", uselist=False, back_populates="test_task", lazy="selectin"
)
groups = relationship(
"TestTaskGroup",
secondary="test_task_task_groups",
lazy="selectin",
)
class Test(Base):
__tablename__ = "tests"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
test_typing = relationship(
"TestTyping",
uselist=False,
back_populates="test",
lazy="selectin",
)
test_task = relationship(
"TestTask",
uselist=False,
back_populates="test",
lazy="selectin",
)
@validates("test_typing")
def adjust_test_typing(self, _, value) -> TestTyping | None:
if value:
if isinstance(value, dict):
return TestTyping(**value, test_id=self.id)
else:
value.test_id = self.id
return value
@validates("test_task")
def adjust_test_task(self, _, value) -> TestTask | None:
if value:
if isinstance(value, dict):
return TestTask(**value, test_id=self.id)
else:
value.test_id = self.id
return value
class TestTaskTaskGroup(Base):
__tablename__ = "test_task_task_groups"
test_task_id = Column(Integer, ForeignKey("test_tasks.test_id"), primary_key=True)
group_id = Column(Integer, ForeignKey("test_task_groups.id"), primary_key=True)
class TestTaskGroup(Base):
__tablename__ = "test_task_groups"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, nullable=False)
demo = Column(Boolean, default=False)
randomize = Column(Boolean, default=True)
questions = relationship(
"TestTaskQuestion",
secondary="test_task_group_questions",
lazy="selectin",
)
class TestTaskGroupQuestion(Base):
__tablename__ = "test_task_group_questions"
group_id = Column(Integer, ForeignKey("test_task_groups.id"), primary_key=True)
question_id = Column(
Integer, ForeignKey("test_task_questions.id"), primary_key=True
)
class TestTaskQuestionQCM(Base):
__tablename__ = "test_task_questions_qcm"
question_id = Column(
Integer, ForeignKey("test_task_questions.id"), primary_key=True
)
correct = Column(Integer, nullable=True)
option1 = Column(String, nullable=True)
option2 = Column(String, nullable=True)
option3 = Column(String, nullable=True)
option4 = Column(String, nullable=True)
option5 = Column(String, nullable=True)
option6 = Column(String, nullable=True)
option7 = Column(String, nullable=True)
option8 = Column(String, nullable=True)
question = relationship("TestTaskQuestion", back_populates="question_qcm")
class TestTaskQuestion(Base):
__tablename__ = "test_task_questions"
id = Column(Integer, primary_key=True, index=True)
question = Column(String, nullable=True)
question_qcm = relationship(
"TestTaskQuestionQCM",
uselist=False,
back_populates="question",
lazy="selectin",
)
@validates("question_qcm")
def adjust_question_qcm(self, _, value) -> TestTaskQuestionQCM | None:
if value:
if isinstance(value, dict):
return TestTaskQuestionQCM(**value, question_id=self.id)
else:
value.question_id = self.id
return value
class TestEntryTaskQCM(Base):
__tablename__ = "test_entries_task_qcm"
entry_id = Column(
Integer, ForeignKey("test_entries_task.entry_id"), primary_key=True
)
selected_id = Column(Integer, nullable=False)
entry_task = relationship(
"TestEntryTask",
uselist=False,
back_populates="entry_task_qcm",
lazy="selectin",
)
class TestEntryTaskGapfill(Base):
__tablename__ = "test_entries_task_gapfill"
entry_id = Column(
Integer, ForeignKey("test_entries_task.entry_id"), primary_key=True, index=True
)
text = Column(String, nullable=False)
entry_task = relationship(
"TestEntryTask",
uselist=False,
back_populates="entry_task_gapfill",
lazy="selectin",
)
class TestEntryTask(Base):
__tablename__ = "test_entries_task"
entry_id = Column(
Integer, ForeignKey("test_entries.id"), primary_key=True, index=True
)
test_group_id = Column(Integer, ForeignKey("test_task_groups.id"), index=True)
test_question_id = Column(Integer, ForeignKey("test_task_questions.id"), index=True)
response_time = Column(Float, nullable=False)
entry_task_qcm = relationship(
"TestEntryTaskQCM",
uselist=False,
back_populates="entry_task",
lazy="selectin",
)
entry_task_gapfill = relationship(
"TestEntryTaskGapfill",
uselist=False,
back_populates="entry_task",
lazy="selectin",
)
entry = relationship("TestEntry", uselist=False, back_populates="entry_task")
@validates("entry_task_qcm")
def adjust_entry_qcm(self, _, value) -> TestEntryTaskQCM | None:
if value:
if isinstance(value, dict):
return TestEntryTaskQCM(**value, entry_id=self.entry_id)
else:
value.entry_id = self.entry_id
return value
@validates("entry_task_gapfill")
def adjust_entry_gapfill(self, _, value) -> TestEntryTaskGapfill | None:
if value:
if isinstance(value, dict):
return TestEntryTaskGapfill(**value, entry_id=self.entry_id)
else:
value.entry_id = self.entry_id
return value
class TestEntryTyping(Base):
__tablename__ = "test_entries_typing"
entry_id = Column(
Integer, ForeignKey("test_entries.id"), primary_key=True, index=True
)
position = Column(Integer, nullable=False)
downtime = Column(Integer, nullable=False)
uptime = Column(Integer, nullable=False)
key_code = Column(Integer, nullable=False)
key_value = Column(String, nullable=False)
entry = relationship(
"TestEntry", uselist=False, back_populates="entry_typing", lazy="selectin"
)
class TestEntry(Base):
__tablename__ = "test_entries"
id = Column(Integer, primary_key=True, index=True)
code = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), default=None, nullable=True)
test_id = Column(Integer, ForeignKey("tests.id"), nullable=False)
created_at = Column(DateTime, default=datetime_aware)
entry_task = relationship(
"TestEntryTask", uselist=False, back_populates="entry", lazy="selectin"
)
entry_typing = relationship(
"TestEntryTyping", uselist=False, back_populates="entry", lazy="selectin"
)
@validates("entry_task")
def adjust_entry_task(self, _, value) -> TestEntryTask | None:
if value:
if isinstance(value, dict):
return TestEntryTask(**value, entry_id=self.id)
else:
value.entry_id = self.id
return value
@validates("entry_typing")
def adjust_entry_typing(self, _, value) -> TestEntryTyping | None:
if value:
if isinstance(value, dict):
return TestEntryTyping(**value, entry_id=self.id)
else:
value.entry_id = self.id
return value
from typing import Callable
from fastapi import HTTPException
import schemas
from utils import check_user_level
def require_admin(error: str):
def decorator(func: Callable):
def wrapper(*args, current_user: schemas.User, **kwargs):
if not check_user_level(current_user, schemas.UserType.ADMIN):
raise HTTPException(
status_code=401,
detail=error,
)
return func(*args, current_user=current_user, **kwargs)
return wrapper
return decorator
from fastapi import APIRouter, Depends, HTTPException, status
import crud
import schemas
from database import get_db
from models import Session
from routes.decorators import require_admin
studiesRouter = APIRouter(prefix="/studies", tags=["Studies"])
@require_admin("You do not have permission to create a study.")
@studiesRouter.post("", status_code=status.HTTP_201_CREATED)
def create_study(
study: schemas.StudyCreate,
db: Session = Depends(get_db),
):
return crud.create_study(db, study).id
@studiesRouter.get("", response_model=list[schemas.Study])
def get_studies(
skip: int = 0,
db: Session = Depends(get_db),
):
return crud.get_studies(db, skip)
@studiesRouter.get("/{study_id}", response_model=schemas.Study)
def get_study(
study_id: int,
db: Session = Depends(get_db),
):
study = crud.get_study(db, study_id)
if study is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Study not found"
)
return study
@require_admin("You do not have permission to patch a study.")
@studiesRouter.patch("/{study_id}", status_code=status.HTTP_204_NO_CONTENT)
def update_study(
study_id: int,
study: schemas.StudyCreate,
db: Session = Depends(get_db),
):
return crud.update_study(db, study, study_id)
@require_admin("You do not have permission to delete a study.")
@studiesRouter.delete("/{study_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_study(
study_id: int,
db: Session = Depends(get_db),
):
return crud.delete_study(db, study_id)
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
import crud
import schemas
from database import get_db
from routes.decorators import require_admin
testRouter = APIRouter(prefix="/tests", tags=["Tests"])
@require_admin("You do not have permission to create a test.")
@testRouter.post("", status_code=status.HTTP_201_CREATED)
def create_test(
test: schemas.TestCreate,
db: Session = Depends(get_db),
):
return crud.create_test(db, test).id
@testRouter.get("")
def get_tests(
skip: int = 0,
db: Session = Depends(get_db),
):
return crud.get_tests(db, skip)
@testRouter.get("/{test_id}", response_model=schemas.Test)
def get_test(
test_id: int,
db: Session = Depends(get_db),
):
test = crud.get_test(db, test_id)
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Test not found"
)
return test
@require_admin("You do not have permission to: delete a test.")
@testRouter.delete("/{test_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_test(
test_id: int,
db: Session = Depends(get_db),
):
return crud.delete_test(db, test_id)
@require_admin("You do not have permission to add a group to a task test.")
@testRouter.post("/{test_id}/groups/{group_id}", status_code=status.HTTP_201_CREATED)
def add_group_to_test(
test_id: int,
group_id: int,
db: Session = Depends(get_db),
):
test = crud.get_test(db, test_id)
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Test not found"
)
if test.test_task is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Test does not have a task",
)
group = crud.get_group(db, group_id)
if group is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
)
return crud.add_group_to_test_task(db, test.test_task, group)
@require_admin("You do not have permission to remove a group from a task test.")
@testRouter.delete(
"/{test_id}/groups/{group_id}", status_code=status.HTTP_204_NO_CONTENT
)
def remove_group_from_test(
test_id: int,
group_id: int,
db: Session = Depends(get_db),
):
test = crud.get_test(db, test_id)
if test is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Test not found"
)
if test.test_task is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Test does not have a task",
)
group = crud.get_group(db, group_id)
if group is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
)
return crud.remove_group_from_test_task(db, test.test_task, group)
@require_admin("You do not have permission to create a group.")
@testRouter.post("/groups", status_code=status.HTTP_201_CREATED)
def create_group(
group: schemas.TestTaskGroupCreate,
db: Session = Depends(get_db),
):
return crud.create_group(db, group).id
@require_admin("You do not have permission to delete a group.")
@testRouter.delete("/groups/{group_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_group(
group_id: int,
db: Session = Depends(get_db),
):
return crud.delete_group(db, group_id)
@require_admin("You do not have permission to add a question to a group.")
@testRouter.post(
"/groups/{group_id}/questions/{question_id}", status_code=status.HTTP_201_CREATED
)
def add_question_to_group(
group_id: int,
question_id: int,
db: Session = Depends(get_db),
):
group = crud.get_group(db, group_id)
if group is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
)
question = crud.get_question(db, question_id)
if question is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Question not found"
)
return crud.add_question_to_group(db, group, question)
@require_admin("You do not have permission to remove a question from a group.")
@testRouter.delete(
"/groups/{group_id}/questions/{question_id}", status_code=status.HTTP_204_NO_CONTENT
)
def remove_question_from_group(
group_id: int,
question_id: int,
db: Session = Depends(get_db),
):
group = crud.get_group(db, group_id)
if group is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Group not found"
)
question = crud.get_question(db, question_id)
if question is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Question not found"
)
return crud.remove_question_from_group(db, group, question)
@require_admin("You do not have permission to create a question.")
@testRouter.post("/questions", status_code=status.HTTP_201_CREATED)
def create_question(
question: schemas.TestTaskQuestionCreate,
db: Session = Depends(get_db),
):
return crud.create_question(db, question).id
@require_admin("You do not have permission to delete a question.")
@testRouter.delete("/questions/{question_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_question(
question_id: int,
db: Session = Depends(get_db),
):
return crud.delete_question(db, question_id)
@testRouter.post("/entries", status_code=status.HTTP_201_CREATED)
def create_entry(
entry: schemas.TestEntryCreate,
db: Session = Depends(get_db),
):
return crud.create_test_entry(db, entry).id
from pydantic import BaseModel, NaiveDatetime
from models import UserType
from schemas.studies import *
from schemas.tests import *
from schemas.users import *
class LoginData(BaseModel):
......@@ -15,86 +17,6 @@ class RegisterData(BaseModel):
is_tutor: bool
class User(BaseModel):
id: int
email: str
nickname: str
type: int
bio: str | None
is_active: bool
ui_language: str | None
home_language: str | None
target_language: str | None
birthdate: NaiveDatetime | None
gender: str | None = None
calcom_link: str | None
last_survey: NaiveDatetime | None = None
availabilities: list[dict] | None = []
tutor_list: list[str] | None = []
my_tutor: str | None = None
my_slots: list[dict] | None = []
class Config:
from_attributes = True
def to_dict(self):
return {
"id": self.id,
"email": self.email,
"nickname": self.nickname,
"type": self.type,
"availability": self.availability,
"is_active": self.is_active,
"ui_language": self.ui_language,
"home_language": self.home_language,
"target_language": self.target_language,
"birthdate": self.birthdate.isoformat() if self.birthdate else None,
}
class UserCreate(BaseModel):
email: str
nickname: str | None = None
password: str
type: int = UserType.STUDENT.value
bio: str | None = None
is_active: bool = True
ui_language: str | None = None
home_language: str | None = None
target_language: str | None = None
birthdate: NaiveDatetime | None = None
gender: str | None = None
calcom_link: str | None = None
last_survey: NaiveDatetime | None = None
availabilities: list[dict] | None = []
tutor_list: list[str] | None = []
my_tutor: str | None = None
my_slots: list[dict] | None = []
class UserUpdate(BaseModel):
email: str | None = None
nickname: str | None = None
password: str | None = None
type: int | None = None
bio: str | None = None
is_active: bool | None = None
ui_language: str | None = None
home_language: str | None = None
target_language: str | None = None
birthdate: NaiveDatetime | None = None
gender: str | None = None
calcom_link: str | None = None
last_survey: NaiveDatetime | None = None
availabilities: list[dict] | None = []
tutor_list: list[str] | None = []
my_tutor: str | None = None
my_slots: list[dict] | None = None
class Config:
from_attributes = True
class ContactCreate(BaseModel):
user_id: int
......@@ -355,22 +277,3 @@ class SurveyResponseInfo(BaseModel):
primary_language: str
other_language: str
education: str
class Study(BaseModel):
id: int
title: str
description: str
start_date: NaiveDatetime
end_date: NaiveDatetime
chat_duration: int
users: list[User]
surveys: list[Survey]
class StudyCreate(BaseModel):
title: str
description: str
start_date: NaiveDatetime
end_date: NaiveDatetime
chat_duration: int = 30 * 60
from pydantic import BaseModel, NaiveDatetime
from schemas.users import User
from schemas.tests import Test
class StudyCreate(BaseModel):
title: str
description: str
start_date: NaiveDatetime
end_date: NaiveDatetime
nb_session: int = 8
consent_participation: str
consent_privacy: str
consent_rights: str
consent_study_data: str
user_ids: list[int] = []
test_ids: list[int] = []
class Study(BaseModel):
id: int
title: str
description: str
start_date: NaiveDatetime
end_date: NaiveDatetime
nb_session: int = 8
consent_participation: str
consent_privacy: str
consent_rights: str
consent_study_data: str
users: list[User] = []
tests: list[Test] = []
from typing_extensions import Self
from pydantic import BaseModel, model_validator
class TestTaskQuestionQCMCreate(BaseModel):
correct: int
option1: str | None = None
option2: str | None = None
option3: str | None = None
option4: str | None = None
option5: str | None = None
option6: str | None = None
option7: str | None = None
option8: str | None = None
class TestTaskQuestionCreate(BaseModel):
# TODO remove
id: int | None = None
question: str | None = None
question_qcm: TestTaskQuestionQCMCreate | None = None
class TestTaskGroupQuestionAdd(BaseModel):
question_id: int
group_id: int
class TestTaskGroupAdd(BaseModel):
group_id: int
task_id: int
class TestTaskQuestionQCM(BaseModel):
correct: int
options: list[str]
model_config = {"from_attributes": True}
@model_validator(mode="before")
@classmethod
def extract_options(cls, data):
if hasattr(data, "__dict__"):
result = {"correct": getattr(data, "correct", None), "options": []}
for i in range(1, 9):
option_value = getattr(data, f"option{i}", None)
if option_value is not None and option_value != "":
result["options"].append(option_value)
return result
return data
class TestTaskQuestion(BaseModel):
id: int
question: str | None = None
question_qcm: TestTaskQuestionQCM | None = None
class TestTaskGroupCreate(BaseModel):
# TODO remove
id: int | None = None
title: str
demo: bool = False
randomize: bool = True
class TestTypingCreate(BaseModel):
explanations: str
text: str
repeat: int | None = None
duration: int | None = None
class TestTaskGroup(TestTaskGroupCreate):
# id: int
questions: list[TestTaskQuestion] = []
class TestTaskCreate(BaseModel):
groups: list[TestTaskGroup] = []
class TestTask(TestTaskCreate):
pass
class TestCreate(BaseModel):
# TODO remove
id: int | None = None
title: str
test_typing: TestTypingCreate | None = None
test_task: TestTaskCreate | None = None
@model_validator(mode="after")
def check_test_type(self) -> Self:
if self.test_typing is None and self.test_task is None:
raise ValueError("TypingTest or TaskTest must be provided")
if self.test_typing is not None and self.test_task is not None:
raise ValueError(
"TypingTest and TaskTest cannot be provided at the same time"
)
return self
class TestTyping(TestTypingCreate):
pass
class Test(BaseModel):
id: int
title: str
test_typing: TestTyping | None = None
test_task: TestTask | None = None
class TestTaskEntryQCMCreate(BaseModel):
selected_id: int
class TestTaskEntryGapfillCreate(BaseModel):
text: str
class TestTaskEntryCreate(BaseModel):
test_group_id: int
test_question_id: int
response_time: float
entry_task_qcm: TestTaskEntryQCMCreate | None = None
entry_task_gapfill: TestTaskEntryGapfillCreate | None = None
@model_validator(mode="after")
def check_entry_type(self) -> Self:
if self.entry_task_qcm is None and self.entry_task_gapfill is None:
raise ValueError("QCM or Gapfill must be provided")
if self.entry_task_qcm is not None and self.entry_task_gapfill is not None:
raise ValueError("QCM and Gapfill cannot be provided at the same time")
return self
class TestTypingEntryCreate(BaseModel):
position: int
downtime: int
uptime: int
key_code: int
key_value: str
class TestEntryCreate(BaseModel):
code: str
user_id: int | None = None
test_id: int
entry_task: TestTaskEntryCreate | None = None
entry_typing: TestTypingEntryCreate | None = None
@model_validator(mode="after")
def check_entry_type(self) -> Self:
if self.entry_task is None and self.entry_typing is None:
raise ValueError("Task or Typing must be provided")
if self.entry_task is not None and self.entry_typing is not None:
raise ValueError("Task and Typing cannot be provided at the same time")
return self
from pydantic import BaseModel, NaiveDatetime
from models import UserType
class User(BaseModel):
id: int
email: str
nickname: str
type: int
bio: str | None
is_active: bool
ui_language: str | None
home_language: str | None
target_language: str | None
birthdate: NaiveDatetime | None
gender: str | None = None
calcom_link: str | None
last_survey: NaiveDatetime | None = None
availabilities: list[dict] | None = []
tutor_list: list[str] | None = []
my_tutor: str | None = None
my_slots: list[dict] | None = []
class Config:
from_attributes = True
def to_dict(self):
return {
"id": self.id,
"email": self.email,
"nickname": self.nickname,
"type": self.type,
"availability": self.availability,
"is_active": self.is_active,
"ui_language": self.ui_language,
"home_language": self.home_language,
"target_language": self.target_language,
"birthdate": self.birthdate.isoformat() if self.birthdate else None,
}
class UserCreate(BaseModel):
email: str
nickname: str | None = None
password: str
type: int = UserType.STUDENT.value
bio: str | None = None
is_active: bool = True
ui_language: str | None = None
home_language: str | None = None
target_language: str | None = None
birthdate: NaiveDatetime | None = None
gender: str | None = None
calcom_link: str | None = None
last_survey: NaiveDatetime | None = None
availabilities: list[dict] | None = []
tutor_list: list[str] | None = []
my_tutor: str | None = None
my_slots: list[dict] | None = []
class UserUpdate(BaseModel):
email: str | None = None
nickname: str | None = None
password: str | None = None
type: int | None = None
bio: str | None = None
is_active: bool | None = None
ui_language: str | None = None
home_language: str | None = None
target_language: str | None = None
birthdate: NaiveDatetime | None = None
gender: str | None = None
calcom_link: str | None = None
last_survey: NaiveDatetime | None = None
availabilities: list[dict] | None = []
tutor_list: list[str] | None = []
my_tutor: str | None = None
my_slots: list[dict] | None = None
class Config:
from_attributes = True
from fastapi.testclient import TestClient
import datetime
from .main import app
import config
client = TestClient(app)
def test_read_main():
response = client.get("/health")
assert response.status_code == 204
def test_webhook_create():
response = client.post(
"/api/v1/webhooks/sessions",
headers={"X-Cal-Signature-256": config.CALCOM_SECRET},
json={
"triggerEvent": "BOOKING_CREATED",
"createdAt": "2023-05-24T09:30:00.538Z",
"payload": {
"type": "60min",
"title": "60min between Pro Example and John Doe",
"description": "",
"additionalNotes": "",
"customInputs": {},
"startTime": (
datetime.datetime.now() + datetime.timedelta(days=1, hours=1)
).isoformat(),
"endTime": (
datetime.datetime.now() + datetime.timedelta(days=1, hours=2)
).isoformat(),
"organizer": {
"id": 5,
"name": "Pro Example",
"email": "pro@example.com",
"username": "pro",
"timeZone": "Asia/Kolkata",
"language": {"locale": "en"},
"timeFormat": "h:mma",
},
"responses": {
"name": {"label": "your_name", "value": "John Doe"},
"email": {
"label": "email_address",
"value": "john.doe@example.com",
},
"location": {
"label": "location",
"value": {"optionValue": "", "value": "inPerson"},
},
"notes": {"label": "additional_notes"},
"guests": {"label": "additional_guests"},
"rescheduleReason": {"label": "reschedule_reason"},
},
"userFieldsResponses": {},
"attendees": [
{
"email": "admin@admin.tld",
"name": "John Doe",
"timeZone": "Asia/Kolkata",
"language": {"locale": "en"},
}
],
"location": "Calcom HQ",
"destinationCalendar": {
"id": 10,
"integration": "apple_calendar",
"externalId": "https://caldav.icloud.com/1234567/calendars/XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX/",
"userId": 5,
"eventTypeId": None,
"credentialId": 1,
},
"hideCalendarNotes": False,
"requiresConfirmation": None,
"eventTypeId": 7,
"seatsShowAttendees": True,
"seatsPerTimeSlot": None,
"uid": "bFJeNb2uX8ANpT3JL5EfXw",
"appsStatus": [
{
"appName": "Apple Calendar",
"type": "apple_calendar",
"success": 1,
"failures": 0,
"errors": [],
"warnings": [],
}
],
"eventTitle": "60min",
"eventDescription": "",
"price": 0,
"currency": "usd",
"length": 60,
"bookingId": 91,
"metadata": {},
"status": "ACCEPTED",
},
},
)
assert response.status_code == 202, (
response.status_code,
response.content.decode("utf-8"),
)
Fichier ajouté
......@@ -345,7 +345,8 @@
"deleteConfirm": "Êtes-vous sûr de vouloir supprimer cette étude ? Cette action est irréversible.",
"startDate": "Date de début",
"endDate": "Date de fin",
"chatDuration": "Durée des sessions (en secondes)",
"chatDuration": "Durée des sessions (en minutes)",
"nbSession": "Nombre de sessions",
"updated": "Étude mise à jour avec succès",
"noChanges": "Aucune modification",
"updateError": "Erreur lors de la mise à jour de l'étude",
......@@ -364,7 +365,30 @@
"addUserError": "Erreur lors de l'ajout de l'utilisateur",
"addUserSuccess": "Utilisateur ajouté à l'étude",
"deleteConfirm": "Êtes-vous sûr de vouloir supprimer cette étude ? Cette action est irréversible.",
"createTitle": "Créer une nouvelle étude"
"createTitle": "Créer une nouvelle étude",
"editTitle": "Modification de l'étude",
"typingTest": "Activer le test de frappe",
"hasToLoggin": "Exiger que les participants soient inscrits et connectés pour passer le test",
"andOr": "et/ou",
"typingTestDuration": "Durée (en secondes)",
"typingTestRepetition": "Nombre de fois à répéter",
"typingTestText": "Texte",
"typingTestInfoNote": "Si aucune durée n'est fournis le mode \"plus vite que possible\" sera activé.",
"consentParticipation": "Si vous acceptez de participer, vous serez invité·e à participer à des sessions de tutorat en ligne avec un tuteur de langue étrangère. Vous serez également invité à remplir des questionnaires avant et après les sessions de tutorat. Les sessions de tutorat seront enregistrées pour analyse ultérieure.Nous vous demandons de prévoir de réaliser un minimum de 8 sessions d'une heure de tutorat (donc 8 heures au total), au cours d'une période de 1 à 3 mois. Vous pouvez bien sûr en réaliser plus si vous le souhaitez. Vous pouvez cependant arrêter de participer à l'étude à tout moment.",
"consentPrivacy": "Les données collectées (par exemple, les transcriptions des conversations, les résultats de tests, les mesures de frappe, les informations sur les participants comme l'age ou le genre) seront traitées de manière confidentielle et anonyme. Elles seront conservées après leur anonymisation intégrale et ne pourront être utilisées qu'à des fins scientifiques ou pédagogiques. Elles pourront éventuellement être partagées avec d'autres chercheurs ou enseignants, mais toujours dans ce cadre strictement de recherche ou d'enseignement.",
"consentRights": "Votre participation à cette étude est volontaire. Vous pouvez à tout moment décider de ne plus participer à l'étude sans avoir à vous justifier. Vous pouvez également demander à ce que vos données soient supprimées à tout moment. Si vous avez des questions ou des préoccupations concernant cette étude, vous pouvez contacter le responsable de l'étude.",
"consentStudyData": "Informations sur l'étude.",
"tab": {
"study": "Étude",
"code": "Code",
"tests": "Tests",
"end": "Fin"
},
"complete": "Merci pour votre participation !"
},
"tests": {
"taskTests": "Tests de langue",
"typingTests": "Tests de frappe"
},
"button": {
"create": "Créer",
......@@ -462,7 +486,10 @@
"users": "Utilisateurs",
"description": "Description",
"email": "E-mail",
"toggle": "Participants"
"toggle": "Participants",
"groups": "groupes",
"questions": "questions",
"tests": "tests"
}
},
"inputs": {
......