680 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			680 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from fastapi import FastAPI, Depends, HTTPException, status
 | |
| from fastapi.middleware.cors import CORSMiddleware
 | |
| from sqlalchemy.orm import Session
 | |
| from sqlalchemy import text
 | |
| from typing import List
 | |
| import models, schemas
 | |
| from database import engine, get_db
 | |
| 
 | |
| # Create database tables
 | |
| models.Base.metadata.create_all(bind=engine)
 | |
| 
 | |
| app = FastAPI(
 | |
|     title="Product Tracker API",
 | |
|     description="API for tracking product prices and shopping events",
 | |
|     version="1.0.0"
 | |
| )
 | |
| 
 | |
| # CORS middleware for React frontend
 | |
| app.add_middleware(
 | |
|     CORSMiddleware,
 | |
|     allow_origins=["http://localhost:3000", "http://localhost:5173"],  # React dev servers
 | |
|     allow_credentials=True,
 | |
|     allow_methods=["*"],
 | |
|     allow_headers=["*"],
 | |
| )
 | |
| 
 | |
| def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> schemas.ShoppingEventResponse:
 | |
|     """Build a shopping event response with products from the association table"""
 | |
|     # Get products with their event-specific data including category and brand information
 | |
|     product_data = db.execute(
 | |
|         text("""
 | |
|             SELECT p.id, p.name, p.organic, p.weight, p.weight_unit,
 | |
|                    sep.amount, sep.price, sep.discount,
 | |
|                    gc.id as category_id, gc.name as category_name,
 | |
|                    gc.created_at as category_created_at, gc.updated_at as category_updated_at,
 | |
|                    b.id as brand_id, b.name as brand_name,
 | |
|                    b.created_at as brand_created_at, b.updated_at as brand_updated_at
 | |
|             FROM products p
 | |
|             JOIN shopping_event_products sep ON p.id = sep.product_id
 | |
|             JOIN grocery_categories gc ON p.category_id = gc.id
 | |
|             LEFT JOIN brands b ON p.brand_id = b.id
 | |
|             WHERE sep.shopping_event_id = :event_id
 | |
|         """),
 | |
|         {"event_id": event.id}
 | |
|     ).fetchall()
 | |
|     
 | |
|     # Convert to ProductWithEventData objects
 | |
|     products_with_data = []
 | |
|     for row in product_data:
 | |
|         category = schemas.GroceryCategory(
 | |
|             id=row.category_id,
 | |
|             name=row.category_name,
 | |
|             created_at=row.category_created_at,
 | |
|             updated_at=row.category_updated_at
 | |
|         )
 | |
|         
 | |
|         brand = None
 | |
|         if row.brand_id is not None:
 | |
|             brand = schemas.Brand(
 | |
|                 id=row.brand_id,
 | |
|                 name=row.brand_name,
 | |
|                 created_at=row.brand_created_at,
 | |
|                 updated_at=row.brand_updated_at
 | |
|             )
 | |
|         
 | |
|         products_with_data.append(
 | |
|             schemas.ProductWithEventData(
 | |
|                 id=row.id,
 | |
|                 name=row.name,
 | |
|                 category=category,
 | |
|                 brand=brand,
 | |
|                 organic=row.organic,
 | |
|                 weight=row.weight,
 | |
|                 weight_unit=row.weight_unit,
 | |
|                 amount=row.amount,
 | |
|                 price=row.price,
 | |
|                 discount=row.discount
 | |
|             )
 | |
|         )
 | |
|     
 | |
|     return schemas.ShoppingEventResponse(
 | |
|         id=event.id,
 | |
|         shop_id=event.shop_id,
 | |
|         date=event.date,
 | |
|         total_amount=event.total_amount,
 | |
|         notes=event.notes,
 | |
|         created_at=event.created_at,
 | |
|         shop=event.shop,
 | |
|         products=products_with_data
 | |
|     )
 | |
| 
 | |
| # Root endpoint
 | |
| @app.get("/")
 | |
