diff --git a/docs/en/docs/tutorial/nosql-databases.md b/docs/en/docs/tutorial/nosql-databases.md new file mode 100644 index 000000000..0c2e79ad5 --- /dev/null +++ b/docs/en/docs/tutorial/nosql-databases.md @@ -0,0 +1,341 @@ +# NoSQL Databases { #nosql-databases } + +**FastAPI** doesn't require you to use a SQL (relational) database. + +Here we'll see an example using Apache Cassandra, a popular distributed NoSQL database. + +We'll also show how ScyllaDB, a Cassandra-compatible database, works using the exact same code. + +/// tip + +FastAPI doesn't force you to use any specific database. This tutorial demonstrates Cassandra/ScyllaDB, but you can use other databases with their respective Python drivers. + +/// + +## Install Dependencies { #install-dependencies } + +First, make sure you create your [virtual environment](../virtual-environments.md){.internal-link target=_blank}, activate it, and then install the Cassandra driver: + +
+
+## Use ScyllaDB Instead { #use-scylladb-instead }
+
+**ScyllaDB is Cassandra-compatible**, so you can use it with the same code.
+
+To use ScyllaDB instead of Cassandra, change the **hostname** in the connection:
+
+```python
+# Cassandra version (tutorial001.py)
+class CassandraConnection:
+ def __init__(self, hosts=["cassandra"], port=9042):
+ # ... rest of the code stays the same
+```
+
+Change to:
+
+```python
+# ScyllaDB version (tutorial001_scylla.py)
+class ScyllaDBConnection:
+ def __init__(self, hosts=["scylladb"], port=9042):
+ # ... rest of the code stays the same
+```
+
+That's it! Everything else is **identical**:
+
+* ✅ Same Cassandra driver (`cassandra-driver`)
+* ✅ Same CQL queries
+* ✅ Same data models
+* ✅ Same API endpoints
+* ✅ Same behavior
+
+The complete ScyllaDB version is in `docs_src/nosql_databases/tutorial001_scylla.py` - the only difference is the hostname.
+
+## Production Considerations { #production-considerations }
+
+### Connection Pooling { #connection-pooling }
+
+The Cassandra driver automatically manages connection pools. For production, configure:
+
+```python
+from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
+from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
+
+profile = ExecutionProfile(
+ load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()),
+ request_timeout=15
+)
+
+cluster = Cluster(
+ hosts=['node1', 'node2', 'node3'],
+ execution_profiles={EXEC_PROFILE_DEFAULT: profile}
+)
+```
+
+### Consistency Levels { #consistency-levels }
+
+Cassandra allows tuning consistency per query:
+
+```python
+from cassandra.query import SimpleStatement
+from cassandra import ConsistencyLevel
+
+query = SimpleStatement(
+ "SELECT * FROM tasks WHERE id = %s",
+ consistency_level=ConsistencyLevel.QUORUM
+)
+session.execute(query, (task_id,))
+```
+
+### Error Handling { #error-handling }
+
+Add proper error handling for production:
+
+```python
+from cassandra.cluster import NoHostAvailable
+from cassandra import OperationTimedOut
+
+try:
+ session.execute(query)
+except NoHostAvailable:
+ raise HTTPException(status_code=503, detail="Database unavailable")
+except OperationTimedOut:
+ raise HTTPException(status_code=504, detail="Query timeout")
+```
+
+### Schema Migrations { #schema-migrations }
+
+For production, use migration tools:
+
+* cassandra-migrate
+* Custom CQL scripts with version tracking
+* Application-level schema management
+
+## Learn More { #learn-more }
+
+This is a quick introduction. For more advanced topics, see:
+
+* Cassandra Documentation
+* ScyllaDB Documentation
+* DataStax Python Driver Documentation
+
+## Recap { #recap }
+
+FastAPI works great with NoSQL databases like Cassandra and ScyllaDB:
+
+* ✅ Use standard Python drivers
+* ✅ Leverage Pydantic for data validation
+* ✅ Get automatic API documentation
+* ✅ Enjoy type safety and editor support
+* ✅ Switch between compatible databases with minimal changes
+
+FastAPI works well with NoSQL databases, providing the same benefits as with SQL databases.
diff --git a/docs/en/mkdocs.yml b/docs/en/mkdocs.yml
index 60d2f977e..1e7a63d71 100644
--- a/docs/en/mkdocs.yml
+++ b/docs/en/mkdocs.yml
@@ -153,6 +153,7 @@ nav:
- tutorial/middleware.md
- tutorial/cors.md
- tutorial/sql-databases.md
+ - tutorial/nosql-databases.md
- tutorial/bigger-applications.md
- tutorial/background-tasks.md
- tutorial/metadata.md
diff --git a/docs_src/nosql_databases/tutorial001.py b/docs_src/nosql_databases/tutorial001.py
new file mode 100644
index 000000000..66369d54e
--- /dev/null
+++ b/docs_src/nosql_databases/tutorial001.py
@@ -0,0 +1,147 @@
+from contextlib import asynccontextmanager
+from typing import List, Union
+from uuid import UUID, uuid4
+
+from cassandra.cluster import Cluster
+from fastapi import Depends, FastAPI, HTTPException
+from pydantic import BaseModel, Field
+
+
+class TaskBase(BaseModel):
+ title: str
+ description: Union[str, None] = None
+ status: str = "pending"
+
+
+class TaskCreate(TaskBase):
+ pass
+
+
+class Task(TaskBase):
+ id: UUID = Field(default_factory=uuid4)
+
+
+class CassandraConnection:
+ def __init__(self, hosts=None, port=9042):
+ self.cluster = Cluster(hosts or ["cassandra"], port=port)
+ self.session = None
+ self.keyspace = "task_manager"
+
+ def connect(self):
+ self.session = self.cluster.connect()
+ self.create_keyspace()
+ self.session.set_keyspace(self.keyspace)
+ self.create_table()
+
+ def create_keyspace(self):
+ self.session.execute(
+ f"""
+ CREATE KEYSPACE IF NOT EXISTS {self.keyspace}
+ WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}
+ """
+ )
+
+ def create_table(self):
+ self.session.execute(
+ """
+ CREATE TABLE IF NOT EXISTS tasks (
+ id uuid PRIMARY KEY,
+ title text,
+ description text,
+ status text,
+ created_at timestamp,
+ updated_at timestamp
+ )
+ """
+ )
+
+ def close(self):
+ if self.session:
+ self.session.shutdown()
+ if self.cluster:
+ self.cluster.shutdown()
+
+
+db = CassandraConnection()
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ # Startup
+ db.connect()
+ yield
+ # Shutdown
+ db.close()
+
+
+app = FastAPI(lifespan=lifespan)
+
+
+def get_db():
+ return db.session
+
+
+@app.post("/tasks/", response_model=Task)
+def create_task(task: TaskCreate, session=Depends(get_db)):
+ task_id = uuid4()
+ query = """
+ INSERT INTO tasks (id, title, description, status, created_at, updated_at)
+ VALUES (%s, %s, %s, %s, toTimestamp(now()), toTimestamp(now()))
+ """
+ session.execute(query, (task_id, task.title, task.description, task.status))
+ return Task(
+ id=task_id, title=task.title, description=task.description, status=task.status
+ )
+
+
+@app.get("/tasks/", response_model=List[Task])
+def read_tasks(session=Depends(get_db)):
+ query = "SELECT id, title, description, status FROM tasks"
+ rows = session.execute(query)
+ return [
+ Task(id=row.id, title=row.title, description=row.description, status=row.status)
+ for row in rows
+ ]
+
+
+@app.get("/tasks/{task_id}", response_model=Task)
+def read_task(task_id: UUID, session=Depends(get_db)):
+ query = "SELECT id, title, description, status FROM tasks WHERE id = %s"
+ row = session.execute(query, (task_id,)).one()
+ if not row:
+ raise HTTPException(status_code=404, detail="Task not found")
+ return Task(
+ id=row.id, title=row.title, description=row.description, status=row.status
+ )
+
+
+@app.put("/tasks/{task_id}", response_model=Task)
+def update_task(task_id: UUID, task: TaskCreate, session=Depends(get_db)):
+ # Check if task exists
+ check_query = "SELECT id FROM tasks WHERE id = %s"
+ existing = session.execute(check_query, (task_id,)).one()
+ if not existing:
+ raise HTTPException(status_code=404, detail="Task not found")
+
+ update_query = """
+ UPDATE tasks
+ SET title = %s, description = %s, status = %s, updated_at = toTimestamp(now())
+ WHERE id = %s
+ """
+ session.execute(update_query, (task.title, task.description, task.status, task_id))
+ return Task(
+ id=task_id, title=task.title, description=task.description, status=task.status
+ )
+
+
+@app.delete("/tasks/{task_id}")
+def delete_task(task_id: UUID, session=Depends(get_db)):
+ # Check if task exists
+ check_query = "SELECT id FROM tasks WHERE id = %s"
+ existing = session.execute(check_query, (task_id,)).one()
+ if not existing:
+ raise HTTPException(status_code=404, detail="Task not found")
+
+ delete_query = "DELETE FROM tasks WHERE id = %s"
+ session.execute(delete_query, (task_id,))
+ return {"ok": True}
diff --git a/docs_src/nosql_databases/tutorial001_scylla.py b/docs_src/nosql_databases/tutorial001_scylla.py
new file mode 100644
index 000000000..cc8808b65
--- /dev/null
+++ b/docs_src/nosql_databases/tutorial001_scylla.py
@@ -0,0 +1,147 @@
+from contextlib import asynccontextmanager
+from typing import List, Union
+from uuid import UUID, uuid4
+
+from cassandra.cluster import Cluster
+from fastapi import Depends, FastAPI, HTTPException
+from pydantic import BaseModel, Field
+
+
+class TaskBase(BaseModel):
+ title: str
+ description: Union[str, None] = None
+ status: str = "pending"
+
+
+class TaskCreate(TaskBase):
+ pass
+
+
+class Task(TaskBase):
+ id: UUID = Field(default_factory=uuid4)
+
+
+class ScyllaDBConnection:
+ def __init__(self, hosts=None, port=9042):
+ self.cluster = Cluster(hosts or ["scylladb"], port=port)
+ self.session = None
+ self.keyspace = "task_manager"
+
+ def connect(self):
+ self.session = self.cluster.connect()
+ self.create_keyspace()
+ self.session.set_keyspace(self.keyspace)
+ self.create_table()
+
+ def create_keyspace(self):
+ self.session.execute(
+ f"""
+ CREATE KEYSPACE IF NOT EXISTS {self.keyspace}
+ WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': 1}}
+ """
+ )
+
+ def create_table(self):
+ self.session.execute(
+ """
+ CREATE TABLE IF NOT EXISTS tasks (
+ id uuid PRIMARY KEY,
+ title text,
+ description text,
+ status text,
+ created_at timestamp,
+ updated_at timestamp
+ )
+ """
+ )
+
+ def close(self):
+ if self.session:
+ self.session.shutdown()
+ if self.cluster:
+ self.cluster.shutdown()
+
+
+db = ScyllaDBConnection()
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ # Startup
+ db.connect()
+ yield
+ # Shutdown
+ db.close()
+
+
+app = FastAPI(lifespan=lifespan)
+
+
+def get_db():
+ return db.session
+
+
+@app.post("/tasks/", response_model=Task)
+def create_task(task: TaskCreate, session=Depends(get_db)):
+ task_id = uuid4()
+ query = """
+ INSERT INTO tasks (id, title, description, status, created_at, updated_at)
+ VALUES (%s, %s, %s, %s, toTimestamp(now()), toTimestamp(now()))
+ """
+ session.execute(query, (task_id, task.title, task.description, task.status))
+ return Task(
+ id=task_id, title=task.title, description=task.description, status=task.status
+ )
+
+
+@app.get("/tasks/", response_model=List[Task])
+def read_tasks(session=Depends(get_db)):
+ query = "SELECT id, title, description, status FROM tasks"
+ rows = session.execute(query)
+ return [
+ Task(id=row.id, title=row.title, description=row.description, status=row.status)
+ for row in rows
+ ]
+
+
+@app.get("/tasks/{task_id}", response_model=Task)
+def read_task(task_id: UUID, session=Depends(get_db)):
+ query = "SELECT id, title, description, status FROM tasks WHERE id = %s"
+ row = session.execute(query, (task_id,)).one()
+ if not row:
+ raise HTTPException(status_code=404, detail="Task not found")
+ return Task(
+ id=row.id, title=row.title, description=row.description, status=row.status
+ )
+
+
+@app.put("/tasks/{task_id}", response_model=Task)
+def update_task(task_id: UUID, task: TaskCreate, session=Depends(get_db)):
+ # Check if task exists
+ check_query = "SELECT id FROM tasks WHERE id = %s"
+ existing = session.execute(check_query, (task_id,)).one()
+ if not existing:
+ raise HTTPException(status_code=404, detail="Task not found")
+
+ update_query = """
+ UPDATE tasks
+ SET title = %s, description = %s, status = %s, updated_at = toTimestamp(now())
+ WHERE id = %s
+ """
+ session.execute(update_query, (task.title, task.description, task.status, task_id))
+ return Task(
+ id=task_id, title=task.title, description=task.description, status=task.status
+ )
+
+
+@app.delete("/tasks/{task_id}")
+def delete_task(task_id: UUID, session=Depends(get_db)):
+ # Check if task exists
+ check_query = "SELECT id FROM tasks WHERE id = %s"
+ existing = session.execute(check_query, (task_id,)).one()
+ if not existing:
+ raise HTTPException(status_code=404, detail="Task not found")
+
+ delete_query = "DELETE FROM tasks WHERE id = %s"
+ session.execute(delete_query, (task_id,))
+ return {"ok": True}
diff --git a/tests/test_tutorial/test_nosql_databases/__init__.py b/tests/test_tutorial/test_nosql_databases/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/test_tutorial/test_nosql_databases/conftest.py b/tests/test_tutorial/test_nosql_databases/conftest.py
new file mode 100644
index 000000000..0c5157d2b
--- /dev/null
+++ b/tests/test_tutorial/test_nosql_databases/conftest.py
@@ -0,0 +1,15 @@
+# Mock cassandra module to avoid Python 3.14+ import issues
+# (asyncore removed in 3.12, cassandra-driver needs event loop)
+import sys
+from types import ModuleType
+from unittest.mock import MagicMock
+
+mock_cassandra = ModuleType("cassandra")
+mock_cassandra_cluster = ModuleType("cassandra.cluster")
+mock_cassandra_cluster.Cluster = MagicMock
+
+# Set cluster attribute on cassandra module
+mock_cassandra.cluster = mock_cassandra_cluster
+
+sys.modules["cassandra"] = mock_cassandra
+sys.modules["cassandra.cluster"] = mock_cassandra_cluster
diff --git a/tests/test_tutorial/test_nosql_databases/test_tutorial001.py b/tests/test_tutorial/test_nosql_databases/test_tutorial001.py
new file mode 100644
index 000000000..442c1b6ce
--- /dev/null
+++ b/tests/test_tutorial/test_nosql_databases/test_tutorial001.py
@@ -0,0 +1,467 @@
+import importlib
+from typing import Any, List
+from unittest.mock import MagicMock, patch
+from uuid import UUID
+
+import pytest
+from dirty_equals import IsDict, IsUUID
+from fastapi.testclient import TestClient
+from inline_snapshot import snapshot
+
+
+class MockRow:
+ def __init__(self, id: UUID, title: str, description: str, status: str):
+ self.id = id
+ self.title = title
+ self.description = description
+ self.status = status
+
+
+class MockResult:
+ def __init__(self, rows: List[MockRow]):
+ self._rows = rows
+ self._iter = iter(rows)
+
+ def __iter__(self):
+ return self._iter
+
+ def one(self):
+ if self._rows:
+ return self._rows[0]
+ return None
+
+
+@pytest.fixture(
+ name="client",
+ params=["tutorial001", "tutorial001_scylla"],
+)
+def get_client(request: pytest.FixtureRequest):
+ mock_session = MagicMock()
+ mock_cluster_instance = MagicMock()
+ mock_cluster_instance.connect.return_value = mock_session
+
+ tasks_store: dict[UUID, dict[str, Any]] = {}
+
+ with patch("cassandra.cluster.Cluster") as mock_cluster:
+ mock_cluster.return_value = mock_cluster_instance
+
+ def mock_execute(query: str, params: tuple = ()):
+ if "DELETE FROM tasks" in query:
+ task_id = params[0]
+ if task_id in tasks_store:
+ del tasks_store[task_id]
+ return None
+
+ if "UPDATE tasks" in query:
+ title, description, status, task_id = params
+ if task_id in tasks_store:
+ tasks_store[task_id].update(
+ {"title": title, "description": description, "status": status}
+ )
+ return None
+
+ if "INSERT INTO tasks" in query:
+ task_id, title, description, status = params
+ tasks_store[task_id] = {
+ "id": task_id,
+ "title": title,
+ "description": description,
+ "status": status,
+ }
+ return None
+
+ if "SELECT" in query and "WHERE" not in query:
+ rows = [
+ MockRow(
+ id=task["id"],
+ title=task["title"],
+ description=task["description"],
+ status=task["status"],
+ )
+ for task in tasks_store.values()
+ ]
+ return MockResult(rows)
+
+ if "SELECT" in query and "WHERE id = %s" in query:
+ task_id = params[0]
+ if task_id in tasks_store:
+ task = tasks_store[task_id]
+ return MockResult(
+ [
+ MockRow(
+ id=task["id"],
+ title=task["title"],
+ description=task["description"],
+ status=task["status"],
+ )
+ ]
+ )
+ return MockResult([])
+
+ return None
+
+ mock_session.execute.side_effect = mock_execute
+
+ mod = importlib.import_module(f"docs_src.nosql_databases.{request.param}")
+ importlib.reload(mod)
+
+ with TestClient(mod.app) as c:
+ yield c
+
+
+def test_crud_app(client: TestClient):
+ response = client.get("/tasks/")
+ assert response.status_code == 200, response.text
+ assert response.json() == []
+
+ response = client.post(
+ "/tasks/",
+ json={
+ "title": "Buy groceries",
+ "description": "Milk, eggs, bread",
+ "status": "pending",
+ },
+ )
+ assert response.status_code == 200, response.text
+ data = response.json()
+ assert "id" in data
+ assert data["title"] == "Buy groceries"
+ assert data["description"] == "Milk, eggs, bread"
+ assert data["status"] == "pending"
+
+ task_id = data["id"]
+ response = client.get(f"/tasks/{task_id}")
+ assert response.status_code == 200, response.text
+ assert response.json() == snapshot(
+ {
+ "id": IsUUID(4),
+ "title": "Buy groceries",
+ "description": "Milk, eggs, bread",
+ "status": "pending",
+ }
+ )
+
+ response = client.post(
+ "/tasks/",
+ json={
+ "title": "Walk the dog",
+ "description": "In the park",
+ "status": "pending",
+ },
+ )
+ assert response.status_code == 200, response.text
+
+ response = client.post(
+ "/tasks/",
+ json={"title": "Write code", "description": None, "status": "in_progress"},
+ )
+ assert response.status_code == 200, response.text
+
+ response = client.get("/tasks/")
+ assert response.status_code == 200, response.text
+ assert len(response.json()) == 3
+
+ response = client.put(
+ f"/tasks/{task_id}",
+ json={
+ "title": "Buy groceries (Updated)",
+ "description": "Milk, eggs, bread, cheese",
+ "status": "completed",
+ },
+ )
+ assert response.status_code == 200, response.text
+ assert response.json() == snapshot(
+ {
+ "id": IsUUID(4),
+ "title": "Buy groceries (Updated)",
+ "description": "Milk, eggs, bread, cheese",
+ "status": "completed",
+ }
+ )
+
+ response = client.delete(f"/tasks/{task_id}")
+ assert response.status_code == 200, response.text
+ assert response.json() == snapshot({"ok": True})
+
+ response = client.get(f"/tasks/{task_id}")
+ assert response.status_code == 404, response.text
+
+ response = client.delete(f"/tasks/{task_id}")
+ assert response.status_code == 404, response.text
+ assert response.json() == snapshot({"detail": "Task not found"})
+
+ response = client.put(
+ f"/tasks/{task_id}",
+ json={
+ "title": "Updated non-existent task",
+ "description": "This should fail",
+ "status": "pending",
+ },
+ )
+ assert response.status_code == 404, response.text
+ assert response.json() == snapshot({"detail": "Task not found"})
+
+
+def test_openapi_schema(client: TestClient):
+ response = client.get("/openapi.json")
+ assert response.status_code == 200, response.text
+ assert response.json() == snapshot(
+ {
+ "openapi": "3.1.0",
+ "info": {"title": "FastAPI", "version": "0.1.0"},
+ "paths": {
+ "/tasks/": {
+ "post": {
+ "summary": "Create Task",
+ "operationId": "create_task_tasks__post",
+ "requestBody": {
+ "required": True,
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/TaskCreate"
+ }
+ }
+ },
+ },
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {"$ref": "#/components/schemas/Task"}
+ }
+ },
+ },
+ "422": {
+ "description": "Validation Error",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/HTTPValidationError"
+ }
+ }
+ },
+ },
+ },
+ },
+ "get": {
+ "summary": "Read Tasks",
+ "operationId": "read_tasks_tasks__get",
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {
+ "type": "array",
+ "items": {
+ "$ref": "#/components/schemas/Task"
+ },
+ "title": "Response Read Tasks Tasks Get",
+ }
+ }
+ },
+ }
+ },
+ },
+ },
+ "/tasks/{task_id}": {
+ "get": {
+ "summary": "Read Task",
+ "operationId": "read_task_tasks__task_id__get",
+ "parameters": [
+ {
+ "name": "task_id",
+ "in": "path",
+ "required": True,
+ "schema": {
+ "type": "string",
+ "format": "uuid",
+ "title": "Task Id",
+ },
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {"$ref": "#/components/schemas/Task"}
+ }
+ },
+ },
+ "422": {
+ "description": "Validation Error",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/HTTPValidationError"
+ }
+ }
+ },
+ },
+ },
+ },
+ "put": {
+ "summary": "Update Task",
+ "operationId": "update_task_tasks__task_id__put",
+ "parameters": [
+ {
+ "name": "task_id",
+ "in": "path",
+ "required": True,
+ "schema": {
+ "type": "string",
+ "format": "uuid",
+ "title": "Task Id",
+ },
+ }
+ ],
+ "requestBody": {
+ "required": True,
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/TaskCreate"
+ }
+ }
+ },
+ },
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {"$ref": "#/components/schemas/Task"}
+ }
+ },
+ },
+ "422": {
+ "description": "Validation Error",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/HTTPValidationError"
+ }
+ }
+ },
+ },
+ },
+ },
+ "delete": {
+ "summary": "Delete Task",
+ "operationId": "delete_task_tasks__task_id__delete",
+ "parameters": [
+ {
+ "name": "task_id",
+ "in": "path",
+ "required": True,
+ "schema": {
+ "type": "string",
+ "format": "uuid",
+ "title": "Task Id",
+ },
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {"application/json": {"schema": {}}},
+ },
+ "422": {
+ "description": "Validation Error",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/HTTPValidationError"
+ }
+ }
+ },
+ },
+ },
+ },
+ },
+ },
+ "components": {
+ "schemas": {
+ "HTTPValidationError": {
+ "properties": {
+ "detail": {
+ "items": {
+ "$ref": "#/components/schemas/ValidationError"
+ },
+ "type": "array",
+ "title": "Detail",
+ }
+ },
+ "type": "object",
+ "title": "HTTPValidationError",
+ },
+ "Task": {
+ "properties": {
+ "title": {"type": "string", "title": "Title"},
+ "description": IsDict(
+ {
+ "anyOf": [{"type": "string"}, {"type": "null"}],
+ "title": "Description",
+ }
+ )
+ | IsDict(
+ # TODO: remove when deprecating Pydantic v1
+ {"type": "string", "title": "Description"}
+ ),
+ "status": {
+ "type": "string",
+ "default": "pending",
+ "title": "Status",
+ },
+ "id": {"type": "string", "format": "uuid", "title": "Id"},
+ },
+ "type": "object",
+ "required": ["title"],
+ "title": "Task",
+ },
+ "TaskCreate": {
+ "properties": {
+ "title": {"type": "string", "title": "Title"},
+ "description": IsDict(
+ {
+ "anyOf": [{"type": "string"}, {"type": "null"}],
+ "title": "Description",
+ }
+ )
+ | IsDict(
+ # TODO: remove when deprecating Pydantic v1
+ {"type": "string", "title": "Description"}
+ ),
+ "status": {
+ "type": "string",
+ "default": "pending",
+ "title": "Status",
+ },
+ },
+ "type": "object",
+ "required": ["title"],
+ "title": "TaskCreate",
+ },
+ "ValidationError": {
+ "properties": {
+ "loc": {
+ "items": {
+ "anyOf": [{"type": "string"}, {"type": "integer"}]
+ },
+ "type": "array",
+ "title": "Location",
+ },
+ "msg": {"type": "string", "title": "Message"},
+ "type": {"type": "string", "title": "Error Type"},
+ },
+ "type": "object",
+ "required": ["loc", "msg", "type"],
+ "title": "ValidationError",
+ },
+ }
+ },
+ }
+ )