TECH

JWT and Cookie Auth in FastAPI

JARRETT RETZ July 9th, 2021 programming python fastapi authorization authentication oauth2 api api security cookies jwt

Introduction

Setting up authentication or authorization in an application is an important and often time-consuming step. I hope that if you are experimenting with different kinds of authentication, this article will help you understand how JWTs (JSON Web tokens) and cookies can work together in FastAPI.

In this article I:

  • Provide context from other tutorials on the big picture
  • Discuss how these JWTs and cookies can work together for security
  • Share Python code for implementation in FastAPI

I leave out a few items:

  • The function code for communicating with a database through FastAPI (instead, there is a function call, but the function implementation is not discussed)
  • Front-end Javascript for communicating with the API

React Admin

React Admin is an open-source React framework for B2B applications. I use it for an application I'm developing because it's actively maintained, easy-to-use, and provides a nice user interface out-of-the-box.

React Admin's blog has an article, Handling JWT in Admin Apps the Right Way, by Alexis Janvier.

Next, I'll outline a few points from the article and how it got me into the situation of handling JWTs and cookies with FastAPI.

Combining JWTs and Cookies

First, JWTs should not be kept in the browser's local storage. This is done all the time in tutorials, despite being a known bad practice. The aforementioned article goes into slightly more depth. Still, more importantly, it walks through a way to store JWTs in memory (you'll have to check out the tutorial on how to do this) that allows the application to use JWTs still but not store them insecurely.

Second, JWTs are useful, but they're not as secure as using a SameSite, HTTP Only cookie. This is because an HTTP Only cookie is not exposed to Javascript (preventing XSS attacks).

Thirdly, regardless of the move to store JWTs in memory, it's still good practice, says Alexis in the article, to keep a short expiration on the JWT token. However, to constantly refresh the token and provide a better user experience by handling multiple tab sessions, refreshing the page, and making authenticated requests, we need to use a cookie.

The JWT and Cookie Workflow

I'll outline an application workflow that keeps the user authenticated during their session. We're going to assume that the user authenticates successfully and everything goes as planned.

  • The user sends log in request to the server ("/token") with username and password:
    1. The server creates a cookie with an ID, user ID, and validity timestamp
    2. Next, the server generates a JWT and sets expiration for the access token
    3. The server returns the JWT in the response body and sets a cookie, with the cookie ID as the value
    4. The client (browser) saves the JWT in memory
    5. Then, the client sets a timeout to call the server ("/refresh-token") just before the access token expires (this keeps the user logged in with a valid JWT during their session. (Note: I'm using access token and JWT interchangeably)
  • Later, the client calls the server ("/refresh-token") just before the token expiration. For this example, let's say fives minutes.
    1. The server checks the cookie ID (sent with the request) to see if it's stored in the cookie database.
    2. If yes, the server checks the validity timestamp on the cookie stored in the database to see if it's still valid.
    3. If yes, it uses the user ID property to get the username and generate a fresh JWT.
    4. The server returns a fresh JWT and expiry time.
    5. The client saves the new JWT in memory and restarts the timeout to call the server ("/refresh-token") again in five minutes.

That's a lot to outline, but it all seems much more simple after writing the code. Finally, It's worth noting that the JWT expires quickly, but the cookie ID can be set to expiry in twenty-four hours, one week, or more.

FastAPI

FastAPI is a Python API framework, and you are probably familiar with it if you're reading this article.

I followed FastAPI's documentation to set up OAuth2 with password hashing and JWT bearer tokens.

That tutorial uses a fake DB object for users, and I set a fake DB object for tokens. Additionally, it covers hashing passwords, creating and validating JWTs. I'll try to include relevant code, but if I miss something, be sure to check there for more guidance.

The Token Route (User Auth)

The /token route is the login route. It validates that the user exists and has entered the right password. Also, if checks with a cookie if the user provided a cookie and if that cookie is still valid.

Either way, it's returning a JWT, expiration time, and cookie.

# userRouter.py
from fastapi import (
    APIRouter,
    HTTPException,
    Depends,
    status,
    Cookie,
)
from .models import User, UserInDB, Token, TokenData, RefreshToken
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional, Union
from fastapi.responses import JSONResponse
import uuid

#...

@userRouter.post("/token", response_model=Token)
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(),
    refreshTokenId: Optional[str] = Cookie(None),
):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    refreshTokenId: str = None

    # TODO: Check for existing token
    existingRefreshToken = getExistingRefreshToken(user.username)

    # TODO: Create current timestamp
    currentTimestamp = round(datetime.now().timestamp()) # seconds

    # TODO: Check if there is a token and the valid time is in the future
    tokenStillValid = existingRefreshToken.validity_timestamp > currentTimestamp

    if existingRefreshToken is not None and tokenStillValid:
        # TODO: If it is, set the refreshToken id to the existing token id.
        refreshTokenId = existingRefreshToken.id
    else:
        if existingRefreshToken is not None and tokenStillValid is False:
        # TODO: If there is an existing token and ID but it's invalid, delete the token
            delete_token(existingRefreshToken.id)

        seconds_till_expiration = ACCESS_TOKEN_EXPIRE_MINUTES * 60
        # TODO: Initialize new token data
        new_token_data = RefreshToken(
            id=uuid.uuid4(), user_id=user.id, validity_timestamp=currentTimestamp + seconds_till_expiration
        )

        # TODO: Create a new refresh token
        new_refresh_token_id = save_token(new_token_data)

        # TODO: Set the refreshTokenId to the newRefreshToken id
        refreshTokenId = new_refresh_token_id

    # TODO: Initialize time object for cookie creation
    # current time + 5 minutes then conver to miliseconds
    seconds_till_expiration = ACCESS_TOKEN_EXPIRE_MINUTES * 60
    cookie_token_expiration = currentTimestamp + seconds_till_expiration
    cookie_token_expiration_in_ms =  cookie_token_expiration * 1000 # miliseconds

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    response = JSONResponse(
        content={
            "access_token": access_token,
            "token_type": "bearer",
            "token_expiry": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
        }
    )

    # TODO: Set the cookie with name, refreshtokenid, and cookie options
    response.set_cookie(
        key="invoice_processing",
        value=refreshTokenId,
        expires=cookie_token_expiration_in_ms,
        httponly=True,
    )

    return response
    
# ...

The Refresh Token Route

The /refresh-token route receives the cookie and checks whether the cookie ID is valid. It's valid if it exists and has a timestamp greater than the current timestamp (i.e., 'now').

If the cookie ID passes both of those checks, it retrieves the username—using the user ID property saved with the cookie in the database—and generates a new JWT access token to return to the client.

# userRouter.py

# ...

@userRouter.get("/refresh-token", response_model=Token)
async def get_refresh_token(
    invoice_processing: str = Cookie(None, description="Refresh token id")
):
    # TODO: Check if token ID is in database
    tokenFromDb = get_token_with_token_id(fake_token_db, invoice_processing)

    if tokenFromDb is None:
        raise refresh_token_exception

    # TODO: Create current timestamp
    currentTimestamp = round(datetime.now().timestamp())

    # TODO: Check if the timestamp is less than the current timestamp
    if tokenFromDb.validity_timestamp < currentTimestamp:
        # TODO: If it is, delete the token from the db
        delete_token(invoice_processing)

        # TODO: Return 400 'The refresh token is expired'
        raise refresh_token_exception

    # TODO: Use the user ID to on the token object to fetch the user
    user = get_user(fake_users_db, tokenFromDb.user_id)

    # TODO: Check if the user exists and the user is not disabled.
    if user is None or user.disabled is True:
        # TODO: If either are true, return a 400 'Invalid credentials'
        raise user_exception

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    # TODO: Put expiration time in congif object
    response = JSONResponse(
        content={
            "access_token": access_token,
            "token_type": "bearer",
            "token_expiry": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
        }
    )

    return response
    
# ...

Helper Functions and Dictionaries

To get the routes tested and working with the frontend, I wrote a few example functions for fetching data from a fake database. Additionally, I created dictionaries to act as mock databases returning hard-coded data. Below are the functions and objects I used. These are not fit for production, but they help us understand how this workflow will eventually work.

# ...

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # 'secret'
        "disabled": False,
    },
}

fake_token_db = {
    "42d6496a-4a64-4492-a1bd-9a8d554e156d": {
        "user_id": "johndoe",
        "id": "42d6496a-4a64-4492-a1bd-9a8d554e156d",
        "validity_timestamp": 2625502317, # way in the future
    },
}

user_exception = HTTPException(
    status_code=status.HTTP_401_UNAUTHORIZED,
    detail="User account error.",
    headers={"WWW-Authenticate": "Bearer"},
)

refresh_token_exception = HTTPException(
    status_code=status.HTTP_401_UNAUTHORIZED,
    detail="The refresh token has expired or was not found.",
    headers={"WWW-Authenticate": "Bearer"},
)

userRouter = APIRouter(
    prefix="/user",
    tags=["User"],
    # dependencies=[Depends(get_token_header)],
    responses={404: {"description": "Not found"}},
)

##
# Token
##
SECRET_KEY = "secret key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 5
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


##
# OAuth2 for Login
##

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/user/token")


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)
    else:
        return None


def get_token(db, username: str) -> Union[RefreshToken, None]:
    # TODO: check for token in db
    for k, v in db.items():
        if v["user_id"] == username:
            return RefreshToken(**v)
    return None


def get_token_with_token_id(db, tokenId: str) -> Union[RefreshToken, None]:
    # TODO: check for token in db
    if tokenId in db:
        return RefreshToken(**db[tokenId])
    return None


def delete_token(token_id: str) -> None:
    # TODO: Delete token from db
    return None


def save_token(token_data: RefreshToken) -> str:
    fake_token_db[token_data.id] = token_data.dict()
    return token_data.id


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=5)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt
    
# ...

Models

After writing the article I noticed the file used models. So I'm going to include those models below.

# models.py

from typing import Optional
from pydantic import BaseModel
import uuid

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

class RefreshToken(BaseModel):
    id: uuid.UUID
    user_id: str
    validity_timestamp: float

Conclusion

The server still has a long way to go if it wants to make it into production, but this is a great starting point and hopefully clears up confusion and relieves built-up frustration for anyone trying to roll their own auth.

As mentioned, there are two tutorials and an entire frontend framework (React Admin) that are necessary to read and understand before grasping the entire essence of this article. Therefore, if you find yourself confused about where something came from or why it's set up to run how it is, check the linked tutorials above.

Thanks for reading!


Have a thought about the article?

Send JRTS a message!

We'll use this email to respond to your message.
Contact