delete unsed scripts

This commit is contained in:
lasse 2025-04-18 22:41:42 +02:00
parent 828e0a5c4b
commit 1671e169f8
5 changed files with 3 additions and 386 deletions

4
.gitignore vendored
View File

@ -1,3 +1,5 @@
./venv
data/*
!data/.gitkeep
!data/.gitkeep
working_times.db

View File

@ -2,9 +2,6 @@
This project analyzes working time data from a CSV file by:
1. Importing the data into a DuckDB database
2. Transforming the data for analysis
3. Generating reports based on the data
## Setup
### Dependencies
@ -29,42 +26,3 @@ This will:
- Create a DuckDB database file `working_times.db`
- Import the CSV data into a table
- Add an import timestamp to each record
### 2. Transform Data
Run the transformation script to create analytical views:
```bash
python3 transform_data.py
```
This will:
- Create summary tables with aggregated data
- Convert hours to days (using 8 hours = 1 day conversion)
- Add transformation timestamps
### 3. Analyze Data
Run the analysis script to generate reports:
```bash
python3 analyze_data.py
```
This will produce:
- Overall time summary
- Top projects by hours
- Busiest days
- Day distribution analysis
- Project-activity combinations
## Data Structure
The analysis uses the following tables:
- `working_times`: Raw imported data
- `working_times_summary`: Per-day, per-project aggregation
- `project_summary`: Total time per project
- `daily_summary`: Total time per day
Each derived table includes timestamps for data lineage tracking.

View File

@ -1,157 +0,0 @@
import duckdb
import pandas as pd
from datetime import datetime
# Connect to the database
try:
con = duckdb.connect('working_times.db')
print("Connected to working_times.db")
except Exception as e:
print(f"Error connecting to database: {e}")
exit(1)
# Get the current analysis timestamp
analysis_timestamp = datetime.now()
# Function to format hours
def format_hours(hours):
return f"{hours:.2f}h"
# Function to format days
def format_days(days):
return f"{days:.2f}d"
# Get the date range of the data
date_range = con.execute("""
SELECT MIN(date) AS start_date, MAX(date) AS end_date
FROM daily_summary
""").fetchone()
start_date = date_range[0]
end_date = date_range[1]
# Get the transformation timestamp (most recent)
transform_info = con.execute("""
SELECT
MAX(transform_timestamp) AS transform_timestamp,
MAX(source_import_timestamp) AS source_import_timestamp
FROM daily_summary
""").fetchone()
transform_timestamp = transform_info[0]
source_import_timestamp = transform_info[1]
print("\n" + "="*60)
print(f"WORKING TIME ANALYSIS: {start_date} to {end_date}")
print(f"ANALYSIS TIMESTAMP: {analysis_timestamp}")
print(f"DATA TRANSFORMATION: {transform_timestamp}")
print(f"DATA IMPORT: {source_import_timestamp}")
print("="*60)
# Get the total hours and days worked
totals = con.execute("""
SELECT SUM(total_hours) AS total_hours, SUM(total_days) AS total_days
FROM project_summary
""").fetchone()
total_hours = totals[0]
total_days = totals[1]
num_working_days = con.execute("SELECT COUNT(*) FROM daily_summary WHERE total_hours > 0").fetchone()[0]
avg_hours_per_day = total_hours / num_working_days if num_working_days > 0 else 0
print(f"\nTOTAL HOURS: {format_hours(total_hours)}")
print(f"TOTAL DAYS: {format_days(total_days)}")
print(f"WORKING DAYS: {num_working_days}")
print(f"AVG HOURS PER WORKING DAY: {format_hours(avg_hours_per_day)}")
# Get the top projects by hours
top_projects = con.execute("""
SELECT project_name, total_hours, total_days, days_worked
FROM project_summary
ORDER BY total_hours DESC
LIMIT 5
""").fetchall()
print("\n" + "-"*60)
print("TOP 5 PROJECTS BY HOURS")
print("-"*60)
for i, (project, hours, days, worked_days) in enumerate(top_projects, 1):
percent = (hours / total_hours) * 100
print(f"{i}. {project}")
print(f" {format_hours(hours)} ({percent:.1f}% of total) / {format_days(days)}")
print(f" Across {worked_days} days, daily average: {format_hours(hours/worked_days) if worked_days > 0 else 0}")
# Get the busiest days
busiest_days = con.execute("""
SELECT date, total_hours, project_count
FROM daily_summary
WHERE total_hours > 0
ORDER BY total_hours DESC
LIMIT 5
""").fetchall()
print("\n" + "-"*60)
print("TOP 5 BUSIEST DAYS")
print("-"*60)
for i, (date, hours, project_count) in enumerate(busiest_days, 1):
# Calculate day equivalent
day_equivalent = hours / 8
print(f"{i}. {date}: {format_hours(hours)} ({format_days(day_equivalent)}) across {project_count} projects")
# Get day distribution
day_distribution = con.execute("""
SELECT
CASE
WHEN total_hours <= 4 THEN '0-4 hours'
WHEN total_hours <= 6 THEN '4-6 hours'
WHEN total_hours <= 8 THEN '6-8 hours'
WHEN total_hours <= 10 THEN '8-10 hours'
ELSE '10+ hours'
END AS hour_range,
COUNT(*) as day_count
FROM daily_summary
WHERE total_hours > 0
GROUP BY hour_range
ORDER BY
CASE
WHEN hour_range = '0-4 hours' THEN 1
WHEN hour_range = '4-6 hours' THEN 2
WHEN hour_range = '6-8 hours' THEN 3
WHEN hour_range = '8-10 hours' THEN 4
ELSE 5
END
""").fetchall()
print("\n" + "-"*60)
print("DAY DISTRIBUTION")
print("-"*60)
for hour_range, day_count in day_distribution:
percent = (day_count / num_working_days) * 100
print(f"{hour_range}: {day_count} days ({percent:.1f}%)")
# Print an overview of project/activity combinations
project_activity_combo = con.execute("""
SELECT
project_name,
activity_type,
SUM(total_hours) as hours,
SUM(total_days) as days
FROM working_times_summary
GROUP BY project_name, activity_type
ORDER BY hours DESC
LIMIT 10
""").fetchall()
print("\n" + "-"*60)
print("TOP 10 PROJECT-ACTIVITY COMBINATIONS")
print("-"*60)
for project, activity, hours, days in project_activity_combo:
percent = (hours / total_hours) * 100
print(f"{project} - {activity}: {format_hours(hours)} ({format_days(days)}, {percent:.1f}%)")
print("\n" + "="*60)
print(f"END OF ANALYSIS - Generated at {analysis_timestamp}")
print("="*60)
# Close the connection
con.close()

View File

@ -1,68 +0,0 @@
#!/usr/bin/env python3
"""
Working Time Analysis - Complete Workflow
This script runs all three steps of the analysis in sequence:
1. Import the data
2. Transform the data
3. Generate analysis reports
"""
import os
import sys
import subprocess
import time
def run_step(script_name, step_desc):
"""Run a step in the analysis and handle errors"""
print(f"\n{'='*60}")
print(f"STEP: {step_desc}")
print(f"{'='*60}")
try:
# Run the script and capture output
result = subprocess.run(
[sys.executable, script_name],
capture_output=True,
text=True,
check=True
)
print(result.stdout)
return True
except subprocess.CalledProcessError as e:
print(f"ERROR in {script_name}:")
print(e.stderr)
return False
def main():
# Check if the CSV file exists
csv_file = 'data/lawi-2025-04-01-2025-04-30-2025-04-17.csv'
if not os.path.exists(csv_file):
print(f"Error: CSV file not found at {csv_file}")
return False
# Step 1: Import Data
if not run_step('import_data.py', 'IMPORTING DATA'):
return False
# Wait a moment to ensure any file locks are released
time.sleep(1)
# Step 2: Transform Data
if not run_step('transform_data.py', 'TRANSFORMING DATA'):
return False
# Wait a moment to ensure any file locks are released
time.sleep(1)
# Step 3: Analyze Data
if not run_step('analyze_data.py', 'ANALYZING DATA'):
return False
print("\n" + "="*60)
print("ANALYSIS COMPLETE")
print("="*60)
return True
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@ -1,118 +0,0 @@
import duckdb
import pandas as pd
import os
import time
import datetime
# Try to connect to the database with retry logic
max_retries = 5
retry_count = 0
connected = False
print("Trying to connect to working_times.db...")
while not connected and retry_count < max_retries:
try:
# Try to connect with 'access_mode=read_only' to avoid lock conflicts
con = duckdb.connect('working_times.db')
connected = True
print("Connected to working_times.db")
except Exception as e:
retry_count += 1
print(f"Connection attempt {retry_count} failed: {e}")
if retry_count < max_retries:
print(f"Retrying in {retry_count} seconds...")
time.sleep(retry_count)
else:
print("Maximum retries reached. Exiting.")
exit(1)
print("Transforming data...")
# Get the transformation timestamp
transform_timestamp = datetime.datetime.now()
print(f"Transform timestamp: {transform_timestamp}")
# Create a new table with transformed data
# This query will:
# 1. Extract date and project information
# 2. Calculate total hours per project per day
# 3. Format the data in a more analytical friendly way
con.execute("""
DROP TABLE IF EXISTS working_times_summary;
CREATE TABLE working_times_summary AS
SELECT
Datum AS date,
Projektname AS project_name,
"Leistungsart (Bezeichnung)" AS activity_type,
SUM("Zeit [h]") AS total_hours,
SUM("Zeit [h]"/8) AS total_days,
'{transform_timestamp}' AS transform_timestamp
FROM working_times
GROUP BY date, project_name, activity_type
ORDER BY date, project_name, activity_type;
""".format(transform_timestamp=transform_timestamp))
# Create a table with project totals
con.execute("""
DROP TABLE IF EXISTS project_summary;
CREATE TABLE project_summary AS
SELECT
Projektname AS project_name,
SUM("Zeit [h]") AS total_hours,
SUM("Zeit [h]"/8) AS total_days,
COUNT(DISTINCT Datum) AS days_worked,
MAX(import_timestamp) AS source_import_timestamp,
'{transform_timestamp}' AS transform_timestamp
FROM working_times
GROUP BY project_name
ORDER BY total_hours DESC;
""".format(transform_timestamp=transform_timestamp))
# Create a table with daily totals
con.execute("""
DROP TABLE IF EXISTS daily_summary;
CREATE TABLE daily_summary AS
SELECT
Datum AS date,
SUM("Zeit [h]") AS total_hours,
COUNT(*) AS entry_count,
COUNT(DISTINCT Projektname) AS project_count,
MAX(import_timestamp) AS source_import_timestamp,
'{transform_timestamp}' AS transform_timestamp
FROM working_times
GROUP BY date
ORDER BY date;
""".format(transform_timestamp=transform_timestamp))
# Verify the data was transformed
summary_count = con.execute("SELECT COUNT(*) FROM working_times_summary").fetchone()[0]
project_count = con.execute("SELECT COUNT(*) FROM project_summary").fetchone()[0]
daily_count = con.execute("SELECT COUNT(*) FROM daily_summary").fetchone()[0]
print(f"Successfully created {summary_count} records in working_times_summary table.")
print(f"Successfully created {project_count} records in project_summary table.")
print(f"Successfully created {daily_count} records in daily_summary table.")
# Print a sample of the summary table
print("\nSample of working_times_summary table:")
summary_sample = con.execute("SELECT date, project_name, activity_type, total_hours, total_days, transform_timestamp FROM working_times_summary LIMIT 5").fetchall()
for row in summary_sample:
print(row)
# Print a sample of the project summary table
print("\nProject summary (top 5 by hours):")
project_sample = con.execute("SELECT project_name, total_hours, total_days, days_worked, transform_timestamp FROM project_summary LIMIT 5").fetchall()
for row in project_sample:
print(row)
# Total hours worked
total_hours = con.execute("SELECT SUM(total_hours) FROM project_summary").fetchone()[0]
total_days = con.execute("SELECT SUM(total_days) FROM project_summary").fetchone()[0]
print(f"\nTotal hours worked: {total_hours:.2f}")
print(f"Total days worked: {total_days:.2f}")
# Close the connection
con.close()
print("\nData transformation complete.")