编写后端

This commit is contained in:
2026-03-28 22:18:43 +08:00
parent f2528fbc87
commit f5d26949c4
63 changed files with 1841 additions and 5 deletions

View File

@@ -0,0 +1 @@
# CRUD module

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,72 @@
"""
分类 CRUD 操作
"""
from typing import Optional, List
from app.models.category import Category
class CategoryCRUD:
"""分类 CRUD 操作类"""
@staticmethod
async def get_by_id(category_id: str) -> Optional[Category]:
"""根据 ID 获取分类"""
return await Category.filter(id=category_id).first()
@staticmethod
async def get_by_slug(slug: str) -> Optional[Category]:
"""根据 slug 获取分类"""
return await Category.filter(slug=slug).first()
@staticmethod
async def get_all() -> List[Category]:
"""获取所有分类"""
return await Category.all()
@staticmethod
async def create(name: str, slug: str, description: Optional[str] = None) -> Category:
"""创建分类"""
category = await Category.create(
name=name,
slug=slug,
description=description
)
return category
@staticmethod
async def update(category_id: str, **kwargs) -> Optional[Category]:
"""更新分类"""
category = await Category.filter(id=category_id).first()
if category:
for key, value in kwargs.items():
if value is not None and hasattr(category, key):
setattr(category, key, value)
await category.save()
return category
@staticmethod
async def delete(category_id: str) -> bool:
"""删除分类"""
category = await Category.filter(id=category_id).first()
if category:
await category.delete()
return True
return False
@staticmethod
async def get_with_post_count() -> List[dict]:
"""获取分类及其文章数量"""
categories = await Category.all().prefetch_related("posts")
result = []
for cat in categories:
result.append({
"id": str(cat.id),
"name": cat.name,
"slug": cat.slug,
"description": cat.description,
"post_count": len(cat.posts) if cat.posts else 0
})
return result
category_crud = CategoryCRUD()

View File

@@ -0,0 +1,91 @@
"""
评论 CRUD 操作
"""
from typing import Optional, List, Tuple
from app.models.comment import Comment
from app.models.post import Post
class CommentCRUD:
"""评论 CRUD 操作类"""
@staticmethod
async def get_by_id(comment_id: str) -> Optional[Comment]:
"""根据 ID 获取评论"""
return await Comment.filter(id=comment_id).first()
@staticmethod
async def get_by_post(
post_id: str,
page: int = 1,
page_size: int = 20,
approved_only: bool = True
) -> Tuple[List[Comment], int]:
"""获取文章的所有评论(树形结构)"""
query = Comment.filter(post_id=post_id)
if approved_only:
query = query.filter(is_approved=True)
total = await query.count()
comments = await query \
.prefetch_related("author", "replies__author") \
.filter(parent_id=None) \
.offset((page - 1) * page_size) \
.limit(page_size) \
.order_by("created_at")
return comments, total
@staticmethod
async def create(
content: str,
author_id: str,
post_id: str,
parent_id: Optional[str] = None,
is_approved: bool = True
) -> Comment:
"""创建评论"""
comment = await Comment.create(
content=content,
author_id=author_id,
post_id=post_id,
parent_id=parent_id,
is_approved=is_approved
)
# 更新文章的评论数
await Post.filter(id=post_id).update(comment_count=Post.comment_count + 1)
return comment
@staticmethod
async def update(comment_id: str, **kwargs) -> Optional[Comment]:
"""更新评论"""
comment = await Comment.filter(id=comment_id).first()
if comment:
for key, value in kwargs.items():
if value is not None and hasattr(comment, key):
setattr(comment, key, value)
await comment.save()
return comment
@staticmethod
async def delete(comment_id: str) -> bool:
"""删除评论"""
comment = await Comment.filter(id=comment_id).first()
if comment:
post_id = comment.post_id
await comment.delete()
# 更新文章的评论数
await Post.filter(id=post_id).update(comment_count=Post.comment_count - 1)
return True
return False
@staticmethod
async def approve(comment_id: str) -> Optional[Comment]:
"""审核通过评论"""
return await CommentCRUD.update(comment_id, is_approved=True)
comment_crud = CommentCRUD()

163
backend/app/crud/post.py Normal file
View File

@@ -0,0 +1,163 @@
"""
文章 CRUD 操作
"""
from datetime import datetime
from typing import Optional, List, Tuple
from app.models.post import Post, PostTag
from app.models.user import User
from app.models.category import Category
from app.models.tag import Tag
class PostCRUD:
"""文章 CRUD 操作类"""
@staticmethod
async def get_by_id(post_id: str) -> Optional[Post]:
"""根据 ID 获取文章"""
return await Post.filter(id=post_id).first()
@staticmethod
async def get_by_slug(slug: str) -> Optional[Post]:
"""根据 slug 获取文章"""
return await Post.filter(slug=slug).first()
@staticmethod
async def get_all(
page: int = 1,
page_size: int = 10,
status: str = "published",
category_id: Optional[str] = None,
tag_id: Optional[str] = None
) -> Tuple[List[Post], int]:
"""获取文章列表(分页)"""
query = Post.all()
if status:
query = query.filter(status=status)
if category_id:
query = query.filter(category_id=category_id)
if tag_id:
query = query.filter(tags__id=tag_id)
total = await query.count()
posts = await query \
.prefetch_related("author", "category", "tags") \
.offset((page - 1) * page_size) \
.limit(page_size) \
.order_by("-created_at")
return posts, total
@staticmethod
async def get_by_author(
author_id: str,
page: int = 1,
page_size: int = 10
) -> Tuple[List[Post], int]:
"""获取指定作者的文章列表"""
query = Post.filter(author_id=author_id)
total = await query.count()
posts = await query \
.prefetch_related("author", "category", "tags") \
.offset((page - 1) * page_size) \
.limit(page_size) \
.order_by("-created_at")
return posts, total
@staticmethod
async def create(
title: str,
slug: str,
content: str,
author_id: str,
summary: Optional[str] = None,
cover_image: Optional[str] = None,
category_id: Optional[str] = None,
tag_ids: Optional[List[str]] = None,
status: str = "draft",
meta_title: Optional[str] = None,
meta_description: Optional[str] = None
) -> Post:
"""创建文章"""
post = await Post.create(
title=title,
slug=slug,
content=content,
author_id=author_id,
summary=summary,
cover_image=cover_image,
category_id=category_id,
status=status,
meta_title=meta_title,
meta_description=meta_description
)
# 添加标签
if tag_ids:
for tag_id in tag_ids:
await PostTag.create(post_id=post.id, tag_id=tag_id)
return post
@staticmethod
async def update(post_id: str, **kwargs) -> Optional[Post]:
"""更新文章"""
post = await Post.filter(id=post_id).first()
if not post:
return None
# 处理标签更新
if "tag_ids" in kwargs:
tag_ids = kwargs.pop("tag_ids")
# 删除旧标签关联
await PostTag.filter(post_id=post_id).delete()
# 添加新标签关联
for tag_id in tag_ids:
await PostTag.create(post_id=post_id, tag_id=tag_id)
# 处理发布状态更新
if kwargs.get("status") == "published" and not post.published_at:
kwargs["published_at"] = datetime.utcnow()
for key, value in kwargs.items():
if value is not None and hasattr(post, key):
setattr(post, key, value)
await post.save()
return post
@staticmethod
async def delete(post_id: str) -> bool:
"""删除文章"""
post = await Post.filter(id=post_id).first()
if post:
await post.delete()
return True
return False
@staticmethod
async def increment_view_count(post_id: str) -> None:
"""增加浏览量"""
await Post.filter(id=post_id).update(view_count=Post.view_count + 1)
@staticmethod
async def get_hot_posts(limit: int = 10) -> List[Post]:
"""获取热门文章(按浏览量排序)"""
return await Post.filter(status="published") \
.prefetch_related("author", "category") \
.order_by("-view_count") \
.limit(limit)
@staticmethod
async def get_recent_posts(limit: int = 10) -> List[Post]:
"""获取最新文章"""
return await Post.filter(status="published") \
.prefetch_related("author", "category", "tags") \
.order_by("-created_at") \
.limit(limit)
post_crud = PostCRUD()

