# db/crud.py from sqlalchemy.orm import Session from . import models from api import schemas from core.security import get_password_hash, verify_password # --- User CRUD --- def get_user(db: Session, user_id: int): """Obtiene un usuario por su ID.""" return db.query(models.User).filter(models.User.id == user_id).first() def get_user_by_username(db: Session, username: str): """Obtiene un usuario por su nombre de usuario.""" return db.query(models.User).filter(models.User.username == username).first() def get_user_by_email(db: Session, email: str): """Obtiene un usuario por su email.""" return db.query(models.User).filter(models.User.email == email).first() def get_users(db: Session, skip: int = 0, limit: int = 100): """Obtiene una lista de usuarios con paginación.""" return db.query(models.User).offset(skip).limit(limit).all() def create_user(db: Session, user: schemas.UserCreate): """Crea un nuevo usuario en la base de datos.""" hashed_password = get_password_hash(user.password) db_user = models.User( username=user.username, email=user.email, hashed_password=hashed_password ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def update_user_password(db: Session, user: models.User, new_password: str): """Actualiza la contraseña de un usuario.""" hashed_password = get_password_hash(new_password) user.hashed_password = hashed_password db.add(user) db.commit() db.refresh(user) return user def authenticate_user(db: Session, username: str, password: str): """ Autentica a un usuario. Retorna el objeto de usuario si es exitoso, de lo contrario, retorna None. """ user = get_user_by_username(db, username) if not user: return None if not verify_password(password, user.hashed_password): return None return user