This commit is contained in:
Andrii Kysylevskyi 2025-12-16 21:09:33 +00:00 committed by GitHub
commit c0a27b1f49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1118 additions and 0 deletions

View File

@ -0,0 +1,341 @@
# NoSQL Databases { #nosql-databases }
**FastAPI** doesn't require you to use a SQL (relational) database.
Here we'll see an example using <a href="https://cassandra.apache.org/" class="external-link" target="_blank">Apache Cassandra</a>, a popular distributed NoSQL database.
We'll also show how <a href="https://www.scylladb.com/" class="external-link" target="_blank">ScyllaDB</a>, a Cassandra-compatible database, works using the exact same code.
/// tip
FastAPI doesn't force you to use any specific database. This tutorial demonstrates Cassandra/ScyllaDB, but you can use other databases with their respective Python drivers.
///
## Install Dependencies { #install-dependencies }
First, make sure you create your [virtual environment](../virtual-environments.md){.internal-link target=_blank}, activate it, and then install the Cassandra driver:
<div class="termy">
```console
$ pip install cassandra-driver
---> 100%
```
</div>
/// note
The same `cassandra-driver` works with both Apache Cassandra and ScyllaDB. This is what makes ScyllaDB a true drop-in replacement.
///
/// tip | Python 3.12+
If you're using Python 3.12 or newer, you'll also need an event loop implementation. Install `gevent` for best compatibility:
<div class="termy">
```console
$ pip install gevent
---> 100%
```
</div>
The `asyncore` module was removed in Python 3.12, so `cassandra-driver` requires an alternative event loop like gevent or libev.
///
## Set Up Docker Containers { #set-up-docker-containers }
For local development and testing, you can use Docker Compose to run both Cassandra and ScyllaDB.
Create a `docker-compose.yml` file:
```yaml
services:
cassandra:
image: cassandra:4.1
container_name: fastapi-cassandra
ports:
- "9043:9042"
environment:
- CASSANDRA_CLUSTER_NAME=test-cluster
healthcheck:
test: ["CMD-SHELL", "cqlsh -e 'DESCRIBE KEYSPACES'"]
interval: 10s
timeout: 5s
retries: 10
volumes:
- cassandra_data:/var/lib/cassandra
scylladb:
image: scylladb/scylla:2025.1.4
container_name: fastapi-scylladb
command: --reactor-backend epoll --smp 1 --memory 1G --overprovisioned 1 --api-address 0.0.0.0
ports:
- "9042:9042"
healthcheck:
test: ["CMD-SHELL", "cqlsh -e 'DESCRIBE KEYSPACES'"]
interval: 10s
timeout: 5s
retries: 10
volumes:
- scylladb_data:/var/lib/scylla
volumes:
cassandra_data:
scylladb_data:
```
Start the containers:
<div class="termy">
```console
$ docker-compose up -d
```
</div>
/// tip
Notice that both containers expose port 9042 (the CQL native protocol port). We map them to different host ports (9042 and 9043) so both can run simultaneously for comparison.
///
## Create the App { #create-the-app }
We'll create a task management API that demonstrates CRUD operations with Cassandra.
### Import and Create Models { #import-and-create-models }
We use **Pydantic models** for data validation, just like with SQL databases:
{* ../../docs_src/nosql_databases/tutorial001.py ln[1:17] hl[1:5,8:17] *}
/// tip
Unlike SQL databases, Cassandra uses UUIDs for primary keys. This works great in distributed systems because UUIDs can be generated independently on any node without conflicts.
///
### Create the Database Connection { #create-the-database-connection }
Create a connection class that manages the Cassandra cluster, session, and schema:
{* ../../docs_src/nosql_databases/tutorial001.py ln[20:59] hl[20:22,24:26,28:35,37:51] *}
Notice:
* **Cluster** connects to Cassandra nodes (line 21)
* **Session** executes CQL queries (line 25)
* **Keyspace** is like a database/schema in SQL (line 26)
* **Replication** strategy determines how data is distributed
/// note
`SimpleStrategy` with `replication_factor: 1` is for development. In production, use `NetworkTopologyStrategy` with multiple replicas across data centers.
///
### Initialize the Database { #initialize-the-database }
Set up the application and database connection using the modern `lifespan` context manager:
{* ../../docs_src/nosql_databases/tutorial001.py ln[65:81] hl[65,68:74,77] *}
/// tip
We use the `lifespan` context manager to handle startup and shutdown events. This is the recommended approach in modern FastAPI applications. See the <a href="https://fastapi.tiangolo.com/advanced/events/" class="internal-link" target="_blank">events documentation</a> for more details.
///
### Create a Task { #create-a-task }
Add a **POST** endpoint to create tasks:
{* ../../docs_src/nosql_databases/tutorial001.py ln[84:93] hl[84,85,86:92] *}
Key points:
* Generate UUID for primary key
* Use CQL parameterized queries (secure against injection)
* Use `toTimestamp(now())` for Cassandra timestamps
### Read Tasks { #read-tasks }
Add a **GET** endpoint to list all tasks:
{* ../../docs_src/nosql_databases/tutorial001.py ln[96:104] hl[96,97,98:99,100:103] *}
### Read One Task { #read-one-task }
Add a **GET** endpoint for a specific task:
{* ../../docs_src/nosql_databases/tutorial001.py ln[107:112] hl[107,108,109:110,111] *}
### Update a Task { #update-a-task }
Add a **PUT** endpoint to update tasks:
{* ../../docs_src/nosql_databases/tutorial001.py ln[115:131] hl[115,116,117:121,123:127] *}
### Delete a Task { #delete-a-task }
Add a **DELETE** endpoint:
{* ../../docs_src/nosql_databases/tutorial001.py ln[134:144] hl[134,135,136:140,142:143] *}
## Run the App { #run-the-app }
Save the code to `main.py` and run it:
<div class="termy">
```console
$ fastapi dev main.py
╭────────── FastAPI CLI - Development mode ───────────╮
│ │
│ Serving at: http://127.0.0.1:8000 │
│ │
│ API docs: http://127.0.0.1:8000/docs │
│ │
│ Running in development mode, for production use: │
│ │
│ fastapi run │
│ │
╰─────────────────────────────────────────────────────╯
INFO: Will watch for changes in these directories: ['/home/user/code']
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [2248755] using WatchFiles
INFO: Started server process [2248757]
INFO: Waiting for application startup.
INFO: Application startup complete.
```
</div>
## Check the API Docs { #check-the-api-docs }
Open your browser at <a href="http://127.0.0.1:8000/docs" class="external-link" target="_blank">http://127.0.0.1:8000/docs</a>.
You will see the automatic interactive API documentation (provided by Swagger UI):
<img src="/img/tutorial/nosql-databases/image01.png">
## Use ScyllaDB Instead { #use-scylladb-instead }
**ScyllaDB is Cassandra-compatible**, so you can use it with the same code.
To use ScyllaDB instead of Cassandra, change the **hostname** in the connection:
```python
# Cassandra version (tutorial001.py)
class CassandraConnection:
def __init__(self, hosts=["cassandra"], port=9042):
# ... rest of the code stays the same
```
Change to:
```python
# ScyllaDB version (tutorial001_scylla.py)
class ScyllaDBConnection:
def __init__(self, hosts=["scylladb"], port=9042):
# ... rest of the code stays the same
```
That's it! Everything else is **identical**:
* ✅ Same Cassandra driver (`cassandra-driver`)
* ✅ Same CQL queries
* ✅ Same data models
* ✅ Same API endpoints
* ✅ Same behavior
The complete ScyllaDB version is in `docs_src/nosql_databases/tutorial001_scylla.py` - the only difference is the hostname.
## Production Considerations { #production-considerations }
### Connection Pooling { #connection-pooling }
The Cassandra driver automatically manages connection pools. For production, configure:
```python
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
profile = ExecutionProfile(
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()),
request_timeout=15
)
cluster = Cluster(
hosts=['node1', 'node2', 'node3'],
execution_profiles={EXEC_PROFILE_DEFAULT: profile}
)
```
### Consistency Levels { #consistency-levels }
Cassandra allows tuning consistency per query:
```python
from cassandra.query import SimpleStatement
from cassandra import ConsistencyLevel
query = SimpleStatement(
"SELECT * FROM tasks WHERE id = %s",
consistency_level=ConsistencyLevel.QUORUM
)
session.execute(query, (task_id,))
```
### Error Handling { #error-handling }
Add proper error handling for production:
```python
from cassandra.cluster import NoHostAvailable
from cassandra import OperationTimedOut
try:
session.execute(query)
except NoHostAvailable:
raise HTTPException(status_code=503, detail="Database unavailable")
except OperationTimedOut:
raise HTTPException(status_code=504, detail="Query timeout")
```
### Schema Migrations { #schema-migrations }
For production, use migration tools:
* <a href="https://github.com/Cobliteam/cassandra-migrate" class="external-link" target="_blank">cassandra-migrate</a>
* Custom CQL scripts with version tracking
* Application-level schema management
## Learn More { #learn-more }
This is a quick introduction. For more advanced topics, see:
* <a href="https://cassandra.apache.org/doc/latest/" class="external-link" target="_blank">Cassandra Documentation</a>
* <a href="https://opensource.docs.scylladb.com/" class="external-link" target="_blank">ScyllaDB Documentation</a>
* <a href="https://docs.datastax.com/en/developer/python-driver/" class="external-link" target="_blank">DataStax Python Driver Documentation</a>
## Recap { #recap }
FastAPI works great with NoSQL databases like Cassandra and ScyllaDB:
* ✅ Use standard Python drivers
* ✅ Leverage Pydantic for data validation
* ✅ Get automatic API documentation
* ✅ Enjoy type safety and editor support
* ✅ Switch between compatible databases with minimal changes
FastAPI works well with NoSQL databases, providing the same benefits as with SQL databases.

View File

@ -153,6 +153,7 @@ nav:
- tutorial/middleware.md
- tutorial/cors.md
- tutorial/sql-databases.md
- tutorial/nosql-databases.md
- tutorial/bigger-applications.md
- tutorial/background-tasks.md
- tutorial/metadata.md

View File

@ -0,0 +1,147 @@
from contextlib import asynccontextmanager
from typing import List, Union
from uuid import UUID, uuid4
from cassandra.cluster import Cluster
from fastapi import Depends, FastAPI, HTTPException
from pydantic import BaseModel, Field
class TaskBase(BaseModel):
title: str
description: Union[str, None] = None
status: str = "pending"
class TaskCreate(TaskBase):
pass
class Task(TaskBase):
id: UUID = Field(default_factory=uuid4)
class CassandraConnection:
def __init__(self, hosts=None, port=9042):
self.cluster = Cluster(hosts or ["cassandra"], port=port)
self.session = None
self.keyspace = "task_manager"
def connect(self):
self.session = self.cluster.connect()
self.create_keyspace()
self.session.set_keyspace(self.keyspace)
self.create_table()
def create_keyspace(self):
self.session.execute(
f"""
CREATE KEYSPACE IF NOT EXISTS {self.keyspace}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}
"""
)
def create_table(self):
self.session.execute(
"""
CREATE TABLE IF NOT EXISTS tasks (
id uuid PRIMARY KEY,
title text,
description text,
status text,
created_at timestamp,
updated_at timestamp
)
"""
)
def close(self):
if self.session:
self.session.shutdown()
if self.cluster:
self.cluster.shutdown()
db = CassandraConnection()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
db.connect()
yield
# Shutdown
db.close()
app = FastAPI(lifespan=lifespan)
def get_db():
return db.session
@app.post("/tasks/", response_model=Task)
def create_task(task: TaskCreate, session=Depends(get_db)):
task_id = uuid4()
query = """
INSERT INTO tasks (id, title, description, status, created_at, updated_at)
VALUES (%s, %s, %s, %s, toTimestamp(now()), toTimestamp(now()))
"""
session.execute(query, (task_id, task.title, task.description, task.status))
return Task(
id=task_id, title=task.title, description=task.description, status=task.status
)
@app.get("/tasks/", response_model=List[Task])
def read_tasks(session=Depends(get_db)):
query = "SELECT id, title, description, status FROM tasks"
rows = session.execute(query)
return [
Task(id=row.id, title=row.title, description=row.description, status=row.status)
for row in rows
]
@app.get("/tasks/{task_id}", response_model=Task)
def read_task(task_id: UUID, session=Depends(get_db)):
query = "SELECT id, title, description, status FROM tasks WHERE id = %s"
row = session.execute(query, (task_id,)).one()
if not row:
raise HTTPException(status_code=404, detail="Task not found")
return Task(
id=row.id, title=row.title, description=row.description, status=row.status
)
@app.put("/tasks/{task_id}", response_model=Task)
def update_task(task_id: UUID, task: TaskCreate, session=Depends(get_db)):
# Check if task exists
check_query = "SELECT id FROM tasks WHERE id = %s"
existing = session.execute(check_query, (task_id,)).one()
if not existing:
raise HTTPException(status_code=404, detail="Task not found")
update_query = """
UPDATE tasks
SET title = %s, description = %s, status = %s, updated_at = toTimestamp(now())
WHERE id = %s
"""
session.execute(update_query, (task.title, task.description, task.status, task_id))
return Task(
id=task_id, title=task.title, description=task.description, status=task.status
)
@app.delete("/tasks/{task_id}")
def delete_task(task_id: UUID, session=Depends(get_db)):
# Check if task exists
check_query = "SELECT id FROM tasks WHERE id = %s"
existing = session.execute(check_query, (task_id,)).one()
if not existing:
raise HTTPException(status_code=404, detail="Task not found")
delete_query = "DELETE FROM tasks WHERE id = %s"
session.execute(delete_query, (task_id,))
return {"ok": True}

View File

@ -0,0 +1,147 @@
from contextlib import asynccontextmanager
from typing import List, Union
from uuid import UUID, uuid4
from cassandra.cluster import Cluster
from fastapi import Depends, FastAPI, HTTPException
from pydantic import BaseModel, Field
class TaskBase(BaseModel):
title: str
description: Union[str, None] = None
status: str = "pending"
class TaskCreate(TaskBase):
pass
class Task(TaskBase):
id: UUID = Field(default_factory=uuid4)
class ScyllaDBConnection:
def __init__(self, hosts=None, port=9042):
self.cluster = Cluster(hosts or ["scylladb"], port=port)
self.session = None
self.keyspace = "task_manager"
def connect(self):
self.session = self.cluster.connect()
self.create_keyspace()
self.session.set_keyspace(self.keyspace)
self.create_table()
def create_keyspace(self):
self.session.execute(
f"""
CREATE KEYSPACE IF NOT EXISTS {self.keyspace}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}
"""
)
def create_table(self):
self.session.execute(
"""
CREATE TABLE IF NOT EXISTS tasks (
id uuid PRIMARY KEY,
title text,
description text,
status text,
created_at timestamp,
updated_at timestamp
)
"""
)
def close(self):
if self.session:
self.session.shutdown()
if self.cluster:
self.cluster.shutdown()
db = ScyllaDBConnection()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
db.connect()
yield
# Shutdown
db.close()
app = FastAPI(lifespan=lifespan)
def get_db():
return db.session
@app.post("/tasks/", response_model=Task)
def create_task(task: TaskCreate, session=Depends(get_db)):
task_id = uuid4()
query = """
INSERT INTO tasks (id, title, description, status, created_at, updated_at)
VALUES (%s, %s, %s, %s, toTimestamp(now()), toTimestamp(now()))
"""
session.execute(query, (task_id, task.title, task.description, task.status))
return Task(
id=task_id, title=task.title, description=task.description, status=task.status
)
@app.get("/tasks/", response_model=List[Task])
def read_tasks(session=Depends(get_db)):
query = "SELECT id, title, description, status FROM tasks"
rows = session.execute(query)
return [
Task(id=row.id, title=row.title, description=row.description, status=row.status)
for row in rows
]
@app.get("/tasks/{task_id}", response_model=Task)
def read_task(task_id: UUID, session=Depends(get_db)):
query = "SELECT id, title, description, status FROM tasks WHERE id = %s"
row = session.execute(query, (task_id,)).one()
if not row:
raise HTTPException(status_code=404, detail="Task not found")
return Task(
id=row.id, title=row.title, description=row.description, status=row.status
)
@app.put("/tasks/{task_id}", response_model=Task)
def update_task(task_id: UUID, task: TaskCreate, session=Depends(get_db)):
# Check if task exists
check_query = "SELECT id FROM tasks WHERE id = %s"
existing = session.execute(check_query, (task_id,)).one()
if not existing:
raise HTTPException(status_code=404, detail="Task not found")
update_query = """
UPDATE tasks
SET title = %s, description = %s, status = %s, updated_at = toTimestamp(now())
WHERE id = %s
"""
session.execute(update_query, (task.title, task.description, task.status, task_id))
return Task(
id=task_id, title=task.title, description=task.description, status=task.status
)
@app.delete("/tasks/{task_id}")
def delete_task(task_id: UUID, session=Depends(get_db)):
# Check if task exists
check_query = "SELECT id FROM tasks WHERE id = %s"
existing = session.execute(check_query, (task_id,)).one()
if not existing:
raise HTTPException(status_code=404, detail="Task not found")
delete_query = "DELETE FROM tasks WHERE id = %s"
session.execute(delete_query, (task_id,))
return {"ok": True}

View File

@ -0,0 +1,15 @@
# Mock cassandra module to avoid Python 3.14+ import issues
# (asyncore removed in 3.12, cassandra-driver needs event loop)
import sys
from types import ModuleType
from unittest.mock import MagicMock
mock_cassandra = ModuleType("cassandra")
mock_cassandra_cluster = ModuleType("cassandra.cluster")
mock_cassandra_cluster.Cluster = MagicMock
# Set cluster attribute on cassandra module
mock_cassandra.cluster = mock_cassandra_cluster
sys.modules["cassandra"] = mock_cassandra
sys.modules["cassandra.cluster"] = mock_cassandra_cluster

View File

@ -0,0 +1,467 @@
import importlib
from typing import Any, List
from unittest.mock import MagicMock, patch
from uuid import UUID
import pytest
from dirty_equals import IsDict, IsUUID
from fastapi.testclient import TestClient
from inline_snapshot import snapshot
class MockRow:
def __init__(self, id: UUID, title: str, description: str, status: str):
self.id = id
self.title = title
self.description = description
self.status = status
class MockResult:
def __init__(self, rows: List[MockRow]):
self._rows = rows
self._iter = iter(rows)
def __iter__(self):
return self._iter
def one(self):
if self._rows:
return self._rows[0]
return None
@pytest.fixture(
name="client",
params=["tutorial001", "tutorial001_scylla"],
)
def get_client(request: pytest.FixtureRequest):
mock_session = MagicMock()
mock_cluster_instance = MagicMock()
mock_cluster_instance.connect.return_value = mock_session
tasks_store: dict[UUID, dict[str, Any]] = {}
with patch("cassandra.cluster.Cluster") as mock_cluster:
mock_cluster.return_value = mock_cluster_instance
def mock_execute(query: str, params: tuple = ()):
if "DELETE FROM tasks" in query:
task_id = params[0]
if task_id in tasks_store:
del tasks_store[task_id]
return None
if "UPDATE tasks" in query:
title, description, status, task_id = params
if task_id in tasks_store:
tasks_store[task_id].update(
{"title": title, "description": description, "status": status}
)
return None
if "INSERT INTO tasks" in query:
task_id, title, description, status = params
tasks_store[task_id] = {
"id": task_id,
"title": title,
"description": description,
"status": status,
}
return None
if "SELECT" in query and "WHERE" not in query:
rows = [
MockRow(
id=task["id"],
title=task["title"],
description=task["description"],
status=task["status"],
)
for task in tasks_store.values()
]
return MockResult(rows)
if "SELECT" in query and "WHERE id = %s" in query:
task_id = params[0]
if task_id in tasks_store:
task = tasks_store[task_id]
return MockResult(
[
MockRow(
id=task["id"],
title=task["title"],
description=task["description"],
status=task["status"],
)
]
)
return MockResult([])
return None
mock_session.execute.side_effect = mock_execute
mod = importlib.import_module(f"docs_src.nosql_databases.{request.param}")
importlib.reload(mod)
with TestClient(mod.app) as c:
yield c
def test_crud_app(client: TestClient):
response = client.get("/tasks/")
assert response.status_code == 200, response.text
assert response.json() == []
response = client.post(
"/tasks/",
json={
"title": "Buy groceries",
"description": "Milk, eggs, bread",
"status": "pending",
},
)
assert response.status_code == 200, response.text
data = response.json()
assert "id" in data
assert data["title"] == "Buy groceries"
assert data["description"] == "Milk, eggs, bread"
assert data["status"] == "pending"
task_id = data["id"]
response = client.get(f"/tasks/{task_id}")
assert response.status_code == 200, response.text
assert response.json() == snapshot(
{
"id": IsUUID(4),
"title": "Buy groceries",
"description": "Milk, eggs, bread",
"status": "pending",
}
)
response = client.post(
"/tasks/",
json={
"title": "Walk the dog",
"description": "In the park",
"status": "pending",
},
)
assert response.status_code == 200, response.text
response = client.post(
"/tasks/",
json={"title": "Write code", "description": None, "status": "in_progress"},
)
assert response.status_code == 200, response.text
response = client.get("/tasks/")
assert response.status_code == 200, response.text
assert len(response.json()) == 3
response = client.put(
f"/tasks/{task_id}",
json={
"title": "Buy groceries (Updated)",
"description": "Milk, eggs, bread, cheese",
"status": "completed",
},
)
assert response.status_code == 200, response.text
assert response.json() == snapshot(
{
"id": IsUUID(4),
"title": "Buy groceries (Updated)",
"description": "Milk, eggs, bread, cheese",
"status": "completed",
}
)
response = client.delete(f"/tasks/{task_id}")
assert response.status_code == 200, response.text
assert response.json() == snapshot({"ok": True})
response = client.get(f"/tasks/{task_id}")
assert response.status_code == 404, response.text
response = client.delete(f"/tasks/{task_id}")
assert response.status_code == 404, response.text
assert response.json() == snapshot({"detail": "Task not found"})
response = client.put(
f"/tasks/{task_id}",
json={
"title": "Updated non-existent task",
"description": "This should fail",
"status": "pending",
},
)
assert response.status_code == 404, response.text
assert response.json() == snapshot({"detail": "Task not found"})
def test_openapi_schema(client: TestClient):
response = client.get("/openapi.json")
assert response.status_code == 200, response.text
assert response.json() == snapshot(
{
"openapi": "3.1.0",
"info": {"title": "FastAPI", "version": "0.1.0"},
"paths": {
"/tasks/": {
"post": {
"summary": "Create Task",
"operationId": "create_task_tasks__post",
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/TaskCreate"
}
}
},
},
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/Task"}
}
},
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
},
},
},
},
"get": {
"summary": "Read Tasks",
"operationId": "read_tasks_tasks__get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"type": "array",
"items": {
"$ref": "#/components/schemas/Task"
},
"title": "Response Read Tasks Tasks Get",
}
}
},
}
},
},
},
"/tasks/{task_id}": {
"get": {
"summary": "Read Task",
"operationId": "read_task_tasks__task_id__get",
"parameters": [
{
"name": "task_id",
"in": "path",
"required": True,
"schema": {
"type": "string",
"format": "uuid",
"title": "Task Id",
},
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/Task"}
}
},
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
},
},
},
},
"put": {
"summary": "Update Task",
"operationId": "update_task_tasks__task_id__put",
"parameters": [
{
"name": "task_id",
"in": "path",
"required": True,
"schema": {
"type": "string",
"format": "uuid",
"title": "Task Id",
},
}
],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/TaskCreate"
}
}
},
},
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/Task"}
}
},
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
},
},
},
},
"delete": {
"summary": "Delete Task",
"operationId": "delete_task_tasks__task_id__delete",
"parameters": [
{
"name": "task_id",
"in": "path",
"required": True,
"schema": {
"type": "string",
"format": "uuid",
"title": "Task Id",
},
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
},
},
},
},
},
},
"components": {
"schemas": {
"HTTPValidationError": {
"properties": {
"detail": {
"items": {
"$ref": "#/components/schemas/ValidationError"
},
"type": "array",
"title": "Detail",
}
},
"type": "object",
"title": "HTTPValidationError",
},
"Task": {
"properties": {
"title": {"type": "string", "title": "Title"},
"description": IsDict(
{
"anyOf": [{"type": "string"}, {"type": "null"}],
"title": "Description",
}
)
| IsDict(
# TODO: remove when deprecating Pydantic v1
{"type": "string", "title": "Description"}
),
"status": {
"type": "string",
"default": "pending",
"title": "Status",
},
"id": {"type": "string", "format": "uuid", "title": "Id"},
},
"type": "object",
"required": ["title"],
"title": "Task",
},
"TaskCreate": {
"properties": {
"title": {"type": "string", "title": "Title"},
"description": IsDict(
{
"anyOf": [{"type": "string"}, {"type": "null"}],
"title": "Description",
}
)
| IsDict(
# TODO: remove when deprecating Pydantic v1
{"type": "string", "title": "Description"}
),
"status": {
"type": "string",
"default": "pending",
"title": "Status",
},
},
"type": "object",
"required": ["title"],
"title": "TaskCreate",
},
"ValidationError": {
"properties": {
"loc": {
"items": {
"anyOf": [{"type": "string"}, {"type": "integer"}]
},
"type": "array",
"title": "Location",
},
"msg": {"type": "string", "title": "Message"},
"type": {"type": "string", "title": "Error Type"},
},
"type": "object",
"required": ["loc", "msg", "type"],
"title": "ValidationError",
},
}
},
}
)