ChatBotRPG - Code Examples and Best Practices

Analysis Date: 2026-01-18 Language: Python 3.10+ Type Hints: Full type annotations


Overview

This document contains production-ready code examples inferred from ChatBotRPG’s design patterns and Discord discussions. All examples include:

  • Full type hints
  • Error handling
  • Documentation
  • Real-world usage examples

Best Practice 1: Anti-Hallucination Constraint System

Implementation

from typing import Dict, Any, Tuple, List, Optional
from dataclasses import dataclass
 
@dataclass
class ValidationResult:
    """Result of action validation"""
    valid: bool
    reason: str
    suggested_narration: Optional[str] = None
 
 
class ConstraintValidator:
    """Validates LLM outputs against game rules to prevent hallucinations"""
 
    MAGIC_KEYWORDS = ["cast", "spell", "magic", "summon", "teleport",
                      "enchant", "charm", "hex", "curse_spell"]
    MAGIC_CLASSES = ["mage", "wizard", "sorcerer", "warlock", "cleric",
                     "druid", "paladin"]
 
    def __init__(self, character_sheet: Dict[str, Any],
                 world_state: Dict[str, Any]):
        self.character_sheet = character_sheet
        self.world_state = world_state
 
    def validate_action(self, action: str, method: str) -> ValidationResult:
        """
        Validate if action is possible given character abilities
 
        Args:
            action: The action being attempted (e.g., "attack", "cast spell")
            method: How the action is being performed (e.g., "sword", "fireball")
 
        Returns:
            ValidationResult with validity status and reason
        """
 
        # Check 1: Character has required ability
        if self._is_magical_action(action, method):
            if not self._has_magic_ability():
                return ValidationResult(
                    valid=False,
                    reason="You're not a spellcaster",
                    suggested_narration="You attempt to channel magical energy, but nothing happens. You lack the training to cast spells."
                )
 
        # Check 2: Required items present
        required_item = self._get_required_item(action, method)
        if required_item and not self._has_item(required_item):
            return ValidationResult(
                valid=False,
                reason=f"You don't have {required_item}",
                suggested_narration=f"You reach for your {required_item}, but it's not there. You can't perform this action without it."
            )
 
        # Check 3: Physics validation
        physics_check = self._physics_check(action, method)
        if not physics_check.valid:
            return physics_check
 
        # Check 4: Location constraints
        location_check = self._location_check(action)
        if not location_check.valid:
            return location_check
 
        return ValidationResult(valid=True, reason="Action allowed")
 
    def _is_magical_action(self, action: str, method: str) -> bool:
        """Check if action or method involves magic"""
        text = f"{action} {method}".lower()
        return any(keyword in text for keyword in self.MAGIC_KEYWORDS)
 
    def _has_magic_ability(self) -> bool:
        """Check if character can cast spells"""
        char_class = self.character_sheet.get("class", "").lower()
        return char_class in self.MAGIC_CLASSES
 
    def _has_item(self, item: str) -> bool:
        """Check if item is in character's inventory"""
        inventory = self.character_sheet.get("inventory", [])
        item_lower = item.lower()
        return any(item_lower in inv_item.lower() for inv_item in inventory)
 
    def _get_required_item(self, action: str, method: str) -> Optional[str]:
        """Map actions/methods to required items"""
        action_items = {
            "lockpick": "lockpicks",
            "unlock": "key",
            "shoot": "bow",
            "fire arrow": "bow",
            "drink potion": "potion",
            "read": "book",
            "light": "torch",
            "write": "quill"
        }
 
        text = f"{action} {method}".lower()
        for key, item in action_items.items():
            if key in text:
                return item
        return None
 
    def _physics_check(self, action: str, method: str) -> ValidationResult:
        """Validate against setting physics"""
 
        text = f"{action} {method}".lower()
 
        # Check for impossible physical actions
        impossible_actions = [
            ("fly", "flight", "You can't fly without wings or magic"),
            ("teleport", "teleportation", "You can't teleport without magic"),
            ("lift 500", None, "That's far too heavy to lift"),
            ("jump through wall", None, "You can't jump through solid walls"),
            ("walk through", None, "You can't walk through solid objects"),
            ("phase", None, "You can't pass through matter"),
            ("become invisible", "invisibility", "You don't have invisibility powers")
        ]
 
        for trigger, ability, reason in impossible_actions:
            if trigger in text:
                # Check if character has the required ability
                if ability and self._has_ability(ability):
                    continue  # Action is valid due to special ability
 
                return ValidationResult(
                    valid=False,
                    reason=reason,
                    suggested_narration=f"You attempt to {action}, but it's physically impossible. {reason}."
                )
 
        return ValidationResult(valid=True, reason="Physics check passed")
 
    def _location_check(self, action: str) -> ValidationResult:
        """Validate action is appropriate for current location"""
 
        location = self.world_state.get("player_location", "")
        action_lower = action.lower()
 
        # Location-specific restrictions
        restrictions = {
            "underwater": ["light fire", "shoot bow"],
            "in_air": ["pick up", "open door"],
            "crowded_tavern": ["practice swordplay", "shoot bow"]
        }
 
        for location_type, forbidden_actions in restrictions.items():
            if location_type in location.lower():
                for forbidden in forbidden_actions:
                    if forbidden in action_lower:
                        return ValidationResult(
                            valid=False,
                            reason=f"Can't {action} while {location_type}",
                            suggested_narration=f"You can't do that here. The environment doesn't allow it."
                        )
 
        return ValidationResult(valid=True, reason="Location check passed")
 
    def _has_ability(self, ability: str) -> bool:
        """Check if character has a special ability"""
        abilities = self.character_sheet.get("abilities", [])
        return ability.lower() in [a.lower() for a in abilities]
 
 
# Usage Example
def example_usage():
    """Demonstrate constraint validation"""
 
    # Character setup
    character = {
        "class": "Warrior",
        "inventory": ["iron_sword", "leather_armor", "10_gold"],
        "abilities": ["basic_swordplay", "shield_bash"]
    }
 
    world = {
        "player_location": "golden_oak_inn",
        "npcs": ["bartender", "guard"],
        "time": "evening"
    }
 
    validator = ConstraintValidator(character, world)
 
    # Test 1: Impossible action (casting spells as warrior)
    result = validator.validate_action("cast fireball", "arcane gesture")
    print(f"Test 1 - Magic as warrior:")
    print(f"  Valid: {result.valid}")
    print(f"  Reason: {result.reason}")
    print(f"  Narration: {result.suggested_narration}\n")
    # Output:
    # Valid: False
    # Reason: You're not a spellcaster
    # Narration: You attempt to channel magical energy...
 
    # Test 2: Missing required item
    result = validator.validate_action("lockpick the door", "carefully")
    print(f"Test 2 - Missing lockpicks:")
    print(f"  Valid: {result.valid}")
    print(f"  Reason: {result.reason}\n")
    # Output:
    # Valid: False
    # Reason: You don't have lockpicks
 
    # Test 3: Valid action
    result = validator.validate_action("attack guard", "with sword")
    print(f"Test 3 - Valid attack:")
    print(f"  Valid: {result.valid}")
    print(f"  Reason: {result.reason}\n")
    # Output:
    # Valid: True
    # Reason: Action allowed
 
    # Test 4: Physics violation
    result = validator.validate_action("fly to the ceiling", "flapping arms")
    print(f"Test 4 - Physics violation:")
    print(f"  Valid: {result.valid}")
    print(f"  Reason: {result.reason}")
    print(f"  Narration: {result.suggested_narration}\n")
    # Output:
    # Valid: False
    # Reason: You can't fly without wings or magic
 
 
if __name__ == "__main__":
    example_usage()

Integration with LLM Pipeline

def process_player_action_with_validation(
    player_input: str,
    character_sheet: Dict[str, Any],
    world_state: Dict[str, Any],
    llm_client: Any
) -> str:
    """
    Process player action with constraint validation before narration
 
    Args:
        player_input: Natural language player input
        character_sheet: Player's character data
        world_state: Current world state
        llm_client: LLM client for narration
 
    Returns:
        Narrated response
    """
 
    # Step 1: Extract intent from natural language
    intent = extract_intent(player_input, llm_client)
 
    # Step 2: Validate action against constraints
    validator = ConstraintValidator(character_sheet, world_state)
    validation = validator.validate_action(intent['action'], intent['method'])
 
    # Step 3: Generate narration
    if validation.valid:
        # Action allowed - narrate success/failure based on dice rolls, etc.
        event = execute_action(intent, world_state)
        return generate_narration(event, world_state, llm_client)
    else:
        # Action invalid - use suggested narration or generate failure
        if validation.suggested_narration:
            return validation.suggested_narration
        else:
            return generate_failure_narration(intent, validation.reason, llm_client)

Best Practice 2: 170-Token Narration Limiter

Implementation

import re
from typing import Optional, Dict, Any
from dataclasses import dataclass
 
 
@dataclass
class NarrationConfig:
    """Configuration for narration generation"""
    max_tokens: int = 170
    target_sentences: int = 2
    temperature: float = 0.7
    enforce_complete_sentences: bool = True
 
 
class NarrationLimiter:
    """Enforces 170-token sweet spot for responsive narration"""
 
    def __init__(self, llm_client: Any, config: Optional[NarrationConfig] = None):
        self.llm = llm_client
        self.config = config or NarrationConfig()
 
    def generate_narration(self, ndl: str, context: Dict[str, Any],
                           override_max_tokens: Optional[int] = None) -> str:
        """
        Generate token-limited narration
 
        Args:
            ndl: Natural Description Language event string
            context: Game context (location, time, NPCs, etc.)
            override_max_tokens: Optional override for max tokens
 
        Returns:
            Formatted narration string
        """
        max_tokens = override_max_tokens or self.config.max_tokens
 
        # Build prompt with explicit length constraint
        prompt = self._build_prompt(ndl, context, max_tokens)
 
        # API-level token limit
        response = self.llm.complete(
            prompt,
            temperature=self.config.temperature,
            max_tokens=max_tokens
        )
 
        # Post-processing: Enforce sentence count and completeness
        if self.config.enforce_complete_sentences:
            cleaned = self._enforce_sentence_limit(response)
        else:
            cleaned = response.strip()
 
        return cleaned
 
    def _build_prompt(self, ndl: str, context: Dict[str, Any],
                      max_tokens: int) -> str:
        """Build prompt with length instructions"""
 
        location = context.get('location', 'Unknown location')
        time = context.get('time', 'Unknown time')
        weather = context.get('weather', '')
 
        weather_note = f"\nWeather: {weather}" if weather else ""
 
        return f"""You are a game narrator. Convert events to natural prose.
 
Setting: {location}
Time: {time}{weather_note}
 
Events: {ndl}
 
Write EXACTLY {self.config.target_sentences}-3 sentences. Be concise and punchy.
Maximum length: {max_tokens} tokens.
Use present tense and second person ("You swing...").
"""
 
    def _enforce_sentence_limit(self, text: str) -> str:
        """
        Post-process to enforce sentence count
 
        Args:
            text: Raw LLM output
 
        Returns:
            Cleaned text with proper sentence boundaries
        """
        # Split on sentence boundaries (. ! ?)
        sentence_pattern = r'(?<=[.!?])\s+'
        sentences = re.split(sentence_pattern, text.strip())
 
        # Keep first 2-3 sentences
        max_sentences = self.config.target_sentences + 1  # Allow up to 3
        if len(sentences) > max_sentences:
            sentences = sentences[:max_sentences]
 
        # Handle incomplete final sentence (common with token limits)
        # If last sentence doesn't end with punctuation, remove it
        if sentences and sentences[-1]:
            last_char = sentences[-1].strip()[-1] if sentences[-1].strip() else ''
            if last_char not in '.!?':
                sentences = sentences[:-1]
 
        # Join sentences
        result = ' '.join(s.strip() for s in sentences if s.strip())
 
        return result
 
    def generate_dialogue(self, speaker: str, listener: str,
                          topic: str, context: Dict[str, Any],
                          max_tokens: int = 50) -> str:
        """
        Generate short dialogue (even more constrained than narration)
 
        Args:
            speaker: Character speaking
            listener: Character being spoken to
            topic: What the dialogue is about
            context: Game context including NPC personality
            max_tokens: Token limit for dialogue (default 50)
 
        Returns:
            Dialogue string
        """
        personality = context.get('npcs', {}).get(speaker, {}).get('personality', 'neutral')
        mood = context.get('npcs', {}).get(speaker, {}).get('mood', 'neutral')
 
        prompt = f"""Character {speaker} speaks to {listener} about {topic}.
 
Personality: {personality}
Mood: {mood}
 
Write a single line of dialogue (1-2 sentences max). Match personality and mood.
Include quotation marks. No narration, just dialogue.
"""
 
        response = self.llm.complete(
            prompt,
            temperature=0.9,  # Higher for creative dialogue
            max_tokens=max_tokens
        )
 
        # Extract dialogue from quotes if present
        dialogue_match = re.search(r'"([^"]+)"', response)
        if dialogue_match:
            return f'"{dialogue_match.group(1)}"'
 
        return response.strip()
 
    def generate_combat_narration(self, attacker: str, defender: str,
                                   attack_type: str, hit: bool,
                                   damage: int, context: Dict[str, Any]) -> str:
        """
        Generate combat narration (optimized for fast-paced action)
 
        Args:
            attacker: Who is attacking
            defender: Who is being attacked
            attack_type: Type of attack (sword, spell, etc.)
            hit: Whether attack hit or missed
            damage: Damage dealt (0 if miss)
            context: Game context
 
        Returns:
            Combat narration string
        """
        result = "hit" if hit else "miss"
        damage_str = f" ({damage} damage)" if hit else " (Miss)"
 
        prompt = f"""Combat turn in fantasy RPG:
 
Attacker: {attacker}
Defender: {defender}
Attack: {attack_type}
Result: {result}
 
Write 2 sentences describing this combat turn.
Be dynamic and engaging. Include sensory details (sounds, visuals).
End with: {damage_str}
"""
 
        response = self.llm.complete(
            prompt,
            temperature=0.6,  # Moderate creativity
            max_tokens=120    # Slightly shorter than normal narration
        )
 
        # Ensure damage/miss indicator is present
        if damage_str not in response:
            response = response.strip() + f" {damage_str}"
 
        return self._enforce_sentence_limit(response)
 
 
# Usage Example
def example_usage():
    """Demonstrate narration limiting"""
 
    class MockLLM:
        """Mock LLM for testing"""
        def complete(self, prompt: str, temperature: float, max_tokens: int) -> str:
            # Simulate different response lengths
            if "combat" in prompt.lower():
                return ("You swing your sword at the guard with all your might. "
                        "The blade connects with a sharp clang, biting deep into his shoulder. "
                        "He staggers back, blood seeping through his armor.")
            else:
                return ("You enter the Golden Oak Inn. The warm firelight flickers across "
                        "worn wooden tables. The smell of roasted meat and ale fills the air. "
                        "A bard strums a lute in the corner. Patrons laugh and talk loudly. "
                        "The bartender waves you over.")
 
    limiter = NarrationLimiter(MockLLM())
 
    # Example 1: Regular narration
    ndl = 'do($player, "enter")->location($inn)'
    context = {
        "location": "Golden Oak Inn",
        "time": "Evening",
        "weather": "Rainy outside"
    }
 
    narration = limiter.generate_narration(ndl, context)
    print("Example 1 - Regular narration:")
    print(f"  {narration}\n")
    # Output: First 2-3 sentences only
 
    # Example 2: Dialogue
    dialogue = limiter.generate_dialogue(
        "Bartender",
        "Player",
        "drinks",
        context={
            "npcs": {
                "Bartender": {
                    "personality": "gruff but fair",
                    "mood": "tired"
                }
            }
        }
    )
    print("Example 2 - Dialogue:")
    print(f"  {dialogue}\n")
 
    # Example 3: Combat
    combat = limiter.generate_combat_narration(
        "You",
        "Guard",
        "sword slash",
        hit=True,
        damage=12,
        context={}
    )
    print("Example 3 - Combat:")
    print(f"  {combat}\n")
 
 
if __name__ == "__main__":
    example_usage()

Cost Comparison

def calculate_cost_savings():
    """Demonstrate cost savings from 170-token limit"""
 
    # Pricing (Gemini 2.5 Flash Lite Preview, Jan 2026)
    cost_per_1k_output_tokens = 0.001  # $0.001 per 1K tokens
 
    # Scenarios
    scenarios = {
        "No limit (avg 500 tokens)": 500,
        "300-token limit": 300,
        "170-token limit": 170
    }
 
    turns_per_session = 200  # Average game session
 
    print("Cost Analysis for 200-turn game session:\n")
    for scenario, tokens_per_turn in scenarios.items():
        total_tokens = tokens_per_turn * turns_per_session
        cost = (total_tokens / 1000) * cost_per_1k_output_tokens
        print(f"{scenario}:")
        print(f"  Tokens per turn: {tokens_per_turn}")
        print(f"  Total tokens: {total_tokens:,}")
        print(f"  Cost: ${cost:.4f}\n")
 
    # Savings
    no_limit_cost = (500 * turns_per_session / 1000) * cost_per_1k_output_tokens
    with_limit_cost = (170 * turns_per_session / 1000) * cost_per_1k_output_tokens
    savings = no_limit_cost - with_limit_cost
    savings_pct = (savings / no_limit_cost) * 100
 
    print(f"Savings with 170-token limit:")
    print(f"  Absolute: ${savings:.4f}")
    print(f"  Percentage: {savings_pct:.1f}%")
 
 
# Output:
# Cost Analysis for 200-turn game session:
#
# No limit (avg 500 tokens):
#   Tokens per turn: 500
#   Total tokens: 100,000
#   Cost: $0.1000
#
# 300-token limit:
#   Tokens per turn: 300
#   Total tokens: 60,000
#   Cost: $0.0600
#
# 170-token limit:
#   Tokens per turn: 170
#   Total tokens: 34,000
#   Cost: $0.0340
#
# Savings with 170-token limit:
#   Absolute: $0.0660
#   Percentage: 66.0%

Best Practice 3: Three-Tier Persistence

Implementation

import json
import sqlite3
from dataclasses import dataclass, asdict, field
from typing import Dict, List, Any, Optional
from datetime import datetime
from pathlib import Path
 
 
@dataclass
class WorldState:
    """Persistent world data (.world file)"""
    world_id: str
    name: str
    setting: str  # "fantasy", "sci-fi", etc.
    locations: Dict[str, Any] = field(default_factory=dict)
    npcs: Dict[str, Any] = field(default_factory=dict)
    items: Dict[str, Any] = field(default_factory=dict)
    rules: List[Dict[str, Any]] = field(default_factory=list)
    templates: Dict[str, Any] = field(default_factory=dict)
    lore: List[str] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    version: str = "1.0"
 
 
@dataclass
class PlaythroughState:
    """Current game session (.save file)"""
    save_id: str
    world_id: str
    player_state: Dict[str, Any] = field(default_factory=dict)
    location_states: Dict[str, Any] = field(default_factory=dict)
    npc_states: Dict[str, Any] = field(default_factory=dict)
    quest_progress: Dict[str, Any] = field(default_factory=dict)
    inventory: List[str] = field(default_factory=list)
    game_time: int = 0  # In-game time (minutes since start)
    flags: Dict[str, bool] = field(default_factory=dict)
    turn_count: int = 0
    created_at: datetime = field(default_factory=datetime.now)
    last_played: datetime = field(default_factory=datetime.now)
 
 
class SessionState:
    """Runtime memory (not persisted)"""
 
    def __init__(self, max_context_window: int = 10):
        self.context_window: List[Dict[str, str]] = []
        self.max_context_window = max_context_window
        self.pending_events: List[str] = []
        self.cached_descriptions: Dict[str, str] = {}
        self.llm_temperature: float = 0.7
        self.current_scene: Optional[str] = None
 
    def add_to_context(self, event: Dict[str, str]):
        """Add event to context window with automatic cleanup"""
        self.context_window.append(event)
 
        # Keep only last N turns
        if len(self.context_window) > self.max_context_window:
            self.context_window.pop(0)
 
    def get_context(self) -> List[Dict[str, str]]:
        """Get current context window"""
        return self.context_window.copy()
 
    def clear_context(self):
        """Clear context window (e.g., when changing scenes)"""
        self.context_window.clear()
 
 
class PersistenceManager:
    """Manages three-tier persistence system"""
 
    def __init__(self, db_directory: str = "./saves"):
        self.db_directory = Path(db_directory)
        self.db_directory.mkdir(exist_ok=True)
        self.session = SessionState()
        self._ensure_schema()
 
    def _get_db_path(self, filename: str) -> Path:
        """Get full path for database file"""
        return self.db_directory / filename
 
    def _ensure_schema(self):
        """Ensure database schema exists for world/save files"""
        # Schema is created per-file, not globally
        pass
 
    def _init_world_db(self, db_path: Path):
        """Initialize world database schema"""
        conn = sqlite3.connect(str(db_path))
        cursor = conn.cursor()
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS world_meta (
                world_id TEXT PRIMARY KEY,
                name TEXT NOT NULL,
                setting TEXT NOT NULL,
                version TEXT NOT NULL,
                created_at TIMESTAMP NOT NULL
            )
        """)
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS locations (
                location_id TEXT PRIMARY KEY,
                data TEXT NOT NULL
            )
        """)
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS npcs (
                npc_id TEXT PRIMARY KEY,
                data TEXT NOT NULL
            )
        """)
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS items (
                item_id TEXT PRIMARY KEY,
                data TEXT NOT NULL
            )
        """)
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS rules (
                rule_id TEXT PRIMARY KEY,
                data TEXT NOT NULL
            )
        """)
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS lore (
                lore_id INTEGER PRIMARY KEY AUTOINCREMENT,
                content TEXT NOT NULL
            )
        """)
 
        conn.commit()
        conn.close()
 
    def _init_save_db(self, db_path: Path):
        """Initialize save database schema"""
        conn = sqlite3.connect(str(db_path))
        cursor = conn.cursor()
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS save_meta (
                save_id TEXT PRIMARY KEY,
                world_id TEXT NOT NULL,
                created_at TIMESTAMP NOT NULL,
                last_played TIMESTAMP NOT NULL,
                turn_count INTEGER NOT NULL,
                game_time INTEGER NOT NULL
            )
        """)
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS player_state (
                key TEXT PRIMARY KEY,
                value TEXT NOT NULL
            )
        """)
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS location_states (
                location_id TEXT PRIMARY KEY,
                state TEXT NOT NULL
            )
        """)
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS npc_states (
                npc_id TEXT PRIMARY KEY,
                state TEXT NOT NULL
            )
        """)
 
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS flags (
                flag_name TEXT PRIMARY KEY,
                value INTEGER NOT NULL
            )
        """)
 
        conn.commit()
        conn.close()
 
    def save_world(self, world: WorldState) -> Path:
        """
        Persist world state to .world file
 
        Args:
            world: WorldState to persist
 
        Returns:
            Path to created .world file
        """
        db_path = self._get_db_path(f"{world.world_id}.world")
        self._init_world_db(db_path)
 
        conn = sqlite3.connect(str(db_path))
        cursor = conn.cursor()
 
        # Save metadata
        cursor.execute("""
            INSERT OR REPLACE INTO world_meta
            (world_id, name, setting, version, created_at)
            VALUES (?, ?, ?, ?, ?)
        """, (world.world_id, world.name, world.setting,
              world.version, world.created_at))
 
        # Save locations
        for loc_id, loc_data in world.locations.items():
            cursor.execute("""
                INSERT OR REPLACE INTO locations (location_id, data)
                VALUES (?, ?)
            """, (loc_id, json.dumps(loc_data)))
 
        # Save NPCs
        for npc_id, npc_data in world.npcs.items():
            cursor.execute("""
                INSERT OR REPLACE INTO npcs (npc_id, data)
                VALUES (?, ?)
            """, (npc_id, json.dumps(npc_data)))
 
        # Save items
        for item_id, item_data in world.items.items():
            cursor.execute("""
                INSERT OR REPLACE INTO items (item_id, data)
                VALUES (?, ?)
            """, (item_id, json.dumps(item_data)))
 
        # Save rules
        for rule in world.rules:
            cursor.execute("""
                INSERT OR REPLACE INTO rules (rule_id, data)
                VALUES (?, ?)
            """, (rule['rule_id'], json.dumps(rule)))
 
        # Save lore
        cursor.execute("DELETE FROM lore")  # Clear existing
        for lore_entry in world.lore:
            cursor.execute("""
                INSERT INTO lore (content) VALUES (?)
            """, (lore_entry,))
 
        conn.commit()
        conn.close()
 
        print(f"World saved to: {db_path}")
        return db_path
 
    def load_world(self, world_id: str) -> WorldState:
        """
        Load world state from .world file
 
        Args:
            world_id: ID of world to load
 
        Returns:
            WorldState object
 
        Raises:
            FileNotFoundError: If world file doesn't exist
        """
        db_path = self._get_db_path(f"{world_id}.world")
        if not db_path.exists():
            raise FileNotFoundError(f"World file not found: {db_path}")
 
        conn = sqlite3.connect(str(db_path))
        cursor = conn.cursor()
 
        # Load metadata
        cursor.execute("SELECT * FROM world_meta WHERE world_id = ?", (world_id,))
        meta = cursor.fetchone()
        if not meta:
            raise ValueError(f"World {world_id} not found in database")
 
        # Load locations
        locations = {}
        cursor.execute("SELECT location_id, data FROM locations")
        for loc_id, data in cursor.fetchall():
            locations[loc_id] = json.loads(data)
 
        # Load NPCs
        npcs = {}
        cursor.execute("SELECT npc_id, data FROM npcs")
        for npc_id, data in cursor.fetchall():
            npcs[npc_id] = json.loads(data)
 
        # Load items
        items = {}
        cursor.execute("SELECT item_id, data FROM items")
        for item_id, data in cursor.fetchall():
            items[item_id] = json.loads(data)
 
        # Load rules
        rules = []
        cursor.execute("SELECT data FROM rules")
        for (data,) in cursor.fetchall():
            rules.append(json.loads(data))
 
        # Load lore
        lore = []
        cursor.execute("SELECT content FROM lore")
        for (content,) in cursor.fetchall():
            lore.append(content)
 
        conn.close()
 
        return WorldState(
            world_id=meta[0],
            name=meta[1],
            setting=meta[2],
            version=meta[3],
            created_at=datetime.fromisoformat(meta[4]) if isinstance(meta[4], str) else meta[4],
            locations=locations,
            npcs=npcs,
            items=items,
            rules=rules,
            lore=lore
        )
 
    def save_playthrough(self, playthrough: PlaythroughState) -> Path:
        """
        Persist playthrough state to .save file
 
        Args:
            playthrough: PlaythroughState to persist
 
        Returns:
            Path to created .save file
        """
        playthrough.last_played = datetime.now()
 
        db_path = self._get_db_path(f"{playthrough.save_id}.save")
        self._init_save_db(db_path)
 
        conn = sqlite3.connect(str(db_path))
        cursor = conn.cursor()
 
        # Save metadata
        cursor.execute("""
            INSERT OR REPLACE INTO save_meta
            (save_id, world_id, created_at, last_played, turn_count, game_time)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (playthrough.save_id, playthrough.world_id,
              playthrough.created_at, playthrough.last_played,
              playthrough.turn_count, playthrough.game_time))
 
        # Save player state
        cursor.execute("DELETE FROM player_state")
        for key, value in playthrough.player_state.items():
            cursor.execute("""
                INSERT INTO player_state (key, value) VALUES (?, ?)
            """, (key, json.dumps(value)))
 
        # Save location states
        for loc_id, state in playthrough.location_states.items():
            cursor.execute("""
                INSERT OR REPLACE INTO location_states (location_id, state)
                VALUES (?, ?)
            """, (loc_id, json.dumps(state)))
 
        # Save NPC states
        for npc_id, state in playthrough.npc_states.items():
            cursor.execute("""
                INSERT OR REPLACE INTO npc_states (npc_id, state)
                VALUES (?, ?)
            """, (npc_id, json.dumps(state)))
 
        # Save flags
        for flag_name, value in playthrough.flags.items():
            cursor.execute("""
                INSERT OR REPLACE INTO flags (flag_name, value)
                VALUES (?, ?)
            """, (flag_name, 1 if value else 0))
 
        conn.commit()
        conn.close()
 
        print(f"Save file created: {db_path}")
        return db_path
 
    def load_playthrough(self, save_id: str) -> PlaythroughState:
        """
        Load playthrough state from .save file
 
        Args:
            save_id: ID of save to load
 
        Returns:
            PlaythroughState object
 
        Raises:
            FileNotFoundError: If save file doesn't exist
        """
        db_path = self._get_db_path(f"{save_id}.save")
        if not db_path.exists():
            raise FileNotFoundError(f"Save file not found: {db_path}")
 
        conn = sqlite3.connect(str(db_path))
        cursor = conn.cursor()
 
        # Load metadata
        cursor.execute("SELECT * FROM save_meta WHERE save_id = ?", (save_id,))
        meta = cursor.fetchone()
        if not meta:
            raise ValueError(f"Save {save_id} not found in database")
 
        # Load player state
        player_state = {}
        cursor.execute("SELECT key, value FROM player_state")
        for key, value in cursor.fetchall():
            player_state[key] = json.loads(value)
 
        # Load location states
        location_states = {}
        cursor.execute("SELECT location_id, state FROM location_states")
        for loc_id, state in cursor.fetchall():
            location_states[loc_id] = json.loads(state)
 
        # Load NPC states
        npc_states = {}
        cursor.execute("SELECT npc_id, state FROM npc_states")
        for npc_id, state in cursor.fetchall():
            npc_states[npc_id] = json.loads(state)
 
        # Load flags
        flags = {}
        cursor.execute("SELECT flag_name, value FROM flags")
        for flag_name, value in cursor.fetchall():
            flags[flag_name] = bool(value)
 
        conn.close()
 
        return PlaythroughState(
            save_id=meta[0],
            world_id=meta[1],
            created_at=datetime.fromisoformat(meta[2]) if isinstance(meta[2], str) else meta[2],
            last_played=datetime.fromisoformat(meta[3]) if isinstance(meta[3], str) else meta[3],
            turn_count=meta[4],
            game_time=meta[5],
            player_state=player_state,
            location_states=location_states,
            npc_states=npc_states,
            flags=flags
        )
 
    def update_session(self, event: Dict[str, str]):
        """
        Update runtime session state (not persisted)
 
        Args:
            event: Event to add to context window
        """
        self.session.add_to_context(event)
 
    def get_session_context(self) -> List[Dict[str, str]]:
        """Get current session context window"""
        return self.session.get_context()
 
    def export_world_for_distribution(self, world_id: str, output_dir: str = "./exports"):
        """
        Export .world file for distribution (copy to output directory)
 
        Args:
            world_id: ID of world to export
            output_dir: Directory to export to
        """
        source = self._get_db_path(f"{world_id}.world")
        if not source.exists():
            raise FileNotFoundError(f"World file not found: {source}")
 
        output_path = Path(output_dir)
        output_path.mkdir(exist_ok=True)
 
        dest = output_path / f"{world_id}.world"
        import shutil
        shutil.copy2(source, dest)
 
        print(f"World exported to: {dest}")
        return dest
 
 
# Usage Example
def example_usage():
    """Demonstrate three-tier persistence"""
 
    pm = PersistenceManager()
 
    # Tier 1: Create world state (.world file)
    world = WorldState(
        world_id="fantasy_001",
        name="The Forgotten Realm",
        setting="high_fantasy",
        locations={
            "golden_oak_inn": {
                "name": "Golden Oak Inn",
                "description": "A cozy tavern with warm firelight",
                "connections": {
                    "market": {"travel_time": 10},
                    "castle": {"travel_time": 30}
                },
                "npcs": ["bartender"]
            }
        },
        npcs={
            "bartender": {
                "name": "Old Tom",
                "personality": "gruff but fair",
                "schedule": {
                    "8:00-22:00": "golden_oak_inn",
                    "22:00-8:00": "upstairs_room"
                }
            }
        },
        items={
            "health_potion": {
                "name": "Health Potion",
                "effect": "heal",
                "value": 20
            }
        },
        rules=[
            {
                "rule_id": "midnight_ghost",
                "name": "Midnight Haunting",
                "conditions": [{"type": "time", "value": "00:00"}],
                "actions": [{"type": "spawn_npc", "npc": "ghost"}]
            }
        ],
        lore=[
            "Long ago, dragons ruled these lands.",
            "The ancient ruins hold forgotten secrets."
        ]
    )
 
    world_path = pm.save_world(world)
    print(f"World saved: {world_path}\n")
 
    # Tier 2: Create playthrough state (.save file)
    playthrough = PlaythroughState(
        save_id="playthrough_001",
        world_id="fantasy_001",
        player_state={
            "name": "Adventurer",
            "hp": 100,
            "max_hp": 100,
            "level": 1,
            "location": "golden_oak_inn"
        },
        npc_states={
            "bartender": {
                "location": "golden_oak_inn",
                "mood": "neutral",
                "met_player": False
            }
        },
        inventory=["iron_sword", "leather_armor"],
        flags={"started_quest": False}
    )
 
    save_path = pm.save_playthrough(playthrough)
    print(f"Save created: {save_path}\n")
 
    # Tier 3: Update session state (runtime only)
    pm.update_session({
        "turn": 1,
        "player_input": "I enter the tavern",
        "narration": "You push open the heavy wooden door..."
    })
    pm.update_session({
        "turn": 2,
        "player_input": "I talk to the bartender",
        "narration": "The bartender looks up and nods..."
    })
 
    context = pm.get_session_context()
    print(f"Session context: {len(context)} events in memory\n")
 
    # Load world back
    loaded_world = pm.load_world("fantasy_001")
    print(f"Loaded world: {loaded_world.name}")
    print(f"  Locations: {len(loaded_world.locations)}")
    print(f"  NPCs: {len(loaded_world.npcs)}")
    print(f"  Rules: {len(loaded_world.rules)}\n")
 
    # Load save back
    loaded_save = pm.load_playthrough("playthrough_001")
    print(f"Loaded save: {loaded_save.save_id}")
    print(f"  Player HP: {loaded_save.player_state['hp']}/{loaded_save.player_state['max_hp']}")
    print(f"  Inventory: {loaded_save.inventory}")
 
    # Export for distribution
    export_path = pm.export_world_for_distribution("fantasy_001")
    print(f"\nExported for distribution: {export_path}")
 
 
if __name__ == "__main__":
    example_usage()

Best Practice 4: Visual Rule Engine

Implementation

from dataclasses import dataclass, field
from typing import List, Dict, Any, Callable, Optional
from enum import Enum
import json
 
 
class TriggerType(Enum):
    """Types of trigger conditions"""
    TIME = "time"
    LOCATION = "location"
    ACTION = "action"
    FLAG = "flag"
    ITEM = "item"
    NPC_STATE = "npc_state"
    STAT = "stat"
 
 
class Operator(Enum):
    """Comparison operators"""
    EQUAL = "=="
    NOT_EQUAL = "!="
    GREATER = ">"
    LESS = "<"
    GREATER_EQUAL = ">="
    LESS_EQUAL = "<="
    CONTAINS = "contains"
    NOT_CONTAINS = "not_contains"
 
 
@dataclass
class Condition:
    """A single condition to check"""
    trigger_type: TriggerType
    operator: Operator
    value: Any
    description: str = ""  # Human-readable description
 
    def evaluate(self, game_state: Dict[str, Any]) -> bool:
        """
        Evaluate condition against game state
 
        Args:
            game_state: Current game state dictionary
 
        Returns:
            True if condition is met, False otherwise
        """
        if self.trigger_type == TriggerType.TIME:
            current_time = game_state.get("time", 0)
            return self._compare(current_time, self.value, self.operator)
 
        elif self.trigger_type == TriggerType.LOCATION:
            player_location = game_state.get("player_location", "")
            return self._compare(player_location, self.value, self.operator)
 
        elif self.trigger_type == TriggerType.FLAG:
            flag_value = game_state.get("flags", {}).get(self.value, False)
            return flag_value if self.operator == Operator.EQUAL else not flag_value
 
        elif self.trigger_type == TriggerType.ITEM:
            inventory = game_state.get("inventory", [])
            has_item = self.value in inventory
            return has_item if self.operator == Operator.CONTAINS else not has_item
 
        elif self.trigger_type == TriggerType.NPC_STATE:
            # Value format: "npc_id.property"
            npc_id, prop = self.value.split(".", 1) if "." in self.value else (self.value, "present")
            npc_states = game_state.get("npc_states", {})
            npc_state = npc_states.get(npc_id, {})
            actual_value = npc_state.get(prop, False)
            return self._compare(actual_value, True, self.operator)
 
        elif self.trigger_type == TriggerType.STAT:
            # Value format: "stat_name:threshold"
            stat_name, threshold = self.value.split(":", 1) if ":" in self.value else (self.value, 0)
            player_stats = game_state.get("player_stats", {})
            actual_value = player_stats.get(stat_name, 0)
            return self._compare(actual_value, int(threshold), self.operator)
 
        return False
 
    def _compare(self, actual: Any, expected: Any, operator: Operator) -> bool:
        """Compare values based on operator"""
        if operator == Operator.EQUAL:
            return actual == expected
        elif operator == Operator.NOT_EQUAL:
            return actual != expected
        elif operator == Operator.GREATER:
            return actual > expected
        elif operator == Operator.LESS:
            return actual < expected
        elif operator == Operator.GREATER_EQUAL:
            return actual >= expected
        elif operator == Operator.LESS_EQUAL:
            return actual <= expected
        elif operator == Operator.CONTAINS:
            return expected in str(actual)
        elif operator == Operator.NOT_CONTAINS:
            return expected not in str(actual)
        return False
 
 
@dataclass
class Action:
    """An action to execute when conditions met"""
    action_type: str
    parameters: Dict[str, Any]
    description: str = ""  # Human-readable description
 
    def execute(self, game_state: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute action and return state changes
 
        Args:
            game_state: Current game state
 
        Returns:
            Dictionary of state changes to apply
        """
        if self.action_type == "set_flag":
            return {"flags": {self.parameters["flag"]: self.parameters.get("value", True)}}
 
        elif self.action_type == "spawn_npc":
            npc_id = self.parameters["npc_id"]
            location = self.parameters["location"]
            return {"npc_states": {npc_id: {"location": location, "spawned": True}}}
 
        elif self.action_type == "trigger_event":
            return {"events": [self.parameters["event_id"]]}
 
        elif self.action_type == "modify_stat":
            stat = self.parameters["stat"]
            value = self.parameters["value"]
            operation = self.parameters.get("operation", "set")  # set, add, subtract
 
            if operation == "add":
                return {"player_stats": {stat: game_state.get("player_stats", {}).get(stat, 0) + value}}
            elif operation == "subtract":
                return {"player_stats": {stat: game_state.get("player_stats", {}).get(stat, 0) - value}}
            else:  # set
                return {"player_stats": {stat: value}}
 
        elif self.action_type == "give_item":
            return {"inventory_add": [self.parameters["item_id"]]}
 
        elif self.action_type == "remove_item":
            return {"inventory_remove": [self.parameters["item_id"]]}
 
        elif self.action_type == "display_message":
            return {"messages": [self.parameters["message"]]}
 
        elif self.action_type == "start_quest":
            quest_id = self.parameters["quest_id"]
            return {"quests": {quest_id: {"status": "active", "step": 0}}}
 
        return {}
 
 
@dataclass
class Rule:
    """A complete rule with conditions and actions (StarCraft-inspired)"""
    rule_id: str
    name: str
    conditions: List[Condition] = field(default_factory=list)
    actions: List[Action] = field(default_factory=list)
    condition_logic: str = "AND"  # AND or OR
    enabled: bool = True
    frequency: str = "once"  # once, always, per_turn
    priority: int = 0  # Higher priority rules execute first
    description: str = ""  # Human-readable description
 
    def evaluate(self, game_state: Dict[str, Any]) -> bool:
        """
        Check if all conditions are met
 
        Args:
            game_state: Current game state
 
        Returns:
            True if conditions met, False otherwise
        """
        if not self.enabled:
            return False
 
        if not self.conditions:
            return True  # No conditions = always true
 
        results = [c.evaluate(game_state) for c in self.conditions]
 
        if self.condition_logic == "AND":
            return all(results)
        else:  # OR
            return any(results)
 
    def execute(self, game_state: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute all actions and return state changes
 
        Args:
            game_state: Current game state
 
        Returns:
            Dictionary of state changes
        """
        state_changes: Dict[str, Any] = {}
 
        for action in self.actions:
            changes = action.execute(game_state)
 
            # Merge changes
            for key, value in changes.items():
                if key in state_changes:
                    if isinstance(value, dict):
                        state_changes[key].update(value)
                    elif isinstance(value, list):
                        state_changes[key].extend(value)
                    else:
                        state_changes[key] = value
                else:
                    state_changes[key] = value
 
        return state_changes
 
 
class RuleEngine:
    """Manages and executes rules (StarCraft trigger system inspired)"""
 
    def __init__(self):
        self.rules: List[Rule] = []
        self.executed_once: set = set()  # Track "once" rules
        self.execution_history: List[Dict[str, Any]] = []
 
    def add_rule(self, rule: Rule):
        """Add rule to engine"""
        self.rules.append(rule)
        # Sort by priority (higher first)
        self.rules.sort(key=lambda r: r.priority, reverse=True)
 
    def remove_rule(self, rule_id: str):
        """Remove rule from engine"""
        self.rules = [r for r in self.rules if r.rule_id != rule_id]
 
    def process_turn(self, game_state: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        Process all rules for current turn
 
        Args:
            game_state: Current game state
 
        Returns:
            List of state change dictionaries
        """
        state_changes = []
 
        for rule in self.rules:
            if rule.evaluate(game_state):
                # Check frequency
                if rule.frequency == "once":
                    if rule.rule_id in self.executed_once:
                        continue
                    self.executed_once.add(rule.rule_id)
 
                # Execute and collect changes
                changes = rule.execute(game_state)
                if changes:
                    execution_record = {
                        "rule_id": rule.rule_id,
                        "rule_name": rule.name,
                        "changes": changes
                    }
                    state_changes.append(execution_record)
                    self.execution_history.append(execution_record)
 
        return state_changes
 
    def export_visual_format(self, rule: Rule) -> Dict[str, Any]:
        """
        Export rule in visual editor format (JSON)
 
        Args:
            rule: Rule to export
 
        Returns:
            Dictionary suitable for visual editor
        """
        return {
            "id": rule.rule_id,
            "name": rule.name,
            "description": rule.description,
            "enabled": rule.enabled,
            "priority": rule.priority,
            "frequency": rule.frequency,
            "condition_logic": rule.condition_logic,
            "conditions": [
                {
                    "type": c.trigger_type.value,
                    "operator": c.operator.value,
                    "value": c.value,
                    "description": c.description
                }
                for c in rule.conditions
            ],
            "actions": [
                {
                    "type": a.action_type,
                    "params": a.parameters,
                    "description": a.description
                }
                for a in rule.actions
            ]
        }
 
    def import_visual_format(self, data: Dict[str, Any]) -> Rule:
        """
        Import rule from visual editor format
 
        Args:
            data: Dictionary from visual editor
 
        Returns:
            Rule object
        """
        conditions = [
            Condition(
                trigger_type=TriggerType(c["type"]),
                operator=Operator(c["operator"]),
                value=c["value"],
                description=c.get("description", "")
            )
            for c in data.get("conditions", [])
        ]
 
        actions = [
            Action(
                action_type=a["type"],
                parameters=a["params"],
                description=a.get("description", "")
            )
            for a in data.get("actions", [])
        ]
 
        return Rule(
            rule_id=data["id"],
            name=data["name"],
            conditions=conditions,
            actions=actions,
            condition_logic=data.get("condition_logic", "AND"),
            enabled=data.get("enabled", True),
            frequency=data.get("frequency", "once"),
            priority=data.get("priority", 0),
            description=data.get("description", "")
        )
 
    def save_to_file(self, filename: str):
        """Save all rules to JSON file"""
        data = [self.export_visual_format(rule) for rule in self.rules]
        with open(filename, 'w') as f:
            json.dump(data, f, indent=2)
 
    def load_from_file(self, filename: str):
        """Load rules from JSON file"""
        with open(filename, 'r') as f:
            data = json.load(f)
 
        self.rules.clear()
        for rule_data in data:
            rule = self.import_visual_format(rule_data)
            self.add_rule(rule)
 
 
# Usage Example
def example_usage():
    """Demonstrate visual rule engine"""
 
    engine = RuleEngine()
 
    # Rule 1: Midnight Haunting
    midnight_rule = Rule(
        rule_id="midnight_ghost",
        name="Midnight Haunting",
        description="Spawn ghost at midnight if player is in haunted mansion",
        conditions=[
            Condition(
                trigger_type=TriggerType.TIME,
                operator=Operator.EQUAL,
                value=0,  # Midnight (00:00)
                description="Time is midnight"
            ),
            Condition(
                trigger_type=TriggerType.LOCATION,
                operator=Operator.EQUAL,
                value="haunted_mansion",
                description="Player is in haunted mansion"
            ),
            Condition(
                trigger_type=TriggerType.FLAG,
                operator=Operator.EQUAL,
                value="ghost_defeated",
                description="Ghost hasn't been defeated yet"
            )
        ],
        actions=[
            Action(
                action_type="spawn_npc",
                parameters={"npc_id": "ghost", "location": "haunted_mansion"},
                description="Spawn ghost NPC"
            ),
            Action(
                action_type="set_flag",
                parameters={"flag": "haunting_started", "value": True},
                description="Mark haunting as started"
            ),
            Action(
                action_type="display_message",
                parameters={"message": "A chill runs down your spine as a ghostly figure materializes before you."},
                description="Display atmospheric message"
            )
        ],
        frequency="once",
        priority=10
    )
 
    engine.add_rule(midnight_rule)
 
    # Rule 2: Reward for Defeating Ghost
    reward_rule = Rule(
        rule_id="ghost_reward",
        name="Reward for Defeating Ghost",
        description="Give player magic sword after defeating ghost",
        conditions=[
            Condition(
                trigger_type=TriggerType.FLAG,
                operator=Operator.EQUAL,
                value="ghost_defeated",
                description="Ghost has been defeated"
            )
        ],
        actions=[
            Action(
                action_type="give_item",
                parameters={"item_id": "magic_sword"},
                description="Give magic sword to player"
            ),
            Action(
                action_type="start_quest",
                parameters={"quest_id": "cleanse_mansion"},
                description="Start quest to cleanse the mansion"
            ),
            Action(
                action_type="display_message",
                parameters={"message": "The ghost dissipates, leaving behind a glowing sword."},
                description="Display reward message"
            )
        ],
        frequency="once",
        priority=5
    )
 
    engine.add_rule(reward_rule)
 
    # Rule 3: Low Health Warning
    health_warning = Rule(
        rule_id="low_health_warning",
        name="Low Health Warning",
        description="Warn player when health drops below 25%",
        conditions=[
            Condition(
                trigger_type=TriggerType.STAT,
                operator=Operator.LESS,
                value="hp:25",
                description="HP less than 25"
            )
        ],
        actions=[
            Action(
                action_type="display_message",
                parameters={"message": "You're badly wounded! Find healing soon!"},
                description="Display warning"
            )
        ],
        frequency="per_turn",
        priority=100  # High priority (safety warning)
    )
 
    engine.add_rule(health_warning)
 
    # Test with game state
    game_state = {
        "time": 0,  # Midnight
        "player_location": "haunted_mansion",
        "flags": {"ghost_defeated": False},
        "inventory": [],
        "player_stats": {"hp": 100}
    }
 
    print("=== Turn 1: Entering haunted mansion at midnight ===")
    changes = engine.process_turn(game_state)
    for change in changes:
        print(f"Rule triggered: {change['rule_name']}")
        print(f"Changes: {change['changes']}\n")
 
    # Export rule to visual format
    print("=== Visual Editor Format ===")
    visual_format = engine.export_visual_format(midnight_rule)
    print(json.dumps(visual_format, indent=2))
 
    # Save to file
    engine.save_to_file("rules.json")
    print("\nRules saved to rules.json")
 
 
if __name__ == "__main__":
    example_usage()

Cross-References

Pattern Documentation

Implementation Details


Tags

code-examples best-practices python type-hints production-ready anti-hallucination token-limiting persistence rule-engine starcraft-triggers


Next: Production Lessons

See 05-Production-Lessons for key insights and lessons learned from real-world ChatBotRPG usage.