Skip to content

Authentication Router

The auth router will generate a set of endpoints for authenticating users.

  • POST /register
  • POST /login
  • POST /logout
  • POST /token
  • POST /token/refresh
  • GET /confirm
  • POST /confirm
  • POST /confirm/{token}
  • POST /{id}/change_username

Setup The Authentication service

To Setup the authentication service, you will need to add all requirements to the object AuthService.

from authx.services.auth import AuthService
from authx.backend import UsersRepo
from authx.core.jwt import JWTBackend

AuthService.setup(
        repo = UsersRepo,
        auth_backend = JWTBackend,
        debug = True,
        base_url = 'http://localhost:8000',
        site = 'authx',
        recaptcha_secret = None,
        smtp_username = None,
        smtp_password = None,
        smtp_host = None,
        smtp_tls = False,
        display_name = 'authx',
    )

This one gonna help use to use the authentication service, that we provide.

from authx import Authentication
from fastapi import FastAPI

app = FastAPI()
auth = Authentication()

app.include_router(auth.auth_router, prefix="/api/users")

Register

As we know we will use the POST method to register a new user, also a callback method.

This Route is based on:

@router.post("/register", name="auth:register")
    async def register(*, request: Request, response: Response):
        data = await request.json()
        service = AuthService()

        tokens = await service.register(data)
        set_tokens_in_response(response, tokens)
        return None

The Service.register method will return the Access and Refresh tokens, and if we have a validation error it will return the error 400.

Login

Same logical way as register, we use the same Authentication route.

Info

app.include_router(auth.auth_router, prefix="/api/users") include all authentication routers.

@router.post("/login", name="auth:login")
    async def login(*, request: Request, response: Response):
        data = await request.json()
        service = AuthService()
        ip = request.client.host
        tokens = await service.login(data, ip)
        set_tokens_in_response(response, tokens)
        return None

For the login Service we provide some params:

  • data: The data that we will use to login. (login, password).
  • ip: The IP of the client.

if the data is not valid, we will return the error 400, if the data is valid, we will return the tokens (access and refresh).

Info

There is also an HTTPException relate to 404 if the user is not found, also the 429 relate to brute force attempts.

Info

The HTTP 429 Too Many Requests response status code indicates the user has sent too many requests in a given amount of time ("rate limiting"). A Retry-After header might be included to this response indicating how long to wait before making a new request.

Logout

As we know logout always mean that we will remove the access_cookies_name and refresh_cookies_name from the response.

@router.post("/logout", name="auth:logout")
    async def logout(*, response: Response):
        response.delete_cookie(access_cookie_name)
        response.delete_cookie(refresh_cookie_name)
        return None

The response.delete_cookie is a function in the starlette a lightweight ASGI framework/toolkit, which is ideal for building high performance asyncio services.

Token

After login and all the steps we have a function relate to get a new token based on user a class that initialize the user object and use data (login, password) as an argument.

@router.post("/token", name="auth:token")
    async def token(*, user: User = Depends(get_authenticated_user)):
        return user.data
  • User Data:
def __init__(self, data=None):
        self.data = data
        if data is None:
            self.is_authenticated = False
            self.is_admin = False
            self.id = None
            self.username = None
        else:
            self.is_authenticated = True
            self.is_admin = "admin" in self.data.get("permissions")
            self.id = int(self.data.get("id"))
            self.username = self.data.get("username")

Refresh Token

We have also a way to get a new refresh token, this is the same as the token method, but we will use the refresh_token instead of access_token, it take request & response as arguments, use also starlette to set the refresh_cookie_name in the response.

@router.post("/token/refresh", name="auth:refresh_access_token")
    async def refresh_access_token(
        *, request: Request, response: Response,
    ):
        service = AuthService()
        refresh_token = request.cookies.get(refresh_cookie_name)
        if refresh_token is None:
            raise HTTPException(401)

        access_token = await service.refresh_access_token(refresh_token)
        set_access_token_in_response(response, access_token)
        return {"access": access_token}
  • The service.refresh_access_token will return the new access token, or it will raise;
  • 401: if the refresh token is not valid. (Refresh or Ban).
  • 500: if the refresh token is expired.

Info

set_access_token_in_response take the response and the token as arguments, also set : py response.set_cookie( key=refresh_cookie_name, value=token, secure=not debug, httponly=True, max_age=refresh_expiration, )

Confirm

We have 3 steps for the Email confirmation:

ConfirmationStatus

For the get_email_confirmation_status we will use the GET method, and we will return the status of the email confirmation.

@router.get("/confirm", name="auth:get_email_confirmation_status")
    async def get_email_confirmation_status(
        *, user: User = Depends(get_authenticated_user)
    ):
        service = AuthService(user)
        return await service.get_email_confirmation_status()

The service.get_email_confirmation_status() gonna return the status of the email confirmation. {"email": sample@sample.com, "confirmed": True}

async def get_email_confirmation_status(self) -> dict:
        item = await self._repo.get(self._user.id)
        return {"email": item.get("email"), "confirmed": item.get("confirmed")}

RequestConfirmation

for the request_email_confirmation we will use the POST method, and we will return the status of the email confirmation, its take the user as an arguments or by default its depend on get_authenticated_user.

@router.post("/confirm", name="auth:request_email_confirmation")
    async def request_email_confirmation(
        *, user: User = Depends(get_authenticated_user)):
        service = AuthService(user)
        return await service.request_email_confirmation()

the service.request_email_confirmation() check the email if is confirmed or not, for example if is not confirmed its return a link to confirm this email, if confirmed raise an HTTPException 400 with the message Email already confirmed, also can show the timeout exception.

async def request_email_confirmation(self) -> None:
        item = await self._repo.get(self._user.id)
        if item.get("confirmed"):
            raise HTTPException(400)
        if not await self._repo.is_email_confirmation_available(self._user.id):
            raise HTTPException(429)
        email = item.get("email")
        await self._request_email_confirmation(email)
        return None

Confirmation

For the confirm_email function we will use the POST method, and it will return a response with a token to verify, its take the token as an arguments.

@router.post("/confirm/{token}", name="auth:confirm_email")
    async def confirm_email(*, token: str):
        service = AuthService()
        return await service.confirm_email(token)

The service.confirm_email hash the token, or looks up hash in db to update the confirmed row to row (Default is false), this could raise also 403 an HTTPException that show if there is no hash in database.

async def confirm_email(self, token: str) -> None:
        token_hash = hash_string(token)
        if not await self._repo.confirm_email(token_hash):
            raise HTTPException(403)
        return None

Change Username

At the last point of authentication we have a function to change the username, it take id, username, user as arguments, and return a response with the new username.

@router.post("/{id}/change_username", name="auth:change_username")
    async def change_username(
        *,
        id: int,
        username: str = Body("", embed=True),
        user: User = Depends(get_authenticated_user),):
        service = AuthService(user)
        if user.id == id or user.is_admin:
            return await service.change_username(id, username)
        else:
            raise HTTPException(403)
    return router

The service.change_username will return the new username, or it will raise; - 400: Username already exists. - 404: user not found.

async def change_username(self, id: int, username: str) -> None:
        new_username = self._validate_user_model(
            UserInChangeUsername, {"username": username}
        ).username
        item = await self._repo.get(id)
        old_username = item.get("username")
        if old_username == new_username:
            raise HTTPException(400, detail=get_error_message("username change same"))
        existing_user = await self._repo.get_by_username(new_username)
        if existing_user is not None:
            raise HTTPException(400, detail=get_error_message("existing username"))
        logger.info(
            f"change_username id={id} old_username={old_username} new_username={new_username}"
        )
        await self._repo.change_username(id, new_username)
        logger.info(f"change_username id={id} success")
        return None