feat: add challenge and red-blue competitions across API and web

This commit is contained in:
Joey Yakimowich-Payne 2025-10-01 06:49:09 -06:00
commit 8fd3c4bb64
No known key found for this signature in database
GPG key ID: 6BFE655FA5ABD1E1
77 changed files with 5355 additions and 24 deletions

View file

@ -58,6 +58,9 @@ class NodeType(StrEnum):
DOCUMENT_EXTRACTOR = "document-extractor"
LIST_OPERATOR = "list-operator"
AGENT = "agent"
CHALLENGE_EVALUATOR = "challenge-evaluator"
JUDGING_LLM = "judging-llm"
TEAM_CHALLENGE = "team-challenge"
class NodeExecutionType(StrEnum):

View file

@ -0,0 +1,3 @@
from .node import ChallengeEvaluatorNode
__all__ = ['ChallengeEvaluatorNode']

View file

@ -0,0 +1,258 @@
# pyright: reportImplicitRelativeImport=none
from __future__ import annotations
import logging
import time
from collections.abc import Mapping
from typing import Any
from core.variables.segments import Segment
from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.base.node import Node
from extensions.ext_database import db
from models.challenge import Challenge
from services.challenge_scorer_service import ChallengeScorerService
from services.challenge_service import ChallengeService
logger = logging.getLogger(__name__)
class ChallengeEvaluatorNode(Node):
node_type = NodeType.CHALLENGE_EVALUATOR
execution_type = NodeExecutionType.EXECUTABLE
_node_data: BaseNodeData
def init_node_data(self, data: Mapping[str, Any]):
# Using BaseNodeData to carry title/desc; node data is accessed directly
self._node_data = BaseNodeData.model_validate(data)
self._config: dict[str, Any] = data
def _get_error_strategy(self) -> ErrorStrategy | None:
return getattr(self._node_data, 'error_strategy', None)
def _get_retry_config(self) -> RetryConfig:
return getattr(self._node_data, 'retry_config', RetryConfig())
def _get_title(self) -> str:
return getattr(self._node_data, 'title', 'Challenge Evaluator')
def _get_description(self) -> str | None:
return getattr(self._node_data, 'desc', None)
def _get_default_value_dict(self) -> dict[str, Any]:
return getattr(self._node_data, 'default_value_dict', {})
def get_base_node_data(self) -> BaseNodeData:
return self._node_data
@classmethod
def version(cls) -> str:
return "1"
def _run(self) -> NodeRunResult:
# Resolve response text from selector in config.inputs.response (frontend schema)
output_text = ''
source_selector = None
inputs_cfg = self._config.get('inputs') or {}
if isinstance(inputs_cfg, dict):
source_selector = inputs_cfg.get('response')
# fallback to older key if any
source_selector = source_selector or self._config.get('value_selector')
# Check evaluation mode from config
evaluation_mode = self._config.get('evaluation_mode', 'rules')
logger.info("ChallengeEvaluator - evaluation_mode: %s, source_selector: %s", evaluation_mode, source_selector)
# Initialize judge variables
is_judge_input = False
judge_passed = False
judge_rating = 0
judge_feedback_from_input = ''
output_text = ''
def _segment_to_value(segment: Segment | None) -> Any:
if segment is None:
return None
if hasattr(segment, "to_object"):
try:
return segment.to_object()
except Exception: # pragma: no cover - defensive
pass
return getattr(segment, "value", segment)
# If evaluation_mode is 'llm-judge', try to read from upstream Judging LLM node
if evaluation_mode == 'llm-judge' and source_selector and len(source_selector) >= 1:
try:
node_id = source_selector[0]
# Retrieve judge outputs as Segments and convert to primitive values
passed_segment = self.graph_runtime_state.variable_pool.get([node_id, 'judge_passed'])
rating_segment = self.graph_runtime_state.variable_pool.get([node_id, 'judge_rating'])
feedback_segment = self.graph_runtime_state.variable_pool.get([node_id, 'judge_feedback'])
potential_judge_passed = _segment_to_value(passed_segment)
potential_judge_rating = _segment_to_value(rating_segment)
potential_judge_feedback = _segment_to_value(feedback_segment)
logger.info(
"ChallengeEvaluator - Reading judge outputs: passed=%s, rating=%s, feedback=%s",
potential_judge_passed,
potential_judge_rating,
potential_judge_feedback,
)
# If judge_passed exists, we successfully read from a Judging LLM node
if potential_judge_passed is not None:
is_judge_input = True
judge_passed = bool(potential_judge_passed)
judge_rating = int(potential_judge_rating or 0)
judge_feedback_from_input = str(potential_judge_feedback or '')
logger.info(
"ChallengeEvaluator - Judge input successfully read! passed=%s, rating=%s, feedback=%s",
judge_passed,
judge_rating,
judge_feedback_from_input,
)
except Exception as e:
logger.error("ChallengeEvaluator - Error reading judge outputs: %s", e, exc_info=True)
is_judge_input = False
# If not using judge input, get text output for rules-based evaluation
if not is_judge_input and source_selector:
try:
segment = self.graph_runtime_state.variable_pool.get(source_selector)
if segment is None:
output_text = ''
elif hasattr(segment, 'text'):
output_text = segment.text
else:
output_text = str(_segment_to_value(segment) or '')
except Exception:
output_text = ''
# Evaluate based on mode
if is_judge_input:
ok = judge_passed
details = {
'mode': 'llm-judge',
'rating': judge_rating,
'feedback': judge_feedback_from_input,
}
else:
# Rules-based evaluation (only if not using judge input)
ok, details = ChallengeService.evaluate_outcome(output_text, self._config)
# optional persistence if config carries challenge_id
challenge_id = self._config.get('challenge_id')
if challenge_id:
try:
# Calculate elapsed time in milliseconds
elapsed_ms = int((time.time() - self.graph_runtime_state.start_at) * 1000)
# Get total tokens used in the workflow so far
tokens_total = self.graph_runtime_state.total_tokens
# Extract judge_rating from details if available (for highest_rating strategy)
judge_rating = None
judge_feedback = None
if isinstance(details, dict):
judge_rating = details.get('rating')
judge_feedback = details.get('feedback')
# Load challenge to check scoring strategy
challenge = db.session.get(Challenge, str(challenge_id))
# Score field is reserved for custom scoring plugins.
# For built-in strategies (first, fastest, fewest_tokens, highest_rating),
# the leaderboard sorts by specific columns (created_at, elapsed_ms, tokens_total, judge_rating).
score = None
# If custom scoring is configured, compute score using plugin
if challenge and challenge.scoring_strategy == 'custom':
try:
metrics = {
'succeeded': ok,
'tokens_total': tokens_total,
'elapsed_ms': elapsed_ms,
'rating': judge_rating,
'created_at': int(time.time() * 1000),
}
ctx = {
'tenant_id': self.tenant_id,
'app_id': self.app_id,
'workflow_id': self.workflow_id,
'challenge_id': str(challenge_id),
'end_user_id': None,
'timeout_ms': 5000,
}
result = ChallengeScorerService.score_with_plugin(
scorer_plugin_id=challenge.scoring_plugin_id,
scorer_entrypoint=challenge.scoring_entrypoint,
metrics=metrics,
config=challenge.scoring_config or {},
ctx=ctx,
)
score = result.get('score')
logger.info(
"Custom scorer computed score: %s (details: %s)",
score,
result.get('details'),
)
except Exception as e:
logger.error("Custom scorer failed: %s", e, exc_info=True)
# Continue with score=None on error
ChallengeService.record_attempt(
tenant_id=self.tenant_id,
challenge_id=challenge_id,
end_user_id=None,
account_id=None,
workflow_run_id=None,
succeeded=ok,
score=score,
judge_rating=judge_rating,
judge_feedback=judge_feedback,
tokens_total=tokens_total,
elapsed_ms=elapsed_ms,
session=db.session,
)
except Exception:
# do not crash the workflow if recording fails
pass
# Always provide all output variables to match frontend getOutputVars
outputs: dict[str, Any] = {
'challenge_succeeded': ok,
'judge_rating': 0,
'judge_feedback': '',
'message': '',
}
# Override with actual values if evaluator provides them
if isinstance(details, dict):
logger.debug("ChallengeEvaluator - details: %s", details)
if 'rating' in details:
outputs['judge_rating'] = details.get('rating')
if 'feedback' in details:
outputs['judge_feedback'] = details.get('feedback')
if 'message' in details:
outputs['message'] = details.get('message')
# If no explicit message, create one from evaluation details
if not outputs['message']:
if ok:
outputs['message'] = f"Success: {details.get('mode', 'evaluation')} matched"
else:
outputs['message'] = f"Failed: {details.get('mode', 'evaluation')} did not match"
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs=outputs,
)

View file

@ -0,0 +1,188 @@
from __future__ import annotations
import json
import re
from collections.abc import Mapping
from typing import Any
from core.model_manager import ModelManager
from core.model_runtime.entities.llm_entities import LLMResult
from core.model_runtime.entities.message_entities import (
PromptMessageContentType,
SystemPromptMessage,
UserPromptMessage,
)
from core.model_runtime.entities.model_entities import ModelType
from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.base.node import Node
from services.challenge_service import ChallengeService
class JudgingLLMNode(Node):
node_type = NodeType.JUDGING_LLM
execution_type = NodeExecutionType.EXECUTABLE
_node_data: BaseNodeData
def init_node_data(self, data: Mapping[str, Any]):
self._node_data = BaseNodeData.model_validate(data)
# Access data directly from node_data, not from a 'config' key
self._config: dict[str, Any] = data
def _get_error_strategy(self) -> ErrorStrategy | None:
return getattr(self._node_data, 'error_strategy', None)
def _get_retry_config(self) -> RetryConfig:
return getattr(self._node_data, 'retry_config', RetryConfig())
def _get_title(self) -> str:
return getattr(self._node_data, 'title', 'Judging LLM')
def _get_description(self) -> str | None:
return getattr(self._node_data, 'desc', None)
def _get_default_value_dict(self) -> dict[str, Any]:
return getattr(self._node_data, 'default_value_dict', {})
def get_base_node_data(self) -> BaseNodeData:
return self._node_data
@classmethod
def version(cls) -> str:
return "1"
def _run(self) -> NodeRunResult:
# Placeholder with FE-compatible keys. Extract inputs for future wiring.
inputs_cfg = self._config.get('inputs') or {}
goal_selector = None
response_selector = None
if isinstance(inputs_cfg, dict):
goal_selector = inputs_cfg.get('goal')
response_selector = inputs_cfg.get('response')
# Attempt to read variables (not used in placeholder decision)
_ = None
try:
if goal_selector:
_ = self.graph_runtime_state.variable_pool.get(goal_selector)
if response_selector:
_ = self.graph_runtime_state.variable_pool.get(response_selector)
except Exception:
pass
outputs = {
'judge_passed': False,
'judge_rating': 0,
'judge_feedback': '',
}
# If model config and rubric provided, invoke LLM synchronously to judge
judge_model = self._config.get('judge_model') or {}
rubric = self._config.get('rubric_prompt_template') or ''
provider = (judge_model or {}).get('provider')
model_name = (judge_model or {}).get('name')
completion_params = (judge_model or {}).get('completion_params') or {}
def _segment_to_text(seg: Any) -> str:
try:
# Many variable types expose .text
if hasattr(seg, 'text'):
return str(seg.text)
if isinstance(seg, (dict, list)):
return json.dumps(seg, ensure_ascii=False)
return str(seg)
except Exception:
return ''
# Debug: log what we're checking
import logging
logger = logging.getLogger(__name__)
logger.info(
"JudgingLLM check - provider: %s, model: %s, rubric_len: %s, response_selector: %s",
provider,
model_name,
len(rubric) if rubric else 0,
response_selector,
)
if provider and model_name and rubric and response_selector:
logger.info("JudgingLLM: All conditions met, invoking LLM...")
try:
goal_val = self.graph_runtime_state.variable_pool.get(goal_selector) if goal_selector else None
response_val = self.graph_runtime_state.variable_pool.get(response_selector)
goal_text = _segment_to_text(goal_val)
response_text = _segment_to_text(response_val)
json_template = '{"passed": boolean, "rating": number (0-10), "feedback": string}'
prompt_body = (
f"Goal:\n{goal_text}\n\n"
f"Response:\n{response_text}\n\n"
f"Return JSON with rating 0-10: {json_template}"
)
prompt_messages = [
SystemPromptMessage(content=rubric),
UserPromptMessage(content=prompt_body),
]
model_instance = ModelManager().get_model_instance(
tenant_id=self.tenant_id,
model_type=ModelType.LLM,
provider=provider,
model=model_name,
)
result: LLMResult = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters=completion_params,
stop=[],
stream=False,
user=self.user_id,
) # type: ignore
# Extract text from result
text_out = ''
content = getattr(result.message, 'content', '')
if isinstance(content, str):
text_out = content
elif isinstance(content, list):
for item in content:
if getattr(item, 'type', None) == PromptMessageContentType.TEXT:
text_out += str(getattr(item, 'data', ''))
else:
text_out = str(content)
# Parse last JSON object in output
verdict: dict[str, Any] | None = None
try:
matches = re.findall(r"\{[\s\S]*\}", text_out)
if matches:
verdict = json.loads(matches[-1])
except Exception:
verdict = None
if isinstance(verdict, dict):
outputs['judge_passed'] = bool(verdict.get('passed'))
outputs['judge_rating'] = int(verdict.get('rating') or 0)
outputs['judge_feedback'] = str(verdict.get('feedback') or '')
outputs['judge_raw'] = json.dumps(verdict)
else:
# Fallback to simple rules if configured
success_type = self._config.get('success_type')
success_pattern = self._config.get('success_pattern')
if success_type and success_pattern:
ok, _ = ChallengeService.evaluate_outcome(response_text, {
'success_type': success_type,
'success_pattern': success_pattern,
})
outputs['judge_passed'] = ok
outputs['judge_rating'] = 10 if ok else 0
outputs['judge_feedback'] = 'passed by rules' if ok else 'failed by rules'
except Exception as e:
# keep default outputs on error
logger.error("JudgingLLM error: %s", e, exc_info=True)
pass
else:
logger.warning("JudgingLLM skipped - missing required fields")
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs=outputs)

View file

@ -24,6 +24,9 @@ from core.workflow.nodes.tool import ToolNode
from core.workflow.nodes.variable_aggregator import VariableAggregatorNode
from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1
from core.workflow.nodes.variable_assigner.v2 import VariableAssignerNode as VariableAssignerNodeV2
from core.workflow.nodes.challenge_evaluator.node import ChallengeEvaluatorNode
from core.workflow.nodes.judging_llm.node import JudgingLLMNode
from core.workflow.nodes.team_challenge.node import TeamChallengeNode
LATEST_VERSION = "latest"
@ -142,4 +145,16 @@ NODE_TYPE_CLASSES_MAPPING: Mapping[NodeType, Mapping[str, type[Node]]] = {
LATEST_VERSION: KnowledgeIndexNode,
"1": KnowledgeIndexNode,
},
NodeType.CHALLENGE_EVALUATOR: {
LATEST_VERSION: ChallengeEvaluatorNode,
"1": ChallengeEvaluatorNode,
},
NodeType.JUDGING_LLM: {
LATEST_VERSION: JudgingLLMNode,
"1": JudgingLLMNode,
},
NodeType.TEAM_CHALLENGE: {
LATEST_VERSION: TeamChallengeNode,
"1": TeamChallengeNode,
},
}

View file

@ -0,0 +1,68 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.base.node import Node
class TeamChallengeNode(Node):
node_type = NodeType.TEAM_CHALLENGE
execution_type = NodeExecutionType.EXECUTABLE
_node_data: BaseNodeData
def init_node_data(self, data: Mapping[str, Any]):
self._node_data = BaseNodeData.model_validate(data)
self._config: dict[str, Any] = data
def _get_error_strategy(self) -> ErrorStrategy | None:
return getattr(self._node_data, 'error_strategy', None)
def _get_retry_config(self) -> RetryConfig:
return getattr(self._node_data, 'retry_config', RetryConfig())
def _get_title(self) -> str:
return getattr(self._node_data, 'title', 'Team Challenge')
def _get_description(self) -> str | None:
return getattr(self._node_data, 'desc', None)
def _get_default_value_dict(self) -> dict[str, Any]:
return getattr(self._node_data, 'default_value_dict', {})
def get_base_node_data(self) -> BaseNodeData:
return self._node_data
@classmethod
def version(cls) -> str:
return "1"
def _run(self) -> NodeRunResult:
# Read inputs.team_choice for consistency with FE
inputs_cfg = self._config.get('inputs') or {}
team_choice = ''
if isinstance(inputs_cfg, dict):
team_choice_selector = inputs_cfg.get('team_choice')
if team_choice_selector:
try:
v = self.graph_runtime_state.variable_pool.get_value_by_selector(team_choice_selector)
team_choice = str(v or '')
except Exception:
team_choice = ''
outputs = {
'team': team_choice,
'judge_passed': False,
'judge_rating': 0,
'judge_feedback': '',
'categories': {},
'team_points': 0.0,
'total_points': 0.0,
}
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs=outputs)