FastAPI

Posted by neverset on September 5, 2020

FastAPI is a high performance web application framework based on python 3.6+, it is similar to flask but more advanced than flask github page: https://github.com/tiangolo/fastapi advantages:

  • Async/Await support
  • Built-in validation and serialization
  • fast performance thanks to starletter and pydantic
  • simple and intuitive API and http method
  • less error prone (output nett json even in error)
  • auto-generated interactive documentation: http://localhost:port/docs (current api docs) http://localhost:port/redocs (api docs for all versions, can be downloads as json for publication)
  • flexiable route system selection
  • uses Pydantic to handle all data validation, data serialization and automatic model documentation
  • inherits Starlette, so all callable objects in Starlette can be directly referenced in FastAPI
  • based on API open standards:Swagger UI and Redoc
  • powerful dependency injection system
  • optionally declare a response parameter in the function to set the header and cookie
  • use Python type hints to declare parameters disadvantage:
  • reletively new, less third-party library

basic features

FastAPI recommends using uvicorn to run the service. Uvicorn is a lightning fast ASGI server built on uvloop and httptools:

uvicorn main:app --reload

we can add some description configuration in fastAPI, which will show in docs

app = FastAPI(
title="My Super Project",
description="This is a very fancy project, with auto docs for the API and everything",
version="6.6.6",)
#change json download links
app = FastAPI(openapi_url="/api/v1/openapi.json")
#change docs links and deactivate redoc links
app = FastAPI(docs_url="/documentation", redoc_url=None)

query parameter and verification

#url to query: http://localhost:port/items/?skip=0&limit=10
from fastapi 
import FastAPIapp = FastAPI()
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):    
    return fake_items_db[skip : skip + limit]

#use Query() to add restrictions on query parameters to verify the query parameters
from fastapi import FastAPI, Query

@app.get("/items/")
#define query parameter as list of string with default parameters
async def read_item(q: List[str]=Query(["fool","bar"])):
    query_items={"q":q}
    return query_items

@app.get("/items/1/")
#define query parameter as str with min and max length
#the default value is None, if use ... then it is empty
#regex defines regular expression of the string 
#if deprecated=True, the parameter is abandoned, will not be used any more
async def read_item1(q: str=Query(None, min_length=3, max_length=50, regex="^nice", deprecated=True)):
    results={"items":[{"item_id": "Foo"},{"item_id":"Bar"}]}
    if q:
        results.update({"q": q})
    return results

@app.get("/items/2")
#add an alias to the query parameter, the original query parameter name can not be used any more!!
async def read_item2(q: str=Query(None, alias="item-query")):
    results={"items":[{"item_id":"Foo"},{"item_id":"Bar"}]}
    if q:
        results.update({"q":q})
    return results

path parameter and verification

use path parameter should be avoid, better to use query parameter, better to use query to pass in parameter

from fastapi import FastAPI, Path, Query

app=FastAPI()

@app.get("/items/{item_id}")
async def read_items(
    #ge(greater or equal), le(smaller or equal), gt(greater than), lt(smaller than)
    item_id: int = Path(..., title="The ID of the item to get", ge=50, le=100),
    q: str       = Query(None, alias="item-query"),
    size: float  = Query(1, gt=0, lt=10.5)):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    return results

request body

by delcaring request: Request FastAPI will automatically pass that parameter in the function

from fastapi import FastAPI, Body
from pydantic import BaseModel
from pydantic import ValidationError
from datetime import datetime
from typing import List

# create request body class based on pydantic BaseModel

class Item(BaseModel):
    name: str = "test data model"
    description: str = None
    price: float = 123
    tax: float = None

app = FastAPI()

#this post api run with request body, which can be tested with docs api
@app.post("/items/")
#insert importance as request body parameter
#embed option to embed value into the key
async def create_item(item: Item, importance: int = Body(..., embed=True)):
    return item
#error handling 
try:
    Item(price='not float'])
except ValidationError as e:
    print(e.json())

response body

respons body can be defined with contents and header.

#filter input date and give modified output data
from typing import union
from fastapi import FastAPI
from pydantic import BaseModel, EmailStr
app=FastAPI()

