mirror of https://github.com/tiangolo/fastapi.git
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
This commit is contained in:
parent
71c441e471
commit
c5e20f6e60
|
|
@ -6,9 +6,10 @@ Example: Basic QUERY method usage in FastAPI.
|
|||
This example demonstrates how to use the QUERY HTTP method for simple queries.
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import FastAPI
|
||||
from pydantic import BaseModel
|
||||
from typing import Optional
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
|
@ -22,19 +23,18 @@ class SimpleQuery(BaseModel):
|
|||
def search_items(query: SimpleQuery):
|
||||
"""
|
||||
Search for items using the QUERY method.
|
||||
|
||||
|
||||
The QUERY method allows sending complex search parameters in the request body
|
||||
instead of URL parameters, making it ideal for complex queries.
|
||||
"""
|
||||
# Simulate search logic
|
||||
results = [
|
||||
f"Item {i}: {query.search_term}"
|
||||
for i in range(1, min(query.limit + 1, 6))
|
||||
f"Item {i}: {query.search_term}" for i in range(1, min(query.limit + 1, 6))
|
||||
]
|
||||
|
||||
|
||||
return {
|
||||
"query": query.search_term,
|
||||
"limit": query.limit,
|
||||
"results": results,
|
||||
"total_found": len(results)
|
||||
"total_found": len(results),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,9 +7,10 @@ This example demonstrates the power of the QUERY method for complex data filteri
|
|||
and field selection, similar to GraphQL but using standard HTTP.
|
||||
"""
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from fastapi import FastAPI
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Optional, Dict, Any
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
|
@ -24,7 +25,7 @@ sample_data = [
|
|||
"id": 101,
|
||||
"name": "Dr. Smith",
|
||||
"email": "smith@university.edu",
|
||||
"bio": "Mathematics professor with 20 years experience"
|
||||
"bio": "Mathematics professor with 20 years experience",
|
||||
},
|
||||
"topics": [
|
||||
{
|
||||
|
|
@ -34,8 +35,8 @@ sample_data = [
|
|||
"lessons": 15,
|
||||
"exercises": [
|
||||
{"id": 1, "title": "Linear Equations", "points": 10},
|
||||
{"id": 2, "title": "Quadratic Equations", "points": 15}
|
||||
]
|
||||
{"id": 2, "title": "Quadratic Equations", "points": 15},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
|
|
@ -44,13 +45,13 @@ sample_data = [
|
|||
"lessons": 20,
|
||||
"exercises": [
|
||||
{"id": 3, "title": "Derivatives", "points": 20},
|
||||
{"id": 4, "title": "Integrals", "points": 25}
|
||||
]
|
||||
}
|
||||
{"id": 4, "title": "Integrals", "points": 25},
|
||||
],
|
||||
},
|
||||
],
|
||||
"tags": ["mathematics", "algebra", "calculus"],
|
||||
"rating": 4.8,
|
||||
"enrolled_students": 245
|
||||
"enrolled_students": 245,
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
|
|
@ -60,7 +61,7 @@ sample_data = [
|
|||
"id": 102,
|
||||
"name": "Dr. Johnson",
|
||||
"email": "johnson@university.edu",
|
||||
"bio": "Physics professor specializing in quantum mechanics"
|
||||
"bio": "Physics professor specializing in quantum mechanics",
|
||||
},
|
||||
"topics": [
|
||||
{
|
||||
|
|
@ -70,19 +71,20 @@ sample_data = [
|
|||
"lessons": 25,
|
||||
"exercises": [
|
||||
{"id": 5, "title": "Wave Functions", "points": 30},
|
||||
{"id": 6, "title": "Uncertainty Principle", "points": 35}
|
||||
]
|
||||
{"id": 6, "title": "Uncertainty Principle", "points": 35},
|
||||
],
|
||||
}
|
||||
],
|
||||
"tags": ["physics", "quantum", "mechanics"],
|
||||
"rating": 4.9,
|
||||
"enrolled_students": 156
|
||||
}
|
||||
"enrolled_students": 156,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
class FieldSelector(BaseModel):
|
||||
"""Define which fields to include in the response."""
|
||||
|
||||
course_fields: Optional[List[str]] = None
|
||||
instructor_fields: Optional[List[str]] = None
|
||||
topic_fields: Optional[List[str]] = None
|
||||
|
|
@ -91,6 +93,7 @@ class FieldSelector(BaseModel):
|
|||
|
||||
class QueryFilter(BaseModel):
|
||||
"""Define filters for the query."""
|
||||
|
||||
min_rating: Optional[float] = None
|
||||
max_rating: Optional[float] = None
|
||||
difficulty: Optional[str] = None
|
||||
|
|
@ -100,13 +103,16 @@ class QueryFilter(BaseModel):
|
|||
|
||||
class CourseQuery(BaseModel):
|
||||
"""Complete query schema for course data."""
|
||||
|
||||
fields: Optional[FieldSelector] = None
|
||||
filters: Optional[QueryFilter] = None
|
||||
limit: Optional[int] = 10
|
||||
offset: Optional[int] = 0
|
||||
|
||||
|
||||
def filter_object(obj: Dict[str, Any], allowed_fields: Optional[List[str]] = None) -> Dict[str, Any]:
|
||||
def filter_object(
|
||||
obj: Dict[str, Any], allowed_fields: Optional[List[str]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Filter an object to only include specified fields."""
|
||||
if allowed_fields is None:
|
||||
return obj
|
||||
|
|
@ -117,67 +123,86 @@ def apply_filters(data: List[Dict], filters: Optional[QueryFilter]) -> List[Dict
|
|||
"""Apply filters to the data."""
|
||||
if not filters:
|
||||
return data
|
||||
|
||||
|
||||
filtered_data = data.copy()
|
||||
|
||||
|
||||
if filters.min_rating is not None:
|
||||
filtered_data = [item for item in filtered_data if item.get("rating", 0) >= filters.min_rating]
|
||||
|
||||
filtered_data = [
|
||||
item
|
||||
for item in filtered_data
|
||||
if item.get("rating", 0) >= filters.min_rating
|
||||
]
|
||||
|
||||
if filters.max_rating is not None:
|
||||
filtered_data = [item for item in filtered_data if item.get("rating", 0) <= filters.max_rating]
|
||||
|
||||
filtered_data = [
|
||||
item
|
||||
for item in filtered_data
|
||||
if item.get("rating", 0) <= filters.max_rating
|
||||
]
|
||||
|
||||
if filters.difficulty:
|
||||
filtered_data = [
|
||||
item for item in filtered_data
|
||||
if any(topic.get("difficulty") == filters.difficulty for topic in item.get("topics", []))
|
||||
item
|
||||
for item in filtered_data
|
||||
if any(
|
||||
topic.get("difficulty") == filters.difficulty
|
||||
for topic in item.get("topics", [])
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
if filters.tags:
|
||||
filtered_data = [
|
||||
item for item in filtered_data
|
||||
item
|
||||
for item in filtered_data
|
||||
if any(tag in item.get("tags", []) for tag in filters.tags)
|
||||
]
|
||||
|
||||
|
||||
if filters.instructor_name:
|
||||
filtered_data = [
|
||||
item for item in filtered_data
|
||||
if filters.instructor_name.lower() in item.get("instructor", {}).get("name", "").lower()
|
||||
item
|
||||
for item in filtered_data
|
||||
if filters.instructor_name.lower()
|
||||
in item.get("instructor", {}).get("name", "").lower()
|
||||
]
|
||||
|
||||
|
||||
return filtered_data
|
||||
|
||||
|
||||
def apply_field_selection(data: List[Dict], fields: Optional[FieldSelector]) -> List[Dict]:
|
||||
def apply_field_selection(
|
||||
data: List[Dict], fields: Optional[FieldSelector]
|
||||
) -> List[Dict]:
|
||||
"""Apply field selection to shape the response."""
|
||||
if not fields:
|
||||
return data
|
||||
|
||||
|
||||
result = []
|
||||
for item in data:
|
||||
filtered_item = filter_object(item, fields.course_fields)
|
||||
|
||||
|
||||
# Filter instructor fields
|
||||
if "instructor" in item and fields.instructor_fields:
|
||||
filtered_item["instructor"] = filter_object(item["instructor"], fields.instructor_fields)
|
||||
|
||||
filtered_item["instructor"] = filter_object(
|
||||
item["instructor"], fields.instructor_fields
|
||||
)
|
||||
|
||||
# Filter topic fields
|
||||
if "topics" in item and fields.topic_fields:
|
||||
filtered_topics = []
|
||||
for topic in item["topics"]:
|
||||
filtered_topic = filter_object(topic, fields.topic_fields)
|
||||
|
||||
|
||||
# Filter exercise fields if requested
|
||||
if "exercises" in topic and fields.exercise_fields:
|
||||
filtered_topic["exercises"] = [
|
||||
filter_object(exercise, fields.exercise_fields)
|
||||
for exercise in topic["exercises"]
|
||||
]
|
||||
|
||||
|
||||
filtered_topics.append(filtered_topic)
|
||||
filtered_item["topics"] = filtered_topics
|
||||
|
||||
|
||||
result.append(filtered_item)
|
||||
|
||||
|
||||
return result
|
||||
|
||||
|
||||
|
|
@ -185,13 +210,13 @@ def apply_field_selection(data: List[Dict], fields: Optional[FieldSelector]) ->
|
|||
def query_courses(query: CourseQuery):
|
||||
"""
|
||||
Query courses with complex filtering and field selection.
|
||||
|
||||
|
||||
This endpoint demonstrates the power of the QUERY method:
|
||||
- Send complex query parameters in the request body
|
||||
- Filter data based on multiple criteria
|
||||
- Select only the fields you need (like GraphQL)
|
||||
- Avoid URL length limitations
|
||||
|
||||
|
||||
Example query:
|
||||
{
|
||||
"fields": {
|
||||
|
|
@ -209,27 +234,27 @@ def query_courses(query: CourseQuery):
|
|||
"""
|
||||
# Start with all data
|
||||
filtered_data = sample_data.copy()
|
||||
|
||||
|
||||
# Apply filters
|
||||
if query.filters:
|
||||
filtered_data = apply_filters(filtered_data, query.filters)
|
||||
|
||||
|
||||
# Apply pagination
|
||||
offset = query.offset or 0
|
||||
limit = query.limit or 10
|
||||
paginated_data = filtered_data[offset:offset + limit]
|
||||
|
||||
paginated_data = filtered_data[offset : offset + limit]
|
||||
|
||||
# Apply field selection
|
||||
if query.fields:
|
||||
paginated_data = apply_field_selection(paginated_data, query.fields)
|
||||
|
||||
|
||||
return {
|
||||
"query": query.model_dump(),
|
||||
"total_results": len(filtered_data),
|
||||
"returned_results": len(paginated_data),
|
||||
"offset": offset,
|
||||
"limit": limit,
|
||||
"data": paginated_data
|
||||
"data": paginated_data,
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -245,12 +270,9 @@ def read_root():
|
|||
"fields": {
|
||||
"course_fields": ["id", "name", "rating"],
|
||||
"instructor_fields": ["name"],
|
||||
"topic_fields": ["title", "difficulty"]
|
||||
"topic_fields": ["title", "difficulty"],
|
||||
},
|
||||
"filters": {
|
||||
"min_rating": 4.5,
|
||||
"tags": ["mathematics"]
|
||||
},
|
||||
"limit": 5
|
||||
}
|
||||
"filters": {"min_rating": 4.5, "tags": ["mathematics"]},
|
||||
"limit": 5,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,9 +10,10 @@ This example directly addresses the use case described in the GitHub issue:
|
|||
- Provides GraphQL-like flexibility with standard HTTP
|
||||
"""
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from fastapi import FastAPI
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Optional, Dict, Any
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
|
@ -69,13 +70,13 @@ sample_subject = {
|
|||
"number_of_clicks": 1,
|
||||
"number_of_votes": 1,
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -84,13 +85,16 @@ class ArbitrarySchema(BaseModel):
|
|||
The schema that clients send to specify exactly what they want in the response.
|
||||
This is the key innovation - clients can request any combination of fields.
|
||||
"""
|
||||
|
||||
# Root level fields to include
|
||||
root_fields: Optional[List[str]] = None
|
||||
|
||||
|
||||
# Nested object field specifications
|
||||
tags: Optional[Dict[str, Any]] = None # {"fields": ["id", "name"], "limit": 5}
|
||||
topics: Optional[Dict[str, Any]] = None # {"fields": [...], "limit": 10, "posts": {...}}
|
||||
|
||||
topics: Optional[Dict[str, Any]] = (
|
||||
None # {"fields": [...], "limit": 10, "posts": {...}}
|
||||
)
|
||||
|
||||
# Global limits
|
||||
max_depth: Optional[int] = None
|
||||
|
||||
|
|
@ -101,7 +105,7 @@ def filter_by_schema(data: Dict[str, Any], schema: ArbitrarySchema) -> Dict[str,
|
|||
This allows clients to specify exactly what fields they want at each level.
|
||||
"""
|
||||
result = {}
|
||||
|
||||
|
||||
# Handle root fields
|
||||
if schema.root_fields:
|
||||
for field in schema.root_fields:
|
||||
|
|
@ -110,38 +114,42 @@ def filter_by_schema(data: Dict[str, Any], schema: ArbitrarySchema) -> Dict[str,
|
|||
else:
|
||||
# If no root fields specified, include basic fields
|
||||
result = {k: v for k, v in data.items() if k in ["id", "name"]}
|
||||
|
||||
|
||||
# Handle tags
|
||||
if schema.tags and "tags" in data:
|
||||
tags_config = schema.tags
|
||||
tags_data = data["tags"]
|
||||
|
||||
|
||||
# Apply limit if specified
|
||||
if "limit" in tags_config:
|
||||
tags_data = tags_data[:tags_config["limit"]]
|
||||
|
||||
tags_data = tags_data[: tags_config["limit"]]
|
||||
|
||||
# Filter fields if specified
|
||||
if "fields" in tags_config:
|
||||
tags_data = [
|
||||
{field: tag.get(field) for field in tags_config["fields"] if field in tag}
|
||||
{
|
||||
field: tag.get(field)
|
||||
for field in tags_config["fields"]
|
||||
if field in tag
|
||||
}
|
||||
for tag in tags_data
|
||||
]
|
||||
|
||||
|
||||
result["tags"] = tags_data
|
||||
|
||||
|
||||
# Handle topics (more complex nesting)
|
||||
if schema.topics and "topics" in data:
|
||||
topics_config = schema.topics
|
||||
topics_data = data["topics"]
|
||||
|
||||
|
||||
# Apply limit if specified
|
||||
if "limit" in topics_config:
|
||||
topics_data = topics_data[:topics_config["limit"]]
|
||||
|
||||
topics_data = topics_data[: topics_config["limit"]]
|
||||
|
||||
processed_topics = []
|
||||
for topic in topics_data:
|
||||
processed_topic = {}
|
||||
|
||||
|
||||
# Filter topic fields
|
||||
if "fields" in topics_config:
|
||||
for field in topics_config["fields"]:
|
||||
|
|
@ -149,78 +157,96 @@ def filter_by_schema(data: Dict[str, Any], schema: ArbitrarySchema) -> Dict[str,
|
|||
processed_topic[field] = topic[field]
|
||||
else:
|
||||
# Default topic fields
|
||||
processed_topic = {k: v for k, v in topic.items() if k in ["id", "name"]}
|
||||
|
||||
processed_topic = {
|
||||
k: v for k, v in topic.items() if k in ["id", "name"]
|
||||
}
|
||||
|
||||
# Handle posts within topics
|
||||
if "posts" in topics_config and "posts" in topic:
|
||||
posts_config = topics_config["posts"]
|
||||
posts_data = topic["posts"]
|
||||
|
||||
|
||||
if "limit" in posts_config:
|
||||
posts_data = posts_data[:posts_config["limit"]]
|
||||
|
||||
posts_data = posts_data[: posts_config["limit"]]
|
||||
|
||||
processed_posts = []
|
||||
for post in posts_data:
|
||||
processed_post = {}
|
||||
|
||||
|
||||
# Filter post fields
|
||||
if "fields" in posts_config:
|
||||
for field in posts_config["fields"]:
|
||||
if field in post:
|
||||
processed_post[field] = post[field]
|
||||
else:
|
||||
processed_post = {k: v for k, v in post.items() if k in ["id", "title"]}
|
||||
|
||||
processed_post = {
|
||||
k: v for k, v in post.items() if k in ["id", "title"]
|
||||
}
|
||||
|
||||
# Handle answers within posts
|
||||
if "answers" in posts_config and "answers" in post:
|
||||
answers_config = posts_config["answers"]
|
||||
answers_data = post["answers"]
|
||||
|
||||
|
||||
if "limit" in answers_config:
|
||||
answers_data = answers_data[:answers_config["limit"]]
|
||||
|
||||
answers_data = answers_data[: answers_config["limit"]]
|
||||
|
||||
processed_answers = []
|
||||
for answer in answers_data:
|
||||
processed_answer = {}
|
||||
|
||||
|
||||
if "fields" in answers_config:
|
||||
for field in answers_config["fields"]:
|
||||
if field in answer:
|
||||
processed_answer[field] = answer[field]
|
||||
else:
|
||||
processed_answer = {k: v for k, v in answer.items() if k in ["id", "content"]}
|
||||
|
||||
processed_answer = {
|
||||
k: v
|
||||
for k, v in answer.items()
|
||||
if k in ["id", "content"]
|
||||
}
|
||||
|
||||
# Handle comments within answers
|
||||
if "comments" in answers_config and "comments" in answer:
|
||||
comments_config = answers_config["comments"]
|
||||
comments_data = answer["comments"]
|
||||
|
||||
|
||||
if "limit" in comments_config:
|
||||
comments_data = comments_data[:comments_config["limit"]]
|
||||
|
||||
comments_data = comments_data[
|
||||
: comments_config["limit"]
|
||||
]
|
||||
|
||||
if "fields" in comments_config:
|
||||
processed_answer["comments"] = [
|
||||
{field: comment.get(field) for field in comments_config["fields"] if field in comment}
|
||||
{
|
||||
field: comment.get(field)
|
||||
for field in comments_config["fields"]
|
||||
if field in comment
|
||||
}
|
||||
for comment in comments_data
|
||||
]
|
||||
else:
|
||||
processed_answer["comments"] = [
|
||||
{k: v for k, v in comment.items() if k in ["id", "content"]}
|
||||
{
|
||||
k: v
|
||||
for k, v in comment.items()
|
||||
if k in ["id", "content"]
|
||||
}
|
||||
for comment in comments_data
|
||||
]
|
||||
|
||||
|
||||
processed_answers.append(processed_answer)
|
||||
|
||||
|
||||
processed_post["answers"] = processed_answers
|
||||
|
||||
|
||||
processed_posts.append(processed_post)
|
||||
|
||||
|
||||
processed_topic["posts"] = processed_posts
|
||||
|
||||
|
||||
processed_topics.append(processed_topic)
|
||||
|
||||
|
||||
result["topics"] = processed_topics
|
||||
|
||||
|
||||
return result
|
||||
|
||||
|
||||
|
|
@ -228,21 +254,21 @@ def filter_by_schema(data: Dict[str, Any], schema: ArbitrarySchema) -> Dict[str,
|
|||
def query_subjects(schema: ArbitrarySchema):
|
||||
"""
|
||||
Query subjects with an arbitrary schema - exactly as requested in the GitHub issue.
|
||||
|
||||
|
||||
This endpoint allows clients to specify exactly what fields they want
|
||||
at each level of the nested data structure, avoiding the need for:
|
||||
- Multiple API endpoints for different data combinations
|
||||
- Long URL parameters
|
||||
- Over-fetching of data
|
||||
|
||||
|
||||
Example schemas:
|
||||
|
||||
|
||||
1. Minimal data:
|
||||
{
|
||||
"root_fields": ["id", "name"],
|
||||
"tags": {"fields": ["id", "name"], "limit": 1}
|
||||
}
|
||||
|
||||
|
||||
2. Detailed topics only:
|
||||
{
|
||||
"root_fields": ["id", "name"],
|
||||
|
|
@ -255,7 +281,7 @@ def query_subjects(schema: ArbitrarySchema):
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
3. Full nested structure:
|
||||
{
|
||||
"topics": {
|
||||
|
|
@ -275,11 +301,11 @@ def query_subjects(schema: ArbitrarySchema):
|
|||
"""
|
||||
# Apply the schema to filter the response
|
||||
filtered_data = filter_by_schema(sample_subject, schema)
|
||||
|
||||
|
||||
return {
|
||||
"message": "Successfully queried subjects using arbitrary schema",
|
||||
"schema_used": schema.model_dump(),
|
||||
"data": filtered_data
|
||||
"data": filtered_data,
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -293,14 +319,14 @@ def get_root():
|
|||
"issue_url": "https://github.com/tiangolo/fastapi/issues/...",
|
||||
"status": "✅ IMPLEMENTED",
|
||||
"usage": {
|
||||
"endpoint": "/query/subjects",
|
||||
"endpoint": "/query/subjects",
|
||||
"method": "QUERY",
|
||||
"description": "Send arbitrary schema in request body to get exactly the data you need"
|
||||
"description": "Send arbitrary schema in request body to get exactly the data you need",
|
||||
},
|
||||
"examples": {
|
||||
"minimal": {
|
||||
"root_fields": ["id", "name"],
|
||||
"tags": {"fields": ["id", "name"], "limit": 1}
|
||||
"tags": {"fields": ["id", "name"], "limit": 1},
|
||||
},
|
||||
"detailed": {
|
||||
"root_fields": ["id", "name"],
|
||||
|
|
@ -308,14 +334,11 @@ def get_root():
|
|||
"fields": ["id", "name"],
|
||||
"posts": {
|
||||
"fields": ["id", "title"],
|
||||
"answers": {
|
||||
"fields": ["id", "content"],
|
||||
"limit": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"answers": {"fields": ["id", "content"], "limit": 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -7,10 +7,11 @@ This test file follows the FastAPI test patterns and should be compatible
|
|||
with the existing test suite.
|
||||
"""
|
||||
|
||||
from fastapi import FastAPI, Depends
|
||||
from typing import List, Optional
|
||||
|
||||
from fastapi import Depends, FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
def test_query_method_basic():
|
||||
|
|
|
|||
Loading…
Reference in New Issue