Files
api/app/workflow_registry.py
2025-12-30 11:27:14 +07:00

22 lines
606 B
Python

from typing import Callable, Dict
ACTION_REGISTRY: Dict[str, Callable] = {}
ACTION_SCHEMAS: Dict[str, dict] = {}
def register_action(name: str, schema=None):
def decorator(func):
ACTION_REGISTRY[name] = func
ACTION_SCHEMAS[name] = schema or {}
return func
return decorator
def validate_action_schema(action_name, params):
schema = ACTION_SCHEMAS.get(action_name, {})
required = schema.get("required", [])
for key in required:
if key not in params:
raise Exception(f"Action '{action_name}' missing required param: {key}")
return True