class UserIn(BaseModel):
    username: str
    password: str
    email:EmailStr
    full_name: str=None

class UserOut(BaseModel):
    username: str
    email: EmailStr
    full_name: str = None

class UserOut_fakedPW(BaseModel):
    username: str
    faked_password: str
    email:EmailStr
    full_name: str=None

class Item(BaseModel):
    name: str
    descriptoin: str=None
    price: float
    tax: float=10.2
    tags: List[str]=[]

class BaseItem(BaseModel):
    description: str
    type: str

class CarItem(BaseItem):
    type= "car"

class PlaneItem(BaseItem):
    type="plane"
    size: int

item={
    "foo": {"name":"Foo", "price": 50.3},
    "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
    "baz": {"name": "Baz", "description": None, "price": 50.3, "tax": 10.8, "tags":[]},
}

#filter out passord for output
@app.post("/user/", response_model=UserOut)
async def create_user(*, user:UserIn):
    return user

#encrypte password
#json object needs to be dictionarized to be taken as dictionary in python
def fake_user(user_in: UserIn):
    hashed_password="secret"+user_in.password
    encrypted_user=UserOut_fakedPW(**user_in.dict(),faked_password=hashed_password)

#select item and return item with available attributes
#the unavailabe attributes defined in Item class will not be returned
@app.get("/items/{item_id}", response_model=Item, response_model_exclude_unset=True)
async def read_item(item_id: str):
    return items(item_id)

#select item and return item without excluded attributes
#the unavaible attributes defined in Item class will still be kept with default value
@app.get("/items/{item_id}/public", response_model=Item, response_model_exclude={"price"})
async def read_item_public_data(item_id:str):
    return items[item_id]

@app.get("/items/{item_id}/name",
response_model=Item,
response_model_include=["name", "description", "tax"],)
async def read_item_name(item_id: str):
    return items[item_id]

#union two data model
@app.get("/items/{item_id}", response_model=Union[PlaneItem, CarItem])
async def read_item(item_id: str):
    return items[item_id]

update

update can be done with app.put() or app.patch(), difference is Idempotency: put->update with creating new if necessary, patch->only update

from fastapi import FastAPI
from pydantic import BaseModel

class Item(BaseModel):
    name: str
    description: str = None
    price: float = None
    tax: float = 10.2
    tags: List[str]=[]

items={
    "foo": {"name":"Foo", "price": 50.3},
    "bar": {"name": "Bar", "description": "The bartenders", "price": 62, "tax": 20.2},
    "baz": {"name": "Baz", "description": None, "price": 50.3, "tax": 10.8, "tags":[]},
}

app = FastAPI()
@app.put("/items/{item_id}")
# * means keywords only arguments, no auto-filling
async def create_item(*, item_id: int, item: Item, q: str = None):
    #** means to unpack the dictionary
    result = {"item_id": item_id, **item.dict()}
    if q:
        result.update({"q": q})
    return result

@app.patch("/items/{item_id}", response_model=Item)
async def update_item(item_id: str, item:Item):
    stored_item_data=items[item_id]
    stored_item_model=Item(**stored_item_data)
    update_data=item.dict(exclude_unset=True)
    print("update_data", update_data)
    updated_item=stored_item_model.copy(update=update_data)
    print("updated_item", jsonable_encoder(updated_item))
    items[item_id]=jsonable_encoder(updated_item)
    print("items[item_id]", items[item_id])
    return updated_item

body fields

uing bady field to verify arguments in path, query or request

from typing import List, Set, Dict
from fastapi import Body, FastAPI
from pydantic import BaseModel, Field

app = FastAPI()
class Item(BaseModel):
    name: str
    description: str = Field(None, title="The description of the item", max_length=6)
    price: float     = Field(..., gt=0, description="The price must be greater than zero")
    tax: float       = None

#this is a nested body class
class Item(BaseModel):
    name: str
    description: str  = None
    price: float
    tax: float        = None
    tags0: list       = []
    tags1: List[str]  = []
    tags2: Set[str]   = set()

#using dictionary with key/value type verification
@app.post("/index-weights/")
async def create_index_weights(weights: Dict[int, float]): 
    return weights

@app.put("/items/{item_id}")
async def update_item(*, item_id: int, item: Item = Body(..., 
        example={   # example是Body里没有的字段;不会添加任何验证,而只会添加注释;不是example也不行
        "name": "Foo",
        "description": "A very nice Item",
        "price": 0,
        "toooo": 3.2,
        # "toooooooooo": 3.2, # 超过的键值对,会全部显示原来的Item},
    embed=True)):
    results = {"item_id": item_id, "item": item}
    # to insert item_id into same dictionary of item, we have to do it this way
    results={"item_id": item_id, "item": item.name,...}
    return results

define cookie parameter for verification

from fastapi import Cookie, FastAPI
app=FastAPI()

@app.get("/items/")
async def read_items(*,ads_id: str=Cookie(None)):
    return {"ads_id": ads_id}

request header verification from fastapi import Header, FastAPI app=FastAPI()

@app.get("/items/")
async def read_items(*, user_agent: str = Header(None), users_agent: str = Header(None)):
    return  {"User-Agent": user_agent},{"AAAAA": user_agent},{'ABCD': users_agent}

extra data type

from datetime import datetime, time, timedelta
from uuid import UUID
from uuid import uuid1
from fastapi import Body, FastAPI

status code and error handling

using status code can redirect to error page if error occurs from fastapi import FastAPI, HTTPException from starlette import status from fastapi.exceptions import RequestValidationError #from starlette.responses import PlainTextResponse #from starlette.exception import HTTPException as StarletteHTTPException

class unicornException(Exception):
    def __init__(self, name: str):
        self.name=name

app=FastAPI()
items={"foo": "The foo body"}

#overwrite error handler, it is also ok to use the default handler in Exception. execuation order: error occurs->run method in decorator->run self defined handler
@app.exception_handler(UnicornException)
async def unicorn_exception_handler(request: Request, exc: UnicornException):
    return JSONResponse(
        status_code=418,
        #f allows execuation of {exc.name}
        #returned response body is defined in content
        content={"message", f"Oops!{exc.name} did something..."},
    )

@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
    return PlainTextResponse(str(exc.detail), status_code=exc.status_code)

#request error handling
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
    return PlainTextResponse(str(exc), status_code=400)

@app.post("items/", status_code=status.HTTP_404_NOT_FOUND)
async def create_item(name: str):
    return {"name": name}

@app.get("/items/{item_id}")
async def read_item(item_id: str):
    if item_id not in items:
        raise HTTPException(status_code=HTTP_404_NOT_FOUND,
        detail="Item not found",
        #insert error message in header
        headers={"X-Error": "This is my error message"},
        )
    pass
    return {"item": items[item_id]}

@app.get("/unicorns/{name}")
async def read_unicorn(name: str):
    if name=="yolo":
        raise UnicornException(name=name)
    return {"unicorn_name": name}

template engine and staticFiles

with template engine it is possible to design web application with seperate frontend and backend FastAPI has no default template engine for configuration (not like flask using jinja2), which means that it has high flexibility on template selection

#using jinja2
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates

app = FastAPI()
#mount static directory with name, so that in html can use local files independent of directionary location
app.mount("/static", StaticFiles(directory="static"), name="static")
#import template from local directory 
templates = Jinja2Templates(directory="templates")

@app.get("/data/{data}")
async def read_data(request: Request, data: str):
#In the returned Template response, the context object of the request must be together with the incoming parameters in the same dictionary
    # "data" is a key in html, value data will be passed into html with this function
    return templates.TemplateResponse("index.html", {"request": request, "data": data})

Form data

form is able to extract table data from html

from starlette.requests import Request
from fastapi import FastAPI, Form
from starlette.templating import jinja2Templates

app=FastAPI()
templates=jinja2Templates(directory="template")

@app.post("/user/")
#...is empty parameter, means this parameter has to be filled,otherwise exception
async def users(username: str=Form(...), password: str=Form(...)):
    return {"username": username, "password":password}

@app.get("/")
async def index(request: Request):
    #to use jinja2 template the request parameter is obligatory
    return templates.TemplateResponse("post.html", {"request":request})

jsonable_encoder

convert python data into json format

from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel

fake_db={}

class Item(BaseModel):
    title: str
    timestamp: datetime
    description: str=None

app=FastAPI()

#put to update an item, if the specifed item doesnot exist, it will create a new item
@app.put("item/{id}")
def update_item(id:str, item:Item):
    json_compatible_item_data=jsonable_encoder(item)
    fake_db[id]=json_compatible_item_data

dependency

run dependent method before the running method and get the return of dependent method back to running method

from fastapi import Depends, FastAPI
import time

app=FastAPI()

async def common_parameters(q:str=None, skip=int=0, limit: int=100):
    limit=66
    return {"q":q, "skip":skip, "limit":limit}

fake_items_db=[{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]

#query parameter class
class CommonQueryParams:
    def __init__(self, q: str=None, skip: int=0, limit: int=100):
        self.q=q
        self.skip=skip
        self.limit=limit

#depends on function
@app.get("/items/")
async def read_items(commons: dict=Depends(common_parameters)):
    common["skip"]+=10
    return common

#depends on class, so that common is declared as query parameter
async def read_items(commons: CommonQueryParams=Depends(CommonQueryParams)):
    response={}
    if commons.q:
        response.update({"q": commons.q})
    
    items=fakeitems_db[commons.skip:commons.skip+commons.limit]
    response.update({"items": items})
    return response

Authority(OAuth2)

from typing import Optional
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from passlib.context import CryptContext # libraryto deal with hash encryption

SECRET_KEY="09dsffs"
ALGORITHM="HS256"

#instantize hash encryption class
pwd_context=CryptContext(schemes=["bcrypt"], deprecated="auto")

app=FastAPI()
#instantize the OAuth2 class
oauth2_scheme=OAuth2PasswordBearer(tokenUrl="/token")

fake_users_db={
    "johndoe":{
        "username":"johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "12345",
        "disabled": False,
    }
}

class User(BaseModel):
    username: str
    email: Optional[str]= None
    full_name: Optional[str]=None
    disabled: Optional[bool]=None

class Token(BaseModel):
    access_token: str
    token_tpye: str

def fake_decode_token(token):
    return User(username=token+"fakedecoded", email="abc@example.com", full_name="john Doe")

#get user info(token=user info) from oauth2 and formulate it into User format
async def get_current_user(token: str=Depends(oauth2_scheme)):

    payload=jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    username: str=payload.get("sub")
    user=get_user(fake_users_db, username=username)
    #this is only an example decoding of token
    #user=fake_decode_token(token)
    return user

#return bool 
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

#create jwt token with valid period
def create_access_token(*,data:dict, expires_delta: timedelta=None):
    to_encode=data.copy()
    expire=datetime.utcnow()+expires_delta
    #append expire date to username
    to_encode.update({"exp":expire})
    encoded_jwt=jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

#login with jwt access token
#this is only an example, the password and username is stored uncrypted in header, which is not safe
@app.post("/token", response_model=Token)
async def login_jwt_token(form_data: OAuth2PasswordRequestForm = Depends()):
    #first to veriry username and passoword, here is neglected
    #second to create token
    access_token=create_access_token(data={"sub": form_data.username}, expires_delta=timedelta(minutes=30))
    #or simple use username as token
    #access_token=form_data.username
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User=Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="inactive user")
    return current_user

Okta

1) add .env file with following contents

//xxxx represents the client ID and client secret generated with your Okta account
OKTA_CLIENT_ID=xxxx
OKTA_CLIENT_SECRET=xxxx
OKTA_ISSUER="https://SOMETHING_HERE.oktapreview.com/oauth2/default"
OKTA_AUDIENCE="api://default" 2) Retrieving an Access Token in FastAPI calling an external authentication endpoint like Okta requires a bit more custom code and since the documentation’s API call originates from the browser, further security considerations are needed. The easiest way to request an access token is to use the Python HTTPX library to call the Okta /token endpoint from your API.

#install httpx
pip install httpx
pip freeze > requirements.txt

#main.py
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.security import OAuth2PasswordBearer
import httpx
from starlette.config import Config

# Load environment variables
config = Config('.env')
app = FastAPI()

# Define the auth scheme and access token URL
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token')

# Call the Okta API to get an access token
def retrieve_token(authorization, issuer, scope='items'):
    headers = {
        'accept': 'application/json',
        'authorization': authorization,
        'cache-control': 'no-cache',
        'content-type': 'application/x-www-form-urlencoded'
    }
    data = {
        'grant_type': 'client_credentials',
        'scope': scope,
    }
    url = issuer + '/v1/token'

    response = httpx.post(url, headers=headers, data=data)

    if response.status_code == httpx.codes.OK:
        return response.json()
    else:
        raise HTTPException(status_code=400, detail=response.text)


# Get auth token endpoint
@app.post('/token')
def login(request: Request):
    return retrieve_token(
        request.headers['authorization'],
        config('OKTA_ISSUER'),
        'items'
    )

3) Creating a Protected Endpoint

# Add these to your imports
from typing import List
from pydantic import BaseModel

...

# Validate the token
def validate(token: str = Depends(oauth2_scheme)):
    # TODO: Add token validation logic
    return True


# Data model
class Item(BaseModel):
    id: int
    name: str


# Protected, get items route
@app.get('/items', response_model=List[Item])
def read_items(valid: bool = Depends(validate)):
    return [
        Item.parse_obj({'id': 1, 'name': 'red ball'}),
        Item.parse_obj({'id': 2, 'name': 'blue square'}),
        Item.parse_obj({'id': 3, 'name': 'purple ellipse'}),
    ]

4) Validating Access Tokens Remotely

def validate_remotely(token, issuer, clientId, clientSecret):
    headers = {
        'accept': 'application/json',
        'cache-control': 'no-cache',
        'content-type': 'application/x-www-form-urlencoded',
    }
    data = {
        'client_id': clientId,
        'client_secret': clientSecret,
        'token': token,
    }
    url = issuer + '/v1/introspect'

    response = httpx.post(url, headers=headers, data=data)

    return response.status_code == httpx.codes.OK and response.json()['active']

def validate(token: str = Depends(oauth2_scheme)):
    res = validate_remotely(
        token,
        config('OKTA_ISSUER'),
        config('OKTA_CLIENT_ID'),
        config('OKTA_CLIENT_SECRET')
    )

    if res:
        return True
    else:
        raise HTTPException(status_code=400)

5) validating access token locally access tokens are generally short-lived (an hour by default), you might prefer to validate the tokens locally

#pip install okta_jwt
#pip freeze > requirements.txt

...
from okta_jwt.jwt import validate_token as validate_locally

...

def validate(token: str = Depends(oauth2_scheme)):
    try:
        res = validate_locally(
            token,
            config('OKTA_ISSUER'),
            config('OKTA_AUDIENCE'),
            config('OKTA_CLIENT_ID')
        )
        return bool(res)
    except Exception:
        raise HTTPException(status_code=403)
...

Blueprint (route)

use Include_route to register the children API router on the core application route

import time
from typing import List
from starlette.templating import Jinja2Templates
from fastapi import Depends, FastAPI, HTTPException
from starlette.staticfiles import StaticFiles
from starlette.templating import Jinja2Templates
from app import models
from app.database.database import SessionLocal, engine
from app.home import user,index

app = FastAPI()
app.mount("/static", StaticFiles(directory="app/static"), name="static")
templates = Jinja2Templates(directory="templates")
#register  userRouter on the app
app.include_router(index.userRouter)
#defining subroute with prefix option, so that in the children router no prefix for every router is needed
app.include_router(user.userRouter,prefix="/user")

#use APIRouter class to create routers for different views, the views can be registered on the one or multi routers
# user.pyfrom starlette.templating import Jinja2Templatesfrom app import schemas, models
from app.database.database import get_db
from app.home import crud
from fastapi import Depends, HTTPException, Form
from sqlalchemy.orm import Session
from app.models import User
from sqlalchemy.orm import Session
from fastapi import APIRouter, HTTPException,Request
from fastapi.responses import RedirectResponse

