from fastapi import FastAPI, Depends, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from sqlalchemy.orm import Session from sqlalchemy import text from typing import List import models, schemas from database import engine, get_db # Create database tables models.Base.metadata.create_all(bind=engine) app = FastAPI( title="Product Tracker API", description="API for tracking product prices and shopping events", version="1.0.0" ) # CORS middleware for React frontend app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000", "http://localhost:5173"], # React dev servers allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> schemas.ShoppingEventResponse: """Build a shopping event response with products from the association table""" # Get products with their event-specific data including category and brand information product_data = db.execute( text(""" SELECT p.id, p.name, p.organic, p.weight, p.weight_unit, sep.amount, sep.price, gc.id as category_id, gc.name as category_name, gc.created_at as category_created_at, gc.updated_at as category_updated_at, b.id as brand_id, b.name as brand_name, b.created_at as brand_created_at, b.updated_at as brand_updated_at FROM products p JOIN shopping_event_products sep ON p.id = sep.product_id JOIN grocery_categories gc ON p.category_id = gc.id LEFT JOIN brands b ON p.brand_id = b.id WHERE sep.shopping_event_id = :event_id """), {"event_id": event.id} ).fetchall() # Convert to ProductWithEventData objects products_with_data = [] for row in product_data: category = schemas.GroceryCategory( id=row.category_id, name=row.category_name, created_at=row.category_created_at, updated_at=row.category_updated_at ) brand = None if row.brand_id is not None: brand = schemas.Brand( id=row.brand_id, name=row.brand_name, created_at=row.brand_created_at, updated_at=row.brand_updated_at ) products_with_data.append( schemas.ProductWithEventData( id=row.id, name=row.name, category=category, brand=brand, organic=row.organic, weight=row.weight, weight_unit=row.weight_unit, amount=row.amount, price=row.price ) ) return schemas.ShoppingEventResponse( id=event.id, shop_id=event.shop_id, date=event.date, total_amount=event.total_amount, notes=event.notes, created_at=event.created_at, shop=event.shop, products=products_with_data ) # Root endpoint @app.get("/") def read_root(): return {"message": "Product Tracker API", "version": "1.0.0"} # Product endpoints @app.post("/products/", response_model=schemas.Product) def create_product(product: schemas.ProductCreate, db: Session = Depends(get_db)): # Validate category exists category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == product.category_id).first() if category is None: raise HTTPException(status_code=404, detail="Category not found") # Validate brand exists if brand_id is provided if product.brand_id is not None: brand = db.query(models.Brand).filter(models.Brand.id == product.brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") db_product = models.Product(**product.dict()) db.add(db_product) db.commit() db.refresh(db_product) return db_product @app.get("/products/", response_model=List[schemas.Product]) def read_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): products = db.query(models.Product).offset(skip).limit(limit).all() return products @app.get("/products/{product_id}", response_model=schemas.Product) def read_product(product_id: int, db: Session = Depends(get_db)): product = db.query(models.Product).filter(models.Product.id == product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") return product @app.put("/products/{product_id}", response_model=schemas.Product) def update_product(product_id: int, product_update: schemas.ProductUpdate, db: Session = Depends(get_db)): product = db.query(models.Product).filter(models.Product.id == product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") update_data = product_update.dict(exclude_unset=True) # Validate category exists if category_id is being updated if 'category_id' in update_data: category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == update_data['category_id']).first() if category is None: raise HTTPException(status_code=404, detail="Category not found") # Validate brand exists if brand_id is being updated if 'brand_id' in update_data and update_data['brand_id'] is not None: brand = db.query(models.Brand).filter(models.Brand.id == update_data['brand_id']).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") for field, value in update_data.items(): setattr(product, field, value) db.commit() db.refresh(product) return product @app.delete("/products/{product_id}") def delete_product(product_id: int, db: Session = Depends(get_db)): product = db.query(models.Product).filter(models.Product.id == product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") db.delete(product) db.commit() return {"message": "Product deleted successfully"} # Shop endpoints @app.post("/shops/", response_model=schemas.Shop) def create_shop(shop: schemas.ShopCreate, db: Session = Depends(get_db)): db_shop = models.Shop(**shop.dict()) db.add(db_shop) db.commit() db.refresh(db_shop) return db_shop @app.get("/shops/", response_model=List[schemas.Shop]) def read_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): shops = db.query(models.Shop).offset(skip).limit(limit).all() return shops @app.get("/shops/{shop_id}", response_model=schemas.Shop) def read_shop(shop_id: int, db: Session = Depends(get_db)): shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") return shop @app.put("/shops/{shop_id}", response_model=schemas.Shop) def update_shop(shop_id: int, shop_update: schemas.ShopUpdate, db: Session = Depends(get_db)): shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") update_data = shop_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(shop, field, value) db.commit() db.refresh(shop) return shop @app.delete("/shops/{shop_id}") def delete_shop(shop_id: int, db: Session = Depends(get_db)): shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") db.delete(shop) db.commit() return {"message": "Shop deleted successfully"} # Brand endpoints @app.post("/brands/", response_model=schemas.Brand) def create_brand(brand: schemas.BrandCreate, db: Session = Depends(get_db)): db_brand = models.Brand(**brand.dict()) db.add(db_brand) db.commit() db.refresh(db_brand) return db_brand @app.get("/brands/", response_model=List[schemas.Brand]) def read_brands(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): brands = db.query(models.Brand).offset(skip).limit(limit).all() return brands @app.get("/brands/{brand_id}", response_model=schemas.Brand) def read_brand(brand_id: int, db: Session = Depends(get_db)): brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") return brand @app.put("/brands/{brand_id}", response_model=schemas.Brand) def update_brand(brand_id: int, brand_update: schemas.BrandUpdate, db: Session = Depends(get_db)): brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") update_data = brand_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(brand, field, value) db.commit() db.refresh(brand) return brand @app.delete("/brands/{brand_id}") def delete_brand(brand_id: int, db: Session = Depends(get_db)): brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") # Check if any products reference this brand products_with_brand = db.query(models.Product).filter(models.Product.brand_id == brand_id).first() if products_with_brand: raise HTTPException( status_code=400, detail="Cannot delete brand: products are still associated with this brand" ) db.delete(brand) db.commit() return {"message": "Brand deleted successfully"} # BrandInShop endpoints @app.post("/brands-in-shops/", response_model=schemas.BrandInShop) def create_brand_in_shop(brand_in_shop: schemas.BrandInShopCreate, db: Session = Depends(get_db)): # Validate shop exists shop = db.query(models.Shop).filter(models.Shop.id == brand_in_shop.shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") # Validate brand exists brand = db.query(models.Brand).filter(models.Brand.id == brand_in_shop.brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") # Check if this combination already exists existing = db.query(models.BrandInShop).filter( models.BrandInShop.shop_id == brand_in_shop.shop_id, models.BrandInShop.brand_id == brand_in_shop.brand_id ).first() if existing: raise HTTPException(status_code=400, detail="This brand is already associated with this shop") db_brand_in_shop = models.BrandInShop(**brand_in_shop.dict()) db.add(db_brand_in_shop) db.commit() db.refresh(db_brand_in_shop) return db_brand_in_shop @app.get("/brands-in-shops/", response_model=List[schemas.BrandInShop]) def read_brands_in_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): brands_in_shops = db.query(models.BrandInShop).offset(skip).limit(limit).all() return brands_in_shops @app.get("/brands-in-shops/shop/{shop_id}", response_model=List[schemas.BrandInShop]) def read_brands_in_shop(shop_id: int, db: Session = Depends(get_db)): # Validate shop exists shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") brands_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.shop_id == shop_id).all() return brands_in_shop @app.get("/brands-in-shops/brand/{brand_id}", response_model=List[schemas.BrandInShop]) def read_shops_with_brand(brand_id: int, db: Session = Depends(get_db)): # Validate brand exists brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first() if brand is None: raise HTTPException(status_code=404, detail="Brand not found") shops_with_brand = db.query(models.BrandInShop).filter(models.BrandInShop.brand_id == brand_id).all() return shops_with_brand @app.get("/brands-in-shops/{brand_in_shop_id}", response_model=schemas.BrandInShop) def read_brand_in_shop(brand_in_shop_id: int, db: Session = Depends(get_db)): brand_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.id == brand_in_shop_id).first() if brand_in_shop is None: raise HTTPException(status_code=404, detail="Brand in shop association not found") return brand_in_shop @app.delete("/brands-in-shops/{brand_in_shop_id}") def delete_brand_in_shop(brand_in_shop_id: int, db: Session = Depends(get_db)): brand_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.id == brand_in_shop_id).first() if brand_in_shop is None: raise HTTPException(status_code=404, detail="Brand in shop association not found") db.delete(brand_in_shop) db.commit() return {"message": "Brand in shop association deleted successfully"} # Grocery Category endpoints @app.post("/grocery-categories/", response_model=schemas.GroceryCategory) def create_grocery_category(category: schemas.GroceryCategoryCreate, db: Session = Depends(get_db)): db_category = models.GroceryCategory(**category.dict()) db.add(db_category) db.commit() db.refresh(db_category) return db_category @app.get("/grocery-categories/", response_model=List[schemas.GroceryCategory]) def read_grocery_categories(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): categories = db.query(models.GroceryCategory).offset(skip).limit(limit).all() return categories @app.get("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory) def read_grocery_category(category_id: int, db: Session = Depends(get_db)): category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first() if category is None: raise HTTPException(status_code=404, detail="Grocery category not found") return category @app.put("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory) def update_grocery_category(category_id: int, category_update: schemas.GroceryCategoryUpdate, db: Session = Depends(get_db)): category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first() if category is None: raise HTTPException(status_code=404, detail="Grocery category not found") update_data = category_update.dict(exclude_unset=True) for field, value in update_data.items(): setattr(category, field, value) db.commit() db.refresh(category) return category @app.delete("/grocery-categories/{category_id}") def delete_grocery_category(category_id: int, db: Session = Depends(get_db)): category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first() if category is None: raise HTTPException(status_code=404, detail="Grocery category not found") # Check if any products reference this category products_with_category = db.query(models.Product).filter(models.Product.category_id == category_id).first() if products_with_category: raise HTTPException( status_code=400, detail="Cannot delete category: products are still associated with this category" ) db.delete(category) db.commit() return {"message": "Grocery category deleted successfully"} # Shopping Event endpoints @app.post("/shopping-events/", response_model=schemas.ShoppingEventResponse) def create_shopping_event(event: schemas.ShoppingEventCreate, db: Session = Depends(get_db)): # Verify shop exists shop = db.query(models.Shop).filter(models.Shop.id == event.shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") # Create shopping event db_event = models.ShoppingEvent( shop_id=event.shop_id, date=event.date, total_amount=event.total_amount, notes=event.notes ) db.add(db_event) db.commit() db.refresh(db_event) # Add products to the event for product_item in event.products: product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first() if product is None: raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found") # Insert into association table db.execute( models.shopping_event_products.insert().values( shopping_event_id=db_event.id, product_id=product_item.product_id, amount=product_item.amount, price=product_item.price ) ) db.commit() db.refresh(db_event) return build_shopping_event_response(db_event, db) @app.get("/shopping-events/", response_model=List[schemas.ShoppingEventResponse]) def read_shopping_events(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): events = db.query(models.ShoppingEvent).order_by(models.ShoppingEvent.created_at.desc()).offset(skip).limit(limit).all() return [build_shopping_event_response(event, db) for event in events] @app.get("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse) def read_shopping_event(event_id: int, db: Session = Depends(get_db)): event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first() if event is None: raise HTTPException(status_code=404, detail="Shopping event not found") return build_shopping_event_response(event, db) @app.put("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse) def update_shopping_event(event_id: int, event_update: schemas.ShoppingEventCreate, db: Session = Depends(get_db)): # Get the existing event event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first() if event is None: raise HTTPException(status_code=404, detail="Shopping event not found") # Verify shop exists shop = db.query(models.Shop).filter(models.Shop.id == event_update.shop_id).first() if shop is None: raise HTTPException(status_code=404, detail="Shop not found") # Update the shopping event event.shop_id = event_update.shop_id event.date = event_update.date event.total_amount = event_update.total_amount event.notes = event_update.notes # Remove existing product associations db.execute( models.shopping_event_products.delete().where( models.shopping_event_products.c.shopping_event_id == event_id ) ) # Add new product associations for product_item in event_update.products: product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first() if product is None: raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found") # Insert into association table db.execute( models.shopping_event_products.insert().values( shopping_event_id=event_id, product_id=product_item.product_id, amount=product_item.amount, price=product_item.price ) ) db.commit() db.refresh(event) return build_shopping_event_response(event, db) @app.delete("/shopping-events/{event_id}") def delete_shopping_event(event_id: int, db: Session = Depends(get_db)): event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first() if event is None: raise HTTPException(status_code=404, detail="Shopping event not found") # Delete product associations first db.execute( models.shopping_event_products.delete().where( models.shopping_event_products.c.shopping_event_id == event_id ) ) # Delete the shopping event db.delete(event) db.commit() return {"message": "Shopping event deleted successfully"} # Statistics endpoints @app.get("/stats/categories", response_model=List[schemas.CategoryStats]) def get_category_stats(db: Session = Depends(get_db)): # This would need more complex SQL query - placeholder for now return [] @app.get("/stats/shops", response_model=List[schemas.ShopStats]) def get_shop_stats(db: Session = Depends(get_db)): # This would need more complex SQL query - placeholder for now return [] # Related Products endpoints @app.post("/related-products/", response_model=schemas.RelatedProduct) def create_related_product(related_product: schemas.RelatedProductCreate, db: Session = Depends(get_db)): # Validate both products exist product = db.query(models.Product).filter(models.Product.id == related_product.product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") related = db.query(models.Product).filter(models.Product.id == related_product.related_product_id).first() if related is None: raise HTTPException(status_code=404, detail="Related product not found") # Prevent self-referencing if related_product.product_id == related_product.related_product_id: raise HTTPException(status_code=400, detail="A product cannot be related to itself") # Check if relationship already exists (in either direction) existing = db.execute( models.related_products.select().where( ((models.related_products.c.product_id == related_product.product_id) & (models.related_products.c.related_product_id == related_product.related_product_id)) | ((models.related_products.c.product_id == related_product.related_product_id) & (models.related_products.c.related_product_id == related_product.product_id)) ) ).first() if existing: raise HTTPException(status_code=400, detail="Products are already related") # Insert the relationship result = db.execute( models.related_products.insert().values( product_id=related_product.product_id, related_product_id=related_product.related_product_id, relationship_type=related_product.relationship_type ) ) db.commit() # Get the created relationship relationship_id = result.inserted_primary_key[0] created_relationship = db.execute( models.related_products.select().where(models.related_products.c.id == relationship_id) ).first() return schemas.RelatedProduct( id=created_relationship.id, product_id=created_relationship.product_id, related_product_id=created_relationship.related_product_id, relationship_type=created_relationship.relationship_type, created_at=created_relationship.created_at ) @app.get("/related-products/", response_model=List[schemas.RelatedProduct]) def read_related_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): relationships = db.execute( models.related_products.select().offset(skip).limit(limit) ).fetchall() return [ schemas.RelatedProduct( id=rel.id, product_id=rel.product_id, related_product_id=rel.related_product_id, relationship_type=rel.relationship_type, created_at=rel.created_at ) for rel in relationships ] @app.get("/related-products/product/{product_id}", response_model=List[schemas.Product]) def get_related_products_for_product(product_id: int, db: Session = Depends(get_db)): # Validate product exists product = db.query(models.Product).filter(models.Product.id == product_id).first() if product is None: raise HTTPException(status_code=404, detail="Product not found") # Get related products (bidirectional) related_product_ids = db.execute( text(""" SELECT CASE WHEN product_id = :product_id THEN related_product_id ELSE product_id END as related_id FROM related_products WHERE product_id = :product_id OR related_product_id = :product_id """), {"product_id": product_id} ).fetchall() if not related_product_ids: return [] # Get the actual product objects related_ids = [row.related_id for row in related_product_ids] related_products = db.query(models.Product).filter(models.Product.id.in_(related_ids)).all() return related_products @app.get("/related-products/{relationship_id}", response_model=schemas.RelatedProduct) def read_related_product(relationship_id: int, db: Session = Depends(get_db)): relationship = db.execute( models.related_products.select().where(models.related_products.c.id == relationship_id) ).first() if relationship is None: raise HTTPException(status_code=404, detail="Related product relationship not found") return schemas.RelatedProduct( id=relationship.id, product_id=relationship.product_id, related_product_id=relationship.related_product_id, relationship_type=relationship.relationship_type, created_at=relationship.created_at ) @app.put("/related-products/{relationship_id}", response_model=schemas.RelatedProduct) def update_related_product(relationship_id: int, update_data: schemas.RelatedProductUpdate, db: Session = Depends(get_db)): # Check if relationship exists existing = db.execute( models.related_products.select().where(models.related_products.c.id == relationship_id) ).first() if existing is None: raise HTTPException(status_code=404, detail="Related product relationship not found") # Update the relationship db.execute( models.related_products.update().where(models.related_products.c.id == relationship_id).values( relationship_type=update_data.relationship_type ) ) db.commit() # Get the updated relationship updated_relationship = db.execute( models.related_products.select().where(models.related_products.c.id == relationship_id) ).first() return schemas.RelatedProduct( id=updated_relationship.id, product_id=updated_relationship.product_id, related_product_id=updated_relationship.related_product_id, relationship_type=updated_relationship.relationship_type, created_at=updated_relationship.created_at ) @app.delete("/related-products/{relationship_id}") def delete_related_product(relationship_id: int, db: Session = Depends(get_db)): # Check if relationship exists existing = db.execute( models.related_products.select().where(models.related_products.c.id == relationship_id) ).first() if existing is None: raise HTTPException(status_code=404, detail="Related product relationship not found") # Delete the relationship db.execute( models.related_products.delete().where(models.related_products.c.id == relationship_id) ) db.commit() return {"message": "Related product relationship deleted successfully"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)