| def read_root():
 | |
|     return {"message": "Product Tracker API", "version": "1.0.0"}
 | |
| 
 | |
| # Product endpoints
 | |
| @app.post("/products/", response_model=schemas.Product)
 | |
| def create_product(product: schemas.ProductCreate, db: Session = Depends(get_db)):
 | |
|     # Validate category exists
 | |
|     category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == product.category_id).first()
 | |
|     if category is None:
 | |
|         raise HTTPException(status_code=404, detail="Category not found")
 | |
|     
 | |
|     # Validate brand exists if brand_id is provided
 | |
|     if product.brand_id is not None:
 | |
|         brand = db.query(models.Brand).filter(models.Brand.id == product.brand_id).first()
 | |
|         if brand is None:
 | |
|             raise HTTPException(status_code=404, detail="Brand not found")
 | |
|     
 | |
|     db_product = models.Product(**product.dict())
 | |
|     db.add(db_product)
 | |
|     db.commit()
 | |
|     db.refresh(db_product)
 | |
|     return db_product
 | |
| 
 | |
| @app.get("/products/", response_model=List[schemas.Product])
 | |
| def read_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
 | |
|     products = db.query(models.Product).offset(skip).limit(limit).all()
 | |
|     return products
 | |
| 
 | |
| @app.get("/products/{product_id}", response_model=schemas.Product)
 | |
| def read_product(product_id: int, db: Session = Depends(get_db)):
 | |
|     product = db.query(models.Product).filter(models.Product.id == product_id).first()
 | |
|     if product is None:
 | |
|         raise HTTPException(status_code=404, detail="Product not found")
 | |
|     return product
 | |
| 
 | |
| @app.put("/products/{product_id}", response_model=schemas.Product)
 | |
| def update_product(product_id: int, product_update: schemas.ProductUpdate, db: Session = Depends(get_db)):
 | |
|     product = db.query(models.Product).filter(models.Product.id == product_id).first()
 | |
|     if product is None:
 | |
|         raise HTTPException(status_code=404, detail="Product not found")
 | |
|     
 | |
|     update_data = product_update.dict(exclude_unset=True)
 | |
|     
 | |
|     # Validate category exists if category_id is being updated
 | |
|     if 'category_id' in update_data:
 | |
|         category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == update_data['category_id']).first()
 | |
|         if category is None:
 | |
|             raise HTTPException(status_code=404, detail="Category not found")
 | |
|     
 | |
|     # Validate brand exists if brand_id is being updated
 | |
|     if 'brand_id' in update_data and update_data['brand_id'] is not None:
 | |
|         brand = db.query(models.Brand).filter(models.Brand.id == update_data['brand_id']).first()
 | |
|         if brand is None:
 | |
|             raise HTTPException(status_code=404, detail="Brand not found")
 | |
|     
 | |
|     for field, value in update_data.items():
 | |
|         setattr(product, field, value)
 | |
|     
 | |
|     db.commit()
 | |
|     db.refresh(product)
 | |
|     return product
 | |
| 
 | |
| @app.delete("/products/{product_id}")
 | |
| def delete_product(product_id: int, db: Session = Depends(get_db)):
 | |
|     product = db.query(models.Product).filter(models.Product.id == product_id).first()
 | |
|     if product is None:
 | |
|         raise HTTPException(status_code=404, detail="Product not found")
 | |
|     
 | |
|     db.delete(product)
 | |
|     db.commit()
 | |
|     return {"message": "Product deleted successfully"}
 | |
| 
 | |
| # Shop endpoints
 | |
| @app.post("/shops/", response_model=schemas.Shop)
 | |
| def create_shop(shop: schemas.ShopCreate, db: Session = Depends(get_db)):
 | |
|     db_shop = models.Shop(**shop.dict())
 | |
|     db.add(db_shop)
 | |
|     db.commit()
 | |
|     db.refresh(db_shop)
 | |
|     return db_shop
 | |
| 
 | |
