Skip to content
Extraits de code Groupes Projets
Valider 00cfb475 rédigé par Brieuc Dubois's avatar Brieuc Dubois Validation de Serge Bibauw
Parcourir les fichiers

Backend authentication flow and protected routes

parent 9fa7d283
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
......@@ -12,11 +12,11 @@ def get_user_by_username(db: Session, username: str):
return db.query(models.User).filter(models.User.username == username.lower()).first()
def get_user_by_username_and_password(db: Session, user: schemas.UserLogin):
user_db = get_user_by_username(db, user.username)
def get_user_by_username_and_password(db: Session, username: str, password: str):
user_db = get_user_by_username(db, username)
if user_db is None:
return None
if not Hasher.verify_password(user.password, user_db.password):
if not Hasher.verify_password(password, user_db.password):
return None
return user_db
......
......@@ -9,4 +9,12 @@ engine = create_engine(config.DATABASE_URL, connect_args={"check_same_thread": F
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
\ No newline at end of file
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
\ No newline at end of file
from passlib.context import CryptContext
from typing import Union, Any
from jose import jwt
from datetime import datetime, timedelta
from pydantic import ValidationError
from fastapi import HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
import config
import schemas
import models
from database import get_db
import crud
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
reuseable_oauth = OAuth2PasswordBearer(
tokenUrl="/login",
scheme_name="JWT"
)
class Hasher():
@staticmethod
def verify_password(plain_password, hashed_password):
......@@ -18,22 +30,39 @@ class Hasher():
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(subject: Union[str, Any], expires_delta: int = None) -> str:
def create_access_token(user: models.User, expires_delta: int = None) -> str:
if expires_delta is not None:
expires_delta = datetime.utcnow() + expires_delta
else:
expires_delta = datetime.utcnow() + timedelta(minutes=config.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expires_delta, "sub": str(subject)}
to_encode = {"exp": expires_delta, "sub": str(user.id), "type": user.type, "username": user.username, "is_active": user.is_active}
encoded_jwt = jwt.encode(to_encode, config.JWT_SECRET_KEY, config.ALGORITHM)
return encoded_jwt
def create_refresh_token(subject: Union[str, Any], expires_delta: int = None) -> str:
def create_refresh_token(user: models.User, expires_delta: int = None) -> str:
if expires_delta is not None:
expires_delta = datetime.utcnow() + expires_delta
else:
expires_delta = datetime.utcnow() + timedelta(minutes=config.REFRESH_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expires_delta, "sub": str(subject)}
to_encode = {"exp": expires_delta, "sub": str(user.id), "type": user.type, "username": user.username, "is_active": user.is_active}
encoded_jwt = jwt.encode(to_encode, config.JWT_REFRESH_SECRET_KEY, config.ALGORITHM)
return encoded_jwt
\ No newline at end of file
return encoded_jwt
def get_jwt_user(token: str = Depends(reuseable_oauth), db: Session = Depends(get_db)):
try:
payload = jwt.decode(token, config.JWT_SECRET_KEY, algorithms=[config.ALGORITHM])
token_data = schemas.TokenPayload(**payload)
if datetime.utcfromtimestamp(token_data.exp) < datetime.utcnow():
raise HTTPException(status_code=401, detail="Token has expired")
db_user = crud.get_user(db, user_id=int(token_data.sub))
except(jwt.JWTError, jwt.ExpiredSignatureError, ValidationError, ValueError) as e:
print(e)
raise HTTPException(status_code=403, detail="Invalid token")
return db_user
\ No newline at end of file
from fastapi import FastAPI, status, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel
from pydantic import BaseModel, parse_obj_as
from fastapi.security import OAuth2PasswordRequestForm
import schemas, crud
from database import SessionLocal, Base, engine
from models import UserType
from database import SessionLocal, Base, engine, get_db
import hashing
from utils import check_user_level
Base.metadata.create_all(bind=engine)
......@@ -12,22 +15,14 @@ Base.metadata.create_all(bind=engine)
app = FastAPI()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/health", status_code=status.HTTP_200_OK)
def health():
return {}
@app.post("/login/", response_model=schemas.Token)
def login(user: schemas.UserLogin, db: Session = Depends(get_db)):
db_user = crud.get_user_by_username_and_password(db, user)
@app.post("/login", response_model=schemas.Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
db_user = crud.get_user_by_username_and_password(db, form_data.username, form_data.password)
if db_user is None:
raise HTTPException(status_code=400, detail="Incorrect username or password")
......@@ -37,14 +32,17 @@ def login(user: schemas.UserLogin, db: Session = Depends(get_db)):
}
@app.post("/users/", status_code=status.HTTP_201_CREATED)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
@app.post("/users", status_code=status.HTTP_201_CREATED)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db), current_user: schemas.User = Depends(hashing.get_jwt_user)):
if not check_user_level(current_user, UserType.ADMIN):
raise HTTPException(status_code=401, detail="You do not have permission to create a user")
db_user = crud.get_user_by_username(db, username=user.username)
if db_user:
raise HTTPException(status_code=400, detail="User already registered")
if crud.create_user(db=db, user=user):
return
return "OK"
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
......@@ -53,6 +51,6 @@ def read_user(user_id: int, db: Session = Depends(get_db)):
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.get("/users/", response_model=list[schemas.User])
@app.get("/users", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return crud.get_users(db, skip=skip, limit=limit)
\ No newline at end of file
......@@ -11,6 +11,14 @@ class User(BaseModel):
class Config:
from_attributes = True
def toJSON(self):
return {
"id": self.id,
"username": self.username,
"type": self.type,
"is_active": self.is_active
}
class UserCreate(BaseModel):
username: str
password: str
......@@ -25,5 +33,12 @@ class Token(BaseModel):
access_token: str
refresh_token: str
class Config:
from_attributes = True
class TokenPayload(BaseModel):
sub: str = None
exp: int = None
class Config:
from_attributes = True
\ No newline at end of file
import models
def check_user_level(user: models.User, required_level: models.UserType):
if user.type > required_level.value:
return False
return True
\ No newline at end of file
......@@ -3,3 +3,4 @@ fastapi>=0.110.0,<0.111.0
sqlalchemy>=2.0.0,<2.1.0
passlib>=1.7.0,<1.8.0
python-jose>=3.3.0,<3.4.0
python-multipart>=0.0.0,<0.1.0
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter