677 lines
27 KiB
Python
677 lines
27 KiB
Python
from fastapi import FastAPI, Depends, HTTPException, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
from typing import List
|
|
import models, schemas
|
|
from database import engine, get_db
|
|
|
|
# Create database tables
|
|
models.Base.metadata.create_all(bind=engine)
|
|
|
|
app = FastAPI(
|
|
title="Product Tracker API",
|
|
description="API for tracking product prices and shopping events",
|
|
version="1.0.0"
|
|
)
|
|
|
|
# CORS middleware for React frontend
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["http://localhost:3000", "http://localhost:5173"], # React dev servers
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
def build_shopping_event_response(event: models.ShoppingEvent, db: Session) -> schemas.ShoppingEventResponse:
|
|
"""Build a shopping event response with products from the association table"""
|
|
# Get products with their event-specific data including category and brand information
|
|
product_data = db.execute(
|
|
text("""
|
|
SELECT p.id, p.name, p.organic, p.weight, p.weight_unit,
|
|
sep.amount, sep.price,
|
|
gc.id as category_id, gc.name as category_name,
|
|
gc.created_at as category_created_at, gc.updated_at as category_updated_at,
|
|
b.id as brand_id, b.name as brand_name,
|
|
b.created_at as brand_created_at, b.updated_at as brand_updated_at
|
|
FROM products p
|
|
JOIN shopping_event_products sep ON p.id = sep.product_id
|
|
JOIN grocery_categories gc ON p.category_id = gc.id
|
|
LEFT JOIN brands b ON p.brand_id = b.id
|
|
WHERE sep.shopping_event_id = :event_id
|
|
"""),
|
|
{"event_id": event.id}
|
|
).fetchall()
|
|
|
|
# Convert to ProductWithEventData objects
|
|
products_with_data = []
|
|
for row in product_data:
|
|
category = schemas.GroceryCategory(
|
|
id=row.category_id,
|
|
name=row.category_name,
|
|
created_at=row.category_created_at,
|
|
updated_at=row.category_updated_at
|
|
)
|
|
|
|
brand = None
|
|
if row.brand_id is not None:
|
|
brand = schemas.Brand(
|
|
id=row.brand_id,
|
|
name=row.brand_name,
|
|
created_at=row.brand_created_at,
|
|
updated_at=row.brand_updated_at
|
|
)
|
|
|
|
products_with_data.append(
|
|
schemas.ProductWithEventData(
|
|
id=row.id,
|
|
name=row.name,
|
|
category=category,
|
|
brand=brand,
|
|
organic=row.organic,
|
|
weight=row.weight,
|
|
weight_unit=row.weight_unit,
|
|
amount=row.amount,
|
|
price=row.price
|
|
)
|
|
)
|
|
|
|
return schemas.ShoppingEventResponse(
|
|
id=event.id,
|
|
shop_id=event.shop_id,
|
|
date=event.date,
|
|
total_amount=event.total_amount,
|
|
notes=event.notes,
|
|
created_at=event.created_at,
|
|
shop=event.shop,
|
|
products=products_with_data
|
|
)
|
|
|
|
# Root endpoint
|
|
@app.get("/")
|
|
def read_root():
|
|
return {"message": "Product Tracker API", "version": "1.0.0"}
|
|
|
|
# Product endpoints
|
|
@app.post("/products/", response_model=schemas.Product)
|
|
def create_product(product: schemas.ProductCreate, db: Session = Depends(get_db)):
|
|
# Validate category exists
|
|
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == product.category_id).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Category not found")
|
|
|
|
# Validate brand exists if brand_id is provided
|
|
if product.brand_id is not None:
|
|
brand = db.query(models.Brand).filter(models.Brand.id == product.brand_id).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
db_product = models.Product(**product.dict())
|
|
db.add(db_product)
|
|
db.commit()
|
|
db.refresh(db_product)
|
|
return db_product
|
|
|
|
@app.get("/products/", response_model=List[schemas.Product])
|
|
def read_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
products = db.query(models.Product).offset(skip).limit(limit).all()
|
|
return products
|
|
|
|
@app.get("/products/{product_id}", response_model=schemas.Product)
|
|
def read_product(product_id: int, db: Session = Depends(get_db)):
|
|
product = db.query(models.Product).filter(models.Product.id == product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
return product
|
|
|
|
@app.put("/products/{product_id}", response_model=schemas.Product)
|
|
def update_product(product_id: int, product_update: schemas.ProductUpdate, db: Session = Depends(get_db)):
|
|
product = db.query(models.Product).filter(models.Product.id == product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
update_data = product_update.dict(exclude_unset=True)
|
|
|
|
# Validate category exists if category_id is being updated
|
|
if 'category_id' in update_data:
|
|
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == update_data['category_id']).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Category not found")
|
|
|
|
# Validate brand exists if brand_id is being updated
|
|
if 'brand_id' in update_data and update_data['brand_id'] is not None:
|
|
brand = db.query(models.Brand).filter(models.Brand.id == update_data['brand_id']).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
for field, value in update_data.items():
|
|
setattr(product, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(product)
|
|
return product
|
|
|
|
@app.delete("/products/{product_id}")
|
|
def delete_product(product_id: int, db: Session = Depends(get_db)):
|
|
product = db.query(models.Product).filter(models.Product.id == product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
db.delete(product)
|
|
db.commit()
|
|
return {"message": "Product deleted successfully"}
|
|
|
|
# Shop endpoints
|
|
@app.post("/shops/", response_model=schemas.Shop)
|
|
def create_shop(shop: schemas.ShopCreate, db: Session = Depends(get_db)):
|
|
db_shop = models.Shop(**shop.dict())
|
|
db.add(db_shop)
|
|
db.commit()
|
|
db.refresh(db_shop)
|
|
return db_shop
|
|
|
|
@app.get("/shops/", response_model=List[schemas.Shop])
|
|
def read_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
shops = db.query(models.Shop).offset(skip).limit(limit).all()
|
|
return shops
|
|
|
|
@app.get("/shops/{shop_id}", response_model=schemas.Shop)
|
|
def read_shop(shop_id: int, db: Session = Depends(get_db)):
|
|
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
return shop
|
|
|
|
@app.put("/shops/{shop_id}", response_model=schemas.Shop)
|
|
def update_shop(shop_id: int, shop_update: schemas.ShopUpdate, db: Session = Depends(get_db)):
|
|
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
update_data = shop_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(shop, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(shop)
|
|
return shop
|
|
|
|
@app.delete("/shops/{shop_id}")
|
|
def delete_shop(shop_id: int, db: Session = Depends(get_db)):
|
|
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
db.delete(shop)
|
|
db.commit()
|
|
return {"message": "Shop deleted successfully"}
|
|
|
|
# Brand endpoints
|
|
@app.post("/brands/", response_model=schemas.Brand)
|
|
def create_brand(brand: schemas.BrandCreate, db: Session = Depends(get_db)):
|
|
db_brand = models.Brand(**brand.dict())
|
|
db.add(db_brand)
|
|
db.commit()
|
|
db.refresh(db_brand)
|
|
return db_brand
|
|
|
|
@app.get("/brands/", response_model=List[schemas.Brand])
|
|
def read_brands(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
brands = db.query(models.Brand).offset(skip).limit(limit).all()
|
|
return brands
|
|
|
|
@app.get("/brands/{brand_id}", response_model=schemas.Brand)
|
|
def read_brand(brand_id: int, db: Session = Depends(get_db)):
|
|
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
return brand
|
|
|
|
@app.put("/brands/{brand_id}", response_model=schemas.Brand)
|
|
def update_brand(brand_id: int, brand_update: schemas.BrandUpdate, db: Session = Depends(get_db)):
|
|
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
update_data = brand_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(brand, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(brand)
|
|
return brand
|
|
|
|
@app.delete("/brands/{brand_id}")
|
|
def delete_brand(brand_id: int, db: Session = Depends(get_db)):
|
|
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
# Check if any products reference this brand
|
|
products_with_brand = db.query(models.Product).filter(models.Product.brand_id == brand_id).first()
|
|
if products_with_brand:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot delete brand: products are still associated with this brand"
|
|
)
|
|
|
|
db.delete(brand)
|
|
db.commit()
|
|
return {"message": "Brand deleted successfully"}
|
|
|
|
# BrandInShop endpoints
|
|
@app.post("/brands-in-shops/", response_model=schemas.BrandInShop)
|
|
def create_brand_in_shop(brand_in_shop: schemas.BrandInShopCreate, db: Session = Depends(get_db)):
|
|
# Validate shop exists
|
|
shop = db.query(models.Shop).filter(models.Shop.id == brand_in_shop.shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
# Validate brand exists
|
|
brand = db.query(models.Brand).filter(models.Brand.id == brand_in_shop.brand_id).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
# Check if this combination already exists
|
|
existing = db.query(models.BrandInShop).filter(
|
|
models.BrandInShop.shop_id == brand_in_shop.shop_id,
|
|
models.BrandInShop.brand_id == brand_in_shop.brand_id
|
|
).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="This brand is already associated with this shop")
|
|
|
|
db_brand_in_shop = models.BrandInShop(**brand_in_shop.dict())
|
|
db.add(db_brand_in_shop)
|
|
db.commit()
|
|
db.refresh(db_brand_in_shop)
|
|
return db_brand_in_shop
|
|
|
|
@app.get("/brands-in-shops/", response_model=List[schemas.BrandInShop])
|
|
def read_brands_in_shops(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
brands_in_shops = db.query(models.BrandInShop).offset(skip).limit(limit).all()
|
|
return brands_in_shops
|
|
|
|
@app.get("/brands-in-shops/shop/{shop_id}", response_model=List[schemas.BrandInShop])
|
|
def read_brands_in_shop(shop_id: int, db: Session = Depends(get_db)):
|
|
# Validate shop exists
|
|
shop = db.query(models.Shop).filter(models.Shop.id == shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
brands_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.shop_id == shop_id).all()
|
|
return brands_in_shop
|
|
|
|
@app.get("/brands-in-shops/brand/{brand_id}", response_model=List[schemas.BrandInShop])
|
|
def read_shops_with_brand(brand_id: int, db: Session = Depends(get_db)):
|
|
# Validate brand exists
|
|
brand = db.query(models.Brand).filter(models.Brand.id == brand_id).first()
|
|
if brand is None:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
shops_with_brand = db.query(models.BrandInShop).filter(models.BrandInShop.brand_id == brand_id).all()
|
|
return shops_with_brand
|
|
|
|
@app.get("/brands-in-shops/{brand_in_shop_id}", response_model=schemas.BrandInShop)
|
|
def read_brand_in_shop(brand_in_shop_id: int, db: Session = Depends(get_db)):
|
|
brand_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.id == brand_in_shop_id).first()
|
|
if brand_in_shop is None:
|
|
raise HTTPException(status_code=404, detail="Brand in shop association not found")
|
|
return brand_in_shop
|
|
|
|
@app.delete("/brands-in-shops/{brand_in_shop_id}")
|
|
def delete_brand_in_shop(brand_in_shop_id: int, db: Session = Depends(get_db)):
|
|
brand_in_shop = db.query(models.BrandInShop).filter(models.BrandInShop.id == brand_in_shop_id).first()
|
|
if brand_in_shop is None:
|
|
raise HTTPException(status_code=404, detail="Brand in shop association not found")
|
|
|
|
db.delete(brand_in_shop)
|
|
db.commit()
|
|
return {"message": "Brand in shop association deleted successfully"}
|
|
|
|
# Grocery Category endpoints
|
|
@app.post("/grocery-categories/", response_model=schemas.GroceryCategory)
|
|
def create_grocery_category(category: schemas.GroceryCategoryCreate, db: Session = Depends(get_db)):
|
|
db_category = models.GroceryCategory(**category.dict())
|
|
db.add(db_category)
|
|
db.commit()
|
|
db.refresh(db_category)
|
|
return db_category
|
|
|
|
@app.get("/grocery-categories/", response_model=List[schemas.GroceryCategory])
|
|
def read_grocery_categories(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
categories = db.query(models.GroceryCategory).offset(skip).limit(limit).all()
|
|
return categories
|
|
|
|
@app.get("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory)
|
|
def read_grocery_category(category_id: int, db: Session = Depends(get_db)):
|
|
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Grocery category not found")
|
|
return category
|
|
|
|
@app.put("/grocery-categories/{category_id}", response_model=schemas.GroceryCategory)
|
|
def update_grocery_category(category_id: int, category_update: schemas.GroceryCategoryUpdate, db: Session = Depends(get_db)):
|
|
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Grocery category not found")
|
|
|
|
update_data = category_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(category, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(category)
|
|
return category
|
|
|
|
@app.delete("/grocery-categories/{category_id}")
|
|
def delete_grocery_category(category_id: int, db: Session = Depends(get_db)):
|
|
category = db.query(models.GroceryCategory).filter(models.GroceryCategory.id == category_id).first()
|
|
if category is None:
|
|
raise HTTPException(status_code=404, detail="Grocery category not found")
|
|
|
|
# Check if any products reference this category
|
|
products_with_category = db.query(models.Product).filter(models.Product.category_id == category_id).first()
|
|
if products_with_category:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot delete category: products are still associated with this category"
|
|
)
|
|
|
|
db.delete(category)
|
|
db.commit()
|
|
return {"message": "Grocery category deleted successfully"}
|
|
|
|
# Shopping Event endpoints
|
|
@app.post("/shopping-events/", response_model=schemas.ShoppingEventResponse)
|
|
def create_shopping_event(event: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
|
|
# Verify shop exists
|
|
shop = db.query(models.Shop).filter(models.Shop.id == event.shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
# Create shopping event
|
|
db_event = models.ShoppingEvent(
|
|
shop_id=event.shop_id,
|
|
date=event.date,
|
|
total_amount=event.total_amount,
|
|
notes=event.notes
|
|
)
|
|
db.add(db_event)
|
|
db.commit()
|
|
db.refresh(db_event)
|
|
|
|
# Add products to the event
|
|
for product_item in event.products:
|
|
product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found")
|
|
|
|
# Insert into association table
|
|
db.execute(
|
|
models.shopping_event_products.insert().values(
|
|
shopping_event_id=db_event.id,
|
|
product_id=product_item.product_id,
|
|
amount=product_item.amount,
|
|
price=product_item.price
|
|
)
|
|
)
|
|
|
|
db.commit()
|
|
db.refresh(db_event)
|
|
return build_shopping_event_response(db_event, db)
|
|
|
|
@app.get("/shopping-events/", response_model=List[schemas.ShoppingEventResponse])
|
|
def read_shopping_events(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
events = db.query(models.ShoppingEvent).order_by(models.ShoppingEvent.created_at.desc()).offset(skip).limit(limit).all()
|
|
return [build_shopping_event_response(event, db) for event in events]
|
|
|
|
@app.get("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse)
|
|
def read_shopping_event(event_id: int, db: Session = Depends(get_db)):
|
|
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
|
|
if event is None:
|
|
raise HTTPException(status_code=404, detail="Shopping event not found")
|
|
return build_shopping_event_response(event, db)
|
|
|
|
@app.put("/shopping-events/{event_id}", response_model=schemas.ShoppingEventResponse)
|
|
def update_shopping_event(event_id: int, event_update: schemas.ShoppingEventCreate, db: Session = Depends(get_db)):
|
|
# Get the existing event
|
|
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
|
|
if event is None:
|
|
raise HTTPException(status_code=404, detail="Shopping event not found")
|
|
|
|
# Verify shop exists
|
|
shop = db.query(models.Shop).filter(models.Shop.id == event_update.shop_id).first()
|
|
if shop is None:
|
|
raise HTTPException(status_code=404, detail="Shop not found")
|
|
|
|
# Update the shopping event
|
|
event.shop_id = event_update.shop_id
|
|
event.date = event_update.date
|
|
event.total_amount = event_update.total_amount
|
|
event.notes = event_update.notes
|
|
|
|
# Remove existing product associations
|
|
db.execute(
|
|
models.shopping_event_products.delete().where(
|
|
models.shopping_event_products.c.shopping_event_id == event_id
|
|
)
|
|
)
|
|
|
|
# Add new product associations
|
|
for product_item in event_update.products:
|
|
product = db.query(models.Product).filter(models.Product.id == product_item.product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail=f"Product with id {product_item.product_id} not found")
|
|
|
|
# Insert into association table
|
|
db.execute(
|
|
models.shopping_event_products.insert().values(
|
|
shopping_event_id=event_id,
|
|
product_id=product_item.product_id,
|
|
amount=product_item.amount,
|
|
price=product_item.price
|
|
)
|
|
)
|
|
|
|
db.commit()
|
|
db.refresh(event)
|
|
return build_shopping_event_response(event, db)
|
|
|
|
@app.delete("/shopping-events/{event_id}")
|
|
def delete_shopping_event(event_id: int, db: Session = Depends(get_db)):
|
|
event = db.query(models.ShoppingEvent).filter(models.ShoppingEvent.id == event_id).first()
|
|
if event is None:
|
|
raise HTTPException(status_code=404, detail="Shopping event not found")
|
|
|
|
# Delete product associations first
|
|
db.execute(
|
|
models.shopping_event_products.delete().where(
|
|
models.shopping_event_products.c.shopping_event_id == event_id
|
|
)
|
|
)
|
|
|
|
# Delete the shopping event
|
|
db.delete(event)
|
|
db.commit()
|
|
return {"message": "Shopping event deleted successfully"}
|
|
|
|
# Statistics endpoints
|
|
@app.get("/stats/categories", response_model=List[schemas.CategoryStats])
|
|
def get_category_stats(db: Session = Depends(get_db)):
|
|
# This would need more complex SQL query - placeholder for now
|
|
return []
|
|
|
|
@app.get("/stats/shops", response_model=List[schemas.ShopStats])
|
|
def get_shop_stats(db: Session = Depends(get_db)):
|
|
# This would need more complex SQL query - placeholder for now
|
|
return []
|
|
|
|
# Related Products endpoints
|
|
@app.post("/related-products/", response_model=schemas.RelatedProduct)
|
|
def create_related_product(related_product: schemas.RelatedProductCreate, db: Session = Depends(get_db)):
|
|
# Validate both products exist
|
|
product = db.query(models.Product).filter(models.Product.id == related_product.product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
related = db.query(models.Product).filter(models.Product.id == related_product.related_product_id).first()
|
|
if related is None:
|
|
raise HTTPException(status_code=404, detail="Related product not found")
|
|
|
|
# Prevent self-referencing
|
|
if related_product.product_id == related_product.related_product_id:
|
|
raise HTTPException(status_code=400, detail="A product cannot be related to itself")
|
|
|
|
# Check if relationship already exists (in either direction)
|
|
existing = db.execute(
|
|
models.related_products.select().where(
|
|
((models.related_products.c.product_id == related_product.product_id) &
|
|
(models.related_products.c.related_product_id == related_product.related_product_id)) |
|
|
((models.related_products.c.product_id == related_product.related_product_id) &
|
|
(models.related_products.c.related_product_id == related_product.product_id))
|
|
)
|
|
).first()
|
|
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Products are already related")
|
|
|
|
# Insert the relationship
|
|
result = db.execute(
|
|
models.related_products.insert().values(
|
|
product_id=related_product.product_id,
|
|
related_product_id=related_product.related_product_id,
|
|
relationship_type=related_product.relationship_type
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
# Get the created relationship
|
|
relationship_id = result.inserted_primary_key[0]
|
|
created_relationship = db.execute(
|
|
models.related_products.select().where(models.related_products.c.id == relationship_id)
|
|
).first()
|
|
|
|
return schemas.RelatedProduct(
|
|
id=created_relationship.id,
|
|
product_id=created_relationship.product_id,
|
|
related_product_id=created_relationship.related_product_id,
|
|
relationship_type=created_relationship.relationship_type,
|
|
created_at=created_relationship.created_at
|
|
)
|
|
|
|
@app.get("/related-products/", response_model=List[schemas.RelatedProduct])
|
|
def read_related_products(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
relationships = db.execute(
|
|
models.related_products.select().offset(skip).limit(limit)
|
|
).fetchall()
|
|
|
|
return [
|
|
schemas.RelatedProduct(
|
|
id=rel.id,
|
|
product_id=rel.product_id,
|
|
related_product_id=rel.related_product_id,
|
|
relationship_type=rel.relationship_type,
|
|
created_at=rel.created_at
|
|
)
|
|
for rel in relationships
|
|
]
|
|
|
|
@app.get("/related-products/product/{product_id}", response_model=List[schemas.Product])
|
|
def get_related_products_for_product(product_id: int, db: Session = Depends(get_db)):
|
|
# Validate product exists
|
|
product = db.query(models.Product).filter(models.Product.id == product_id).first()
|
|
if product is None:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
# Get related products (bidirectional)
|
|
related_product_ids = db.execute(
|
|
text("""
|
|
SELECT CASE
|
|
WHEN product_id = :product_id THEN related_product_id
|
|
ELSE product_id
|
|
END as related_id
|
|
FROM related_products
|
|
WHERE product_id = :product_id OR related_product_id = :product_id
|
|
"""),
|
|
{"product_id": product_id}
|
|
).fetchall()
|
|
|
|
if not related_product_ids:
|
|
return []
|
|
|
|
# Get the actual product objects
|
|
related_ids = [row.related_id for row in related_product_ids]
|
|
related_products = db.query(models.Product).filter(models.Product.id.in_(related_ids)).all()
|
|
|
|
return related_products
|
|
|
|
@app.get("/related-products/{relationship_id}", response_model=schemas.RelatedProduct)
|
|
def read_related_product(relationship_id: int, db: Session = Depends(get_db)):
|
|
relationship = db.execute(
|
|
models.related_products.select().where(models.related_products.c.id == relationship_id)
|
|
).first()
|
|
|
|
if relationship is None:
|
|
raise HTTPException(status_code=404, detail="Related product relationship not found")
|
|
|
|
return schemas.RelatedProduct(
|
|
id=relationship.id,
|
|
product_id=relationship.product_id,
|
|
related_product_id=relationship.related_product_id,
|
|
relationship_type=relationship.relationship_type,
|
|
created_at=relationship.created_at
|
|
)
|
|
|
|
@app.put("/related-products/{relationship_id}", response_model=schemas.RelatedProduct)
|
|
def update_related_product(relationship_id: int, update_data: schemas.RelatedProductUpdate, db: Session = Depends(get_db)):
|
|
# Check if relationship exists
|
|
existing = db.execute(
|
|
models.related_products.select().where(models.related_products.c.id == relationship_id)
|
|
).first()
|
|
|
|
if existing is None:
|
|
raise HTTPException(status_code=404, detail="Related product relationship not found")
|
|
|
|
# Update the relationship
|
|
db.execute(
|
|
models.related_products.update().where(models.related_products.c.id == relationship_id).values(
|
|
relationship_type=update_data.relationship_type
|
|
)
|
|
)
|
|
db.commit()
|
|
|
|
# Get the updated relationship
|
|
updated_relationship = db.execute(
|
|
models.related_products.select().where(models.related_products.c.id == relationship_id)
|
|
).first()
|
|
|
|
return schemas.RelatedProduct(
|
|
id=updated_relationship.id,
|
|
product_id=updated_relationship.product_id,
|
|
related_product_id=updated_relationship.related_product_id,
|
|
relationship_type=updated_relationship.relationship_type,
|
|
created_at=updated_relationship.created_at
|
|
)
|
|
|
|
@app.delete("/related-products/{relationship_id}")
|
|
def delete_related_product(relationship_id: int, db: Session = Depends(get_db)):
|
|
# Check if relationship exists
|
|
existing = db.execute(
|
|
models.related_products.select().where(models.related_products.c.id == relationship_id)
|
|
).first()
|
|
|
|
if existing is None:
|
|
raise HTTPException(status_code=404, detail="Related product relationship not found")
|
|
|
|
# Delete the relationship
|
|
db.execute(
|
|
models.related_products.delete().where(models.related_products.c.id == relationship_id)
|
|
)
|
|
db.commit()
|
|
|
|
return {"message": "Related product relationship deleted successfully"}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |