add Brand Management

This commit is contained in:
2025-05-26 20:44:15 +02:00
parent d27871160e
commit 25c09dfecc
11 changed files with 548 additions and 21 deletions

View File

@@ -26,32 +26,45 @@ app.add_middleware(
def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> schemas.ShoppingEventResponse:
"""Build a shopping event response with products from the association table"""
# Get products with their event-specific data
# Get products with their event-specific data including brand information
product_data = db.execute(
text("""
SELECT p.id, p.name, p.category, p.organic, p.weight, p.weight_unit,
sep.amount, sep.price
sep.amount, sep.price, b.id as brand_id, b.name as brand_name,
b.created_at as brand_created_at, b.updated_at as brand_updated_at
FROM products p
JOIN shopping_event_products sep ON p.id = sep.product_id
LEFT JOIN brands b ON p.brand_id = b.id
WHERE sep.shopping_event_id = :event_id
"""),
{"event_id": event.id}
).fetchall()
# Convert to ProductWithEventData objects
products_with_data = [
schemas.ProductWithEventData(
id=row.id,
name=row.name,
category=row.category,
organic=row.organic,
weight=row.weight,
weight_unit=row.weight_unit,
amount=row.amount,
price=row.price
products_with_data = []
for row in product_data:
brand = None
if row.brand_id is not None:
brand = schemas.Brand(
id=row.brand_id,
name=row.brand_name,
created_at=row.brand_created_at,
updated_at=row.brand_updated_at
)
products_with_data.append(
schemas.ProductWithEventData(
id=row.id,
name=row.name,
category=row.category,
brand=brand,
organic=row.organic,
weight=row.weight,
weight_unit=row.weight_unit,
amount=row.amount,
price=row.price
)
)
for row in product_data
]
return schemas.ShoppingEventResponse(
id=event.id,
@@ -72,6 +85,12 @@ def read_root():
# Product endpoints
@app.post("/products/", response_model=schemas.Product)
def create_product(product: schemas.ProductCreate, db: Session = Depends(get_db)):
# Validate brand exists if brand_id is provided
if product.brand_id is not None:
brand = db.query(models.Brand).filter(models.Brand.id == product.brand_id).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
db_product = models.Product(**product.dict())
db.add(db_product)
db.commit()
@@ -96,7 +115,13 @@ def update_product(product_id: int, product_update: schemas.ProductUpdate, db: S
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
# Validate brand exists if brand_id is being updated
update_data = product_update.dict(exclude_unset=True)
if 'brand_id' in update_data and update_data['brand_id'] is not None:
brand = db.query(models.Brand).filter(models.Brand.id == update_data['brand_id']).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
for field, value in update_data.items():
setattr(product, field, value)
@@ -159,6 +184,59 @@ def delete_shop(shop_id: int, db: Session = Depends(get_db)):
db.commit()
return {"message": "Shop deleted successfully"}
# Brand endpoints
@app.post("/brands/", response_model=schemas.Brand)
def create_brand(brand: schemas.BrandCreate, db: Session = Depends(get_db)):
db_brand = models.Brand(**brand.dict())
db.add(db_brand)
db.commit()
db.refresh(db_brand)
return db_brand
@app.get("/brands/", response_model=List[schemas.Brand])
def read_brands(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
brands = db.query(models.Brand).offset(skip).limit(limit).all()
return brands
@app.get("/brands/{brand_id}", response_model=schemas.Brand)
def read_brand(brand_id: int, db: Session = Depends(get_db)):
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
return brand
@app.put("/brands/{brand_id}", response_model=schemas.Brand)
def update_brand(brand_id: int, brand_update: schemas.BrandUpdate, db: Session = Depends(get_db)):
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
update_data = brand_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(brand, field, value)
db.commit()
db.refresh(brand)
return brand
@app.delete("/brands/{brand_id}")
def delete_brand(brand_id: int, db: Session = Depends(get_db)):
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
# Check if any products reference this brand
products_with_brand = db.query(models.Product).filter(models.Product.brand_id == brand_id).first()
if products_with_brand:
raise HTTPException(
status_code=400,
detail="Cannot delete brand: products are still associated with this brand"
)
db.delete(brand)
db.commit()
return {"message": "Brand deleted successfully"}
# Shopping Event endpoints
@app.post("/shopping-events/", response_model=schemas.ShoppingEventResponse)
def create_shopping_event(event: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):