from __future__ import annotations
"""
Project Manager for ToDoWrite Utilities
This module provides centralized project utility methods that
replace individual scripts.
It separates core functionality from AI-specific features.
"""
import contextlib
import importlib.util
import json
import logging
import shutil
from pathlib import Path
from textwrap import dedent
# Removed object and cast imports
from typing_extensions import TypedDict
[docs]
class OptimizationParams(TypedDict, total=False):
"""Parameters for token optimization functions"""
text: str
context: str
model: str
max_tokens: int
[docs]
class OptimizationResult(TypedDict, total=False):
"""Result from token optimization"""
optimized: bool
error: str
tokens_saved: int
original_tokens: int
optimized_tokens: int
optimization_ratio: float
strategies_used: list[str]
[docs]
class ProjectManager:
"""Centralized project management and utility methods."""
[docs]
def __init__(self) -> None:
self.cache_dir = Path.home() / ".ToDoWrite_cache"
self.cache_dir.mkdir(exist_ok=True)
self.logger = logging.getLogger(__name__)
# ===== Core Project Utilities =====
[docs]
def check_deprecated_schema(self) -> bool:
"""
Check that the deprecated schema doesn't have unintended changes.
Returns True if check passes, False if there are issues.
"""
primary_path = Path("ToDoWrite/schemas/ToDoWrite.schema.json")
deprecated_path = Path("configs/schemas/ToDoWrite.schema.json")
def get_schema_content(path: Path) -> dict[str, object]:
"""Load schema content from file."""
try:
with open(path) as f:
data = json.load(f)
return cast("dict[str, object]", data)
except (FileNotFoundError, json.JSONDecodeError):
return {}
# Load both schemas
primary_content = get_schema_content(primary_path)
deprecated_content = get_schema_content(deprecated_path)
if not deprecated_content:
print(
"INFO: Deprecated schema not found (may have been cleaned up)"
)
return True
if not primary_content:
print("❌ Primary schema not found!")
return False
# Check if deprecated schema still has deprecation notice
deprecated_title = deprecated_content.get("title", "")
if "DEPRECATED" not in deprecated_title:
print("❌ Deprecated schema title missing DEPRECATED marker!")
print(f"Found: {deprecated_title}")
return False
# Check that core schema structure matches
def get_core_properties(
schema: dict[str, object],
) -> dict[str, object]:
"""Get core schema properties for comparison."""
return {
"required": schema.get("required", []),
"properties": schema.get("properties", {}),
"type": schema.get("type", "object"),
}
primary_core = get_core_properties(primary_content)
deprecated_core = get_core_properties(deprecated_content)
if primary_core != deprecated_core:
print(
"❌ Deprecated schema has different core properties than "
"primary!"
)
print("This suggests someone modified the deprecated schema.")
print(
"All schema changes should be made to the "
"primary schema location."
)
return False
print("✅ Deprecated schema check passed")
print(f"Deprecated schema title: {deprecated_title}")
print("Core properties match primary schema")
return True
[docs]
def check_schema_changes(self) -> bool:
"""
Check if schema changes are in the correct location.
Returns True if check passes, False if there are issues.
"""
primary_schema = Path("ToDoWrite/schemas/ToDoWrite.schema.json")
deprecated_schema = Path("configs/schemas/ToDoWrite.schema.json")
# Check if primary schema exists
if not primary_schema.exists():
print("ERROR: Primary schema file not found!")
print(f"Expected location: {primary_schema}")
print("All schema changes must be made in the package location.")
return False
# Load both schemas
def load_schema(schema_path: Path) -> dict[str, object]:
"""Load schema from file."""
try:
with open(schema_path) as f:
data = json.load(f)
return cast("dict[str, object]", data)
except (FileNotFoundError, json.JSONDecodeError):
return cast("dict[str, object]", {})
primary_data = load_schema(primary_schema)
deprecated_data = load_schema(deprecated_schema)
# Check if deprecated schema has newer content (shouldn't happen)
if deprecated_data:
primary_title = primary_data.get("title", "").replace(
" (DEPRECATED)", ""
)
deprecated_title = deprecated_data.get("title", "").replace(
" (DEPRECATED)", ""
)
if (
primary_title
and deprecated_title
and primary_title != deprecated_title
):
print(
"WARNING: Deprecated schema has different content "
"than primary schema!"
)
print(
"This may indicate changes were made in the "
"wrong location."
)
print(f"Primary: {primary_title}")
print(f"Deprecated: {deprecated_title}")
return False
print("✅ Schema location check passed")
print(f"Primary schema: {primary_schema}")
if deprecated_schema.exists():
print(
f"Deprecated schema: {deprecated_schema} "
"(should not be modified)"
)
return True
[docs]
def setup_integration(
self, project_path: str | Path, db_type: str = "postgres"
) -> bool:
"""
Set up ToDoWrite integration in a project.
Args:
project_path: Path to the project directory
db_type: Database type ('postgres', 'sqlite')
Returns:
True if setup was successful, False otherwise
"""
project_path = Path(project_path)
if not project_path.exists():
print(f"❌ Project path does not exist: {project_path}")
return False
ToDoWrite_dir = project_path / ".ToDoWrite"
ToDoWrite_dir.mkdir(exist_ok=True)
print(f"🚀 Setting up ToDoWrite integration in {project_path}")
if db_type == "postgres":
if not self._setup_postgres_docker(project_path):
return False
else:
if not self._setup_sqlite(project_path):
return False
# Create configuration template
if not self._create_config_template(project_path, db_type):
return False
print("✅ ToDoWrite integration setup complete!")
print(f"📁 Configuration created in: {ToDoWrite_dir}")
print(f"📄 Database type: {db_type}")
return True
[docs]
def init_database_sql(self) -> str:
"""
Return PostgreSQL initialization SQL as string.
This provides the SQL content that would be in the init script.
"""
return dedent(
"""
-- ToDoWrite PostgreSQL Initialization Script
-- This script sets up the ToDoWrite database with proper
-- permissions and extensions
-- Create additional users if needed
-- CREATE USER ToDoWrite_readonly WITH PASSWORD 'readonly_password';
-- Grant permissions
GRANT CONNECT ON DATABASE ToDoWrite TO ToDoWrite;
GRANT USAGE ON SCHEMA public TO ToDoWrite;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ToDoWrite;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO ToDoWrite;
-- Enable extensions if needed
-- CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- CREATE EXTENSION IF NOT EXISTS "pg_trgm";
-- Create indexes for better performance (will be created by
-- SQLAlchemy migrations)
-- This file can be extended with additional setup as needed
-- Set default database settings for ToDoWrite
ALTER DATABASE ToDoWrite SET timezone TO 'UTC';
ALTER DATABASE ToDoWrite SET log_statement TO 'all';
"""
).strip()
[docs]
def create_project_structure(self, project_path: str | Path) -> bool:
"""
Create a basic ToDoWrite project structure.
Args:
project_path: Path where to create the structure
Returns:
True if structure was created successfully
"""
project_path = Path(project_path)
try:
# Create basic directory structure
directories = [
project_path / "configs" / "schemas",
project_path / "docs",
project_path / "scripts",
project_path / ".ToDoWrite",
]
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
# Create example files
self._create_readme(project_path)
self._create_gitignore(project_path)
print(f"✅ Project structure created at: {project_path}")
return True
except Exception as e:
print(f"❌ Failed to create project structure: {e}")
return False
[docs]
def validate_project_setup(
self, project_path: str | Path
) -> dict[str, object]:
"""
Validate that a project is properly set up for ToDoWrite.
Returns:
Dictionary with validation results
"""
project_path = Path(project_path)
result: dict[str, object] = {
"valid": True,
"issues": [],
"recommendations": [],
"found_files": [],
}
# Check if directory exists
if not project_path.exists():
result["valid"] = False
result["issues"].append("Project directory does not exist")
return result
# Look for key files
key_files = [
"ToDoWrite/schemas/ToDoWrite.schema.json",
"pyproject.toml",
"requirements.txt",
".env.ToDoWrite",
]
for file_path in key_files:
full_path = project_path / file_path
if full_path.exists():
result["found_files"].append(file_path)
else:
result["recommendations"].append(
f"Optional file missing: {file_path}"
)
# Check if ToDoWrite package is accessible
if importlib.util.find_spec("ToDoWrite") is not None:
result["found_files"].append("ToDoWrite package accessible")
else:
result["issues"].append("ToDoWrite package not importable")
result["valid"] = False
return result
# ===== Internal Helper Methods =====
def _setup_postgres_docker(self, project_path: Path) -> bool:
"""Set up PostgreSQL using Docker Compose."""
template_path = (
Path(__file__).parent.parent.parent.parent
/ "tests"
/ "docker-compose.yml"
)
target_path = project_path / "docker-compose.ToDoWrite.yml"
if template_path.exists():
shutil.copy2(template_path, target_path)
print(f"✅ Created {target_path}")
else:
self._create_docker_compose_template(target_path)
return True
def _setup_sqlite(self, project_path: Path) -> bool:
"""Set up SQLite configuration."""
env_path = project_path / ".env.ToDoWrite"
env_content = dedent(
"""
# ToDoWrite SQLite Configuration
TODOWRITE_DATABASE_URL=sqlite:///todowrite.db
"""
).strip()
with open(env_path, "w") as f:
f.write(env_content)
print(f"✅ Created {env_path}")
return True
def _create_config_template(
self, project_path: Path, db_type: str
) -> bool:
"""Create configuration template."""
config_content = dedent(
f"""
# ToDoWrite Configuration
# Database Configuration
TODOWRITE_DATABASE_URL={
(
"postgresql://ToDoWrite:ToDoWrite_dev_password@"
"localhost:5432/ToDoWrite"
)
if db_type == "postgres"
else "sqlite:///todowrite.db"
}
# Optional: Log level
LOG_LEVEL=INFO
# Optional: Storage preference (yaml_only, db_only, both)
TODOWRITE_STORAGE_PREFERENCE=both
# Schema validation
ToDoWrite_VALIDATE_SCHEMA=true
# Database migration
ToDoWrite_AUTO_MIGRATE=true
"""
).strip()
config_path = project_path / ".ToDoWrite" / "config.yaml"
with open(config_path, "w") as f:
f.write(config_content)
print(f"✅ Created configuration template: {config_path}")
return True
def _create_docker_compose_template(self, target_path: Path) -> None:
"""Create Docker Compose template for PostgreSQL."""
content = dedent(
"""
version: '3.9'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: ToDoWrite
POSTGRES_USER: ToDoWrite
POSTGRES_PASSWORD: ToDoWrite_dev_password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ToDoWrite"]
interval: 10s
timeout: 5s
retries: 5
volumes:
postgres_data:
"""
).strip()
with open(target_path, "w") as f:
f.write(content)
def _create_readme(self, project_path: Path) -> None:
"""Create a basic README file."""
content = dedent(
f"""
# {project_path.name}
## ToDoWrite Integration
This project is integrated with ToDoWrite for hierarchical
task management.
### Setup Instructions
1. Ensure Docker is installed if using PostgreSQL
2. Run `setup-integration` command
3. Start the database and run migrations
### Getting Started
```bash
# Initialize ToDoWrite
python -m ToDoWrite init
# Create your first goal
python -m ToDoWrite create --id GOAL-PROJECT-VISION --layer Goal \
--title "Project Vision"
```
"""
).strip()
readme_path = project_path / "README.md"
if not readme_path.exists():
with open(readme_path, "w") as f:
f.write(content)
def _create_gitignore(self, project_path: Path) -> None:
"""Create .gitignore template."""
content = dedent(
"""
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.venv/
venv/
env/
ENV/
env.bak/
venv.bak/
# ToDoWrite
.ToDoWrite/
*.db
*.sqlite
*.sqlite3
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Logs
*.log
log/
logs/
"""
).strip()
gitignore_path = project_path / ".gitignore"
if not gitignore_path.exists():
with open(gitignore_path, "w") as f:
f.write(content)
# ===== AI-Related Methods (Internal, Optional) =====
class _AIOptimizationManager:
"""Internal AI optimization features - not exposed to users
without AI access."""
def __init__(self) -> None:
self.cache_dir = Path.home() / ".ToDoWrite_cache"
self.cache_dir.mkdir(exist_ok=True)
self._ai_available = self._check_ai_availability()
self.logger = logging.getLogger(__name__)
def _check_ai_availability(self) -> bool:
"""Check if AI components are available."""
anthropic_available = importlib.util.find_spec("anthropic") is not None
openai_available = importlib.util.find_spec("openai") is not None
return anthropic_available and openai_available
def _get_token_counts(self, text: str) -> dict[str, int]:
"""Get token counts using available AI providers."""
token_counts = {}
if self._ai_available:
with contextlib.suppress(ImportError):
# Try OpenAI token counting (fallback method)
# Using basic approximation if tiktoken not available
token_counts["openai"] = (
len(text) // 4
) # Rough estimate: 1 token ≈ 4 chars
try:
# Try Anthropic
import anthropic # type: ignore
# Use Anthropic's token counting
encoder = anthropic.HUMAN_PROMPT + text + anthropic.AI_PROMPT
token_counts["anthropic"] = len(encoder)
except ImportError:
# Anthropic not available, skip token counting
pass
except Exception as e:
# Log specific error but don't crash
self.logger.debug(f"Anthropic token counting failed: {e}")
pass
return token_counts
def optimize_token_usage(
self, goal: str, **kwargs: OptimizationParams
) -> OptimizationResult | None:
"""
Internal AI token optimization.
Only works if AI dependencies are available.
"""
text = cast("str", kwargs.get("text", ""))
if not text:
return None
return self._optimize_token_usage_internal(goal, text, **kwargs)
def ensure_token_sage(self) -> bool:
"""Internal check for token-sage availability."""
return self._ai_available
def _validate_optimization_input(
self, text: str
) -> dict[str, object] | None:
"""Validate input for token optimization."""
if not text:
return cast(
"OptimizationResult",
{
"optimized": False,
"error": "No text provided for optimization",
"tokens_saved": 0,
},
)
return None
def _apply_whitespace_optimization(
self, text: str
) -> tuple[str, list[str]]:
"""Apply whitespace optimization to text."""
original_len = len(text)
optimized_text = " ".join(text.split())
strategies = []
if len(optimized_text) < original_len:
savings = original_len - len(optimized_text)
strategies.append(
f"Removed {savings} redundant whitespace characters"
)
return optimized_text, strategies
def _apply_phrase_optimization(self, text: str) -> tuple[str, list[str]]:
"""Apply phrase replacement optimization to text."""
replacements = {
"in order to": "to",
"due to the fact that": "because",
"in the event that": "if",
"at this point in time": "now",
"for the purpose of": "for",
}
optimized_text = text
strategies = []
for phrase, replacement in replacements.items():
if phrase in optimized_text:
count = optimized_text.count(phrase)
optimized_text = optimized_text.replace(phrase, replacement)
strategies.append(
f"Replaced {count} instances of '{phrase}' with '{replacement}'"
)
return optimized_text, strategies
def _apply_line_trimming_optimization(
self, text: str
) -> tuple[str, list[str]]:
"""Apply line trimming optimization to text."""
lines = text.split("\n")
optimized_lines = []
for line in lines:
line = line.strip()
if line and not line.startswith("#") and len(line) > 5:
optimized_lines.append(line)
strategies = []
if len(optimized_lines) < len(lines):
strategies.append(
f"Removed {len(lines) - len(optimized_lines)} empty/comment lines"
)
return "\n".join(optimized_lines), strategies
def _calculate_token_savings(
self, original_counts: dict, optimized_counts: dict
) -> dict[str, int]:
"""Calculate token savings between original and optimized text."""
token_savings = {}
for provider, original_count in original_counts.items():
optimized_count = optimized_counts.get(provider, original_count)
savings = original_count - optimized_count
if savings > 0:
token_savings[provider] = savings
return token_savings
def _optimize_token_usage_internal(
self, goal: str, text: str, **kwargs: OptimizationParams
) -> OptimizationResult | None:
"""
Internal method for token optimization.
Analyzes text and provides optimization suggestions.
Only works if AI dependencies are available.
"""
if not self._ai_available:
return None
# Validate input
validation_error = self._validate_optimization_input(text)
if validation_error:
return validation_error
# Get original token counts
original_counts = self._get_token_counts(text)
if not original_counts:
return cast(
"OptimizationResult",
{
"optimized": False,
"error": "Could not count tokens",
"tokens_saved": 0,
},
)
# Apply optimization strategies
optimized_text = text
all_strategies = []
optimizations_applied = []
# Strategy 1: Whitespace optimization
optimized_text, strategies = self._apply_whitespace_optimization(
optimized_text
)
all_strategies.extend(strategies)
if strategies:
optimizations_applied.append("whitespace_optimization")
# Strategy 2: Phrase optimization
optimized_text, strategies = self._apply_phrase_optimization(
optimized_text
)
all_strategies.extend(strategies)
if strategies:
optimizations_applied.append("phrase_optimization")
# Strategy 3: Line trimming optimization
optimized_text, strategies = self._apply_line_trimming_optimization(
optimized_text
)
all_strategies.extend(strategies)
if strategies:
optimizations_applied.append("line_trimming")
# Calculate results
optimized_counts = self._get_token_counts(optimized_text)
token_savings = self._calculate_token_savings(
original_counts, optimized_counts
)
# Prepare result
success = len(token_savings) > 0
result = {
"optimized": True,
"original_tokens": original_counts,
"optimized_tokens": optimized_counts,
"tokens_saved": token_savings,
"optimizations_applied": optimizations_applied,
"optimization_strategies": all_strategies
if success
else ["Text is already well-optimized"],
"method": "text_preprocessing",
"success": success,
}
if not success:
result["message"] = "No significant optimizations found"
return cast("OptimizationResult", result)
# Create instance for public use
_project_manager = ProjectManager()
_ai_manager = _AIOptimizationManager()
# ===== Public API Functions =====
[docs]
def check_deprecated_schema() -> bool:
"""Check that deprecated schema hasn't been modified."""
return _project_manager.check_deprecated_schema()
[docs]
def check_schema_changes() -> bool:
"""Check if schema changes are in the correct location."""
return _project_manager.check_schema_changes()
[docs]
def setup_integration(project_path: str, db_type: str = "postgres") -> bool:
"""Set up ToDoWrite integration in a project."""
return _project_manager.setup_integration(project_path, db_type)
[docs]
def create_project_structure(project_path: str) -> bool:
"""Create a basic ToDoWrite project structure."""
return _project_manager.create_project_structure(project_path)
[docs]
def validate_project_setup(project_path: str) -> dict[str, object]:
"""Validate that a project is properly set up for ToDoWrite."""
return _project_manager.validate_project_setup(project_path)
[docs]
def init_database_sql() -> str:
"""Return PostgreSQL initialization SQL as string."""
return _project_manager.init_database_sql()
# Public AI optimization functions
[docs]
def optimize_token_usage(
goal: str, **kwargs: OptimizationParams
) -> OptimizationResult | None:
"""
Public function for token optimization.
Returns None if AI dependencies are not available.
"""
return _ai_manager.optimize_token_usage(goal, **kwargs)
[docs]
def ensure_token_sage() -> bool:
"""
Public function to check if token-sage functionality is available.
Returns True if AI dependencies are available.
"""
return _ai_manager.ensure_token_sage()