userRouter = APIRouter()
templates = Jinja2Templates(directory="app/templates")

@userRouter.post("/login/", response_model=schemas.UserOut)
async def login(*,request: Request, db: Session = Depends(get_db), username: str = Form(None), password: str = Form(None),):
    if request.method == "POST":
        db_user = db.query(models.User).filter(User.username == username).first()
        if not db_user:
            raise HTTPException(status_code=400, detail="not exist...")
        print("pass!!")
        return RedirectResponse('/index')

    return templates.TemplateResponse("user/login.html", {"request": request})

async def userList(*,request: Request,db: Session = Depends(get_db)):
    userList = db.query(models.User).all()
    return templates.TemplateResponse("user/user-index.html", {"request": request,'userList':userList})
    #methods: request method; path: access path; endpoint: backend method
    userRouter.add_api_route(methods=['GET','POST'],path="/login",endpoint=login)
    userRouter.add_api_route(methods=['GET','POST'],path="/list",endpoint=userList) ### prefix, tags, dependencies, response include sub router in the main router with sub-prefix and dependencies. if include fails then error response is given. the prefix and tags are inheritated by the funciton in this router, and they can be overwritten if they are also defined in the function router

app.include_router(items.router,
    prefix="/items",
    tags=["items"],
    dependencies=[Depends(get_token_header)],
    responses={404: {"description": "Not found"}},
)

Middleware

middleware do something after request is arrived and before the request is processed by fastAPI

# this is server A, which request contents on Server B
#callback time calculation
import time
from fastapi import FastAPI
from starlette.requests import Request
app = FastAPI()
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    print(response.headers)
    return response

@app.get("/")
async def main():
    return {"message": "Hello World"}

if __name__ == '__main__':
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)

CORS (Cross-Origin Resource Sharing )

CORS is used to share resources on differnet servers

# this is Server B, which is configured to allow request from A to request contents on B
# redirect middleware & authorize hosts
from fastapi import FastAPI
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(HTTPSRedirectMiddleware)
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com", "*.example.com"])

origins = [
"https://gzky.live",
"https://google.com",
"http://localhost:5000", # allow access resource on localhost:5000
"http://localhost:8000",]
app.add_middleware(
    CORSMiddleware, #add CORS middleware on Server B
    allow_origins=origins,   
    allow_credentials=True, 
    allow_methods=["*"],   
    allow_headers=["*"],)

@app.get("/")
async def main():
    return {"message": "Hello World"}

background tasks

tasks that are important for operation after request but donot have to be done before response can be set in background task

#asyn call backend method as task with background_tasks
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
#function to write logs
def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)

#background_tasks: BackgroundTasks will automatically assign an instance to background_tasks
def get_query(background_tasks: BackgroundTasks, q: str = None):
    if q:
        message = f"found query: {q}\n"
        #add message to background task wirte_log, add_task(method_name, arguments)
        background_tasks.add_task(write_log, message)
    return q

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

database

DB configuration

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///test.db"

#allow sqlite run with multi-threads
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
#create a session between python and database
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
#data modal instanlization
Base = declarative_base()
#
Base.metadata.create_all(bind=engine)

table structure

sql database data model class M_User(Base): tablename = “users”

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String) 
    is_active = Column(Boolean, default=True)

schemas

data type defined in fast API

class UserBase(BaseModel):
    email: str

class UserCreate(UserBase):
    password: str

class User(UserBase):
    id: int
    is_active: bool
    class Config:
        #orm(object relational mapping)help data compatible between database and python data model
        orm_mode = True

###crud operation functions to interact with database

def get_db():
    try:
        db = SessionLocal()
        yield db
    finally:
        db.close()
        print('database is closed')


#function to get user
def get_user(db: Session, user_id: int):
    CCCCCC = db.query(M_User).filter(M_User.id == user_id).first()
    print('CCCCCC :', CCCCCC)
    return CCCCCC
#function to insert user
def db_create_user(db: Session, user: UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = M_User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

request

#get and post request
@app.post("/users/", response_model=User)
#Depends can avoid concurrency on same resource
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    return db_create_user(db=db, user=user)