TECH

How to Access DynamoDB in FastAPI

JARRETT RETZ July 16th, 2021 programming python fastapi api dynamodb aws aioboto3

Update

As I read through more remote corners of the FastAPI documentation I happened across a few paragraphs that made me pause and reconsider my DynamoDB access using aioboto3. The first is:

If you are using a third party library that communicates with something (a database, an API, the file system, etc) and doesn't have support for using await, (this is currently the case for most database libraries), then declare your path operation functions as normally, with just def...

https://fastapi.tiangolo.com/async/#in-a-hurry

Although libraries exist for asynchronous code with AWS resources (aioboto3) FastAPI is perfectly fine with using standard, blocking libraries. This point is clarified in this second paragraph.

When you declare a path operation function with normal def instead of async def, it is run in an external threadpool that is then awaited, instead of being called directly (as it would block the server).

If you are coming from another async framework that does not work in the way described above and you are used to define trivial compute-only path operation functions with plain def for a tiny performance gain (about 100 nanoseconds), please note that in FastAPI the effect would be quite opposite. In these cases, it's better to use async def unless your path operation functions use code that performs blocking I/O.

Still, in both situations, chances are that FastAPI will still be faster than (or at least comparable to) your previous framework.

https://fastapi.tiangolo.com/async/#path-operation-functions

The code I wrote in the original version of this article works, but it's really unnecessary.

Implications for my original article

FastAPI already considers the problem of popular tested database libraries that do not support async/await syntax.

Using the aioboto3 library forced me to do a few things:

  1. Create a client on every route
  2. Serialize and deserialize response and request objects
  3. Convert some documented code (AWS docs use boto3) to work with aioboto3.

Something else, not related to how the library works, was wrapping my entire route in try and except block. This made it difficult to return errors the way described in the documentation.

Furthermore, and to reiterate, I'm not creating blocking code in FastAPI.

So, let's take a look at what the new company resource CRUD operations look like. And, how we can set up the Dynamo resource once in a separate file.

# dynamo_client.py
import boto3
from main import settings_args

dynamodb = boto3.resource("dynamodb", **settings_args)
# companyRouter.py
from botocore.exceptions import ClientError
from src.company.models import Company, UpdateCompany
from fastapi import APIRouter, status, HTTPException, Path, Response, Query, Depends
from typing import Optional, Any
from src.exceptions import not_found_exception
from src.dynamodb_client import dynamodb
# ... other imports

# .. router code

table_name = "companies"

@companyRouter.get(
    "/{company}",
    # ... open api stuff
)
def read_company(
    company: str = Path(
        ...,
        # ... open api stuff
    )
):
    table = dynamodb.Table(table_name)

    try:
        response = table.get_item(Key=dict(id=company))
    except ClientError as e:
        raise HTTPException(status_code=500, detail=str(e))

    try:
        item = response["Item"]
    except KeyError:
        raise not_found_exception
    return item


@companyRouter.put(
    "",
    # ... open api stuff
)
def create_company(company: UpdateCompany):
    company_dict = company.dict()
    company_dict["id"] = company_dict["name"].lower()

    table = dynamodb.Table(table_name)

    try:
        table.put_item(Item=company_dict)
    except ClientError as e:
        raise HTTPException(status_code=500, detail=str(e))

    logger.info(f'COMPANY: {company_dict["name"]} added to companies.')
    logger.debug(company_dict)

    return company_dict


@companyRouter.put(
    "/{company}",
    # ... open api stuff
)
def update_company(
    payload: UpdateCompany,
    company: str = Path(
      # ... open api stuff
    ),
):
    table = dynamodb.Table(table_name)

    try:
        response = table.get_item(Key=dict(id=company))
    except ClientError as e:
        raise HTTPException(status_code=500, detail=str(e))

    try:
        response = response["Item"]
    except KeyError:
        raise not_found_exception

    existing_company = Company(**response)
    update_data = payload.dict(exclude_unset=True)
    updated_company = existing_company.copy(update=update_data)

    updated_company_dict = updated_company.dict()

    try:
        table.update_item(
            Key=dict(id=company),
            UpdateExpression="set #n=:n, keywords=:kw",
            ExpressionAttributeValues={
                ":n": updated_company_dict["name"],
                ":kw": updated_company_dict["keywords"],
            },
            ExpressionAttributeNames={"#n": "name"},
            ReturnValues="UPDATED_NEW",
        )
    except ClientError as e:
        raise HTTPException(status_code=500, detail=str(e))

    return updated_company_dict


@companyRouter.delete(
    "/{company}",
    # ... open api stuff
)
def delete_company(
    company: str = Path(
        ...,
        # ... open api stuff
    ),
):
    table = dynamodb.Table(table_name)
    try:
        table.delete_item(Key=dict(id=company))
    except ClientError as e:
        raise HTTPException(status_code=500, detail=str(e))

    return dict(id=company)

This code is 10x easier to read and write. Again, I save myself a lot of 'trouble' by choosing to use the boto3 library with path operations using def instead of async def.

I recommend glancing through the original article below to get more context on what I was doing and where I am coming from!

Introduction

This article is a short article on accessing DynamoDB from a FastAPI server. First, I'll talk a little about FastAPI, some libraries used to access DynamoDB, and then share the code that works for me at the moment.

FastAPI

FastAPI is a Python API framework that's asynchronous, supports automatic OpenAPI spec generation, uses static types, and enforces HTTP response and request schemas at runtime.

It's relatively young, but I like it, and I hope work on it continues.

DynamoDB

To explain DynamoDB, I'll borrow from the AWS website:

Amazon DynamoDB is a key-value and document database that delivers single-digit millisecond performance at any scale. It's a fully managed, multi-region, multi-active, durable database with built-in security, backup and restores, and in-memory caching for internet-scale applications. DynamoDB can handle more than 10 trillion requests per day and can support peaks of more than 20 million requests per second.

It's a casual database that I have chosen to help store objects, sometimes the objects have different properties, so it's been helpful so far.

Fetching Data from DynamoDB with FastAPI

Most of the examples on the web and AWS documentation example use the boto3 library to connect to AWS resources.

However, this library does not work for async Python development. Therefore, a few other libraries are available to handle async operations. They include:

I use aioboto3, which is built on top of aiobotocore and boto3. The basic description for the package, from the documentation, is:

With aioboto3, you can now use the higher-level APIs provided by boto3 in an asynchronous manner. Mainly I developed this as I wanted to use the boto3 dynamodb Table object in some async microservices.

Credentials

To use the module, I need to grab my AWS credentials. I set these up in a file named aws_session.py:

from . import api
import aiobotocore

settings = api.get_settings()

session = aiobotocore.get_session()

settings_args = dict(
    aws_access_key_id=settings.AWS_ACCESS_KEY,
    aws_secret_access_key=settings.AWS_SECRET_KEY,
    aws_session_token=settings.AWS_SESSION_TOKEN,
    region_name="region",
)

You'll first have to understand how to pass variables in FastAPI.

In my router files, after importing the objects I need from the file, I'll create asynchronous sessions with the session object and my AWS credentials. Then I can use the library in a similar manner to boto3, with a few caveats.

I don't set up a client for each table. Instead, I'm passing the table when. Also, I have to serialize and deserialize to fit DynamoDB's JSON typing format. I have functions to help me with that, but I think the process could improve.

The below code details how I set up some small routes to help with CRUD operations on a resource.

File Outline

The code shared below only shows the general outline with select function arguments and operations. It does not show all the imports, route arguments, and in-between lines of code.

# ... other imports
from app.aws_session import settings_args, session

# Dynamodb scan arguments
dynamo_kwargs = {
    "companies": {
        # ...
        # "ExpressionAttributeNames": {"#n": "name"},
    }
}


@companyRouter.get("", summary="Fetches company data")
async def read_all(
    #...
):
    #...
    async with session.create_client(
        "dynamodb", **settings_args, use_ssl=False
    ) as client:
        table_name = "companies"

        # Try to fetch items
        try:
            dynamo_response = await client.scan(
                TableName=table_name, **dynamo_kwargs["companies"]
            )

            items = dynamo_response["Items"]

            items = [deserialize_item(item) for item in items]
            #...
            return items
        except Exception as e:
            #...


@companyRouter.get(
    "/{company}",
    #...
)
async def read_company(
    #...
):
    async with session.create_client(
        "dynamodb", **settings_args, use_ssl=False
    ) as client:
        table_name = "companies"

        try:
            response = await client.get_item(
                TableName=table_name, 
                Key=dict(id=dict(S=company)) # manually serialize
            )

            return deserialize_item(response["Item"])
        except Exception as e:
            #...


@companyRouter.put("", status_code=status.HTTP_201_CREATED)
async def create_company(company: UpdateCompany):
    async with session.create_client(
        "dynamodb", **settings_args, use_ssl=False
    ) as client:
        table_name = "companies"
        company_dict = company.dict()

        company_dict['id'] = company_dict['name'].lower()

        try:
            await client.put_item(
                TableName=table_name, 
                Item=serialize_item(company_dict) # use serialize function for dict
            )

            #...
            
            return company_dict
        except Exception as e:
            logger.error(e, exc_info=True)
            return e


@companyRouter.put("/{company}")
async def update_company(
    #...
):
    async with session.create_client(
        "dynamodb", **settings_args, use_ssl=False
    ) as client:
        try:
            table_name = "companies"

            response = await client.get_item(
                TableName=table_name, Key=dict(id=dict(S=company))
            )

            response = deserialize_item(response["Item"])

            existing_company = Company(**response)

            update_data = payload.dict(exclude_unset=True)

            updated_company = existing_company.copy(update=update_data)

            updated_company_dict = updated_company.dict()

            serialized_company = serialize_item(updated_company_dict)

            await client.update_item(
                TableName='companies',
                Key={
                    "id": dict(S=company),
                },
                UpdateExpression= #..,
                ExpressionAttributeValues={
                    #...
                },
                ExpressionAttributeNames={
                    #...
                },
                ReturnValues="UPDATED_NEW",
            )
            return updated_company
        except Exception as e:
            #...


@companyRouter.delete("/{company}", status_code=status.HTTP_202_ACCEPTED)
async def delete_company(
    #...
):
    async with session.create_client(
        "dynamodb", **settings_args, use_ssl=False
    ) as client:
        try:
            table_name = "companies"

            response = await client.get_item(
                TableName=table_name, Key=dict(id=dict(S=company))
            )

            response = deserialize_item(response["Item"])

            await client.delete_item(
                TableName="companies", Key=dict(id=dict(S=company))
            )

            #...
        except Exception as e:
            #...

@companyRouter.delete("", status_code=status.HTTP_202_ACCEPTED)
async def delete_invoices(
    # ...
):
    async with session.create_client(
        "dynamodb", **settings_args, use_ssl=False
    ) as client:
        try:
            table_name = "companies"
            filter_to_dict = ast.literal_eval(filter)

            deletedIds = []
            if "id" in filter_to_dict:
                for id in filter_to_dict["id"]:
                    await client.delete_item(
                        TableName=table_name, Key=dict(id=dict(S=id))
                    )

                    deletedIds.append(id)

            return deletedIds
        except Exception as e:
            # ...

Sometimes, if the input is only a string, I'll manually create the dictionary to send to DynamoDB. Other times, if the object is significant, I'll use the serialization function borrowed (and slightly adapted) from boto3.

Using DynamoDB requires Expression and Projection arguments. If I'm reusing many of those arguments, I'll put them in a dictionary and access them by table name.

dynamo_response= await client.scan(
  TableName=table_name,
  **dynamo_kwargs["companies"] # spread dictionary arguments for expressions and projections
)

Conclusion

This setup works for me so far, so I'll continue to use it until I see something better! I didn't include all of the code from the file, but if you are curious, I hope you could see the general function structures and how you could set up your FastAPI routes. Thanks for reading!


Have a thought about the article?

Send JRTS a message!

We'll use this email to respond to your message.
Contact