520 lines
20 KiB
Python
520 lines
20 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
from typing import List
|
|
import models, schemas
|
|
from database import engine, get_db
|
|
|
|
# Create database tables
|
|
models.Base.metadata.create_all(bind=engine)
|
|
|
|
app = FastAPI(
|
|
title="Product Tracker API",
|
|
description="API for tracking product prices and shopping events",
|
|
version="1.0.0"
|
|
)
|
|
|
|
# CORS middleware for React frontend
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["http://localhost:3000", "http://localhost:5173"], # React dev servers
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> schemas.ShoppingEventResponse:
|
|
"""Build a shopping event response with products from the association table"""
|
|
# Get products with their event-specific data including grocery and brand information
|
|
product_data = db.execute(
|
|
text("""
|
|
SELECT p.id, p.name, p.organic, p.weight, p.weight_unit,
|
|
sep.amount, sep.price,
|
|
g.id as grocery_id, g.name as grocery_name,
|
|
g.created_at as grocery_created_at, g.updated_at as grocery_updated_at,
|
|
gc.id as category_id, gc.name as category_name,
|
|
gc.created_at as category_created_at, gc.updated_at as category_updated_at,
|
|
b.id as brand_id, b.name as brand_name,
|
|
b.created_at as brand_created_at, b.updated_at as brand_updated_at
|
|
FROM products p
|
|
JOIN shopping_event_products sep ON p.id = sep.product_id
|
|
JOIN groceries g ON p.grocery_id = g.id
|
|
JOIN grocery_categories gc ON g.category_id = gc.id
|
|
LEFT JOIN brands b ON p.brand_id = b.id
|
|
WHERE sep.shopping_event_id = :event_id
|
|
"""),
|
|
{"event_id": event.id}
|
|
).fetchall()
|
|
|
|
# Convert to ProductWithEventData objects
|
|
products_with_data = []
|
|
for row in product_data:
|
|
category = schemas.GroceryCategory(
|
|
id=row.category_id,
|
|
name=row.category_name,
|
|
created_at=row.category_created_at,
|
|
updated_at=row.category_updated_at
|
|
)
|
|
|
|
grocery = schemas.Grocery(
|
|
id=row.grocery_id,
|
|
name=row.grocery_name,
|
|
category_id=row.category_id,
|
|
created_at=row.grocery_created_at,
|
|
updated_at=row.grocery_updated_at,
|
|
category=category
|
|
)
|
|
|
|
brand = None
|
|
if row.brand_id is not None:
|
|
brand = schemas.Brand(
|
|
id=row.brand_id,
|
|
name=row.brand_name,
|
|
created_at=row.brand_created_at,
|
|
updated_at=row.brand_updated_at
|
|
)
|
|
|
|
products_with_data.append(
|
|
schemas.ProductWithEventData(
|
|
id=row.id,
|
|
name=row.name,
|
|
grocery=grocery,
|
|
brand=brand,
|
|
organic=row.organic,
|
|
weight=row.weight,
|
|
weight_unit=row.weight_unit,
|
|
amount=row.amount,
|
|
price=row.price
|
|
)
|
|
)
|
|
|
|
return schemas.ShoppingEventResponse(
|
|
id=event.id,
|
|
shop_id=event.shop_id,
|
|
date=event.date,
|
|
total_amount=event.total_amount,
|
|
notes=event.notes,
|
|
created_at=event.created_at,
|
|
shop=event.shop,
|
|
products=products_with_data
|
|
)
|
|
|
|
# Root endpoint
|
|
@app.get("/")
|
|
def read_root():
|
|
return {"message": "Product Tracker API", "version": "1.0.0"}
|
|
|
|
# Product endpoints
|
|
@app.post("/products/", response_model=schemas.Product)
|
|
def create_product(product: schemas.ProductCreate, db: Session = Depends(get_db)):
|
|
# Validate grocery exists
|
|
grocery = db.query(models.Grocery).filter(models.Grocery.id == product.grocery_id).first()
|
|
if grocery is None:
|
|
raise HTTPException(status_code=404, detail="Grocery not found")
|
|
|
|
# Validate brand exists if brand_id is provided
|
|
if product.brand_id is not None:
|
|
brand = db.query(models.Brand).filter(models.Brand.id == product.brand_id).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
db_product = models.Product(**product.dict())
|
|
db.add(db_product)
|
|
db.commit()
|
|
db.refresh(db_product)
|
|
return db_product
|
|
|
|
@app.get("/products/", response_model=List[schemas.Product])
|
|
def read_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
products = db.query(models.Product).offset(skip).limit(limit).all()
|
|
return products
|
|
|
|
@app.get("/products/{product_id}", response_model=schemas.Product)
|
|
def read_product(product_id: int, db: Session = Depends(get_db)):
|
|
product = db.query(models.Product).filter(models.Product.id == product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
return product
|
|
|
|
@app.put("/products/{product_id}", response_model=schemas.Product)
|
|
def update_product(product_id: int, product_update: schemas.ProductUpdate, db: Session = Depends(get_db)):
|
|
product = db.query(models.Product).filter(models.Product.id == product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
update_data = product_update.dict(exclude_unset=True)
|
|
|
|
# Validate grocery exists if grocery_id is being updated
|
|
if 'grocery_id' in update_data:
|
|
grocery = db.query(models.Grocery).filter(models.Grocery.id == update_data['grocery_id']).first()
|
|
if grocery is None:
|
|
raise HTTPException(status_code=404, detail="Grocery not found")
|
|
|
|
# Validate brand exists if brand_id is being updated
|
|
if 'brand_id' in update_data and update_data['brand_id'] is not None:
|
|
brand = db.query(models.Brand).filter(models.Brand.id == update_data['brand_id']).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
for field, value in update_data.items():
|
|
setattr(product, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(product)
|
|
return product
|
|
|
|
@app.delete("/products/{product_id}")
|
|
def delete_product(product_id: int, db: Session = Depends(get_db)):
|
|
product = db.query(models.Product).filter(models.Product.id == product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
db.delete(product)
|
|
db.commit()
|
|
return {"message": "Product deleted successfully"}
|
|
|
|
# Shop endpoints
|
|
@app.post("/shops/", response_model=schemas.Shop)
|
|
def create_shop(shop: schemas.ShopCreate, db: Session = Depends(get_db)):
|
|
db_shop = models.Shop(**shop.dict())
|
|
db.add(db_shop)
|
|
db.commit()
|
|
db.refresh(db_shop)
|
|
return db_shop
|
|
|
|
@app.get("/shops/", response_model=List[schemas.Shop])
|
|
def read_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
shops = db.query(models.Shop).offset(skip).limit(limit).all()
|
|
return shops
|
|
|
|
@app.get("/shops/{shop_id}", response_model=schemas.Shop)
|
|
def read_shop(shop_id: int, db: Session = Depends(get_db)):
|
|
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
return shop
|
|
|
|
@app.put("/shops/{shop_id}", response_model=schemas.Shop)
|
|
def update_shop(shop_id: int, shop_update: schemas.ShopUpdate, db: Session = Depends(get_db)):
|
|
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
update_data = shop_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(shop, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(shop)
|
|
return shop
|
|
|
|
@app.delete("/shops/{shop_id}")
|
|
def delete_shop(shop_id: int, db: Session = Depends(get_db)):
|
|
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
db.delete(shop)
|
|
db.commit()
|
|
return {"message": "Shop deleted successfully"}
|
|
|
|
# Brand endpoints
|
|
@app.post("/brands/", response_model=schemas.Brand)
|
|
def create_brand(brand: schemas.BrandCreate, db: Session = Depends(get_db)):
|
|
db_brand = models.Brand(**brand.dict())
|
|
db.add(db_brand)
|
|
db.commit()
|
|
db.refresh(db_brand)
|
|
return db_brand
|
|
|
|
@app.get("/brands/", response_model=List[schemas.Brand])
|
|
def read_brands(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
brands = db.query(models.Brand).offset(skip).limit(limit).all()
|
|
return brands
|
|
|
|
@app.get("/brands/{brand_id}", response_model=schemas.Brand)
|
|
def read_brand(brand_id: int, db: Session = Depends(get_db)):
|
|
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
return brand
|
|
|
|
@app.put("/brands/{brand_id}", response_model=schemas.Brand)
|
|
def update_brand(brand_id: int, brand_update: schemas.BrandUpdate, db: Session = Depends(get_db)):
|
|
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
update_data = brand_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(brand, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(brand)
|
|
return brand
|
|
|
|
@app.delete("/brands/{brand_id}")
|
|
def delete_brand(brand_id: int, db: Session = Depends(get_db)):
|
|
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
# Check if any products reference this brand
|
|
products_with_brand = db.query(models.Product).filter(models.Product.brand_id == brand_id).first()
|
|
if products_with_brand:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot delete brand: products are still associated with this brand"
|
|
)
|
|
|
|
db.delete(brand)
|
|
db.commit()
|
|
return {"message": "Brand deleted successfully"}
|
|
|
|
# Grocery Category endpoints
|
|
@app.post("/grocery-categories/", response_model=schemas.GroceryCategory)
|
|
def create_grocery_category(category: schemas.GroceryCategoryCreate, db: Session = Depends(get_db)):
|
|
db_category = models.GroceryCategory(**category.dict())
|
|
db.add(db_category)
|
|
db.commit()
|
|
db.refresh(db_category)
|
|
return db_category
|
|
|
|
@app.get("/grocery-categories/", response_model=List[schemas.GroceryCategory])
|
|
def read_grocery_categories(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
categories = db.query(models.GroceryCategory).offset(skip).limit(limit).all()
|
|
return categories
|
|
|
|
@app.get("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory)
|
|
def read_grocery_category(category_id: int, db: Session = Depends(get_db)):
|
|
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Grocery category not found")
|
|
return category
|
|
|
|
@app.put("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory)
|
|
def update_grocery_category(category_id: int, category_update: schemas.GroceryCategoryUpdate, db: Session = Depends(get_db)):
|
|
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Grocery category not found")
|
|
|
|
update_data = category_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(category, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(category)
|
|
return category
|
|
|
|
@app.delete("/grocery-categories/{category_id}")
|
|
def delete_grocery_category(category_id: int, db: Session = Depends(get_db)):
|
|
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Grocery category not found")
|
|
|
|
# Check if any groceries reference this category
|
|
groceries_with_category = db.query(models.Grocery).filter(models.Grocery.category_id == category_id).first()
|
|
if groceries_with_category:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot delete category: groceries are still associated with this category"
|
|
)
|
|
|
|
db.delete(category)
|
|
db.commit()
|
|
return {"message": "Grocery category deleted successfully"}
|
|
|
|
# Grocery endpoints
|
|
@app.post("/groceries/", response_model=schemas.Grocery)
|
|
def create_grocery(grocery: schemas.GroceryCreate, db: Session = Depends(get_db)):
|
|
# Validate category exists
|
|
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == grocery.category_id).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Grocery category not found")
|
|
|
|
db_grocery = models.Grocery(**grocery.dict())
|
|
db.add(db_grocery)
|
|
db.commit()
|
|
db.refresh(db_grocery)
|
|
return db_grocery
|
|
|
|
@app.get("/groceries/", response_model=List[schemas.Grocery])
|
|
def read_groceries(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
groceries = db.query(models.Grocery).offset(skip).limit(limit).all()
|
|
return groceries
|
|
|
|
@app.get("/groceries/{grocery_id}", response_model=schemas.Grocery)
|
|
def read_grocery(grocery_id: int, db: Session = Depends(get_db)):
|
|
grocery = db.query(models.Grocery).filter(models.Grocery.id == grocery_id).first()
|
|
if grocery is None:
|
|
raise HTTPException(status_code=404, detail="Grocery not found")
|
|
return grocery
|
|
|
|
@app.put("/groceries/{grocery_id}", response_model=schemas.Grocery)
|
|
def update_grocery(grocery_id: int, grocery_update: schemas.GroceryUpdate, db: Session = Depends(get_db)):
|
|
grocery = db.query(models.Grocery).filter(models.Grocery.id == grocery_id).first()
|
|
if grocery is None:
|
|
raise HTTPException(status_code=404, detail="Grocery not found")
|
|
|
|
update_data = grocery_update.dict(exclude_unset=True)
|
|
|
|
# Validate category exists if category_id is being updated
|
|
if 'category_id' in update_data:
|
|
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == update_data['category_id']).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Grocery category not found")
|
|
|
|
for field, value in update_data.items():
|
|
setattr(grocery, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(grocery)
|
|
return grocery
|
|
|
|
@app.delete("/groceries/{grocery_id}")
|
|
def delete_grocery(grocery_id: int, db: Session = Depends(get_db)):
|
|
grocery = db.query(models.Grocery).filter(models.Grocery.id == grocery_id).first()
|
|
if grocery is None:
|
|
raise HTTPException(status_code=404, detail="Grocery not found")
|
|
|
|
# Check if any products reference this grocery
|
|
products_with_grocery = db.query(models.Product).filter(models.Product.grocery_id == grocery_id).first()
|
|
if products_with_grocery:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot delete grocery: products are still associated with this grocery"
|
|
)
|
|
|
|
db.delete(grocery)
|
|
db.commit()
|
|
return {"message": "Grocery deleted successfully"}
|
|
|
|
# Shopping Event endpoints
|
|
@app.post("/shopping-events/", response_model=schemas.ShoppingEventResponse)
|
|
def create_shopping_event(event: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
|
|
# Verify shop exists
|
|
shop = db.query(models.Shop).filter(models.Shop.id == event.shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
# Create shopping event
|
|
db_event = models.ShoppingEvent(
|
|
shop_id=event.shop_id,
|
|
date=event.date,
|
|
total_amount=event.total_amount,
|
|
notes=event.notes
|
|
)
|
|
db.add(db_event)
|
|
db.commit()
|
|
db.refresh(db_event)
|
|
|
|
# Add products to the event
|
|
for product_item in event.products:
|
|
product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found")
|
|
|
|
# Insert into association table
|
|
db.execute(
|
|
models.shopping_event_products.insert().values(
|
|
shopping_event_id=db_event.id,
|
|
product_id=product_item.product_id,
|
|
amount=product_item.amount,
|
|
price=product_item.price
|
|
)
|
|
)
|
|
|
|
db.commit()
|
|
db.refresh(db_event)
|
|
return build_shopping_event_response(db_event, db)
|
|
|
|
@app.get("/shopping-events/", response_model=List[schemas.ShoppingEventResponse])
|
|
def read_shopping_events(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
events = db.query(models.ShoppingEvent).order_by(models.ShoppingEvent.created_at.desc()).offset(skip).limit(limit).all()
|
|
return [build_shopping_event_response(event, db) for event in events]
|
|
|
|
@app.get("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse)
|
|
def read_shopping_event(event_id: int, db: Session = Depends(get_db)):
|
|
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
|
|
if event is None:
|
|
raise HTTPException(status_code=404, detail="Shopping event not found")
|
|
return build_shopping_event_response(event, db)
|
|
|
|
@app.put("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse)
|
|
def update_shopping_event(event_id: int, event_update: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
|
|
# Get the existing event
|
|
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
|
|
if event is None:
|
|
raise HTTPException(status_code=404, detail="Shopping event not found")
|
|
|
|
# Verify shop exists
|
|
shop = db.query(models.Shop).filter(models.Shop.id == event_update.shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
# Update the shopping event
|
|
event.shop_id = event_update.shop_id
|
|
event.date = event_update.date
|
|
event.total_amount = event_update.total_amount
|
|
event.notes = event_update.notes
|
|
|
|
# Remove existing product associations
|
|
db.execute(
|
|
models.shopping_event_products.delete().where(
|
|
models.shopping_event_products.c.shopping_event_id == event_id
|
|
)
|
|
)
|
|
|
|
# Add new product associations
|
|
for product_item in event_update.products:
|
|
product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found")
|
|
|
|
# Insert into association table
|
|
db.execute(
|
|
models.shopping_event_products.insert().values(
|
|
shopping_event_id=event_id,
|
|
product_id=product_item.product_id,
|
|
amount=product_item.amount,
|
|
price=product_item.price
|
|
)
|
|
)
|
|
|
|
db.commit()
|
|
db.refresh(event)
|
|
return build_shopping_event_response(event, db)
|
|
|
|
@app.delete("/shopping-events/{event_id}")
|
|
def delete_shopping_event(event_id: int, db: Session = Depends(get_db)):
|
|
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
|
|
if event is None:
|
|
raise HTTPException(status_code=404, detail="Shopping event not found")
|
|
|
|
# Delete product associations first
|
|
db.execute(
|
|
models.shopping_event_products.delete().where(
|
|
models.shopping_event_products.c.shopping_event_id == event_id
|
|
)
|
|
)
|
|
|
|
# Delete the shopping event
|
|
db.delete(event)
|
|
db.commit()
|
|
return {"message": "Shopping event deleted successfully"}
|
|
|
|
# Statistics endpoints
|
|
@app.get("/stats/categories", response_model=List[schemas.CategoryStats])
|
|
def get_category_stats(db: Session = Depends(get_db)):
|
|
# This would need more complex SQL query - placeholder for now
|
|
return []
|
|
|
|
@app.get("/stats/shops", response_model=List[schemas.ShopStats])
|
|
def get_shop_stats(db: Session = Depends(get_db)):
|
|
# This would need more complex SQL query - placeholder for now
|
|
return []
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |