remove intermediate grocery table and add related_products feature

This commit is contained in:
2025-05-28 09:22:47 +02:00
parent 3ea5db4214
commit 112ea41e88
16 changed files with 1140 additions and 1532 deletions

View File

@@ -26,21 +26,18 @@ app.add_middleware(
def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> schemas.ShoppingEventResponse:
"""Build a shopping event response with products from the association table"""
# Get products with their event-specific data including grocery and brand information
# Get products with their event-specific data including category and brand information
product_data = db.execute(
text("""
SELECT p.id, p.name, p.organic, p.weight, p.weight_unit,
sep.amount, sep.price,
g.id as grocery_id, g.name as grocery_name,
g.created_at as grocery_created_at, g.updated_at as grocery_updated_at,
gc.id as category_id, gc.name as category_name,
gc.created_at as category_created_at, gc.updated_at as category_updated_at,
b.id as brand_id, b.name as brand_name,
b.created_at as brand_created_at, b.updated_at as brand_updated_at
FROM products p
JOIN shopping_event_products sep ON p.id = sep.product_id
JOIN groceries g ON p.grocery_id = g.id
JOIN grocery_categories gc ON g.category_id = gc.id
JOIN grocery_categories gc ON p.category_id = gc.id
LEFT JOIN brands b ON p.brand_id = b.id
WHERE sep.shopping_event_id = :event_id
"""),
@@ -57,15 +54,6 @@ def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> s
updated_at=row.category_updated_at
)
grocery = schemas.Grocery(
id=row.grocery_id,
name=row.grocery_name,
category_id=row.category_id,
created_at=row.grocery_created_at,
updated_at=row.grocery_updated_at,
category=category
)
brand = None
if row.brand_id is not None:
brand = schemas.Brand(
@@ -79,7 +67,7 @@ def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> s
schemas.ProductWithEventData(
id=row.id,
name=row.name,
grocery=grocery,
category=category,
brand=brand,
organic=row.organic,
weight=row.weight,
@@ -108,10 +96,10 @@ def read_root():
# Product endpoints
@app.post("/products/", response_model=schemas.Product)
def create_product(product: schemas.ProductCreate, db: Session = Depends(get_db)):
# Validate grocery exists
grocery = db.query(models.Grocery).filter(models.Grocery.id == product.grocery_id).first()
if grocery is None:
raise HTTPException(status_code=404, detail="Grocery not found")
# Validate category exists
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == product.category_id).first()
if category is None:
raise HTTPException(status_code=404, detail="Category not found")
# Validate brand exists if brand_id is provided
if product.brand_id is not None:
@@ -145,11 +133,11 @@ def update_product(product_id: int, product_update: schemas.ProductUpdate, db: S
update_data = product_update.dict(exclude_unset=True)
# Validate grocery exists if grocery_id is being updated
if 'grocery_id' in update_data:
grocery = db.query(models.Grocery).filter(models.Grocery.id == update_data['grocery_id']).first()
if grocery is None:
raise HTTPException(status_code=404, detail="Grocery not found")
# Validate category exists if category_id is being updated
if 'category_id' in update_data:
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == update_data['category_id']).first()
if category is None:
raise HTTPException(status_code=404, detail="Category not found")
# Validate brand exists if brand_id is being updated
if 'brand_id' in update_data and update_data['brand_id'] is not None:
@@ -382,83 +370,18 @@ def delete_grocery_category(category_id: int, db: Session = Depends(get_db)):
if category is None:
raise HTTPException(status_code=404, detail="Grocery category not found")
# Check if any groceries reference this category
groceries_with_category = db.query(models.Grocery).filter(models.Grocery.category_id == category_id).first()
if groceries_with_category:
# Check if any products reference this category
products_with_category = db.query(models.Product).filter(models.Product.category_id == category_id).first()
if products_with_category:
raise HTTPException(
status_code=400,
detail="Cannot delete category: groceries are still associated with this category"
detail="Cannot delete category: products are still associated with this category"
)
db.delete(category)
db.commit()
return {"message": "Grocery category deleted successfully"}
# Grocery endpoints
@app.post("/groceries/", response_model=schemas.Grocery)
def create_grocery(grocery: schemas.GroceryCreate, db: Session = Depends(get_db)):
# Validate category exists
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == grocery.category_id).first()
if category is None:
raise HTTPException(status_code=404, detail="Grocery category not found")
db_grocery = models.Grocery(**grocery.dict())
db.add(db_grocery)
db.commit()
db.refresh(db_grocery)
return db_grocery
@app.get("/groceries/", response_model=List[schemas.Grocery])
def read_groceries(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
groceries = db.query(models.Grocery).offset(skip).limit(limit).all()
return groceries
@app.get("/groceries/{grocery_id}", response_model=schemas.Grocery)
def read_grocery(grocery_id: int, db: Session = Depends(get_db)):
grocery = db.query(models.Grocery).filter(models.Grocery.id == grocery_id).first()
if grocery is None:
raise HTTPException(status_code=404, detail="Grocery not found")
return grocery
@app.put("/groceries/{grocery_id}", response_model=schemas.Grocery)
def update_grocery(grocery_id: int, grocery_update: schemas.GroceryUpdate, db: Session = Depends(get_db)):
grocery = db.query(models.Grocery).filter(models.Grocery.id == grocery_id).first()
if grocery is None:
raise HTTPException(status_code=404, detail="Grocery not found")
update_data = grocery_update.dict(exclude_unset=True)
# Validate category exists if category_id is being updated
if 'category_id' in update_data:
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == update_data['category_id']).first()
if category is None:
raise HTTPException(status_code=404, detail="Grocery category not found")
for field, value in update_data.items():
setattr(grocery, field, value)
db.commit()
db.refresh(grocery)
return grocery
@app.delete("/groceries/{grocery_id}")
def delete_grocery(grocery_id: int, db: Session = Depends(get_db)):
grocery = db.query(models.Grocery).filter(models.Grocery.id == grocery_id).first()
if grocery is None:
raise HTTPException(status_code=404, detail="Grocery not found")
# Check if any products reference this grocery
products_with_grocery = db.query(models.Product).filter(models.Product.grocery_id == grocery_id).first()
if products_with_grocery:
raise HTTPException(
status_code=400,
detail="Cannot delete grocery: products are still associated with this grocery"
)
db.delete(grocery)
db.commit()
return {"message": "Grocery deleted successfully"}
# Shopping Event endpoints
@app.post("/shopping-events/", response_model=schemas.ShoppingEventResponse)
def create_shopping_event(event: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
@@ -584,6 +507,171 @@ def get_shop_stats(db: Session = Depends(get_db)):
# This would need more complex SQL query - placeholder for now
return []
# Related Products endpoints
@app.post("/related-products/", response_model=schemas.RelatedProduct)
def create_related_product(related_product: schemas.RelatedProductCreate, db: Session = Depends(get_db)):
# Validate both products exist
product = db.query(models.Product).filter(models.Product.id == related_product.product_id).first()
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
related = db.query(models.Product).filter(models.Product.id == related_product.related_product_id).first()
if related is None:
raise HTTPException(status_code=404, detail="Related product not found")
# Prevent self-referencing
if related_product.product_id == related_product.related_product_id:
raise HTTPException(status_code=400, detail="A product cannot be related to itself")
# Check if relationship already exists (in either direction)
existing = db.execute(
models.related_products.select().where(
((models.related_products.c.product_id == related_product.product_id) &
(models.related_products.c.related_product_id == related_product.related_product_id)) |
((models.related_products.c.product_id == related_product.related_product_id) &
(models.related_products.c.related_product_id == related_product.product_id))
)
).first()
if existing:
raise HTTPException(status_code=400, detail="Products are already related")
# Insert the relationship
result = db.execute(
models.related_products.insert().values(
product_id=related_product.product_id,
related_product_id=related_product.related_product_id,
relationship_type=related_product.relationship_type
)
)
db.commit()
# Get the created relationship
relationship_id = result.inserted_primary_key[0]
created_relationship = db.execute(
models.related_products.select().where(models.related_products.c.id == relationship_id)
).first()
return schemas.RelatedProduct(
id=created_relationship.id,
product_id=created_relationship.product_id,
related_product_id=created_relationship.related_product_id,
relationship_type=created_relationship.relationship_type,
created_at=created_relationship.created_at
)
@app.get("/related-products/", response_model=List[schemas.RelatedProduct])
def read_related_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
relationships = db.execute(
models.related_products.select().offset(skip).limit(limit)
).fetchall()
return [
schemas.RelatedProduct(
id=rel.id,
product_id=rel.product_id,
related_product_id=rel.related_product_id,
relationship_type=rel.relationship_type,
created_at=rel.created_at
)
for rel in relationships
]
@app.get("/related-products/product/{product_id}", response_model=List[schemas.Product])
def get_related_products_for_product(product_id: int, db: Session = Depends(get_db)):
# Validate product exists
product = db.query(models.Product).filter(models.Product.id == product_id).first()
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
# Get related products (bidirectional)
related_product_ids = db.execute(
text("""
SELECT CASE
WHEN product_id = :product_id THEN related_product_id
ELSE product_id
END as related_id
FROM related_products
WHERE product_id = :product_id OR related_product_id = :product_id
"""),
{"product_id": product_id}
).fetchall()
if not related_product_ids:
return []
# Get the actual product objects
related_ids = [row.related_id for row in related_product_ids]
related_products = db.query(models.Product).filter(models.Product.id.in_(related_ids)).all()
return related_products
@app.get("/related-products/{relationship_id}", response_model=schemas.RelatedProduct)
def read_related_product(relationship_id: int, db: Session = Depends(get_db)):
relationship = db.execute(
models.related_products.select().where(models.related_products.c.id == relationship_id)
).first()
if relationship is None:
raise HTTPException(status_code=404, detail="Related product relationship not found")
return schemas.RelatedProduct(
id=relationship.id,
product_id=relationship.product_id,
related_product_id=relationship.related_product_id,
relationship_type=relationship.relationship_type,
created_at=relationship.created_at
)
@app.put("/related-products/{relationship_id}", response_model=schemas.RelatedProduct)
def update_related_product(relationship_id: int, update_data: schemas.RelatedProductUpdate, db: Session = Depends(get_db)):
# Check if relationship exists
existing = db.execute(
models.related_products.select().where(models.related_products.c.id == relationship_id)
).first()
if existing is None:
raise HTTPException(status_code=404, detail="Related product relationship not found")
# Update the relationship
db.execute(
models.related_products.update().where(models.related_products.c.id == relationship_id).values(
relationship_type=update_data.relationship_type
)
)
db.commit()
# Get the updated relationship
updated_relationship = db.execute(
models.related_products.select().where(models.related_products.c.id == relationship_id)
).first()
return schemas.RelatedProduct(
id=updated_relationship.id,
product_id=updated_relationship.product_id,
related_product_id=updated_relationship.related_product_id,
relationship_type=updated_relationship.relationship_type,
created_at=updated_relationship.created_at
)
@app.delete("/related-products/{relationship_id}")
def delete_related_product(relationship_id: int, db: Session = Depends(get_db)):
# Check if relationship exists
existing = db.execute(
models.related_products.select().where(models.related_products.c.id == relationship_id)
).first()
if existing is None:
raise HTTPException(status_code=404, detail="Related product relationship not found")
# Delete the relationship
db.execute(
models.related_products.delete().where(models.related_products.c.id == relationship_id)
)
db.commit()
return {"message": "Related product relationship deleted successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)