Advanced Template Plugin

Advanced ~1,180 lines 2-4 hours

DVP CMS is a truth distillation system for AI-generated content. Plugins are evidence suppliers—they verify facts, pull live data, and make content more trustworthy over time. Learn more →

← Back to Plugin Source Code
Plugin ID canonical-template
Version 1.0.0
Dependencies httpx
Hooks ContentLifecycleHooks, AuthorityHooks

This is the GOLD STANDARD reference implementation for DVP CMS plugins. Every pattern, convention, and best practice is demonstrated here with exhaustive documentation. When in doubt, this file is the source of truth. Study this file before writing your own plugin. Copy patterns from here. When you see "DON'T DO THIS", take it seriously - those are real mistakes that cause real bugs in production.

LLM-Friendly: This template is designed to work with AI coding assistants. Point your AI at this file and ask it to build plugins following these patterns.

Features

All Hooks Implemented Both filter and action hooks with full docs
Authority Integration dvp_authority_changed hook for feedback
Async Event Handling CONTENT_CREATED and CONTENT_UPDATED
Cache Eviction LRU-like eviction when cache is full
Configuration Validation Type-safe config parsing with defaults
Exhaustive Documentation Every method fully documented
"DON'T DO THIS" Warnings Common mistakes called out inline
Architecture Diagrams ASCII art showing plugin structure

Source Code

Due to its comprehensive nature (~1,180 lines), this file includes extensive inline documentation covering:

plugin.py (1,177 lines)
"""
Canonical Template Plugin for DVP CMS.
================================================================================

PURPOSE OF THIS FILE
--------------------
This is the GOLD STANDARD reference implementation for DVP CMS plugins. Every
pattern, convention, and best practice is demonstrated here with exhaustive
documentation. When in doubt, this file is the source of truth.

Study this file before writing your own plugin. Copy patterns from here. When
you see "DON'T DO THIS", take it seriously - those are real mistakes that cause
real bugs in production.

WHAT THIS PLUGIN DOES
---------------------
This template plugin demonstrates a "content enrichment" pattern that:
1. Intercepts content before creation (filter hook)
2. Fetches external data via API (with caching)
3. Submits evidence to the authority system (via events)
4. Logs significant authority changes (action hook)

You can use this as a starting point for plugins that:
- Integrate with external APIs
- Cache responses for performance
- Influence content authority
- React to system events

PLUGIN ARCHITECTURE
-------------------
DVP CMS plugins follow a layered architecture:

    ┌─────────────────────────────────────────────────┐
    │                 DVP CMS Kernel                  │
    │  ┌─────────────┐  ┌─────────────┐              │
    │  │  EventBus   │  │  Authority  │              │
    │  │  (pub/sub)  │  │   System    │              │
    │  └──────┬──────┘  └──────┬──────┘              │
    │         │                │                      │
    │  ┌──────┴────────────────┴──────┐              │
    │  │         Hook System          │              │
    │  │  (filter hooks, action hooks) │              │
    │  └──────────────┬───────────────┘              │
    └─────────────────┼───────────────────────────────┘
                      │
    ┌─────────────────┼───────────────────────────────┐
    │                 │  YOUR PLUGIN                  │
    │  ┌──────────────┴──────────────┐               │
    │  │   Plugin Base Class         │               │
    │  │   - lifecycle management    │               │
    │  │   - logging                 │               │
    │  │   - event subscription      │               │
    │  └──────────────┬──────────────┘               │
    │                 │                               │
    │  ┌──────────────┴──────────────┐               │
    │  │   Hook Protocol Mixins      │               │
    │  │   - ContentLifecycleHooks   │               │
    │  │   - AuthorityHooks          │               │
    │  └──────────────┬──────────────┘               │
    │                 │                               │
    │  ┌──────────────┴──────────────┐               │
    │  │   Your Implementation       │               │
    │  │   - business logic          │               │
    │  │   - API integrations        │               │
    │  │   - data processing         │               │
    │  └─────────────────────────────┘               │
    └─────────────────────────────────────────────────┘

Example:
    >>> from dvp_cms.kernel.event_bus import EventBus
    >>>
    >>> event_bus = EventBus()
    >>> plugin = CanonicalTemplatePlugin(event_bus)
    >>> await plugin.initialize({
    ...     "api_key": "your-api-key",
    ...     "cache_ttl": 3600,
    ...     "enabled_features": ["enrichment", "validation"],
    ... })

USAGE NOTES
-----------
- Copy this entire directory as a starting point
- Delete the sections you don't need
- Keep the structure and naming conventions
- Add tests for your implementation (see Testing Plugins guide)
"""

# =============================================================================
# IMPORTS
# =============================================================================
#
# Import organization follows PEP 8:
# 1. __future__ imports (always first)
# 2. Standard library imports (alphabetical)
# 3. Third-party imports (alphabetical)
# 4. Local/project imports (alphabetical)

from __future__ import annotations

import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timedelta, UTC
from typing import TYPE_CHECKING

import httpx

from dvp_cms.kernel.plugin import Plugin
from dvp_cms.kernel.event import Event, EventType
from dvp_cms.plugins.hookspec import ContentLifecycleHooks, AuthorityHooks

if TYPE_CHECKING:
    from dvp_cms.kernel.event_bus import EventBus


# =============================================================================
# DATA CLASSES
# =============================================================================
#
# Use dataclasses for structured data. They provide:
# - Automatic __init__, __repr__, __eq__
# - Type safety with annotations
# - Immutability with frozen=True
# - Memory efficiency with slots=True


@dataclass(frozen=True, slots=True)
class APIResponseData:
    """Raw response data from the enrichment API.

    This dataclass holds only the data returned by the API, without any
    request-specific context like content_id. This separation allows us
    to cache API responses by input text hash and reuse them across
    different content items with identical text.
    """

    enrichment_data: dict[str, object]
    confidence: float
    source: str


@dataclass(frozen=True, slots=True)
class EnrichmentResult:
    """Result of content enrichment, combining API data with request context."""

    content_id: str
    enrichment_data: dict[str, object]
    confidence: float
    source: str
    fetched_at: datetime = field(default_factory=lambda: datetime.now(UTC))

    @classmethod
    def from_api_response(
        cls,
        content_id: str,
        api_data: APIResponseData,
        fetched_at: datetime | None = None,
    ) -> EnrichmentResult:
        """Create an EnrichmentResult from cached API data."""
        return cls(
            content_id=content_id,
            enrichment_data=api_data.enrichment_data,
            confidence=api_data.confidence,
            source=api_data.source,
            fetched_at=fetched_at or datetime.now(UTC),
        )


@dataclass(frozen=True, slots=True)
class CacheEntry:
    """A cached API response with expiration tracking."""

    data: APIResponseData
    cached_at: datetime = field(default_factory=lambda: datetime.now(UTC))

    def is_expired(self, ttl_seconds: int) -> bool:
        """Check if this cache entry has expired."""
        age = datetime.now(UTC) - self.cached_at
        return age > timedelta(seconds=ttl_seconds)


# =============================================================================
# CONSTANTS
# =============================================================================
#
# Constants are organized into classes for namespacing.
#
# DON'T DO THIS: Magic numbers in code
# ------------------------------------
# WRONG:
#     response = await client.get(url, timeout=30)  # What is 30?
#
# CORRECT:
#     response = await client.get(url, timeout=APIConfig.TIMEOUT_SECONDS)


class APIConfig:
    """Configuration for external API calls."""
    BASE_URL = "https://api.example.com/v1/enrich"
    TIMEOUT_SECONDS = 30.0


class CacheConfig:
    """Configuration for the response cache."""
    DEFAULT_TTL_SECONDS = 3600
    MAX_SIZE = 1000
    EVICTION_BATCH_SIZE = 100


class ContentFields:
    """Content dictionary field names."""
    TEXT_FIELDS = ("body", "content", "text", "description", "summary")
    TITLE_FIELD = "title"


class EvidenceConfig:
    """Configuration for evidence submission."""
    SOURCE_TYPE = "ENRICHMENT_API"


class AuthorityConfig:
    """Authority change thresholds."""
    SIGNIFICANT_DELTA = 3


# =============================================================================
# PLUGIN IMPLEMENTATION
# =============================================================================


class CanonicalTemplatePlugin(Plugin, ContentLifecycleHooks, AuthorityHooks):
    """The canonical reference implementation for DVP CMS plugins.

    This plugin demonstrates every pattern and best practice for DVP CMS
    plugin development. Use it as a template for your own plugins.

    Configuration:
        api_key: API key for the enrichment service (required for live data).
        api_url: Override the default API URL.
        cache_ttl: Cache time-to-live in seconds. Default: 3600.
        enabled_features: List of features to enable. Default: ["enrichment"].
    """

    plugin_id = "canonical-template"
    plugin_version = "1.0.0"
    plugin_name = "Canonical Template Plugin"
    plugin_description = "Reference implementation demonstrating all plugin patterns"
    plugin_author = "DVP CMS Team"
    plugin_capabilities = ["enrichment", "evidence_provider", "template"]

    def __init__(self, event_bus: EventBus) -> None:
        """Initialize the plugin with required dependencies.

        This method sets up instance variables but does NOT perform any I/O
        or external calls. Heavy initialization should happen in `initialize()`.
        """
        super().__init__(event_bus)

        # Configuration (set in initialize)
        self._api_key: str = ""
        self._api_url: str = APIConfig.BASE_URL
        self._cache_ttl: int = CacheConfig.DEFAULT_TTL_SECONDS
        self._enabled_features: list[str] = []

        # Runtime State
        self._cache: dict[str, CacheEntry] = {}
        self._http_client: httpx.AsyncClient | None = None

        # ---------------------------------------------------------------------
        # DON'T DO THIS: Performing I/O in __init__
        # ---------------------------------------------------------------------
        #
        # WRONG - Don't do network calls or file I/O in __init__:
        #
        #     self._http_client = httpx.AsyncClient()  # Created too early!
        #     self._config = self._load_config_file()   # File I/O in __init__!
        #
        # WHY: __init__ should be fast and never fail due to external factors.
        # ---------------------------------------------------------------------

    async def initialize(self, config: dict[str, object]) -> None:
        """Initialize the plugin with configuration and start resources."""
        # Configuration Parsing
        self._api_key = str(config.get("api_key", ""))
        self._api_url = str(config.get("api_url", APIConfig.BASE_URL))
        self._cache_ttl = int(config.get("cache_ttl", CacheConfig.DEFAULT_TTL_SECONDS))

        features = config.get("enabled_features")
        if isinstance(features, list):
            self._enabled_features = [str(f) for f in features]
        else:
            self._enabled_features = ["enrichment"]

        # Resource Creation
        self._http_client = httpx.AsyncClient(
            timeout=APIConfig.TIMEOUT_SECONDS,
            follow_redirects=True,
        )

        # Event Subscriptions
        await self.subscribe(EventType.CONTENT_CREATED, self._handle_content_created)
        await self.subscribe(EventType.CONTENT_UPDATED, self._handle_content_updated)

        if not self._api_key:
            self._logger.warning(
                "No API key configured; plugin will operate in mock mode"
            )

        self._logger.info(
            "Initialized with cache_ttl=%ds, features=%s",
            self._cache_ttl,
            self._enabled_features,
        )

    async def shutdown(self) -> None:
        """Shut down the plugin and release all resources."""
        if self._http_client is not None:
            await self._http_client.aclose()
            self._http_client = None

        self._cache.clear()
        await self.unsubscribe_all()
        self._logger.info("Shutdown complete")

    # =========================================================================
    # EVENT HANDLERS
    # =========================================================================

    async def _handle_content_created(self, event: Event) -> None:
        """Handle content creation events."""
        content = event.data.get("content")
        if not isinstance(content, dict):
            return

        if "enrichment" not in self._enabled_features:
            return

        result = await self._enrich_content(event.aggregate_id, content)
        if result is not None:
            await self._submit_enrichment_evidence(result)

    async def _handle_content_updated(self, event: Event) -> None:
        """Handle content update events."""
        content = event.data.get("new_content")
        if content is None:
            content = event.data.get("content")

        if not isinstance(content, dict):
            return

        if "enrichment" not in self._enabled_features:
            return

        result = await self._enrich_content(event.aggregate_id, content)
        if result is not None:
            await self._submit_enrichment_evidence(result)

    # =========================================================================
    # BUSINESS LOGIC
    # =========================================================================

    async def enrich(self, content_id: str, text: str) -> EnrichmentResult | None:
        """Enrich text content via the enrichment API."""
        if not text.strip():
            return None

        cache_key = self._generate_cache_key(text)

        cached_entry = self._get_cached_entry(cache_key)
        if cached_entry is not None:
            self._logger.debug("Cache hit for content %s", content_id)
            return EnrichmentResult.from_api_response(
                content_id=content_id,
                api_data=cached_entry.data,
                fetched_at=cached_entry.cached_at,
            )

        api_data = await self._fetch_api_data(text)
        if api_data is None:
            return None

        self._set_cached(cache_key, api_data)

        return EnrichmentResult.from_api_response(
            content_id=content_id,
            api_data=api_data,
        )

    # =========================================================================
    # HOOK IMPLEMENTATIONS
    # =========================================================================

    def dvp_before_content_create(
        self,
        content: dict[str, object],
        metadata: dict[str, object] | None = None,
    ) -> dict[str, object]:
        """Called synchronously before content is created.

        This is a FILTER HOOK - you MUST return the content dictionary.

        Warning:
            This hook is SYNCHRONOUS. Do not perform async operations here.
        """
        # ---------------------------------------------------------------
        # DON'T DO THIS: Forgetting to return content
        # ---------------------------------------------------------------
        #
        # WRONG - Content is lost!
        #
        #     def dvp_before_content_create(self, content, metadata=None):
        #         content["processed"] = True
        #         # Forgot to return! Content will be None!
        #
        # ---------------------------------------------------------------

        if "validation" in self._enabled_features:
            content["_template_processed"] = True
            content["_template_version"] = self.plugin_version

        title = content.get(ContentFields.TITLE_FIELD, "Untitled")
        self._logger.debug("Processing content before creation: %s", title)

        return content  # IMPORTANT: Always return the content!

    def dvp_authority_changed(
        self,
        aggregate_id: str,
        aggregate_type: str,
        plugin_name: str,
        old_authority: int,
        new_authority: int,
        reason: str,
        evidence: dict[str, object],
    ) -> None:
        """Called when content authority changes.

        This is an ACTION HOOK - no return value needed.
        """
        delta = abs(new_authority - old_authority)
        if delta < AuthorityConfig.SIGNIFICANT_DELTA:
            return

        direction = "increased" if new_authority > old_authority else "decreased"

        if delta >= 10:
            self._logger.warning(
                "Major authority change: %s %s from %d to %d (reason: %s)",
                aggregate_id,
                direction,
                old_authority,
                new_authority,
                reason,
            )
        else:
            self._logger.info(
                "Authority %s for %s: %d -> %d",
                direction,
                aggregate_id,
                old_authority,
                new_authority,
            )

    # =========================================================================
    # PRIVATE HELPER METHODS
    # =========================================================================

    async def _enrich_content(
        self,
        content_id: str,
        content: dict[str, object],
    ) -> EnrichmentResult | None:
        """Extract text from content and enrich it."""
        text = self._extract_text(content)
        if not text:
            return None
        return await self.enrich(content_id, text)

    def _extract_text(self, content: dict[str, object]) -> str:
        """Extract text from content fields."""
        parts: list[str] = []
        for field_name in ContentFields.TEXT_FIELDS:
            value = content.get(field_name)
            if isinstance(value, str) and value.strip():
                parts.append(value.strip())
        return " ".join(parts)

    async def _fetch_api_data(self, text: str) -> APIResponseData | None:
        """Fetch data from the API or return mock data."""
        if not self._api_key:
            return self._create_mock_api_data()

        try:
            return await self._call_enrichment_api(text)
        except httpx.TimeoutException:
            self._logger.warning("API timeout")
            return None
        except httpx.HTTPStatusError as e:
            self._logger.warning("API HTTP error %d", e.response.status_code)
            return None
        except httpx.RequestError as e:
            self._logger.warning("API request error: %s", e)
            return None

    async def _call_enrichment_api(self, text: str) -> APIResponseData:
        """Make the actual API call."""
        if self._http_client is None:
            raise RuntimeError("HTTP client not initialized")

        # In a real plugin, this would call the actual API
        self._logger.debug("Would call API: %s", self._api_url)
        return self._create_mock_api_data()

    def _create_mock_api_data(self) -> APIResponseData:
        """Create mock API response data for testing."""
        return APIResponseData(
            enrichment_data={
                "sentiment": "neutral",
                "entities": [],
                "topics": ["general"],
            },
            confidence=0.75,
            source="Mock Data",
        )

    async def _submit_enrichment_evidence(self, result: EnrichmentResult) -> None:
        """Submit enrichment result as evidence to the authority system."""
        await self.emit_event(
            EventType.EVIDENCE_ADDED,
            {
                "content_id": result.content_id,
                "claim": f"Content enrichment for {result.content_id}",
                "source_type": EvidenceConfig.SOURCE_TYPE,
                "supports_claim": True,
                "confidence": result.confidence,
                "evidence_data": {
                    "enrichment": result.enrichment_data,
                    "source": result.source,
                    "fetched_at": result.fetched_at.isoformat(),
                },
            },
            {"source_plugin": self.plugin_id},
        )

        self._logger.info(
            "Submitted enrichment evidence for content %s (confidence: %.2f)",
            result.content_id,
            result.confidence,
        )

    # =========================================================================
    # CACHING METHODS
    # =========================================================================

    def _generate_cache_key(self, text: str) -> str:
        """Generate a cache key from text using SHA256."""
        return hashlib.sha256(text.encode()).hexdigest()

    def _get_cached_entry(self, key: str) -> CacheEntry | None:
        """Get cached entry if valid."""
        entry = self._cache.get(key)
        if entry is None:
            return None

        if entry.is_expired(self._cache_ttl):
            del self._cache[key]
            return None

        return entry

    def _set_cached(self, key: str, api_data: APIResponseData) -> None:
        """Store API response data in cache with eviction if needed."""
        if len(self._cache) >= CacheConfig.MAX_SIZE:
            self._evict_oldest(CacheConfig.EVICTION_BATCH_SIZE)
        self._cache[key] = CacheEntry(data=api_data)

    def _evict_oldest(self, count: int) -> None:
        """Evict the oldest cache entries."""
        sorted_keys = sorted(
            self._cache.keys(),
            key=lambda k: self._cache[k].cached_at,
        )
        for key in sorted_keys[:count]:
            del self._cache[key]
        self._logger.debug("Evicted %d cache entries", count)