from flask_restx import Resource, reqparse from jwt import InvalidTokenError import services from controllers.console.auth.error import ( AuthenticationFailedError, EmailCodeError, InvalidEmailError, ) from controllers.console.error import AccountBannedError from controllers.console.wraps import only_edition_enterprise, setup_required from controllers.web import web_ns from libs.helper import email from libs.password import valid_password from services.account_service import AccountService from services.webapp_auth_service import WebAppAuthService @web_ns.route("/login") class LoginApi(Resource): """Resource for web app email/password login.""" @setup_required @only_edition_enterprise @web_ns.doc("web_app_login") @web_ns.doc(description="Authenticate user for web application access") @web_ns.doc( responses={ 200: "Authentication successful", 400: "Bad request - invalid email or password format", 401: "Authentication failed - email or password mismatch", 403: "Account banned or login disabled", 404: "Account not found", } ) def post(self): """Authenticate user and login.""" parser = reqparse.RequestParser() parser.add_argument("email", type=email, required=True, location="json") parser.add_argument("password", type=valid_password, required=True, location="json") args = parser.parse_args() try: account = WebAppAuthService.authenticate(args["email"], args["password"]) except services.errors.account.AccountLoginError: raise AccountBannedError() except services.errors.account.AccountPasswordError: raise AuthenticationFailedError() except services.errors.account.AccountNotFoundError: raise AuthenticationFailedError() token = WebAppAuthService.login(account=account) return {"result": "success", "data": {"access_token": token}} # class LogoutApi(Resource): # @setup_required # def get(self): # account = cast(Account, flask_login.current_user) # if isinstance(account, flask_login.AnonymousUserMixin): # return {"result": "success"} # flask_login.logout_user() # return {"result": "success"} @web_ns.route("/email-code-login") class EmailCodeLoginSendEmailApi(Resource): @setup_required @only_edition_enterprise @web_ns.doc("send_email_code_login") @web_ns.doc(description="Send email verification code for login") @web_ns.doc( responses={ 200: "Email code sent successfully", 400: "Bad request - invalid email format", 404: "Account not found", } ) def post(self): parser = reqparse.RequestParser() parser.add_argument("email", type=email, required=True, location="json") parser.add_argument("language", type=str, required=False, location="json") args = parser.parse_args() if args["language"] is not None and args["language"] == "zh-Hans": language = "zh-Hans" else: language = "en-US" account = WebAppAuthService.get_user_through_email(args["email"]) if account is None: raise AuthenticationFailedError() else: token = WebAppAuthService.send_email_code_login_email(account=account, language=language) return {"result": "success", "data": token} @web_ns.route("/email-code-login/validity") class EmailCodeLoginApi(Resource): @setup_required @only_edition_enterprise @web_ns.doc("verify_email_code_login") @web_ns.doc(description="Verify email code and complete login") @web_ns.doc( responses={ 200: "Email code verified and login successful", 400: "Bad request - invalid code or token", 401: "Invalid token or expired code", 404: "Account not found", } ) def post(self): parser = reqparse.RequestParser() parser.add_argument("email", type=str, required=True, location="json") parser.add_argument("code", type=str, required=True, location="json") parser.add_argument("token", type=str, required=True, location="json") args = parser.parse_args() user_email = args["email"] token_data = WebAppAuthService.get_email_code_login_data(args["token"]) if token_data is None: raise InvalidTokenError() if token_data["email"] != args["email"]: raise InvalidEmailError() if token_data["code"] != args["code"]: raise EmailCodeError() WebAppAuthService.revoke_email_code_login_token(args["token"]) account = WebAppAuthService.get_user_through_email(user_email) if not account: raise AuthenticationFailedError() token = WebAppAuthService.login(account=account) AccountService.reset_login_error_rate_limit(args["email"]) return {"result": "success", "data": {"access_token": token}}