Remove item management feature from frontend and backend, and add a pending skeleton component.
All checks were successful
Deploy to Production / deploy (push) Successful in 1m34s

This commit is contained in:
魏风
2026-03-13 11:46:15 +08:00
parent ef93d4e5c2
commit 3c9a0343e9
19 changed files with 9 additions and 1069 deletions

View File

@@ -1,13 +1,12 @@
from fastapi import APIRouter
from app.api.routes import items, locations, login, private, users, utils
from app.api.routes import locations, login, private, users, utils
from app.core.config import settings
api_router = APIRouter()
api_router.include_router(login.router)
api_router.include_router(users.router)
api_router.include_router(utils.router)
api_router.include_router(items.router)
api_router.include_router(locations.router)

View File

@@ -1,112 +0,0 @@
import uuid
from typing import Any
from fastapi import APIRouter, HTTPException
from sqlmodel import col, func, select
from app.api.deps import CurrentUser, SessionDep
from app.models import Item, ItemCreate, ItemPublic, ItemsPublic, ItemUpdate, Message
router = APIRouter(prefix="/items", tags=["items"])
@router.get("/", response_model=ItemsPublic)
def read_items(
session: SessionDep, current_user: CurrentUser, skip: int = 0, limit: int = 100
) -> Any:
"""
Retrieve items.
"""
if current_user.is_superuser:
count_statement = select(func.count()).select_from(Item)
count = session.exec(count_statement).one()
statement = (
select(Item).order_by(col(Item.created_at).desc()).offset(skip).limit(limit)
)
items = session.exec(statement).all()
else:
count_statement = (
select(func.count())
.select_from(Item)
.where(Item.owner_id == current_user.id)
)
count = session.exec(count_statement).one()
statement = (
select(Item)
.where(Item.owner_id == current_user.id)
.order_by(col(Item.created_at).desc())
.offset(skip)
.limit(limit)
)
items = session.exec(statement).all()
return ItemsPublic(data=items, count=count)
@router.get("/{id}", response_model=ItemPublic)
def read_item(session: SessionDep, current_user: CurrentUser, id: uuid.UUID) -> Any:
"""
Get item by ID.
"""
item = session.get(Item, id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not current_user.is_superuser and (item.owner_id != current_user.id):
raise HTTPException(status_code=403, detail="Not enough permissions")
return item
@router.post("/", response_model=ItemPublic)
def create_item(
*, session: SessionDep, current_user: CurrentUser, item_in: ItemCreate
) -> Any:
"""
Create new item.
"""
item = Item.model_validate(item_in, update={"owner_id": current_user.id})
session.add(item)
session.commit()
session.refresh(item)
return item
@router.put("/{id}", response_model=ItemPublic)
def update_item(
*,
session: SessionDep,
current_user: CurrentUser,
id: uuid.UUID,
item_in: ItemUpdate,
) -> Any:
"""
Update an item.
"""
item = session.get(Item, id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not current_user.is_superuser and (item.owner_id != current_user.id):
raise HTTPException(status_code=403, detail="Not enough permissions")
update_dict = item_in.model_dump(exclude_unset=True)
item.sqlmodel_update(update_dict)
session.add(item)
session.commit()
session.refresh(item)
return item
@router.delete("/{id}")
def delete_item(
session: SessionDep, current_user: CurrentUser, id: uuid.UUID
) -> Message:
"""
Delete an item.
"""
item = session.get(Item, id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
if not current_user.is_superuser and (item.owner_id != current_user.id):
raise HTTPException(status_code=403, detail="Not enough permissions")
session.delete(item)
session.commit()
return Message(message="Item deleted successfully")

View File

@@ -13,7 +13,7 @@ from app.api.deps import (
from app.core.config import settings
from app.core.security import get_password_hash, verify_password
from app.models import (
Item,
Location,
Message,
UpdatePassword,
User,
@@ -224,7 +224,7 @@ def delete_user(
raise HTTPException(
status_code=403, detail="Super users are not allowed to delete themselves"
)
statement = delete(Item).where(col(Item.owner_id) == user_id)
statement = delete(Location).where(col(Location.owner_id) == user_id)
session.exec(statement)
session.delete(user)
session.commit()

View File

@@ -5,8 +5,6 @@ from sqlmodel import Session, select
from app.core.security import get_password_hash, verify_password
from app.models import (
Item,
ItemCreate,
Location,
LocationCreate,
User,
@@ -68,14 +66,6 @@ def authenticate(*, session: Session, email: str, password: str) -> User | None:
return db_user
def create_item(*, session: Session, item_in: ItemCreate, owner_id: uuid.UUID) -> Item:
db_item = Item.model_validate(item_in, update={"owner_id": owner_id})
session.add(db_item)
session.commit()
session.refresh(db_item)
return db_item
def create_location(*, session: Session, location_in: LocationCreate, owner_id: uuid.UUID) -> Location:
db_location = Location.model_validate(location_in, update={"owner_id": owner_id})
session.add(db_location)

View File

@@ -53,7 +53,6 @@ class User(UserBase, table=True):
default_factory=get_datetime_utc,
sa_type=DateTime(timezone=True), # type: ignore
)
items: list["Item"] = Relationship(back_populates="owner", cascade_delete=True)
locations: list["Location"] = Relationship(back_populates="owner", cascade_delete=True)
@@ -68,47 +67,6 @@ class UsersPublic(SQLModel):
count: int
# Shared properties
class ItemBase(SQLModel):
title: str = Field(min_length=1, max_length=255)
description: str | None = Field(default=None, max_length=255)
# Properties to receive on item creation
class ItemCreate(ItemBase):
pass
# Properties to receive on item update
class ItemUpdate(ItemBase):
title: str | None = Field(default=None, min_length=1, max_length=255) # type: ignore
# Database model, database table inferred from class name
class Item(ItemBase, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
created_at: datetime | None = Field(
default_factory=get_datetime_utc,
sa_type=DateTime(timezone=True), # type: ignore
)
owner_id: uuid.UUID = Field(
foreign_key="user.id", nullable=False, ondelete="CASCADE"
)
owner: User | None = Relationship(back_populates="items")
# Properties to return via API, id is always required
class ItemPublic(ItemBase):
id: uuid.UUID
owner_id: uuid.UUID
created_at: datetime | None = None
class ItemsPublic(SQLModel):
data: list[ItemPublic]
count: int
# Shared properties
class LocationBase(SQLModel):
title: str = Field(min_length=1, max_length=255)

View File

@@ -1,164 +0,0 @@
import uuid
from fastapi.testclient import TestClient
from sqlmodel import Session
from app.core.config import settings
from tests.utils.item import create_random_item
def test_create_item(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
data = {"title": "Foo", "description": "Fighters"}
response = client.post(
f"{settings.API_V1_STR}/items/",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["title"] == data["title"]
assert content["description"] == data["description"]
assert "id" in content
assert "owner_id" in content
def test_read_item(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
item = create_random_item(db)
response = client.get(
f"{settings.API_V1_STR}/items/{item.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["title"] == item.title
assert content["description"] == item.description
assert content["id"] == str(item.id)
assert content["owner_id"] == str(item.owner_id)
def test_read_item_not_found(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
response = client.get(
f"{settings.API_V1_STR}/items/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Item not found"
def test_read_item_not_enough_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
item = create_random_item(db)
response = client.get(
f"{settings.API_V1_STR}/items/{item.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 403
content = response.json()
assert content["detail"] == "Not enough permissions"
def test_read_items(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
create_random_item(db)
create_random_item(db)
response = client.get(
f"{settings.API_V1_STR}/items/",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert len(content["data"]) >= 2
def test_update_item(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
item = create_random_item(db)
data = {"title": "Updated title", "description": "Updated description"}
response = client.put(
f"{settings.API_V1_STR}/items/{item.id}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 200
content = response.json()
assert content["title"] == data["title"]
assert content["description"] == data["description"]
assert content["id"] == str(item.id)
assert content["owner_id"] == str(item.owner_id)
def test_update_item_not_found(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
data = {"title": "Updated title", "description": "Updated description"}
response = client.put(
f"{settings.API_V1_STR}/items/{uuid.uuid4()}",
headers=superuser_token_headers,
json=data,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Item not found"
def test_update_item_not_enough_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
item = create_random_item(db)
data = {"title": "Updated title", "description": "Updated description"}
response = client.put(
f"{settings.API_V1_STR}/items/{item.id}",
headers=normal_user_token_headers,
json=data,
)
assert response.status_code == 403
content = response.json()
assert content["detail"] == "Not enough permissions"
def test_delete_item(
client: TestClient, superuser_token_headers: dict[str, str], db: Session
) -> None:
item = create_random_item(db)
response = client.delete(
f"{settings.API_V1_STR}/items/{item.id}",
headers=superuser_token_headers,
)
assert response.status_code == 200
content = response.json()
assert content["message"] == "Item deleted successfully"
def test_delete_item_not_found(
client: TestClient, superuser_token_headers: dict[str, str]
) -> None:
response = client.delete(
f"{settings.API_V1_STR}/items/{uuid.uuid4()}",
headers=superuser_token_headers,
)
assert response.status_code == 404
content = response.json()
assert content["detail"] == "Item not found"
def test_delete_item_not_enough_permissions(
client: TestClient, normal_user_token_headers: dict[str, str], db: Session
) -> None:
item = create_random_item(db)
response = client.delete(
f"{settings.API_V1_STR}/items/{item.id}",
headers=normal_user_token_headers,
)
assert response.status_code == 403
content = response.json()
assert content["detail"] == "Not enough permissions"

View File

@@ -7,7 +7,7 @@ from sqlmodel import Session, delete
from app.core.config import settings
from app.core.db import engine, init_db
from app.main import app
from app.models import Item, Location, User
from app.models import Location, User
from tests.utils.user import authentication_token_from_email
from tests.utils.utils import get_superuser_token_headers
@@ -17,8 +17,6 @@ def db() -> Generator[Session, None, None]:
with Session(engine) as session:
init_db(session)
yield session
statement = delete(Item)
session.execute(statement)
statement = delete(Location)
session.execute(statement)
statement = delete(User)

View File

@@ -1,16 +0,0 @@
from sqlmodel import Session
from app import crud
from app.models import Item, ItemCreate
from tests.utils.user import create_random_user
from tests.utils.utils import random_lower_string
def create_random_item(db: Session) -> Item:
user = create_random_user(db)
owner_id = user.id
assert owner_id is not None
title = random_lower_string()
description = random_lower_string()
item_in = ItemCreate(title=title, description=description)
return crud.create_item(session=db, item_in=item_in, owner_id=owner_id)