from fastapi import FastAPI, Depends, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from sqlalchemy.orm import Session from sqlalchemy import text, func from typing import List import models, schemas from database import engine, get_db from version import __version__, __app_name__, __description__ # Create database tables models.Base.metadata.create_all(bind=engine) app = FastAPI( title=__app_name__, description=__description__, version=__version__ ) # CORS middleware for React frontend app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000", "http://localhost:5173"], # React dev servers allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> schemas.ShoppingEventResponse: """Build a shopping event response with products from the association table""" # Get products with their event-specific data including category and brand information product_data = db.execute( text(""" SELECT p.id, p.name, p.organic, p.weight, p.weight_unit, sep.amount, sep.price, sep.discount, gc.id as category_id, gc.name as category_name, gc.created_at as category_created_at, gc.updated_at as category_updated_at, b.id as brand_id, b.name as brand_name, b.created_at as brand_created_at, b.updated_at as brand_updated_at FROM products p JOIN shopping_event_products sep ON p.id = sep.product_id JOIN grocery_categories gc ON p.category_id = gc.id LEFT JOIN brands b ON p.brand_id = b.id WHERE sep.shopping_event_id = :event_id """), {"event_id": event.id} ).fetchall() # Convert to ProductWithEventData objects products_with_data = [] for row in product_data: category = schemas.GroceryCategory( id=row.category_id, name=row.category_name, created_at=row.category_created_at, updated_at=row.category_updated_at ) brand = None if row.brand_id is not None: brand = schemas.Brand( id=row.brand_id, name=row.brand_name, created_at=row.brand_created_at, updated_at=row.brand_updated_at ) products_with_data.append( schemas.ProductWithEventData( id=row.id, name=row.name, category=category, brand=brand, organic=row.organic, weight=row.weight, weight_unit=row.weight_unit, amount=row.amount, price=row.price, discount=row.discount ) ) return schemas.ShoppingEventResponse( id=event.id, shop_id=event.shop_id, date=event.date, total_amount=event.total_amount, notes=event.notes, created_at=event.created_at, shop=event.shop, products=products_with_data ) # Root endpoint @app.get("/") def read_root(): return {"message": __app_name__, "version": __version__, "name": "Groceries Tracker Backend"} # Utility endpoints @app.get("/current-date") def get_current_date(): """Get current date for use as default in valid_from fields""" from datetime import date return {"current_date": date.today().isoformat()} @app.get("/products/available-for-shopping/{shopping_date}", response_model=List[schemas.Product]) def get_products_available_for_shopping(shopping_date: str, db: Session = Depends(get_db)): """Get products that were available (not deleted) on a specific shopping date""" from datetime import datetime try: # Parse the shopping date target_date = datetime.strptime(shopping_date, '%Y-%m-%d').date() except ValueError: raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD format") # Get products that were either: # 1. Never deleted (deleted=False) # 2. Deleted after the shopping date (valid_from > shopping_date for deleted=True products) products = db.query(models.Product).filter( (models.Product.deleted == False) | ((models.Product.deleted == True) & (models.Product.valid_from > target_date)) ).all() return products # Product endpoints @app.post("/products/", response_model=schemas.Product) def create_product(product: schemas.ProductCreate, db: Session = Depends(get_db)): # Validate category exists category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == product.category_id).first() if category is None: raise HTTPException(status_code=404, detail="Category not found") # Validate brand exists if brand_id is provided if product.brand_id is not None: brand = db.query(models.Brand).filter(models.Brand.id == product.brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") # Validate valid_from date if provided if product.valid_from is not None: from datetime import date if product.valid_from > date.today(): raise HTTPException(status_code=400, detail="Valid from date cannot be in the future") # Create product data product_data = product.dict(exclude={'valid_from'}) db_product = models.Product(**product_data) # Set valid_from if provided, otherwise let database default handle it if product.valid_from is not None: db_product.valid_from = product.valid_from db.add(db_product) db.commit() db.refresh(db_product) return db_product @app.get("/products/", response_model=List[schemas.Product]) def read_products(skip: int = 0, limit: int = 100, show_deleted: bool = False, db: Session = Depends(get_db)): query = db.query(models.Product) if not show_deleted: query = query.filter(models.Product.deleted == False) products = query.offset(skip).limit(limit).all() return products @app.get("/products/{product_id}", response_model=schemas.Product) def read_product(product_id: int, db: Session = Depends(get_db)): product = db.query(models.Product).filter(models.Product.id == product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") return product @app.get("/products/{product_id}/valid-from") def get_product_valid_from(product_id: int, db: Session = Depends(get_db)): """Get the current valid_from date for a product (used for validation when editing)""" product = db.query(models.Product).filter(models.Product.id == product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") return {"valid_from": product.valid_from.isoformat()} @app.put("/products/{product_id}", response_model=schemas.Product) def update_product(product_id: int, product_update: schemas.ProductUpdate, db: Session = Depends(get_db)): product = db.query(models.Product).filter(models.Product.id == product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") update_data = product_update.dict(exclude_unset=True, exclude={'valid_from'}) # Validate category exists if category_id is being updated if 'category_id' in update_data: category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == update_data['category_id']).first() if category is None: raise HTTPException(status_code=404, detail="Category not found") # Validate brand exists if brand_id is being updated if 'brand_id' in update_data and update_data['brand_id'] is not None: brand = db.query(models.Brand).filter(models.Brand.id == update_data['brand_id']).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") # Validate valid_from date if provided if product_update.valid_from is not None: from datetime import date if product_update.valid_from > date.today(): raise HTTPException(status_code=400, detail="Valid from date cannot be in the future") if product_update.valid_from <= product.valid_from: raise HTTPException( status_code=400, detail=f"Valid from date must be after the current product's valid from date ({product.valid_from})" ) # Check if any versioned fields are actually changing versioned_fields = ['name', 'category_id', 'brand_id', 'organic', 'weight', 'weight_unit'] has_changes = any( field in update_data and getattr(product, field) != update_data[field] for field in versioned_fields ) # Apply the updates - trigger will handle history creation automatically for field, value in update_data.items(): setattr(product, field, value) # Set valid_from if provided for manual versioning if product_update.valid_from is not None: product.valid_from = product_update.valid_from db.commit() db.refresh(product) return product @app.delete("/products/{product_id}") def delete_product(product_id: int, db: Session = Depends(get_db)): product = db.query(models.Product).filter(models.Product.id == product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") if product.deleted: raise HTTPException(status_code=400, detail="Product is already deleted") from datetime import date # Simply mark as deleted and set valid_from to today # The trigger will automatically create the history record product.deleted = True product.valid_from = date.today() product.updated_at = func.now() db.commit() return {"message": "Product deleted successfully"} # Historical Product endpoints @app.get("/products/{product_id}/history", response_model=List[schemas.ProductHistory]) def get_product_history(product_id: int, db: Session = Depends(get_db)): """Get all historical versions of a product""" # Check if product exists product = db.query(models.Product).filter(models.Product.id == product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") # Get history from history table history = db.query(models.ProductHistory).filter( models.ProductHistory.id == product_id ).order_by(models.ProductHistory.valid_from.desc()).all() return history @app.get("/products/{product_id}/at/{date}", response_model=schemas.ProductAtDate) def get_product_at_date(product_id: int, date: str, db: Session = Depends(get_db)): """Get product as it existed at a specific date - CRUCIAL for shopping events""" from datetime import datetime, date as date_type try: # Parse the date string (accept YYYY-MM-DD format) target_date = datetime.strptime(date, '%Y-%m-%d').date() except ValueError: raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD format") # First try current products table current_product = db.query(models.Product).filter( models.Product.id == product_id, models.Product.valid_from <= target_date, models.Product.valid_to >= target_date ).first() if current_product: # Get related data category = db.query(models.GroceryCategory).filter( models.GroceryCategory.id == current_product.category_id ).first() brand = None if current_product.brand_id: brand = db.query(models.Brand).filter( models.Brand.id == current_product.brand_id ).first() return schemas.ProductAtDate( id=current_product.id, name=current_product.name, category_id=current_product.category_id, category=category, brand_id=current_product.brand_id, brand=brand, organic=current_product.organic, weight=current_product.weight, weight_unit=current_product.weight_unit, valid_from=current_product.valid_from, valid_to=current_product.valid_to, deleted=current_product.deleted, was_current=True ) # Try history table historical_product = db.query(models.ProductHistory).filter( models.ProductHistory.id == product_id, models.ProductHistory.valid_from <= target_date, models.ProductHistory.valid_to >= target_date ).first() if historical_product: # Get related data (note: these might have changed too, but we'll use current versions) category = db.query(models.GroceryCategory).filter( models.GroceryCategory.id == historical_product.category_id ).first() brand = None if historical_product.brand_id: brand = db.query(models.Brand).filter( models.Brand.id == historical_product.brand_id ).first() return schemas.ProductAtDate( id=historical_product.id, name=historical_product.name, category_id=historical_product.category_id, category=category, brand_id=historical_product.brand_id, brand=brand, organic=historical_product.organic, weight=historical_product.weight, weight_unit=historical_product.weight_unit, valid_from=historical_product.valid_from, valid_to=historical_product.valid_to, deleted=historical_product.deleted, was_current=False ) # Product didn't exist at that date raise HTTPException( status_code=404, detail=f"Product {product_id} did not exist on {date}" ) @app.get("/shopping-events/{event_id}/products-as-purchased", response_model=List[schemas.ProductAtPurchase]) def get_shopping_event_products_as_purchased(event_id: int, db: Session = Depends(get_db)): """Get products as they were when purchased - shows historical product data""" # Get the shopping event event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first() if event is None: raise HTTPException(status_code=404, detail="Shopping event not found") # Get products from association table products_data = db.execute( text(""" SELECT sep.product_id, sep.amount, sep.price, sep.discount FROM shopping_event_products sep WHERE sep.shopping_event_id = :event_id """), {"event_id": event_id} ).fetchall() result = [] for product_data in products_data: # Get product as it was at the time of purchase try: # Extract just the date from the shopping event datetime purchase_date = event.date.date().strftime('%Y-%m-%d') product_at_purchase = get_product_at_date( product_data.product_id, purchase_date, db ) result.append(schemas.ProductAtPurchase( product=product_at_purchase, amount=product_data.amount, price=product_data.price, discount=product_data.discount )) except HTTPException: # Product didn't exist at purchase time (shouldn't happen, but handle gracefully) continue return result # Shop endpoints @app.post("/shops/", response_model=schemas.Shop) def create_shop(shop: schemas.ShopCreate, db: Session = Depends(get_db)): db_shop = models.Shop(**shop.dict()) db.add(db_shop) db.commit() db.refresh(db_shop) return db_shop @app.get("/shops/", response_model=List[schemas.Shop]) def read_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): shops = db.query(models.Shop).offset(skip).limit(limit).all() return shops @app.get("/shops/{shop_id}", response_model=schemas.Shop) def read_shop(shop_id: int, db: Session = Depends(get_db)): shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") return shop @app.put("/shops/{shop_id}", response_model=schemas.Shop) def update_shop(shop_id: int, shop_update: schemas.ShopUpdate, db: Session = Depends(get_db)): shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") update_data = shop_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(shop, field, value) db.commit() db.refresh(shop) return shop @app.delete("/shops/{shop_id}") def delete_shop(shop_id: int, db: Session = Depends(get_db)): shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") db.delete(shop) db.commit() return {"message": "Shop deleted successfully"} # Brand endpoints @app.post("/brands/", response_model=schemas.Brand) def create_brand(brand: schemas.BrandCreate, db: Session = Depends(get_db)): db_brand = models.Brand(**brand.dict()) db.add(db_brand) db.commit() db.refresh(db_brand) return db_brand @app.get("/brands/", response_model=List[schemas.Brand]) def read_brands(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): brands = db.query(models.Brand).offset(skip).limit(limit).all() return brands @app.get("/brands/{brand_id}", response_model=schemas.Brand) def read_brand(brand_id: int, db: Session = Depends(get_db)): brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") return brand @app.put("/brands/{brand_id}", response_model=schemas.Brand) def update_brand(brand_id: int, brand_update: schemas.BrandUpdate, db: Session = Depends(get_db)): brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") update_data = brand_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(brand, field, value) db.commit() db.refresh(brand) return brand @app.delete("/brands/{brand_id}") def delete_brand(brand_id: int, db: Session = Depends(get_db)): brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") # Check if any products reference this brand products_with_brand = db.query(models.Product).filter(models.Product.brand_id == brand_id).first() if products_with_brand: raise HTTPException( status_code=400, detail="Cannot delete brand: products are still associated with this brand" ) db.delete(brand) db.commit() return {"message": "Brand deleted successfully"} # BrandInShop endpoints @app.post("/brands-in-shops/", response_model=schemas.BrandInShop) def create_brand_in_shop(brand_in_shop: schemas.BrandInShopCreate, db: Session = Depends(get_db)): # Validate shop exists shop = db.query(models.Shop).filter(models.Shop.id == brand_in_shop.shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") # Validate brand exists brand = db.query(models.Brand).filter(models.Brand.id == brand_in_shop.brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") # Check if this combination already exists existing = db.query(models.BrandInShop).filter( models.BrandInShop.shop_id == brand_in_shop.shop_id, models.BrandInShop.brand_id == brand_in_shop.brand_id ).first() if existing: raise HTTPException(status_code=400, detail="This brand is already associated with this shop") db_brand_in_shop = models.BrandInShop(**brand_in_shop.dict()) db.add(db_brand_in_shop) db.commit() db.refresh(db_brand_in_shop) return db_brand_in_shop @app.get("/brands-in-shops/", response_model=List[schemas.BrandInShop]) def read_brands_in_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): brands_in_shops = db.query(models.BrandInShop).offset(skip).limit(limit).all() return brands_in_shops @app.get("/brands-in-shops/shop/{shop_id}", response_model=List[schemas.BrandInShop]) def read_brands_in_shop(shop_id: int, db: Session = Depends(get_db)): # Validate shop exists shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") brands_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.shop_id == shop_id).all() return brands_in_shop @app.get("/brands-in-shops/brand/{brand_id}", response_model=List[schemas.BrandInShop]) def read_shops_with_brand(brand_id: int, db: Session = Depends(get_db)): # Validate brand exists brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") shops_with_brand = db.query(models.BrandInShop).filter(models.BrandInShop.brand_id == brand_id).all() return shops_with_brand @app.get("/brands-in-shops/{brand_in_shop_id}", response_model=schemas.BrandInShop) def read_brand_in_shop(brand_in_shop_id: int, db: Session = Depends(get_db)): brand_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.id == brand_in_shop_id).first() if brand_in_shop is None: raise HTTPException(status_code=404, detail="Brand in shop association not found") return brand_in_shop @app.delete("/brands-in-shops/{brand_in_shop_id}") def delete_brand_in_shop(brand_in_shop_id: int, db: Session = Depends(get_db)): brand_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.id == brand_in_shop_id).first() if brand_in_shop is None: raise HTTPException(status_code=404, detail="Brand in shop association not found") db.delete(brand_in_shop) db.commit() return {"message": "Brand in shop association deleted successfully"} # Grocery Category endpoints @app.post("/grocery-categories/", response_model=schemas.GroceryCategory) def create_grocery_category(category: schemas.GroceryCategoryCreate, db: Session = Depends(get_db)): db_category = models.GroceryCategory(**category.dict()) db.add(db_category) db.commit() db.refresh(db_category) return db_category @app.get("/grocery-categories/", response_model=List[schemas.GroceryCategory]) def read_grocery_categories(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): categories = db.query(models.GroceryCategory).offset(skip).limit(limit).all() return categories @app.get("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory) def read_grocery_category(category_id: int, db: Session = Depends(get_db)): category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first() if category is None: raise HTTPException(status_code=404, detail="Grocery category not found") return category @app.put("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory) def update_grocery_category(category_id: int, category_update: schemas.GroceryCategoryUpdate, db: Session = Depends(get_db)): category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first() if category is None: raise HTTPException(status_code=404, detail="Grocery category not found") update_data = category_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(category, field, value) db.commit() db.refresh(category) return category @app.delete("/grocery-categories/{category_id}") def delete_grocery_category(category_id: int, db: Session = Depends(get_db)): category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first() if category is None: raise HTTPException(status_code=404, detail="Grocery category not found") # Check if any products reference this category products_with_category = db.query(models.Product).filter(models.Product.category_id == category_id).first() if products_with_category: raise HTTPException( status_code=400, detail="Cannot delete category: products are still associated with this category" ) db.delete(category) db.commit() return {"message": "Grocery category deleted successfully"} # Shopping Event endpoints @app.post("/shopping-events/", response_model=schemas.ShoppingEventResponse) def create_shopping_event(event: schemas.ShoppingEventCreate, db: Session = Depends(get_db)): # Verify shop exists shop = db.query(models.Shop).filter(models.Shop.id == event.shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") # Create shopping event db_event = models.ShoppingEvent( shop_id=event.shop_id, date=event.date, total_amount=event.total_amount, notes=event.notes ) db.add(db_event) db.commit() db.refresh(db_event) # Add products to the event for product_item in event.products: product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first() if product is None: raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found") # Insert into association table db.execute( models.shopping_event_products.insert().values( shopping_event_id=db_event.id, product_id=product_item.product_id, amount=product_item.amount, price=product_item.price, discount=product_item.discount ) ) db.commit() db.refresh(db_event) return build_shopping_event_response(db_event, db) @app.get("/shopping-events/", response_model=List[schemas.ShoppingEventResponse]) def read_shopping_events(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): events = db.query(models.ShoppingEvent).order_by(models.ShoppingEvent.created_at.desc()).offset(skip).limit(limit).all() return [build_shopping_event_response(event, db) for event in events] @app.get("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse) def read_shopping_event(event_id: int, db: Session = Depends(get_db)): event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first() if event is None: raise HTTPException(status_code=404, detail="Shopping event not found") return build_shopping_event_response(event, db) @app.put("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse) def update_shopping_event(event_id: int, event_update: schemas.ShoppingEventCreate, db: Session = Depends(get_db)): # Get the existing event event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first() if event is None: raise HTTPException(status_code=404, detail="Shopping event not found") # Verify shop exists shop = db.query(models.Shop).filter(models.Shop.id == event_update.shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") # Update the shopping event event.shop_id = event_update.shop_id event.date = event_update.date event.total_amount = event_update.total_amount event.notes = event_update.notes # Remove existing product associations db.execute( models.shopping_event_products.delete().where( models.shopping_event_products.c.shopping_event_id == event_id ) ) # Add new product associations for product_item in event_update.products: product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first() if product is None: raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found") # Insert into association table db.execute( models.shopping_event_products.insert().values( shopping_event_id=event_id, product_id=product_item.product_id, amount=product_item.amount, price=product_item.price, discount=product_item.discount ) ) db.commit() db.refresh(event) return build_shopping_event_response(event, db) @app.delete("/shopping-events/{event_id}") def delete_shopping_event(event_id: int, db: Session = Depends(get_db)): event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first() if event is None: raise HTTPException(status_code=404, detail="Shopping event not found") # Delete product associations first db.execute( models.shopping_event_products.delete().where( models.shopping_event_products.c.shopping_event_id == event_id ) ) # Delete the shopping event db.delete(event) db.commit() return {"message": "Shopping event deleted successfully"} # Statistics endpoints @app.get("/stats/categories", response_model=List[schemas.CategoryStats]) def get_category_stats(db: Session = Depends(get_db)): # This would need more complex SQL query - placeholder for now return [] @app.get("/stats/shops", response_model=List[schemas.ShopStats]) def get_shop_stats(db: Session = Depends(get_db)): # This would need more complex SQL query - placeholder for now return [] # Related Products endpoints @app.post("/related-products/", response_model=schemas.RelatedProduct) def create_related_product(related_product: schemas.RelatedProductCreate, db: Session = Depends(get_db)): # Validate both products exist product = db.query(models.Product).filter(models.Product.id == related_product.product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") related = db.query(models.Product).filter(models.Product.id == related_product.related_product_id).first() if related is None: raise HTTPException(status_code=404, detail="Related product not found") # Prevent self-referencing if related_product.product_id == related_product.related_product_id: raise HTTPException(status_code=400, detail="A product cannot be related to itself") # Check if relationship already exists (in either direction) existing = db.execute( models.related_products.select().where( ((models.related_products.c.product_id == related_product.product_id) & (models.related_products.c.related_product_id == related_product.related_product_id)) | ((models.related_products.c.product_id == related_product.related_product_id) & (models.related_products.c.related_product_id == related_product.product_id)) ) ).first() if existing: raise HTTPException(status_code=400, detail="Products are already related") # Insert the relationship result = db.execute( models.related_products.insert().values( product_id=related_product.product_id, related_product_id=related_product.related_product_id, relationship_type=related_product.relationship_type ) ) db.commit() # Get the created relationship relationship_id = result.inserted_primary_key[0] created_relationship = db.execute( models.related_products.select().where(models.related_products.c.id == relationship_id) ).first() return schemas.RelatedProduct( id=created_relationship.id, product_id=created_relationship.product_id, related_product_id=created_relationship.related_product_id, relationship_type=created_relationship.relationship_type, created_at=created_relationship.created_at ) @app.get("/related-products/", response_model=List[schemas.RelatedProduct]) def read_related_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): relationships = db.execute( models.related_products.select().offset(skip).limit(limit) ).fetchall() return [ schemas.RelatedProduct( id=rel.id, product_id=rel.product_id, related_product_id=rel.related_product_id, relationship_type=rel.relationship_type, created_at=rel.created_at ) for rel in relationships ] @app.get("/related-products/product/{product_id}", response_model=List[schemas.Product]) def get_related_products_for_product(product_id: int, db: Session = Depends(get_db)): # Validate product exists product = db.query(models.Product).filter(models.Product.id == product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") # Get related products (bidirectional) related_product_ids = db.execute( text(""" SELECT CASE WHEN product_id = :product_id THEN related_product_id ELSE product_id END as related_id FROM related_products WHERE product_id = :product_id OR related_product_id = :product_id """), {"product_id": product_id} ).fetchall() if not related_product_ids: return [] # Get the actual product objects related_ids = [row.related_id for row in related_product_ids] related_products = db.query(models.Product).filter(models.Product.id.in_(related_ids)).all() return related_products @app.get("/related-products/{relationship_id}", response_model=schemas.RelatedProduct) def read_related_product(relationship_id: int, db: Session = Depends(get_db)): relationship = db.execute( models.related_products.select().where(models.related_products.c.id == relationship_id) ).first() if relationship is None: raise HTTPException(status_code=404, detail="Related product relationship not found") return schemas.RelatedProduct( id=relationship.id, product_id=relationship.product_id, related_product_id=relationship.related_product_id, relationship_type=relationship.relationship_type, created_at=relationship.created_at ) @app.put("/related-products/{relationship_id}", response_model=schemas.RelatedProduct) def update_related_product(relationship_id: int, update_data: schemas.RelatedProductUpdate, db: Session = Depends(get_db)): # Check if relationship exists existing = db.execute( models.related_products.select().where(models.related_products.c.id == relationship_id) ).first() if existing is None: raise HTTPException(status_code=404, detail="Related product relationship not found") # Update the relationship db.execute( models.related_products.update().where(models.related_products.c.id == relationship_id).values( relationship_type=update_data.relationship_type ) ) db.commit() # Get the updated relationship updated_relationship = db.execute( models.related_products.select().where(models.related_products.c.id == relationship_id) ).first() return schemas.RelatedProduct( id=updated_relationship.id, product_id=updated_relationship.product_id, related_product_id=updated_relationship.related_product_id, relationship_type=updated_relationship.relationship_type, created_at=updated_relationship.created_at ) @app.delete("/related-products/{relationship_id}") def delete_related_product(relationship_id: int, db: Session = Depends(get_db)): # Check if relationship exists existing = db.execute( models.related_products.select().where(models.related_products.c.id == relationship_id) ).first() if existing is None: raise HTTPException(status_code=404, detail="Related product relationship not found") # Delete the relationship db.execute( models.related_products.delete().where(models.related_products.c.id == relationship_id) ) db.commit() return {"message": "Related product relationship deleted successfully"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)