66
backend/app/crud/tag.py Normal file
View File

@@ -0,0 +1,66 @@
"""
标签 CRUD 操作
"""
from typing import Optional, List
from app.models.tag import Tag
class TagCRUD:
"""标签 CRUD 操作类"""
@staticmethod
async def get_by_id(tag_id: str) -> Optional[Tag]:
"""根据 ID 获取标签"""
return await Tag.filter(id=tag_id).first()
@staticmethod
async def get_by_slug(slug: str) -> Optional[Tag]:
"""根据 slug 获取标签"""
return await Tag.filter(slug=slug).first()
@staticmethod
async def get_by_name(name: str) -> Optional[Tag]:
"""根据名称获取标签"""
return await Tag.filter(name=name).first()
@staticmethod
async def get_all() -> List[Tag]:
"""获取所有标签"""
return await Tag.all()
@staticmethod
async def create(name: str, slug: str) -> Tag:
"""创建标签"""
tag = await Tag.create(name=name, slug=slug)
return tag
@staticmethod
async def update(tag_id: str, **kwargs) -> Optional[Tag]:
"""更新标签"""
tag = await Tag.filter(id=tag_id).first()
if tag:
for key, value in kwargs.items():
if value is not None and hasattr(tag, key):
setattr(tag, key, value)
await tag.save()
return tag
@staticmethod
async def delete(tag_id: str) -> bool:
"""删除标签"""
tag = await Tag.filter(id=tag_id).first()
if tag:
await tag.delete()
return True
return False
@staticmethod
async def get_or_create(name: str, slug: str) -> Tag:
"""获取或创建标签"""
tag = await TagCRUD.get_by_slug(slug)
if tag:
return tag
return await TagCRUD.create(name, slug)
tag_crud = TagCRUD()

75
backend/app/crud/user.py Normal file
View File

@@ -0,0 +1,75 @@
"""
用户 CRUD 操作
"""
from typing import Optional
from app.models.user import User
from app.core.security import get_password_hash, verify_password
class UserCRUD:
"""用户 CRUD 操作类"""
@staticmethod
async def get_by_id(user_id: str) -> Optional[User]:
"""根据 ID 获取用户"""
return await User.filter(id=user_id).first()
@staticmethod
async def get_by_username(username: str) -> Optional[User]:
"""根据用户名获取用户"""
return await User.filter(username=username).first()
@staticmethod
async def get_by_email(email: str) -> Optional[User]:
"""根据邮箱获取用户"""
return await User.filter(email=email).first()
@staticmethod
async def get_by_username_or_email(username_or_email: str) -> Optional[User]:
"""根据用户名或邮箱获取用户"""
return await User.filter(
username=username_or_email
).first() or await User.filter(
email=username_or_email
).first()
@staticmethod
async def create(username: str, email: str, password: str) -> User:
"""创建用户"""
password_hash = get_password_hash(password)
user = await User.create(
username=username,
email=email,
password_hash=password_hash
)
return user
@staticmethod
async def update(user_id: str, **kwargs) -> Optional[User]:
"""更新用户"""
user = await User.filter(id=user_id).first()
if user:
for key, value in kwargs.items():
if value is not None and hasattr(user, key):
setattr(user, key, value)
await user.save()
return user
@staticmethod
async def authenticate(username_or_email: str, password: str) -> Optional[User]:
"""验证用户登录"""
user = await UserCRUD.get_by_username_or_email(username_or_email)
if not user:
return None
if not verify_password(password, user.password_hash):
return None
return user
@staticmethod
async def is_superuser(user_id: str) -> bool:
"""检查用户是否为超级用户"""
user = await User.filter(id=user_id).first()
return user.is_superuser if user else False
user_crud = UserCRUD()