groceries/backend/main.py
lasse 0b42a74fe9 Minor version bump (1.x.0) is appropriate because:
 New functionality added (soft delete system)
 Backward compatible (existing features unchanged)
 Significant enhancement (complete temporal tracking system)
 API additions (new endpoints, parameters)
 UI enhancements (new components, visual indicators)
2025-05-30 09:49:26 +02:00

909 lines
36 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from sqlalchemy import text, func
from typing import List
import models, schemas
from database import engine, get_db
from version import __version__, __app_name__, __description__
# Create database tables
models.Base.metadata.create_all(bind=engine)
app = FastAPI(
title=__app_name__,
description=__description__,
version=__version__
)
# CORS middleware for React frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://localhost:5173"], # React dev servers
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> schemas.ShoppingEventResponse:
"""Build a shopping event response with products from the association table"""
# Get products with their event-specific data including category and brand information
product_data = db.execute(
text("""
SELECT p.id, p.name, p.organic, p.weight, p.weight_unit,
sep.amount, sep.price, sep.discount,
gc.id as category_id, gc.name as category_name,
gc.created_at as category_created_at, gc.updated_at as category_updated_at,
b.id as brand_id, b.name as brand_name,
b.created_at as brand_created_at, b.updated_at as brand_updated_at
FROM products p
JOIN shopping_event_products sep ON p.id = sep.product_id
JOIN grocery_categories gc ON p.category_id = gc.id
LEFT JOIN brands b ON p.brand_id = b.id
WHERE sep.shopping_event_id = :event_id
"""),
{"event_id": event.id}
).fetchall()
# Convert to ProductWithEventData objects
products_with_data = []
for row in product_data:
category = schemas.GroceryCategory(
id=row.category_id,
name=row.category_name,
created_at=row.category_created_at,
updated_at=row.category_updated_at
)
brand = None
if row.brand_id is not None:
brand = schemas.Brand(
id=row.brand_id,
name=row.brand_name,
created_at=row.brand_created_at,
updated_at=row.brand_updated_at
)
products_with_data.append(
schemas.ProductWithEventData(
id=row.id,
name=row.name,
category=category,
brand=brand,
organic=row.organic,
weight=row.weight,
weight_unit=row.weight_unit,
amount=row.amount,
price=row.price,
discount=row.discount
)
)
return schemas.ShoppingEventResponse(
id=event.id,
shop_id=event.shop_id,
date=event.date,
total_amount=event.total_amount,
notes=event.notes,
created_at=event.created_at,
shop=event.shop,
products=products_with_data
)
# Root endpoint
@app.get("/")
def read_root():
return {"message": __app_name__, "version": __version__, "name": "Groceries Tracker Backend"}
# Utility endpoints
@app.get("/current-date")
def get_current_date():
"""Get current date for use as default in valid_from fields"""
from datetime import date
return {"current_date": date.today().isoformat()}
@app.get("/products/available-for-shopping/{shopping_date}", response_model=List[schemas.Product])
def get_products_available_for_shopping(shopping_date: str, db: Session = Depends(get_db)):
"""Get products that were available (not deleted) on a specific shopping date"""
from datetime import datetime
try:
# Parse the shopping date
target_date = datetime.strptime(shopping_date, '%Y-%m-%d').date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD format")
# Get products that were either:
# 1. Never deleted (deleted=False)
# 2. Deleted after the shopping date (valid_from > shopping_date for deleted=True products)
products = db.query(models.Product).filter(
(models.Product.deleted == False) |
((models.Product.deleted == True) & (models.Product.valid_from > target_date))
).all()
return products
# Product endpoints
@app.post("/products/", response_model=schemas.Product)
def create_product(product: schemas.ProductCreate, db: Session = Depends(get_db)):
# Validate category exists
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == product.category_id).first()
if category is None:
raise HTTPException(status_code=404, detail="Category not found")
# Validate brand exists if brand_id is provided
if product.brand_id is not None:
brand = db.query(models.Brand).filter(models.Brand.id == product.brand_id).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
# Validate valid_from date if provided
if product.valid_from is not None:
from datetime import date
if product.valid_from > date.today():
raise HTTPException(status_code=400, detail="Valid from date cannot be in the future")
# Create product data
product_data = product.dict(exclude={'valid_from'})
db_product = models.Product(**product_data)
# Set valid_from if provided, otherwise let database default handle it
if product.valid_from is not None:
db_product.valid_from = product.valid_from
db.add(db_product)
db.commit()
db.refresh(db_product)
return db_product
@app.get("/products/", response_model=List[schemas.Product])
def read_products(skip: int = 0, limit: int = 100, show_deleted: bool = False, db: Session = Depends(get_db)):
query = db.query(models.Product)
if not show_deleted:
query = query.filter(models.Product.deleted == False)
products = query.offset(skip).limit(limit).all()
return products
@app.get("/products/{product_id}", response_model=schemas.Product)
def read_product(product_id: int, db: Session = Depends(get_db)):
product = db.query(models.Product).filter(models.Product.id == product_id).first()
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
return product
@app.get("/products/{product_id}/valid-from")
def get_product_valid_from(product_id: int, db: Session = Depends(get_db)):
"""Get the current valid_from date for a product (used for validation when editing)"""
product = db.query(models.Product).filter(models.Product.id == product_id).first()
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
return {"valid_from": product.valid_from.isoformat()}
@app.put("/products/{product_id}", response_model=schemas.Product)
def update_product(product_id: int, product_update: schemas.ProductUpdate, db: Session = Depends(get_db)):
product = db.query(models.Product).filter(models.Product.id == product_id).first()
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
update_data = product_update.dict(exclude_unset=True, exclude={'valid_from'})
# Validate category exists if category_id is being updated
if 'category_id' in update_data:
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == update_data['category_id']).first()
if category is None:
raise HTTPException(status_code=404, detail="Category not found")
# Validate brand exists if brand_id is being updated
if 'brand_id' in update_data and update_data['brand_id'] is not None:
brand = db.query(models.Brand).filter(models.Brand.id == update_data['brand_id']).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
# Validate valid_from date if provided
if product_update.valid_from is not None:
from datetime import date
if product_update.valid_from > date.today():
raise HTTPException(status_code=400, detail="Valid from date cannot be in the future")
if product_update.valid_from <= product.valid_from:
raise HTTPException(
status_code=400,
detail=f"Valid from date must be after the current product's valid from date ({product.valid_from})"
)
# Check if any versioned fields are actually changing
versioned_fields = ['name', 'category_id', 'brand_id', 'organic', 'weight', 'weight_unit']
has_changes = any(
field in update_data and getattr(product, field) != update_data[field]
for field in versioned_fields
)
# Apply the updates - trigger will handle history creation automatically
for field, value in update_data.items():
setattr(product, field, value)
# Set valid_from if provided for manual versioning
if product_update.valid_from is not None:
product.valid_from = product_update.valid_from
db.commit()
db.refresh(product)
return product
@app.delete("/products/{product_id}")
def delete_product(product_id: int, db: Session = Depends(get_db)):
product = db.query(models.Product).filter(models.Product.id == product_id).first()
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
if product.deleted:
raise HTTPException(status_code=400, detail="Product is already deleted")
from datetime import date
# Simply mark as deleted and set valid_from to today
# The trigger will automatically create the history record
product.deleted = True
product.valid_from = date.today()
product.updated_at = func.now()
db.commit()
return {"message": "Product deleted successfully"}
# Historical Product endpoints
@app.get("/products/{product_id}/history", response_model=List[schemas.ProductHistory])
def get_product_history(product_id: int, db: Session = Depends(get_db)):
"""Get all historical versions of a product"""
# Check if product exists
product = db.query(models.Product).filter(models.Product.id == product_id).first()
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
# Get history from history table
history = db.query(models.ProductHistory).filter(
models.ProductHistory.id == product_id
).order_by(models.ProductHistory.valid_from.desc()).all()
return history
@app.get("/products/{product_id}/at/{date}", response_model=schemas.ProductAtDate)
def get_product_at_date(product_id: int, date: str, db: Session = Depends(get_db)):
"""Get product as it existed at a specific date - CRUCIAL for shopping events"""
from datetime import datetime, date as date_type
try:
# Parse the date string (accept YYYY-MM-DD format)
target_date = datetime.strptime(date, '%Y-%m-%d').date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD format")
# First try current products table
current_product = db.query(models.Product).filter(
models.Product.id == product_id,
models.Product.valid_from <= target_date,
models.Product.valid_to >= target_date
).first()
if current_product:
# Get related data
category = db.query(models.GroceryCategory).filter(
models.GroceryCategory.id == current_product.category_id
).first()
brand = None
if current_product.brand_id:
brand = db.query(models.Brand).filter(
models.Brand.id == current_product.brand_id
).first()
return schemas.ProductAtDate(
id=current_product.id,
name=current_product.name,
category_id=current_product.category_id,
category=category,
brand_id=current_product.brand_id,
brand=brand,
organic=current_product.organic,
weight=current_product.weight,
weight_unit=current_product.weight_unit,
valid_from=current_product.valid_from,
valid_to=current_product.valid_to,
deleted=current_product.deleted,
was_current=True
)
# Try history table
historical_product = db.query(models.ProductHistory).filter(
models.ProductHistory.id == product_id,
models.ProductHistory.valid_from <= target_date,
models.ProductHistory.valid_to >= target_date
).first()
if historical_product:
# Get related data (note: these might have changed too, but we'll use current versions)
category = db.query(models.GroceryCategory).filter(
models.GroceryCategory.id == historical_product.category_id
).first()
brand = None
if historical_product.brand_id:
brand = db.query(models.Brand).filter(
models.Brand.id == historical_product.brand_id
).first()
return schemas.ProductAtDate(
id=historical_product.id,
name=historical_product.name,
category_id=historical_product.category_id,
category=category,
brand_id=historical_product.brand_id,
brand=brand,
organic=historical_product.organic,
weight=historical_product.weight,
weight_unit=historical_product.weight_unit,
valid_from=historical_product.valid_from,
valid_to=historical_product.valid_to,
deleted=historical_product.deleted,
was_current=False
)
# Product didn't exist at that date
raise HTTPException(
status_code=404,
detail=f"Product {product_id} did not exist on {date}"
)
@app.get("/shopping-events/{event_id}/products-as-purchased", response_model=List[schemas.ProductAtPurchase])
def get_shopping_event_products_as_purchased(event_id: int, db: Session = Depends(get_db)):
"""Get products as they were when purchased - shows historical product data"""
# Get the shopping event
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Shopping event not found")
# Get products from association table
products_data = db.execute(
text("""
SELECT sep.product_id, sep.amount, sep.price, sep.discount
FROM shopping_event_products sep
WHERE sep.shopping_event_id = :event_id
"""),
{"event_id": event_id}
).fetchall()
result = []
for product_data in products_data:
# Get product as it was at the time of purchase
try:
# Extract just the date from the shopping event datetime
purchase_date = event.date.date().strftime('%Y-%m-%d')
product_at_purchase = get_product_at_date(
product_data.product_id,
purchase_date,
db
)
result.append(schemas.ProductAtPurchase(
product=product_at_purchase,
amount=product_data.amount,
price=product_data.price,
discount=product_data.discount
))
except HTTPException:
# Product didn't exist at purchase time (shouldn't happen, but handle gracefully)
continue
return result
# Shop endpoints
@app.post("/shops/", response_model=schemas.Shop)
def create_shop(shop: schemas.ShopCreate, db: Session = Depends(get_db)):
db_shop = models.Shop(**shop.dict())
db.add(db_shop)
db.commit()
db.refresh(db_shop)
return db_shop
@app.get("/shops/", response_model=List[schemas.Shop])
def read_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
shops = db.query(models.Shop).offset(skip).limit(limit).all()
return shops
@app.get("/shops/{shop_id}", response_model=schemas.Shop)
def read_shop(shop_id: int, db: Session = Depends(get_db)):
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
if shop is None:
raise HTTPException(status_code=404, detail="Shop not found")
return shop
@app.put("/shops/{shop_id}", response_model=schemas.Shop)
def update_shop(shop_id: int, shop_update: schemas.ShopUpdate, db: Session = Depends(get_db)):
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
if shop is None:
raise HTTPException(status_code=404, detail="Shop not found")
update_data = shop_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(shop, field, value)
db.commit()
db.refresh(shop)
return shop
@app.delete("/shops/{shop_id}")
def delete_shop(shop_id: int, db: Session = Depends(get_db)):
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
if shop is None:
raise HTTPException(status_code=404, detail="Shop not found")
db.delete(shop)
db.commit()
return {"message": "Shop deleted successfully"}
# Brand endpoints
@app.post("/brands/", response_model=schemas.Brand)
def create_brand(brand: schemas.BrandCreate, db: Session = Depends(get_db)):
db_brand = models.Brand(**brand.dict())
db.add(db_brand)
db.commit()
db.refresh(db_brand)
return db_brand
@app.get("/brands/", response_model=List[schemas.Brand])
def read_brands(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
brands = db.query(models.Brand).offset(skip).limit(limit).all()
return brands
@app.get("/brands/{brand_id}", response_model=schemas.Brand)
def read_brand(brand_id: int, db: Session = Depends(get_db)):
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
return brand
@app.put("/brands/{brand_id}", response_model=schemas.Brand)
def update_brand(brand_id: int, brand_update: schemas.BrandUpdate, db: Session = Depends(get_db)):
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
update_data = brand_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(brand, field, value)
db.commit()
db.refresh(brand)
return brand
@app.delete("/brands/{brand_id}")
def delete_brand(brand_id: int, db: Session = Depends(get_db)):
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
# Check if any products reference this brand
products_with_brand = db.query(models.Product).filter(models.Product.brand_id == brand_id).first()
if products_with_brand:
raise HTTPException(
status_code=400,
detail="Cannot delete brand: products are still associated with this brand"
)
db.delete(brand)
db.commit()
return {"message": "Brand deleted successfully"}
# BrandInShop endpoints
@app.post("/brands-in-shops/", response_model=schemas.BrandInShop)
def create_brand_in_shop(brand_in_shop: schemas.BrandInShopCreate, db: Session = Depends(get_db)):
# Validate shop exists
shop = db.query(models.Shop).filter(models.Shop.id == brand_in_shop.shop_id).first()
if shop is None:
raise HTTPException(status_code=404, detail="Shop not found")
# Validate brand exists
brand = db.query(models.Brand).filter(models.Brand.id == brand_in_shop.brand_id).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
# Check if this combination already exists
existing = db.query(models.BrandInShop).filter(
models.BrandInShop.shop_id == brand_in_shop.shop_id,
models.BrandInShop.brand_id == brand_in_shop.brand_id
).first()
if existing:
raise HTTPException(status_code=400, detail="This brand is already associated with this shop")
db_brand_in_shop = models.BrandInShop(**brand_in_shop.dict())
db.add(db_brand_in_shop)
db.commit()
db.refresh(db_brand_in_shop)
return db_brand_in_shop
@app.get("/brands-in-shops/", response_model=List[schemas.BrandInShop])
def read_brands_in_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
brands_in_shops = db.query(models.BrandInShop).offset(skip).limit(limit).all()
return brands_in_shops
@app.get("/brands-in-shops/shop/{shop_id}", response_model=List[schemas.BrandInShop])
def read_brands_in_shop(shop_id: int, db: Session = Depends(get_db)):
# Validate shop exists
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
if shop is None:
raise HTTPException(status_code=404, detail="Shop not found")
brands_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.shop_id == shop_id).all()
return brands_in_shop
@app.get("/brands-in-shops/brand/{brand_id}", response_model=List[schemas.BrandInShop])
def read_shops_with_brand(brand_id: int, db: Session = Depends(get_db)):
# Validate brand exists
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
if brand is None:
raise HTTPException(status_code=404, detail="Brand not found")
shops_with_brand = db.query(models.BrandInShop).filter(models.BrandInShop.brand_id == brand_id).all()
return shops_with_brand
@app.get("/brands-in-shops/{brand_in_shop_id}", response_model=schemas.BrandInShop)
def read_brand_in_shop(brand_in_shop_id: int, db: Session = Depends(get_db)):
brand_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.id == brand_in_shop_id).first()
if brand_in_shop is None:
raise HTTPException(status_code=404, detail="Brand in shop association not found")
return brand_in_shop
@app.delete("/brands-in-shops/{brand_in_shop_id}")
def delete_brand_in_shop(brand_in_shop_id: int, db: Session = Depends(get_db)):
brand_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.id == brand_in_shop_id).first()
if brand_in_shop is None:
raise HTTPException(status_code=404, detail="Brand in shop association not found")
db.delete(brand_in_shop)
db.commit()
return {"message": "Brand in shop association deleted successfully"}
# Grocery Category endpoints
@app.post("/grocery-categories/", response_model=schemas.GroceryCategory)
def create_grocery_category(category: schemas.GroceryCategoryCreate, db: Session = Depends(get_db)):
db_category = models.GroceryCategory(**category.dict())
db.add(db_category)
db.commit()
db.refresh(db_category)
return db_category
@app.get("/grocery-categories/", response_model=List[schemas.GroceryCategory])
def read_grocery_categories(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
categories = db.query(models.GroceryCategory).offset(skip).limit(limit).all()
return categories
@app.get("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory)
def read_grocery_category(category_id: int, db: Session = Depends(get_db)):
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
if category is None:
raise HTTPException(status_code=404, detail="Grocery category not found")
return category
@app.put("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory)
def update_grocery_category(category_id: int, category_update: schemas.GroceryCategoryUpdate, db: Session = Depends(get_db)):
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
if category is None:
raise HTTPException(status_code=404, detail="Grocery category not found")
update_data = category_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(category, field, value)
db.commit()
db.refresh(category)
return category
@app.delete("/grocery-categories/{category_id}")
def delete_grocery_category(category_id: int, db: Session = Depends(get_db)):
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
if category is None:
raise HTTPException(status_code=404, detail="Grocery category not found")
# Check if any products reference this category
products_with_category = db.query(models.Product).filter(models.Product.category_id == category_id).first()
if products_with_category:
raise HTTPException(
status_code=400,
detail="Cannot delete category: products are still associated with this category"
)
db.delete(category)
db.commit()
return {"message": "Grocery category deleted successfully"}
# Shopping Event endpoints
@app.post("/shopping-events/", response_model=schemas.ShoppingEventResponse)
def create_shopping_event(event: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
# Verify shop exists
shop = db.query(models.Shop).filter(models.Shop.id == event.shop_id).first()
if shop is None:
raise HTTPException(status_code=404, detail="Shop not found")
# Create shopping event
db_event = models.ShoppingEvent(
shop_id=event.shop_id,
date=event.date,
total_amount=event.total_amount,
notes=event.notes
)
db.add(db_event)
db.commit()
db.refresh(db_event)
# Add products to the event
for product_item in event.products:
product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first()
if product is None:
raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found")
# Insert into association table
db.execute(
models.shopping_event_products.insert().values(
shopping_event_id=db_event.id,
product_id=product_item.product_id,
amount=product_item.amount,
price=product_item.price,
discount=product_item.discount
)
)
db.commit()
db.refresh(db_event)
return build_shopping_event_response(db_event, db)
@app.get("/shopping-events/", response_model=List[schemas.ShoppingEventResponse])
def read_shopping_events(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
events = db.query(models.ShoppingEvent).order_by(models.ShoppingEvent.created_at.desc()).offset(skip).limit(limit).all()
return [build_shopping_event_response(event, db) for event in events]
@app.get("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse)
def read_shopping_event(event_id: int, db: Session = Depends(get_db)):
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Shopping event not found")
return build_shopping_event_response(event, db)
@app.put("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse)
def update_shopping_event(event_id: int, event_update: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
# Get the existing event
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Shopping event not found")
# Verify shop exists
shop = db.query(models.Shop).filter(models.Shop.id == event_update.shop_id).first()
if shop is None:
raise HTTPException(status_code=404, detail="Shop not found")
# Update the shopping event
event.shop_id = event_update.shop_id
event.date = event_update.date
event.total_amount = event_update.total_amount
event.notes = event_update.notes
# Remove existing product associations
db.execute(
models.shopping_event_products.delete().where(
models.shopping_event_products.c.shopping_event_id == event_id
)
)
# Add new product associations
for product_item in event_update.products:
product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first()
if product is None:
raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found")
# Insert into association table
db.execute(
models.shopping_event_products.insert().values(
shopping_event_id=event_id,
product_id=product_item.product_id,
amount=product_item.amount,
price=product_item.price,
discount=product_item.discount
)
)
db.commit()
db.refresh(event)
return build_shopping_event_response(event, db)
@app.delete("/shopping-events/{event_id}")
def delete_shopping_event(event_id: int, db: Session = Depends(get_db)):
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
if event is None:
raise HTTPException(status_code=404, detail="Shopping event not found")
# Delete product associations first
db.execute(
models.shopping_event_products.delete().where(
models.shopping_event_products.c.shopping_event_id == event_id
)
)
# Delete the shopping event
db.delete(event)
db.commit()
return {"message": "Shopping event deleted successfully"}
# Statistics endpoints
@app.get("/stats/categories", response_model=List[schemas.CategoryStats])
def get_category_stats(db: Session = Depends(get_db)):
# This would need more complex SQL query - placeholder for now
return []
@app.get("/stats/shops", response_model=List[schemas.ShopStats])
def get_shop_stats(db: Session = Depends(get_db)):
# This would need more complex SQL query - placeholder for now
return []
# Related Products endpoints
@app.post("/related-products/", response_model=schemas.RelatedProduct)
def create_related_product(related_product: schemas.RelatedProductCreate, db: Session = Depends(get_db)):
# Validate both products exist
product = db.query(models.Product).filter(models.Product.id == related_product.product_id).first()
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
related = db.query(models.Product).filter(models.Product.id == related_product.related_product_id).first()
if related is None:
raise HTTPException(status_code=404, detail="Related product not found")
# Prevent self-referencing
if related_product.product_id == related_product.related_product_id:
raise HTTPException(status_code=400, detail="A product cannot be related to itself")
# Check if relationship already exists (in either direction)
existing = db.execute(
models.related_products.select().where(
((models.related_products.c.product_id == related_product.product_id) &
(models.related_products.c.related_product_id == related_product.related_product_id)) |
((models.related_products.c.product_id == related_product.related_product_id) &
(models.related_products.c.related_product_id == related_product.product_id))
)
).first()
if existing:
raise HTTPException(status_code=400, detail="Products are already related")
# Insert the relationship
result = db.execute(
models.related_products.insert().values(
product_id=related_product.product_id,
related_product_id=related_product.related_product_id,
relationship_type=related_product.relationship_type
)
)
db.commit()
# Get the created relationship
relationship_id = result.inserted_primary_key[0]
created_relationship = db.execute(
models.related_products.select().where(models.related_products.c.id == relationship_id)
).first()
return schemas.RelatedProduct(
id=created_relationship.id,
product_id=created_relationship.product_id,
related_product_id=created_relationship.related_product_id,
relationship_type=created_relationship.relationship_type,
created_at=created_relationship.created_at
)
@app.get("/related-products/", response_model=List[schemas.RelatedProduct])
def read_related_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
relationships = db.execute(
models.related_products.select().offset(skip).limit(limit)
).fetchall()
return [
schemas.RelatedProduct(
id=rel.id,
product_id=rel.product_id,
related_product_id=rel.related_product_id,
relationship_type=rel.relationship_type,
created_at=rel.created_at
)
for rel in relationships
]
@app.get("/related-products/product/{product_id}", response_model=List[schemas.Product])
def get_related_products_for_product(product_id: int, db: Session = Depends(get_db)):
# Validate product exists
product = db.query(models.Product).filter(models.Product.id == product_id).first()
if product is None:
raise HTTPException(status_code=404, detail="Product not found")
# Get related products (bidirectional)
related_product_ids = db.execute(
text("""
SELECT CASE
WHEN product_id = :product_id THEN related_product_id
ELSE product_id
END as related_id
FROM related_products
WHERE product_id = :product_id OR related_product_id = :product_id
"""),
{"product_id": product_id}
).fetchall()
if not related_product_ids:
return []
# Get the actual product objects
related_ids = [row.related_id for row in related_product_ids]
related_products = db.query(models.Product).filter(models.Product.id.in_(related_ids)).all()
return related_products
@app.get("/related-products/{relationship_id}", response_model=schemas.RelatedProduct)
def read_related_product(relationship_id: int, db: Session = Depends(get_db)):
relationship = db.execute(
models.related_products.select().where(models.related_products.c.id == relationship_id)
).first()
if relationship is None:
raise HTTPException(status_code=404, detail="Related product relationship not found")
return schemas.RelatedProduct(
id=relationship.id,
product_id=relationship.product_id,
related_product_id=relationship.related_product_id,
relationship_type=relationship.relationship_type,
created_at=relationship.created_at
)
@app.put("/related-products/{relationship_id}", response_model=schemas.RelatedProduct)
def update_related_product(relationship_id: int, update_data: schemas.RelatedProductUpdate, db: Session = Depends(get_db)):
# Check if relationship exists
existing = db.execute(
models.related_products.select().where(models.related_products.c.id == relationship_id)
).first()
if existing is None:
raise HTTPException(status_code=404, detail="Related product relationship not found")
# Update the relationship
db.execute(
models.related_products.update().where(models.related_products.c.id == relationship_id).values(
relationship_type=update_data.relationship_type
)
)
db.commit()
# Get the updated relationship
updated_relationship = db.execute(
models.related_products.select().where(models.related_products.c.id == relationship_id)
).first()
return schemas.RelatedProduct(
id=updated_relationship.id,
product_id=updated_relationship.product_id,
related_product_id=updated_relationship.related_product_id,
relationship_type=updated_relationship.relationship_type,
created_at=updated_relationship.created_at
)
@app.delete("/related-products/{relationship_id}")
def delete_related_product(relationship_id: int, db: Session = Depends(get_db)):
# Check if relationship exists
existing = db.execute(
models.related_products.select().where(models.related_products.c.id == relationship_id)
).first()
if existing is None:
raise HTTPException(status_code=404, detail="Related product relationship not found")
# Delete the relationship
db.execute(
models.related_products.delete().where(models.related_products.c.id == relationship_id)
)
db.commit()
return {"message": "Related product relationship deleted successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)