289 lines
11 KiB
Python
289 lines
11 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
from typing import List
|
|
import models, schemas
|
|
from database import engine, get_db
|
|
|
|
# Create database tables
|
|
models.Base.metadata.create_all(bind=engine)
|
|
|
|
app = FastAPI(
|
|
title="Product Tracker API",
|
|
description="API for tracking product prices and shopping events",
|
|
version="1.0.0"
|
|
)
|
|
|
|
# CORS middleware for React frontend
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["http://localhost:3000", "http://localhost:5173"], # React dev servers
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> schemas.ShoppingEventResponse:
|
|
"""Build a shopping event response with products from the association table"""
|
|
# Get products with their event-specific data
|
|
product_data = db.execute(
|
|
text("""
|
|
SELECT p.id, p.name, p.category, p.organic, p.weight, p.weight_unit,
|
|
sep.amount, sep.price
|
|
FROM products p
|
|
JOIN shopping_event_products sep ON p.id = sep.product_id
|
|
WHERE sep.shopping_event_id = :event_id
|
|
"""),
|
|
{"event_id": event.id}
|
|
).fetchall()
|
|
|
|
# Convert to ProductWithEventData objects
|
|
products_with_data = [
|
|
schemas.ProductWithEventData(
|
|
id=row.id,
|
|
name=row.name,
|
|
category=row.category,
|
|
organic=row.organic,
|
|
weight=row.weight,
|
|
weight_unit=row.weight_unit,
|
|
amount=row.amount,
|
|
price=row.price
|
|
)
|
|
for row in product_data
|
|
]
|
|
|
|
return schemas.ShoppingEventResponse(
|
|
id=event.id,
|
|
shop_id=event.shop_id,
|
|
date=event.date,
|
|
total_amount=event.total_amount,
|
|
notes=event.notes,
|
|
created_at=event.created_at,
|
|
shop=event.shop,
|
|
products=products_with_data
|
|
)
|
|
|
|
# Root endpoint
|
|
@app.get("/")
|
|
def read_root():
|
|
return {"message": "Product Tracker API", "version": "1.0.0"}
|
|
|
|
# Product endpoints
|
|
@app.post("/products/", response_model=schemas.Product)
|
|
def create_product(product: schemas.ProductCreate, db: Session = Depends(get_db)):
|
|
db_product = models.Product(**product.dict())
|
|
db.add(db_product)
|
|
db.commit()
|
|
db.refresh(db_product)
|
|
return db_product
|
|
|
|
@app.get("/products/", response_model=List[schemas.Product])
|
|
def read_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
products = db.query(models.Product).offset(skip).limit(limit).all()
|
|
return products
|
|
|
|
@app.get("/products/{product_id}", response_model=schemas.Product)
|
|
def read_product(product_id: int, db: Session = Depends(get_db)):
|
|
product = db.query(models.Product).filter(models.Product.id == product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
return product
|
|
|
|
@app.put("/products/{product_id}", response_model=schemas.Product)
|
|
def update_product(product_id: int, product_update: schemas.ProductUpdate, db: Session = Depends(get_db)):
|
|
product = db.query(models.Product).filter(models.Product.id == product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
update_data = product_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(product, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(product)
|
|
return product
|
|
|
|
@app.delete("/products/{product_id}")
|
|
def delete_product(product_id: int, db: Session = Depends(get_db)):
|
|
product = db.query(models.Product).filter(models.Product.id == product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
db.delete(product)
|
|
db.commit()
|
|
return {"message": "Product deleted successfully"}
|
|
|
|
# Shop endpoints
|
|
@app.post("/shops/", response_model=schemas.Shop)
|
|
def create_shop(shop: schemas.ShopCreate, db: Session = Depends(get_db)):
|
|
db_shop = models.Shop(**shop.dict())
|
|
db.add(db_shop)
|
|
db.commit()
|
|
db.refresh(db_shop)
|
|
return db_shop
|
|
|
|
@app.get("/shops/", response_model=List[schemas.Shop])
|
|
def read_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
shops = db.query(models.Shop).offset(skip).limit(limit).all()
|
|
return shops
|
|
|
|
@app.get("/shops/{shop_id}", response_model=schemas.Shop)
|
|
def read_shop(shop_id: int, db: Session = Depends(get_db)):
|
|
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
return shop
|
|
|
|
@app.put("/shops/{shop_id}", response_model=schemas.Shop)
|
|
def update_shop(shop_id: int, shop_update: schemas.ShopUpdate, db: Session = Depends(get_db)):
|
|
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
update_data = shop_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(shop, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(shop)
|
|
return shop
|
|
|
|
@app.delete("/shops/{shop_id}")
|
|
def delete_shop(shop_id: int, db: Session = Depends(get_db)):
|
|
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
db.delete(shop)
|
|
db.commit()
|
|
return {"message": "Shop deleted successfully"}
|
|
|
|
# Shopping Event endpoints
|
|
@app.post("/shopping-events/", response_model=schemas.ShoppingEventResponse)
|
|
def create_shopping_event(event: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
|
|
# Verify shop exists
|
|
shop = db.query(models.Shop).filter(models.Shop.id == event.shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
# Create shopping event
|
|
db_event = models.ShoppingEvent(
|
|
shop_id=event.shop_id,
|
|
date=event.date,
|
|
total_amount=event.total_amount,
|
|
notes=event.notes
|
|
)
|
|
db.add(db_event)
|
|
db.commit()
|
|
db.refresh(db_event)
|
|
|
|
# Add products to the event
|
|
for product_item in event.products:
|
|
product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found")
|
|
|
|
# Insert into association table
|
|
db.execute(
|
|
models.shopping_event_products.insert().values(
|
|
shopping_event_id=db_event.id,
|
|
product_id=product_item.product_id,
|
|
amount=product_item.amount,
|
|
price=product_item.price
|
|
)
|
|
)
|
|
|
|
db.commit()
|
|
db.refresh(db_event)
|
|
return build_shopping_event_response(db_event, db)
|
|
|
|
@app.get("/shopping-events/", response_model=List[schemas.ShoppingEventResponse])
|
|
def read_shopping_events(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
events = db.query(models.ShoppingEvent).order_by(models.ShoppingEvent.created_at.desc()).offset(skip).limit(limit).all()
|
|
return [build_shopping_event_response(event, db) for event in events]
|
|
|
|
@app.get("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse)
|
|
def read_shopping_event(event_id: int, db: Session = Depends(get_db)):
|
|
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
|
|
if event is None:
|
|
raise HTTPException(status_code=404, detail="Shopping event not found")
|
|
return build_shopping_event_response(event, db)
|
|
|
|
@app.put("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse)
|
|
def update_shopping_event(event_id: int, event_update: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
|
|
# Get the existing event
|
|
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
|
|
if event is None:
|
|
raise HTTPException(status_code=404, detail="Shopping event not found")
|
|
|
|
# Verify shop exists
|
|
shop = db.query(models.Shop).filter(models.Shop.id == event_update.shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
# Update the shopping event
|
|
event.shop_id = event_update.shop_id
|
|
event.date = event_update.date
|
|
event.total_amount = event_update.total_amount
|
|
event.notes = event_update.notes
|
|
|
|
# Remove existing product associations
|
|
db.execute(
|
|
models.shopping_event_products.delete().where(
|
|
models.shopping_event_products.c.shopping_event_id == event_id
|
|
)
|
|
)
|
|
|
|
# Add new product associations
|
|
for product_item in event_update.products:
|
|
product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found")
|
|
|
|
# Insert into association table
|
|
db.execute(
|
|
models.shopping_event_products.insert().values(
|
|
shopping_event_id=event_id,
|
|
product_id=product_item.product_id,
|
|
amount=product_item.amount,
|
|
price=product_item.price
|
|
)
|
|
)
|
|
|
|
db.commit()
|
|
db.refresh(event)
|
|
return build_shopping_event_response(event, db)
|
|
|
|
@app.delete("/shopping-events/{event_id}")
|
|
def delete_shopping_event(event_id: int, db: Session = Depends(get_db)):
|
|
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
|
|
if event is None:
|
|
raise HTTPException(status_code=404, detail="Shopping event not found")
|
|
|
|
# Delete product associations first
|
|
db.execute(
|
|
models.shopping_event_products.delete().where(
|
|
models.shopping_event_products.c.shopping_event_id == event_id
|
|
)
|
|
)
|
|
|
|
# Delete the shopping event
|
|
db.delete(event)
|
|
db.commit()
|
|
return {"message": "Shopping event deleted successfully"}
|
|
|
|
# Statistics endpoints
|
|
@app.get("/stats/categories", response_model=List[schemas.CategoryStats])
|
|
def get_category_stats(db: Session = Depends(get_db)):
|
|
# This would need more complex SQL query - placeholder for now
|
|
return []
|
|
|
|
@app.get("/stats/shops", response_model=List[schemas.ShopStats])
|
|
def get_shop_stats(db: Session = Depends(get_db)):
|
|
# This would need more complex SQL query - placeholder for now
|
|
return []
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |