This template represents a production-ready plugin that demonstrates the patterns most plugins need: configuration handling, HTTP API integration, response caching, error handling, and evidence submission. Use this template when building real plugins. For exhaustive documentation of every pattern, see the Advanced Template.
"""
Standard Template Plugin for DVP CMS.
This template represents a production-ready plugin that demonstrates the
patterns most plugins need:
- Configuration handling
- HTTP API integration
- Response caching
- Error handling
- Evidence submission
Use this template when building real plugins. For exhaustive documentation
of every pattern, see `_template-canonical/`.
Example:
>>> from dvp_cms.kernel.event_bus import EventBus
>>>
>>> event_bus = EventBus()
>>> plugin = StandardTemplatePlugin(event_bus)
>>> await plugin.initialize({
... "api_key": "your-api-key",
... "cache_ttl": 3600,
... })
"""
from __future__ import annotations
import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timedelta, UTC
from typing import TYPE_CHECKING
import httpx
from dvp_cms.kernel.plugin import Plugin
from dvp_cms.kernel.event import Event, EventType
from dvp_cms.plugins.hookspec import ContentLifecycleHooks
if TYPE_CHECKING:
from dvp_cms.kernel.event_bus import EventBus
# -----------------------------------------------------------------------------
# Data Classes
# -----------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class APIResponse:
"""Response from the external API.
Attributes:
data: The response data dictionary.
confidence: Confidence score (0.0 to 1.0).
source: Data source identifier.
fetched_at: When the data was fetched.
"""
data: dict[str, object]
confidence: float
source: str
fetched_at: datetime = field(default_factory=lambda: datetime.now(UTC))
@dataclass(slots=True)
class CacheEntry:
"""A cached API response with expiration tracking.
Attributes:
response: The cached API response.
cached_at: When this entry was cached.
"""
response: APIResponse
cached_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def is_expired(self, ttl_seconds: int) -> bool:
"""Check if this cache entry has expired."""
age = datetime.now(UTC) - self.cached_at
return age > timedelta(seconds=ttl_seconds)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class APIConfig:
"""API configuration constants."""
BASE_URL = "https://api.example.com/v1"
TIMEOUT_SECONDS = 30.0
class CacheConfig:
"""Cache configuration constants."""
DEFAULT_TTL_SECONDS = 3600
MAX_SIZE = 500
EVICTION_BATCH_SIZE = 50
class ContentFields:
"""Content field names."""
TEXT_FIELDS = ("body", "content", "text", "description")
# -----------------------------------------------------------------------------
# Plugin Implementation
# -----------------------------------------------------------------------------
class StandardTemplatePlugin(Plugin, ContentLifecycleHooks):
"""A production-ready template plugin with API integration.
This plugin demonstrates the standard patterns for:
- Configuration handling with validation
- HTTP client management
- Response caching with TTL
- Error handling
- Evidence submission
Configuration:
api_key: API key for the external service (optional, uses mock if missing).
cache_ttl: Cache time-to-live in seconds. Default: 3600.
Example:
>>> plugin = StandardTemplatePlugin(event_bus)
>>> await plugin.initialize({"api_key": "your-key"})
"""
plugin_id = "standard-template"
plugin_version = "1.0.0"
plugin_name = "Standard Template"
plugin_description = "Production-ready template with API integration"
plugin_author = "DVP CMS Team"
plugin_capabilities = ["template", "api_integration"]
def __init__(self, event_bus: EventBus) -> None:
"""Initialize the plugin."""
super().__init__(event_bus)
# Configuration
self._api_key: str = ""
self._cache_ttl: int = CacheConfig.DEFAULT_TTL_SECONDS
# Runtime state
self._cache: dict[str, CacheEntry] = {}
self._http_client: httpx.AsyncClient | None = None
async def initialize(self, config: dict[str, object]) -> None:
"""Initialize the plugin with configuration."""
self._api_key = str(config.get("api_key", ""))
self._cache_ttl = int(config.get("cache_ttl", CacheConfig.DEFAULT_TTL_SECONDS))
self._http_client = httpx.AsyncClient(timeout=APIConfig.TIMEOUT_SECONDS)
await self.subscribe(EventType.CONTENT_CREATED, self._handle_content_created)
if not self._api_key:
self._logger.warning("No API key configured; using mock data")
self._logger.info("Initialized with cache_ttl=%ds", self._cache_ttl)
async def shutdown(self) -> None:
"""Shut down the plugin and release resources."""
if self._http_client is not None:
await self._http_client.aclose()
self._http_client = None
self._cache.clear()
await self.unsubscribe_all()
self._logger.info("Shutdown complete")
# -------------------------------------------------------------------------
# Event Handlers
# -------------------------------------------------------------------------
async def _handle_content_created(self, event: Event) -> None:
"""Handle content creation events."""
content = event.data.get("content")
if not isinstance(content, dict):
return
text = self._extract_text(content)
if not text:
return
response = await self._fetch_data(text)
if response is not None:
await self._submit_evidence(event.aggregate_id, response)
# -------------------------------------------------------------------------
# API Integration
# -------------------------------------------------------------------------
async def _fetch_data(self, text: str) -> APIResponse | None:
"""Fetch data from API with caching."""
cache_key = hashlib.sha256(text.encode()).hexdigest()
# Check cache
cached = self._cache.get(cache_key)
if cached is not None and not cached.is_expired(self._cache_ttl):
return cached.response
# Fetch fresh data
if not self._api_key:
response = self._create_mock_response()
else:
response = await self._call_api(text)
if response is not None:
self._set_cached(cache_key, response)
return response
async def _call_api(self, text: str) -> APIResponse | None:
"""Make API call with error handling."""
if self._http_client is None:
return None
try:
# In a real plugin, make the actual API call here
self._logger.debug("Would call API with text length: %d", len(text))
return self._create_mock_response()
except httpx.TimeoutException:
self._logger.warning("API timeout")
return None
except httpx.HTTPStatusError as e:
self._logger.warning("API HTTP error %d", e.response.status_code)
return None
except httpx.RequestError as e:
self._logger.warning("API request error: %s", e)
return None
def _create_mock_response(self) -> APIResponse:
"""Create mock response for testing."""
return APIResponse(
data={"result": "mock_data", "score": 0.75},
confidence=0.80,
source="Mock Data",
)
# -------------------------------------------------------------------------
# Evidence Submission
# -------------------------------------------------------------------------
async def _submit_evidence(
self,
content_id: str,
response: APIResponse,
) -> None:
"""Submit API response as evidence."""
await self.emit_event(
EventType.EVIDENCE_ADDED,
{
"content_id": content_id,
"claim": f"API data for {content_id}",
"source_type": "EXTERNAL_API",
"supports_claim": True,
"confidence": response.confidence,
"evidence_data": {
"api_data": response.data,
"source": response.source,
"fetched_at": response.fetched_at.isoformat(),
},
},
{"source_plugin": self.plugin_id},
)
self._logger.info(
"Submitted evidence for %s (confidence: %.2f)",
content_id,
response.confidence,
)
# -------------------------------------------------------------------------
# Helper Methods
# -------------------------------------------------------------------------
def _extract_text(self, content: dict[str, object]) -> str:
"""Extract text from content fields."""
parts: list[str] = []
for field_name in ContentFields.TEXT_FIELDS:
value = content.get(field_name)
if isinstance(value, str) and value.strip():
parts.append(value.strip())
return " ".join(parts)
def _set_cached(self, key: str, response: APIResponse) -> None:
"""Store response in cache with eviction if needed."""
if len(self._cache) >= CacheConfig.MAX_SIZE:
self._evict_oldest(CacheConfig.EVICTION_BATCH_SIZE)
self._cache[key] = CacheEntry(response=response)
def _evict_oldest(self, count: int) -> None:
"""Evict oldest cache entries."""
sorted_keys = sorted(
self._cache.keys(),
key=lambda k: self._cache[k].cached_at,
)
for key in sorted_keys[:count]:
del self._cache[key]
# -------------------------------------------------------------------------
# Hook Implementations
# -------------------------------------------------------------------------
def dvp_before_content_create(
self,
content: dict[str, object],
metadata: dict[str, object] | None = None,
) -> dict[str, object]:
"""Process content before creation.
This hook passes through content unchanged. Processing happens
asynchronously via event subscription.
"""
return content