Ref: LLMRouterComponent with Advanced OpenRouter Integration (#8220)

* Refactor: LLMRouterComponent with Advanced OpenRouter Integration and Detailed Output

* [autofix.ci] apply automated fixes

* feat: refactor LLM Router with OpenRouter API integration and enhanced routing logic - Add OpenRouter API integration for model specifications fetching - Implement intelligent model name mapping and caching system - Add structured outputs (selected_model_info, routing_decision) - Improve error handling with specific exception types - Add constants for magic values and enhanced judge LLM prompts - Add fallback mechanism and fix all linting issues

* fix: resolve final linting issues in LLM Router - Fix E501: Break long log message into multiple lines - Fix BLE001: Replace generic Exception with specific exceptions (AttributeError, IndexError)

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
This commit is contained in:
Raphael Valdetaro 2025-06-02 15:11:26 -03:00 committed by GitHub
commit 21af370e5b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,11 +1,15 @@
import asyncio
import http # Added for HTTPStatus
import json
from typing import Any
import requests
import aiohttp
from langflow.base.models.chat_result import get_chat_result
from langflow.base.models.model_utils import get_model_name
from langflow.custom import Component
from langflow.io import DropdownInput, HandleInput, Output
from langflow.io import BoolInput, DropdownInput, HandleInput, IntInput, MultilineInput, Output
from langflow.schema import Data
from langflow.schema.message import Message
@ -14,6 +18,10 @@ class LLMRouterComponent(Component):
description = "Routes the input to the most appropriate LLM based on OpenRouter model specifications"
icon = "git-branch"
# Constants for magic values
MAX_DESCRIPTION_LENGTH = 500
QUERY_PREVIEW_MAX_LENGTH = 1000
inputs = [
HandleInput(
name="models",
@ -23,16 +31,17 @@ class LLMRouterComponent(Component):
is_list=True,
info="List of LLMs to route between",
),
HandleInput(
MultilineInput(
name="input_value",
display_name="Input",
input_types=["Message"],
required=True,
info="The input message to be routed",
),
HandleInput(
name="judge_llm",
display_name="Judge LLM",
input_types=["LanguageModel"],
required=True,
info="LLM that will evaluate and select the most appropriate model",
),
DropdownInput(
@ -42,137 +51,447 @@ class LLMRouterComponent(Component):
value="balanced",
info="Optimization preference for model selection",
),
BoolInput(
name="use_openrouter_specs",
display_name="Use OpenRouter Specs",
value=True,
info=(
"Fetch model specifications from OpenRouter API for enhanced routing decisions. "
"If false, only model names will be used."
),
advanced=True,
),
IntInput(
name="timeout",
display_name="API Timeout",
value=10,
info="Timeout for API requests in seconds",
advanced=True,
),
BoolInput(
name="fallback_to_first",
display_name="Fallback to First Model",
value=True,
info="Use first model as fallback when routing fails",
advanced=True,
),
]
outputs = [
Output(display_name="Output", name="output", method="route_to_model"),
Output(
display_name="Selected Model",
name="selected_model",
method="get_selected_model",
required_inputs=["output"],
display_name="Selected Model Info",
name="selected_model_info",
method="get_selected_model_info",
types=["Data"],
),
Output(
display_name="Routing Decision",
name="routing_decision",
method="get_routing_decision",
),
]
_selected_model_name: str | None = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._selected_model_name: str | None = None
self._selected_api_model_id: str | None = None
self._routing_decision: str = ""
self._models_api_cache: dict[str, dict[str, Any]] = {}
self._model_name_to_api_id: dict[str, str] = {}
def get_selected_model(self) -> str:
return self._selected_model_name or ""
def _simplify_model_name(self, name: str) -> str:
"""Simplify model name for matching by lowercasing and removing non-alphanumerics."""
return "".join(c.lower() for c in name if c.isalnum())
def _get_model_specs(self, model_name: str) -> str:
"""Fetch specific model information from OpenRouter API."""
http_success = 200
base_info = f"Model: {model_name}\n"
async def _fetch_openrouter_models_data(self) -> None:
"""Fetch all models from OpenRouter API and cache them along with name mappings."""
if self._models_api_cache and self._model_name_to_api_id:
return
# Remove any special characters and spaces, keep only alphanumeric
clean_name = "".join(c.lower() for c in model_name if c.isalnum())
url = f"https://openrouter.ai/api/v1/models/{clean_name}/endpoints"
if not self.use_openrouter_specs:
self.log("OpenRouter specs are disabled. Skipping fetch.")
return
try:
response = requests.get(url, timeout=10)
except requests.exceptions.RequestException as e:
return base_info + f"Error fetching specs: {e!s}"
self.status = "Fetching OpenRouter model specifications..."
self.log("Fetching all model specifications from OpenRouter API: https://openrouter.ai/api/v1/models")
async with (
aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session,
session.get("https://openrouter.ai/api/v1/models") as response,
):
if response.status == http.HTTPStatus.OK:
data = await response.json()
models_list = data.get("data", [])
if response.status_code != http_success:
return base_info + "No specifications available"
_models_api_cache_temp = {}
_model_name_to_api_id_temp = {}
try:
data = response.json().get("data", {})
except (json.JSONDecodeError, requests.exceptions.JSONDecodeError):
return base_info + "Error parsing response data"
for model_data in models_list:
api_model_id = model_data.get("id")
if not api_model_id:
continue
# Extract relevant information
context_length = data.get("context_length", "Unknown")
max_completion_tokens = data.get("max_completion_tokens", "Unknown")
architecture = data.get("architecture", {})
tokenizer = architecture.get("tokenizer", "Unknown")
instruct_type = architecture.get("instruct_type", "Unknown")
_models_api_cache_temp[api_model_id] = model_data
_model_name_to_api_id_temp[api_model_id] = api_model_id
pricing = data.get("pricing", {})
prompt_price = pricing.get("prompt", "Unknown")
completion_price = pricing.get("completion", "Unknown")
api_model_name = model_data.get("name")
if api_model_name:
_model_name_to_api_id_temp[api_model_name] = api_model_id
simplified_api_name = self._simplify_model_name(api_model_name)
_model_name_to_api_id_temp[simplified_api_name] = api_model_id
description = data.get("description", "No description available")
created = data.get("created", "Unknown")
hugging_face_id = model_data.get("hugging_face_id")
if hugging_face_id:
_model_name_to_api_id_temp[hugging_face_id] = api_model_id
simplified_hf_id = self._simplify_model_name(hugging_face_id)
_model_name_to_api_id_temp[simplified_hf_id] = api_model_id
return f"""
Model: {model_name}
Description: {description}
Context Length: {context_length} tokens
Max Completion Tokens: {max_completion_tokens}
Tokenizer: {tokenizer}
Instruct Type: {instruct_type}
Pricing: ${prompt_price}/1k tokens (prompt), ${completion_price}/1k tokens (completion)
Created: {created}
"""
if "/" in api_model_id:
try:
model_name_part_of_id = api_model_id.split("/", 1)[1]
if model_name_part_of_id:
_model_name_to_api_id_temp[model_name_part_of_id] = api_model_id
simplified_part_id = self._simplify_model_name(model_name_part_of_id)
_model_name_to_api_id_temp[simplified_part_id] = api_model_id
except IndexError:
pass # Should not happen if '/' is present
MISSING_INPUTS_MSG = "Missing required inputs: models, input_value, or judge_llm"
self._models_api_cache = _models_api_cache_temp
self._model_name_to_api_id = _model_name_to_api_id_temp
log_msg = (
f"Successfully fetched and cached {len(self._models_api_cache)} "
f"model specifications from OpenRouter."
)
self.log(log_msg)
else:
err_text = await response.text()
self.log(f"Failed to fetch OpenRouter models: HTTP {response.status} - {err_text}")
self._models_api_cache = {}
self._model_name_to_api_id = {}
except aiohttp.ClientError as e:
self.log(f"AIOHTTP ClientError fetching OpenRouter models: {e!s}", "error")
self._models_api_cache = {}
self._model_name_to_api_id = {}
except asyncio.TimeoutError:
self.log("Timeout fetching OpenRouter model specifications.", "error")
self._models_api_cache = {}
self._model_name_to_api_id = {}
except json.JSONDecodeError as e:
self.log(f"JSON decode error fetching OpenRouter models: {e!s}", "error")
self._models_api_cache = {}
self._model_name_to_api_id = {}
finally:
self.status = ""
def _get_api_model_id_for_langflow_model(self, langflow_model_name: str) -> str | None:
"""Attempt to find the OpenRouter API ID for a given Langflow model name."""
if not langflow_model_name:
return None
potential_names_to_check = [langflow_model_name, self._simplify_model_name(langflow_model_name)]
if langflow_model_name.startswith("models/"):
name_without_prefix = langflow_model_name[len("models/") :]
potential_names_to_check.append(name_without_prefix)
potential_names_to_check.append(self._simplify_model_name(name_without_prefix))
elif langflow_model_name.startswith("community_models/"):
name_without_prefix = langflow_model_name[len("community_models/") :]
potential_names_to_check.append(name_without_prefix)
simplified_no_prefix = self._simplify_model_name(name_without_prefix)
potential_names_to_check.append(simplified_no_prefix)
elif langflow_model_name.startswith("community_models/"):
name_without_prefix = langflow_model_name[len("community_models/") :]
potential_names_to_check.append(name_without_prefix)
simplified_no_prefix_comm = self._simplify_model_name(name_without_prefix)
potential_names_to_check.append(simplified_no_prefix_comm)
unique_names_to_check = list(dict.fromkeys(potential_names_to_check))
for name_variant in unique_names_to_check:
if name_variant in self._model_name_to_api_id:
return self._model_name_to_api_id[name_variant]
self.log(
f"Could not map Langflow model name '{langflow_model_name}' "
f"(tried variants: {unique_names_to_check}) to an OpenRouter API ID."
)
return None
def _get_model_specs_dict(self, langflow_model_name: str) -> dict[str, Any]:
"""Get a dictionary of relevant model specifications for a given Langflow model name."""
if not self.use_openrouter_specs or not self._models_api_cache:
return {
"id": langflow_model_name,
"name": langflow_model_name,
"description": "Specifications not available.",
}
api_model_id = self._get_api_model_id_for_langflow_model(langflow_model_name)
if not api_model_id or api_model_id not in self._models_api_cache:
log_msg = (
f"No cached API data found for Langflow model '{langflow_model_name}' "
f"(mapped API ID: {api_model_id}). Returning basic info."
)
self.log(log_msg)
return {
"id": langflow_model_name,
"name": langflow_model_name,
"description": "Full specifications not found in cache.",
}
model_data = self._models_api_cache[api_model_id]
top_provider_data = model_data.get("top_provider", {})
architecture_data = model_data.get("architecture", {})
pricing_data = model_data.get("pricing", {})
description = model_data.get("description", "No description available")
truncated_description = (
description[: self.MAX_DESCRIPTION_LENGTH - 3] + "..."
if len(description) > self.MAX_DESCRIPTION_LENGTH
else description
)
specs = {
"id": model_data.get("id"),
"name": model_data.get("name"),
"description": truncated_description,
"context_length": top_provider_data.get("context_length") or model_data.get("context_length"),
"max_completion_tokens": (
top_provider_data.get("max_completion_tokens") or model_data.get("max_completion_tokens")
),
"tokenizer": architecture_data.get("tokenizer"),
"input_modalities": architecture_data.get("input_modalities", []),
"output_modalities": architecture_data.get("output_modalities", []),
"pricing_prompt": pricing_data.get("prompt"),
"pricing_completion": pricing_data.get("completion"),
"is_moderated": top_provider_data.get("is_moderated"),
"supported_parameters": model_data.get("supported_parameters", []),
}
return {k: v for k, v in specs.items() if v is not None}
def _create_system_prompt(self) -> str:
"""Create system prompt for the judge LLM."""
return """\
You are an expert AI model selection specialist. Your task is to analyze the user's input query,
their optimization preference, and a list of available models with their specifications,
then select the most appropriate model.
Each model will be presented as a JSON object with its capabilities and characteristics.
Your decision should be based on:
1. Task complexity and requirements derived from the user's query.
2. Context length needed for the input.
3. Model capabilities (e.g., context window, input/output modalities, tokenizer).
4. Pricing considerations, if relevant to the optimization preference.
5. User's stated optimization preference (quality, speed, cost, balanced).
Return ONLY the index number (0, 1, 2, etc.) of the best model from the provided list.
Do not provide any explanation or reasoning, just the index number.
If multiple models seem equally suitable according to the preference, you may pick the first one that matches.
If no model seems suitable, pick the first model in the list (index 0) as a fallback."""
async def route_to_model(self) -> Message:
"""Main routing method."""
if not self.models or not self.input_value or not self.judge_llm:
raise ValueError(self.MISSING_INPUTS_MSG)
system_prompt = {
"role": "system",
"content": (
"You are a model selection expert. Analyze the input and select the most "
"appropriate model based on:\n"
"1. Task complexity and requirements\n"
"2. Context length needed\n"
"3. Model capabilities\n"
"4. Cost considerations\n"
"5. Speed requirements\n\n"
"Consider the detailed model specifications provided and the user's "
"optimization preference. Return only the index number (0-based) of the best model."
),
}
# Create list of available models with their detailed specs
models_info = []
for i, model in enumerate(self.models):
model_name = get_model_name(model)
model_specs = self._get_model_specs(model_name)
models_info.append(f"=== Model {i} ===\n{model_specs}")
models_str = "\n\n".join(models_info)
user_message = {
"role": "user",
"content": f"""Available Models with Specifications:\n{models_str}\n
Optimization Preference: {self.optimization}\n
Input Query: "{self.input_value.text}"\n
Based on the model specifications and optimization preference,
select the most appropriate model (return only the index number):""",
}
error_msg = "Missing required inputs: models, input_value, or judge_llm"
self.status = error_msg
self.log(f"Validation Error: {error_msg}", "error")
raise ValueError(error_msg)
successful_result: Message | None = None
try:
# Get judge's decision
response = await self.judge_llm.ainvoke([system_prompt, user_message])
self.log(f"Starting model routing with {len(self.models)} available Langflow models.")
self.log(f"Optimization preference: {self.optimization}")
self.log(f"Input length: {len(self.input_value)} characters")
try:
selected_index = int(response.content.strip())
if 0 <= selected_index < len(self.models):
chosen_model = self.models[selected_index]
self._selected_model_name = get_model_name(chosen_model)
if self.use_openrouter_specs and not self._models_api_cache:
await self._fetch_openrouter_models_data()
system_prompt_content = self._create_system_prompt()
system_message = {"role": "system", "content": system_prompt_content}
self.status = "Analyzing available models and preparing specifications..."
model_specs_for_judge = []
for i, langflow_model_instance in enumerate(self.models):
langflow_model_name = get_model_name(langflow_model_instance)
if not langflow_model_name:
self.log(f"Warning: Could not determine name for model at index {i}. Using placeholder.", "warning")
spec_dict = {
"id": f"unknown_model_{i}",
"name": f"Unknown Model {i}",
"description": "Name could not be determined.",
}
else:
chosen_model = self.models[0]
self._selected_model_name = get_model_name(chosen_model)
except ValueError:
chosen_model = self.models[0]
self._selected_model_name = get_model_name(chosen_model)
spec_dict = self._get_model_specs_dict(langflow_model_name)
# Get response from chosen model
return get_chat_result(
runnable=chosen_model,
input_value=self.input_value,
)
model_specs_for_judge.append({"index": i, "langflow_name": langflow_model_name, "specs": spec_dict})
self.log(
f"Prepared specs for Langflow model {i} ('{langflow_model_name}'): {spec_dict.get('name', 'N/A')}"
)
except (RuntimeError, ValueError) as e:
self.status = f"Error: {e!s}"
# Fallback to first model
chosen_model = self.models[0]
self._selected_model_name = get_model_name(chosen_model)
return get_chat_result(
runnable=chosen_model,
input_value=self.input_value,
estimated_tokens = len(self.input_value.split()) * 1.3
self.log(f"Estimated input tokens: {int(estimated_tokens)}")
query_preview = self.input_value[: self.QUERY_PREVIEW_MAX_LENGTH]
if len(self.input_value) > self.QUERY_PREVIEW_MAX_LENGTH:
query_preview += "..."
user_message_content = f"""User Query: "{query_preview}"
Optimization Preference: {self.optimization}
Estimated Input Tokens: ~{int(estimated_tokens)}
Available Models (JSON list):
{json.dumps(model_specs_for_judge, indent=2)}
Based on the user query, optimization preference, and the detailed model specifications,
select the index of the most appropriate model.
Return ONLY the index number:"""
user_message = {"role": "user", "content": user_message_content}
self.log("Requesting model selection from judge LLM...")
self.status = "Judge LLM analyzing options..."
response = await self.judge_llm.ainvoke([system_message, user_message])
selected_index, chosen_model_instance = self._parse_judge_response(response.content.strip())
self._selected_model_name = get_model_name(chosen_model_instance)
if self._selected_model_name:
self._selected_api_model_id = (
self._get_api_model_id_for_langflow_model(self._selected_model_name) or self._selected_model_name
)
else:
self._selected_api_model_id = "unknown_model"
specs_source = (
"OpenRouter API"
if self.use_openrouter_specs and self._models_api_cache
else "Basic (Langflow model names only)"
)
self._routing_decision = f"""Model Selection Decision:
- Selected Model Index: {selected_index}
- Selected Langflow Model Name: {self._selected_model_name}
- Selected API Model ID (if resolved): {self._selected_api_model_id}
- Optimization Preference: {self.optimization}
- Input Query Length: {len(self.input_value)} characters (~{int(estimated_tokens)} tokens)
- Number of Models Considered: {len(self.models)}
- Specifications Source: {specs_source}"""
log_msg = (
f"DECISION by Judge LLM: Selected model index {selected_index} -> "
f"Langflow Name: '{self._selected_model_name}', API ID: '{self._selected_api_model_id}'"
)
self.log(log_msg)
self.status = f"Generating response with: {self._selected_model_name}"
input_message_obj = Message(text=self.input_value)
raw_result = get_chat_result(
runnable=chosen_model_instance,
input_value=input_message_obj,
)
result = Message(text=str(raw_result)) if not isinstance(raw_result, Message) else raw_result
self.status = f"Successfully routed to: {self._selected_model_name}"
successful_result = result
except (ValueError, TypeError, AttributeError, KeyError, RuntimeError) as e:
error_msg = f"Routing error: {type(e).__name__} - {e!s}"
self.log(f"{error_msg}", "error")
self.log("Detailed routing error occurred. Check logs for details.", "error")
self.status = error_msg
if self.fallback_to_first and self.models:
self.log("Activating fallback to first model due to error.", "warning")
chosen_model_instance = self.models[0]
self._selected_model_name = get_model_name(chosen_model_instance)
if self._selected_model_name:
mapped_id = self._get_api_model_id_for_langflow_model(self._selected_model_name)
self._selected_api_model_id = mapped_id or self._selected_model_name
else:
self._selected_api_model_id = "fallback_model"
self._routing_decision = f"""Fallback Decision:
- Error During Routing: {error_msg}
- Fallback Model Langflow Name: {self._selected_model_name}
- Fallback Model API ID (if resolved): {self._selected_api_model_id}
- Reason: Automatic fallback enabled"""
self.status = f"Fallback: Using {self._selected_model_name}"
input_message_obj = Message(text=self.input_value)
raw_fallback_result = get_chat_result(
runnable=chosen_model_instance,
input_value=input_message_obj,
)
if not isinstance(raw_fallback_result, Message):
successful_result = Message(text=str(raw_fallback_result))
else:
successful_result = raw_fallback_result
else:
self.log("No fallback model available or fallback disabled. Raising error.", "error")
raise
if successful_result is None:
error_message = "Unexpected state in route_to_model: No result produced."
self.log(f"Error: {error_message}", "error")
raise RuntimeError(error_message)
return successful_result
def _parse_judge_response(self, response_content: str) -> tuple[int, Any]:
"""Parse the judge's response to extract model index."""
try:
cleaned_response = "".join(filter(str.isdigit, response_content.strip()))
if not cleaned_response:
self.log(f"Judge LLM response was non-numeric: '{response_content}'. Defaulting to index 0.", "warning")
return 0, self.models[0]
selected_index = int(cleaned_response)
if 0 <= selected_index < len(self.models):
self.log(f"Judge LLM selected index: {selected_index}")
return selected_index, self.models[selected_index]
log_msg = (
f"Judge LLM selected index {selected_index} is out of bounds "
f"(0-{len(self.models) - 1}). Defaulting to index 0."
)
self.log(log_msg, "warning")
return 0, self.models[0]
except ValueError:
self.log(
f"Could not parse judge LLM response to integer: '{response_content}'. Defaulting to index 0.",
"warning",
)
return 0, self.models[0]
except (AttributeError, IndexError) as e:
self.log(f"Error parsing judge response '{response_content}': {e!s}. Defaulting to index 0.", "error")
return 0, self.models[0]
def get_selected_model_info(self) -> list[Data]:
"""Return detailed information about the selected model as a list of Data objects."""
if self._selected_model_name:
specs_dict = self._get_model_specs_dict(self._selected_model_name)
if "langflow_name" not in specs_dict:
specs_dict["langflow_model_name_used_for_lookup"] = self._selected_model_name
if self._selected_api_model_id and specs_dict.get("id") != self._selected_api_model_id:
specs_dict["resolved_api_model_id"] = self._selected_api_model_id
data_output = [Data(data=specs_dict)]
self.status = data_output
return data_output
data_output = [Data(data={"info": "No model selected yet - run the router first."})]
self.status = data_output
return data_output
def get_routing_decision(self) -> Message:
"""Return the comprehensive routing decision explanation."""
if self._routing_decision:
message_output = Message(text=f"{self._routing_decision}")
self.status = message_output
return message_output
message_output = Message(text="No routing decision made yet - run the router first.")
self.status = message_output
return message_output