| @app.get("/shops/", response_model=List[schemas.Shop])
 | |
| def read_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
 | |
|     shops = db.query(models.Shop).offset(skip).limit(limit).all()
 | |
|     return shops
 | |
| 
 | |
| @app.get("/shops/{shop_id}", response_model=schemas.Shop)
 | |
| def read_shop(shop_id: int, db: Session = Depends(get_db)):
 | |
|     shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
 | |
|     if shop is None:
 | |
|         raise HTTPException(status_code=404, detail="Shop not found")
 | |
|     return shop
 | |
| 
 | |
| @app.put("/shops/{shop_id}", response_model=schemas.Shop)
 | |
| def update_shop(shop_id: int, shop_update: schemas.ShopUpdate, db: Session = Depends(get_db)):
 | |
|     shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
 | |
|     if shop is None:
 | |
|         raise HTTPException(status_code=404, detail="Shop not found")
 | |
|     
 | |
|     update_data = shop_update.dict(exclude_unset=True)
 | |
|     for field, value in update_data.items():
 | |
|         setattr(shop, field, value)
 | |
|     
 | |
|     db.commit()
 | |
|     db.refresh(shop)
 | |
|     return shop
 | |
| 
 | |
| @app.delete("/shops/{shop_id}")
 | |
| def delete_shop(shop_id: int, db: Session = Depends(get_db)):
 | |
|     shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
 | |
|     if shop is None:
 | |
|         raise HTTPException(status_code=404, detail="Shop not found")
 | |
|     
 | |
|     db.delete(shop)
 | |
|     db.commit()
 | |
|     return {"message": "Shop deleted successfully"}
 | |
| 
 | |
| # Brand endpoints
 | |
| @app.post("/brands/", response_model=schemas.Brand)
 | |
| def create_brand(brand: schemas.BrandCreate, db: Session = Depends(get_db)):
 | |
|     db_brand = models.Brand(**brand.dict())
 | |
|     db.add(db_brand)
 | |
|     db.commit()
 | |
|     db.refresh(db_brand)
 | |
|     return db_brand
 | |
| 
 | |
| @app.get("/brands/", response_model=List[schemas.Brand])
 | |
| def read_brands(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
 | |
|     brands = db.query(models.Brand).offset(skip).limit(limit).all()
 | |
|     return brands
 | |
| 
 | |
| @app.get("/brands/{brand_id}", response_model=schemas.Brand)
 | |
| def read_brand(brand_id: int, db: Session = Depends(get_db)):
 | |
|     brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
 | |
|     if brand is None:
 | |
|         raise HTTPException(status_code=404, detail="Brand not found")
 | |
|     return brand
 | |
| 
 | |
| @app.put("/brands/{brand_id}", response_model=schemas.Brand)
 | |
| def update_brand(brand_id: int, brand_update: schemas.BrandUpdate, db: Session = Depends(get_db)):
 | |
|     brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
 | |
|     if brand is None:
 | |
|         raise HTTPException(status_code=404, detail="Brand not found")
 | |
|     
 | |
|     update_data = brand_update.dict(exclude_unset=True)
 | |
|     for field, value in update_data.items():
 | |
|         setattr(brand, field, value)
 | |
|     
 | |
|     db.commit()
 | |
|     db.refresh(brand)
 | |
|     return brand
 | |
| 
 | |
| @app.delete("/brands/{brand_id}")
 | |
| def delete_brand(brand_id: int, db: Session = Depends(get_db)):
 | |
|     brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
 | |
|     if brand is None:
 | |
|         raise HTTPException(status_code=404, detail="Brand not found")
 | |
|     
 | |
|     # Check if any products reference this brand
 | |
|     products_with_brand = db.query(models.Product).filter(models.Product.brand_id == brand_id).first()
 | |
|     if products_with_brand:
 | |
|         raise HTTPException(
 | |
|             status_code=400, 
 | |
|             detail="Cannot delete brand: products are still associated with this brand"
 | |
|         )
 | |
|     
 | |
|     db.delete(brand)
 | |
|     db.commit()
 | |
|     return {"message": "Brand deleted successfully"}
 | |
| 
 | |
| # BrandInShop endpoints
 | |
| @app.post("/brands-in-shops/", response_model=schemas.BrandInShop)
 | |
| def create_brand_in_shop(brand_in_shop: schemas.BrandInShopCreate, db: Session = Depends(get_db)):
 | |
|     # Validate shop exists
 | |
|     shop = db.query(models.Shop).filter(models.Shop.id == brand_in_shop.shop_id).first()
 | |
|     if shop is None:
 | |
|         raise HTTPException(status_code=404, detail="Shop not found")
 | |
|     
 | |
|     # Validate brand exists
 | |
|     brand = db.query(models.Brand).filter(models.Brand.id == brand_in_shop.brand_id).first()
 | |
|     if brand is None:
 | |
|         raise HTTPException(status_code=404, detail="Brand not found")
 | |
|     
 | |
|     # Check if this combination already exists
 | |
|     existing = db.query(models.BrandInShop).filter(
 | |
|         models.BrandInShop.shop_id == brand_in_shop.shop_id,
 | |
|         models.BrandInShop.brand_id == brand_in_shop.brand_id
 | |
|     ).first()
 | |
|     if existing:
 | |
|         raise HTTPException(status_code=400, detail="This brand is already associated with this shop")
 | |
|     
 | |
|     db_brand_in_shop = models.BrandInShop(**brand_in_shop.dict())
 | |
|     db.add(db_brand_in_shop)
 | |
|     db.commit()
 | |
|     db.refresh(db_brand_in_shop)
 | |
|     return db_brand_in_shop
 | |
| 
 | |
| @app.get("/brands-in-shops/", response_model=List[schemas.BrandInShop])
 | |
| def read_brands_in_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
 | |
|     brands_in_shops = db.query(models.BrandInShop).offset(skip).limit(limit).all()
 | |
|     return brands_in_shops
 | |
| 
 | |
| @app.get("/brands-in-shops/shop/{shop_id}", response_model=List[schemas.BrandInShop])
 | |
| def read_brands_in_shop(shop_id: int, db: Session = Depends(get_db)):
 | |
|     # Validate shop exists
 | |
|     shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
 | |
|     if shop is None:
 | |
|         raise HTTPException(status_code=404, detail="Shop not found")
 | |
|     
 | |
|     brands_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.shop_id == shop_id).all()
 | |
|     return brands_in_shop
 | |
| 
 | |
| @app.get("/brands-in-shops/brand/{brand_id}", response_model=List[schemas.BrandInShop])
 | |
| def read_shops_with_brand(brand_id: int, db: Session = Depends(get_db)):
 | |
|     # Validate brand exists
 | |
|     brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
 | |
|     if brand is None:
 | |
|         raise HTTPException(status_code=404, detail="Brand not found")
 | |
|     
 | |
|     shops_with_brand = db.query(models.BrandInShop).filter(models.BrandInShop.brand_id == brand_id).all()
 | |
|     return shops_with_brand
 | |
| 
 | |
| @app.get("/brands-in-shops/{brand_in_shop_id}", response_model=schemas.BrandInShop)
 | |
| def read_brand_in_shop(brand_in_shop_id: int, db: Session = Depends(get_db)):
 | |
|     brand_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.id == brand_in_shop_id).first()
 | |
|     if brand_in_shop is None:
 | |
|         raise HTTPException(status_code=404, detail="Brand in shop association not found")
 | |
|     return brand_in_shop
 | |
| 
 | |
| @app.delete("/brands-in-shops/{brand_in_shop_id}")
 | |
| def delete_brand_in_shop(brand_in_shop_id: int, db: Session = Depends(get_db)):
 | |
|     brand_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.id == brand_in_shop_id).first()
 | |
|     if brand_in_shop is None:
 | |
|         raise HTTPException(status_code=404, detail="Brand in shop association not found")
 | |
|     
 | |
|     db.delete(brand_in_shop)
 | |
|     db.commit()
 | |
|     return {"message": "Brand in shop association deleted successfully"}
 | |
| 
 | |
| # Grocery Category endpoints
 | |
| @app.post("/grocery-categories/", response_model=schemas.GroceryCategory)
 | |
| def create_grocery_category(category: schemas.GroceryCategoryCreate, db: Session = Depends(get_db)):
 | |
|     db_category = models.GroceryCategory(**category.dict())
 | |
|     db.add(db_category)
 | |
|     db.commit()
 | |
|     db.refresh(db_category)
 | |
|     return db_category
 | |
| 
 | |
| @app.get("/grocery-categories/", response_model=List[schemas.GroceryCategory])
 | |
| def read_grocery_categories(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
 | |
|     categories = db.query(models.GroceryCategory).offset(skip).limit(limit).all()
 | |
|     return categories
 | |
| 
 | |
| @app.get("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory)
 | |
| def read_grocery_category(category_id: int, db: Session = Depends(get_db)):
 | |
|     category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
 | |
|     if category is None:
 | |
|         raise HTTPException(status_code=404, detail="Grocery category not found")
 | |
|     return category
 | |
| 
 | |
| @app.put("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory)
 | |
| def update_grocery_category(category_id: int, category_update: schemas.GroceryCategoryUpdate, db: Session = Depends(get_db)):
 | |
|     category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
 | |
|     if category is None:
 | |
|         raise HTTPException(status_code=404, detail="Grocery category not found")
 | |
|     
 | |
|     update_data = category_update.dict(exclude_unset=True)
 | |
|     for field, value in update_data.items():
 | |
|         setattr(category, field, value)
 | |
|     
 | |
|     db.commit()
 | |
|     db.refresh(category)
 | |
|     return category
 | |
| 
 | |
| @app.delete("/grocery-categories/{category_id}")
 | |
| def delete_grocery_category(category_id: int, db: Session = Depends(get_db)):
 | |
|     category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
 | |
|     if category is None:
 | |
|         raise HTTPException(status_code=404, detail="Grocery category not found")
 | |
|     
 | |
|     # Check if any products reference this category
 | |
|     products_with_category = db.query(models.Product).filter(models.Product.category_id == category_id).first()
 | |
|     if products_with_category:
 | |
|         raise HTTPException(
 | |
|             status_code=400, 
 | |
|             detail="Cannot delete category: products are still associated with this category"
 | |
|         )
 | |
|     
 | |
|     db.delete(category)
 | |
|     db.commit()
 | |
|     return {"message": "Grocery category deleted successfully"}
 | |
| 
 | |
| # Shopping Event endpoints
 | |
| @app.post("/shopping-events/", response_model=schemas.ShoppingEventResponse)
 | |
| def create_shopping_event(event: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
 | |
|     # Verify shop exists
 | |
|     shop = db.query(models.Shop).filter(models.Shop.id == event.shop_id).first()
 | |
|     if shop is None:
 | |
|         raise HTTPException(status_code=404, detail="Shop not found")
 | |
|     
 | |
|     # Create shopping event
 | |
|     db_event = models.ShoppingEvent(
 | |
|         shop_id=event.shop_id,
 | |
|         date=event.date,
 | |
|         total_amount=event.total_amount,
 | |
|         notes=event.notes
 | |
|     )
 | |
|     db.add(db_event)
 | |
|     db.commit()
 | |
|     db.refresh(db_event)
 | |
|     
 | |
|     # Add products to the event
 | |
|     for product_item in event.products:
 | |
|         product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first()
 | |
|         if product is None:
 | |
|             raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found")
 | |
|         
 | |
|         # Insert into association table
 | |
|         db.execute(
 | |
|             models.shopping_event_products.insert().values(
 | |
|                 shopping_event_id=db_event.id,
 | |
|                 product_id=product_item.product_id,
 | |
|                 amount=product_item.amount,
 | |
|                 price=product_item.price,
 | |
|                 discount=product_item.discount
 | |
|             )
 | |
|         )
 | |
|     
 | |
|     db.commit()
 | |
|     db.refresh(db_event)
 | |
|     return build_shopping_event_response(db_event, db)
 | |
| 
 | |
| @app.get("/shopping-events/", response_model=List[schemas.ShoppingEventResponse])
 | |
| def read_shopping_events(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
 | |
|     events = db.query(models.ShoppingEvent).order_by(models.ShoppingEvent.created_at.desc()).offset(skip).limit(limit).all()
 | |
|     return [build_shopping_event_response(event, db) for event in events]
 | |
| 
 | |
| @app.get("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse)
 | |
| def read_shopping_event(event_id: int, db: Session = Depends(get_db)):
 | |
|     event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
 | |
|     if event is None:
 | |
|         raise HTTPException(status_code=404, detail="Shopping event not found")
 | |
|     return build_shopping_event_response(event, db)
 | |
| 
 | |
| @app.put("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse)
 | |
| def update_shopping_event(event_id: int, event_update: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
 | |
|     # Get the existing event
 | |
|     event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
 | |
|     if event is None:
 | |
|         raise HTTPException(status_code=404, detail="Shopping event not found")
 | |
|     
 | |
|     # Verify shop exists
 | |
|     shop = db.query(models.Shop).filter(models.Shop.id == event_update.shop_id).first()
 | |
|     if shop is None:
 | |
|         raise HTTPException(status_code=404, detail="Shop not found")
 | |
|     
 | |
|     # Update the shopping event
 | |
|     event.shop_id = event_update.shop_id
 | |
|     event.date = event_update.date
 | |
|     event.total_amount = event_update.total_amount
 | |
|     event.notes = event_update.notes
 | |
|     
 | |
|     # Remove existing product associations
 | |
|     db.execute(
 | |
|         models.shopping_event_products.delete().where(
 | |
|             models.shopping_event_products.c.shopping_event_id == event_id
 | |
|         )
 | |
|     )
 | |
|     
 | |
|     # Add new product associations
 | |
|     for product_item in event_update.products:
 | |
|         product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first()
 | |
|         if product is None:
 | |
|             raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found")
 | |
|         
 | |
|         # Insert into association table
 | |
|         db.execute(
 | |
|             models.shopping_event_products.insert().values(
 | |
|                 shopping_event_id=event_id,
 | |
|                 product_id=product_item.product_id,
 | |
|                 amount=product_item.amount,
 | |
|                 price=product_item.price,
 | |
|                 discount=product_item.discount
 | |
|             )
 | |
|         )
 | |
|     
 | |
|     db.commit()
 | |
|     db.refresh(event)
 | |
|     return build_shopping_event_response(event, db)
 | |
| 
 | |
| @app.delete("/shopping-events/{event_id}")
 | |
| def delete_shopping_event(event_id: int, db: Session = Depends(get_db)):
 | |
|     event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
 | |
|     if event is None:
 | |
|         raise HTTPException(status_code=404, detail="Shopping event not found")
 | |
|     
 | |
|     # Delete product associations first
 | |
|     db.execute(
 | |
|         models.shopping_event_products.delete().where(
 | |
|             models.shopping_event_products.c.shopping_event_id == event_id
 | |
|         )
 | |
|     )
 | |
|     
 | |
|     # Delete the shopping event
 | |
|     db.delete(event)
 | |
|     db.commit()
 | |
|     return {"message": "Shopping event deleted successfully"}
 | |
| 
 | |
| # Statistics endpoints
 | |
| @app.get("/stats/categories", response_model=List[schemas.CategoryStats])
 | |
| def get_category_stats(db: Session = Depends(get_db)):
 | |
|     # This would need more complex SQL query - placeholder for now
 | |
|     return []
 | |
| 
 | |
| @app.get("/stats/shops", response_model=List[schemas.ShopStats])
 | |
| def get_shop_stats(db: Session = Depends(get_db)):
 | |
|     # This would need more complex SQL query - placeholder for now
 | |
|     return []
 | |
| 
 | |
| # Related Products endpoints
 | |
| @app.post("/related-products/", response_model=schemas.RelatedProduct)
 | |
| def create_related_product(related_product: schemas.RelatedProductCreate, db: Session = Depends(get_db)):
 | |
|     # Validate both products exist
 | |
|     product = db.query(models.Product).filter(models.Product.id == related_product.product_id).first()
 | |
|     if product is None:
 | |
|         raise HTTPException(status_code=404, detail="Product not found")
 | |
|     
 | |
|     related = db.query(models.Product).filter(models.Product.id == related_product.related_product_id).first()
 | |
|     if related is None:
 | |
|         raise HTTPException(status_code=404, detail="Related product not found")
 | |
|     
 | |
|     # Prevent self-referencing
 | |
|     if related_product.product_id == related_product.related_product_id:
 | |
|         raise HTTPException(status_code=400, detail="A product cannot be related to itself")
 | |
|     
 | |
|     # Check if relationship already exists (in either direction)
 | |
|     existing = db.execute(
 | |
|         models.related_products.select().where(
 | |
|             ((models.related_products.c.product_id == related_product.product_id) & 
 | |
|              (models.related_products.c.related_product_id == related_product.related_product_id)) |
 | |
|             ((models.related_products.c.product_id == related_product.related_product_id) & 
 | |
|              (models.related_products.c.related_product_id == related_product.product_id))
 | |
|         )
 | |
|     ).first()
 | |
|     
 | |
|     if existing:
 | |
|         raise HTTPException(status_code=400, detail="Products are already related")
 | |
|     
 | |
|     # Insert the relationship
 | |
|     result = db.execute(
 | |
|         models.related_products.insert().values(
 | |
|             product_id=related_product.product_id,
 | |
|             related_product_id=related_product.related_product_id,
 | |
|             relationship_type=related_product.relationship_type
 | |
|         )
 | |
|     )
 | |
|     db.commit()
 | |
|     
 | |
|     # Get the created relationship
 | |
|     relationship_id = result.inserted_primary_key[0]
 | |
|     created_relationship = db.execute(
 | |
|         models.related_products.select().where(models.related_products.c.id == relationship_id)
 | |
|     ).first()
 | |
|     
 | |
|     return schemas.RelatedProduct(
 | |
|         id=created_relationship.id,
 | |
|         product_id=created_relationship.product_id,
 | |
|         related_product_id=created_relationship.related_product_id,
 | |
|         relationship_type=created_relationship.relationship_type,
 | |
|         created_at=created_relationship.created_at
 | |
|     )
 | |
| 
 | |
| @app.get("/related-products/", response_model=List[schemas.RelatedProduct])
 | |
| def read_related_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
 | |
|     relationships = db.execute(
 | |
|         models.related_products.select().offset(skip).limit(limit)
 | |
|     ).fetchall()
 | |
|     
 | |
|     return [
 | |
|         schemas.RelatedProduct(
 | |
|             id=rel.id,
 | |
|             product_id=rel.product_id,
 | |
|             related_product_id=rel.related_product_id,
 | |
|             relationship_type=rel.relationship_type,
 | |
|             created_at=rel.created_at
 | |
|         )
 | |
|         for rel in relationships
 | |
|     ]
 | |
| 
 | |
| @app.get("/related-products/product/{product_id}", response_model=List[schemas.Product])
 | |
| def get_related_products_for_product(product_id: int, db: Session = Depends(get_db)):
 | |
|     # Validate product exists
 | |
|     product = db.query(models.Product).filter(models.Product.id == product_id).first()
 | |
|     if product is None:
 | |
|         raise HTTPException(status_code=404, detail="Product not found")
 | |
|     
 | |
|     # Get related products (bidirectional)
 | |
|     related_product_ids = db.execute(
 | |
|         text("""
 | |
|             SELECT CASE 
 | |
|                 WHEN product_id = :product_id THEN related_product_id
 | |
|                 ELSE product_id
 | |
|             END as related_id
 | |
|             FROM related_products 
 | |
|             WHERE product_id = :product_id OR related_product_id = :product_id
 | |
|         """),
 | |
|         {"product_id": product_id}
 | |
|     ).fetchall()
 | |
|     
 | |
|     if not related_product_ids:
 | |
|         return []
 | |
|     
 | |
|     # Get the actual product objects
 | |
|     related_ids = [row.related_id for row in related_product_ids]
 | |
|     related_products = db.query(models.Product).filter(models.Product.id.in_(related_ids)).all()
 | |
|     
 | |
|     return related_products
 | |
| 
 | |
| @app.get("/related-products/{relationship_id}", response_model=schemas.RelatedProduct)
 | |
| def read_related_product(relationship_id: int, db: Session = Depends(get_db)):
 | |
|     relationship = db.execute(
 | |
|         models.related_products.select().where(models.related_products.c.id == relationship_id)
 | |
|     ).first()
 | |
|     
 | |
|     if relationship is None:
 | |
|         raise HTTPException(status_code=404, detail="Related product relationship not found")
 | |
|     
 | |
|     return schemas.RelatedProduct(
 | |
|         id=relationship.id,
 | |
|         product_id=relationship.product_id,
 | |
|         related_product_id=relationship.related_product_id,
 | |
|         relationship_type=relationship.relationship_type,
 | |
|         created_at=relationship.created_at
 | |
|     )
 | |
| 
 | |
| @app.put("/related-products/{relationship_id}", response_model=schemas.RelatedProduct)
 | |
| def update_related_product(relationship_id: int, update_data: schemas.RelatedProductUpdate, db: Session = Depends(get_db)):
 | |
|     # Check if relationship exists
 | |
|     existing = db.execute(
 | |
|         models.related_products.select().where(models.related_products.c.id == relationship_id)
 | |
|     ).first()
 | |
|     
 | |
|     if existing is None:
 | |
|         raise HTTPException(status_code=404, detail="Related product relationship not found")
 | |
|     
 | |
|     # Update the relationship
 | |
|     db.execute(
 | |
|         models.related_products.update().where(models.related_products.c.id == relationship_id).values(
 | |
|             relationship_type=update_data.relationship_type
 | |
|         )
 | |
|     )
 | |
|     db.commit()
 | |
|     
 | |
|     # Get the updated relationship
 | |
|     updated_relationship = db.execute(
 | |
|         models.related_products.select().where(models.related_products.c.id == relationship_id)
 | |
|     ).first()
 | |
|     
 | |
|     return schemas.RelatedProduct(
 | |
|         id=updated_relationship.id,
 | |
|         product_id=updated_relationship.product_id,
 | |
|         related_product_id=updated_relationship.related_product_id,
 | |
|         relationship_type=updated_relationship.relationship_type,
 | |
|         created_at=updated_relationship.created_at
 | |
|     )
 | |
| 
 | |
| @app.delete("/related-products/{relationship_id}")
 | |
| def delete_related_product(relationship_id: int, db: Session = Depends(get_db)):
 | |
|     # Check if relationship exists
 | |
|     existing = db.execute(
 | |
|         models.related_products.select().where(models.related_products.c.id == relationship_id)
 | |
|     ).first()
 | |
|     
 | |
|     if existing is None:
 | |
|         raise HTTPException(status_code=404, detail="Related product relationship not found")
 | |
|     
 | |
|     # Delete the relationship
 | |
|     db.execute(
 | |
|         models.related_products.delete().where(models.related_products.c.id == relationship_id)
 | |
|     )
 | |
|     db.commit()
 | |
|     
 | |
|     return {"message": "Related product relationship deleted successfully"}
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     import uvicorn
 | |
|     uvicorn.run(app, host="0.0.0.0", port=8000)  |