Skip to content

Eval Runner Service

Batch evaluation execution via the Axion evaluation engine.

Classes

EvalRunnerError

Bases: Exception

Base exception for eval runner errors.

MetricEvaluationError

Bases: EvalRunnerError

Error during metric evaluation.

AgentConnectionError

Bases: EvalRunnerError

Error connecting to agent API.

Functions

get_available_metrics()

Return all available metrics from the registry.

Source code in backend/app/services/eval_runner_service.py
def get_available_metrics() -> list[MetricInfo]:
    """Return all available metrics from the registry."""
    return METRIC_REGISTRY

get_metric_by_key(key)

Get a specific metric by its key.

Source code in backend/app/services/eval_runner_service.py
def get_metric_by_key(key: str) -> MetricInfo | None:
    """Get a specific metric by its key."""
    return METRIC_REGISTRY_MAP.get(key)

evaluate_llm_metric(metric_key, item, model_name, llm_provider) async

Evaluate a single item using an LLM-based metric.

Parameters:

Name Type Description Default
metric_key str

The metric to evaluate

required
item dict[str, Any]

The item data with required fields

required
model_name str

LLM model to use

required
llm_provider str

Provider (openai, anthropic)

required

Returns:

Type Description
tuple[float, str]

Tuple of (score, reasoning)

Source code in backend/app/services/eval_runner_service.py
async def evaluate_llm_metric(
    metric_key: str,
    item: dict[str, Any],
    model_name: str,
    llm_provider: str,
) -> tuple[float, str]:
    """Evaluate a single item using an LLM-based metric.

    Args:
        metric_key: The metric to evaluate
        item: The item data with required fields
        model_name: LLM model to use
        llm_provider: Provider (openai, anthropic)

    Returns:
        Tuple of (score, reasoning)
    """
    metric = METRIC_REGISTRY_MAP.get(metric_key)
    if not metric:
        raise MetricEvaluationError(f"Unknown metric: {metric_key}")

    # Build evaluation prompt based on metric type
    prompt = _build_metric_prompt(metric_key, item)

    try:
        registry = LLMRegistry(provider=llm_provider)
        llm = registry.get_llm(model_name)

        messages = [
            {
                "role": "system",
                "content": (
                    "You are an expert evaluator. Evaluate the given content "
                    "and respond with a JSON object containing:\n"
                    '- "score": a float between 0.0 and 1.0\n'
                    '- "reasoning": a brief explanation\n\n'
                    "Respond ONLY with valid JSON, no other text."
                ),
            },
            {"role": "user", "content": prompt},
        ]

        response = await llm.achat(messages)

        # Extract content from response
        if hasattr(response, "content"):
            content = response.content
        elif hasattr(response, "choices") and response.choices:
            content = response.choices[0].message.content
        else:
            content = str(response)

        # Parse JSON response
        try:
            # Find JSON in response (handle markdown code blocks)
            if "```json" in content:
                content = content.split("```json")[1].split("```")[0]
            elif "```" in content:
                content = content.split("```")[1].split("```")[0]

            result = json.loads(content.strip())
            score = float(result.get("score", 0.5))
            reasoning = result.get("reasoning", "No reasoning provided")

            # Clamp score to [0, 1]
            score = max(0.0, min(1.0, score))

            return score, reasoning

        except (json.JSONDecodeError, KeyError, ValueError) as e:
            logger.warning(f"Failed to parse LLM response: {e}")
            return 0.5, f"Failed to parse evaluation response: {content[:100]}"

    except Exception as e:
        logger.error(f"LLM metric evaluation failed: {e}")
        raise MetricEvaluationError(f"Failed to evaluate {metric_key}: {e}") from e

evaluate_heuristic_metric(metric_key, item)

Evaluate a single item using a heuristic (non-LLM) metric.

Parameters:

Name Type Description Default
metric_key str

The metric to evaluate

required
item dict[str, Any]

The item data with required fields

required

Returns:

Type Description
tuple[float, str]

Tuple of (score, reasoning)

Source code in backend/app/services/eval_runner_service.py
def evaluate_heuristic_metric(
    metric_key: str,
    item: dict[str, Any],
) -> tuple[float, str]:
    """Evaluate a single item using a heuristic (non-LLM) metric.

    Args:
        metric_key: The metric to evaluate
        item: The item data with required fields

    Returns:
        Tuple of (score, reasoning)
    """
    actual_output = str(item.get("actual_output", ""))
    expected_output = str(item.get("expected_output", ""))

    if metric_key == "exact_string_match":
        match = actual_output.strip() == expected_output.strip()
        return (1.0 if match else 0.0, "Exact match" if match else "No exact match")

    elif metric_key == "levenshtein_ratio":
        ratio = SequenceMatcher(None, actual_output, expected_output).ratio()
        return (ratio, f"Similarity ratio: {ratio:.2%}")

    elif metric_key == "sentence_bleu":
        score = _calculate_bleu(actual_output, expected_output)
        return (score, f"BLEU score: {score:.4f}")

    elif metric_key == "contains_match":
        expected_subs = item.get("expected_substrings", [expected_output])
        if isinstance(expected_subs, str):
            expected_subs = [expected_subs]
        matches = sum(1 for sub in expected_subs if sub.lower() in actual_output.lower())
        score = matches / len(expected_subs) if expected_subs else 0.0
        return (score, f"Matched {matches}/{len(expected_subs)} substrings")

    elif metric_key == "length_constraint":
        length = len(actual_output)
        min_len = item.get("min_length", 1)
        max_len = item.get("max_length", 10000)
        if min_len <= length <= max_len:
            return (1.0, f"Length {length} within bounds [{min_len}, {max_len}]")
        return (0.0, f"Length {length} outside bounds [{min_len}, {max_len}]")

    elif metric_key == "citation_presence":
        # Simple heuristic: check for common citation patterns
        citation_patterns = [
            "[1]",
            "[2]",
            "(1)",
            "(2)",
            "http://",
            "https://",
            "source:",
            "reference:",
        ]
        has_citation = any(p in actual_output.lower() for p in citation_patterns)
        return (
            1.0 if has_citation else 0.0,
            "Citation found" if has_citation else "No citation found",
        )

    elif metric_key == "latency":
        latency = float(item.get("latency", 0))
        # Normalize: lower latency is better, cap at threshold for score calculation
        threshold = 1000.0  # 1 second baseline
        score = max(0.0, 1.0 - (latency / threshold))
        return (score, f"Latency: {latency}ms")

    elif metric_key in ["hit_rate_at_k", "precision_at_k", "recall_at_k", "ndcg_at_k", "mrr"]:
        # Retrieval metrics - simplified implementation
        retrieved = item.get("retrieved_content", "")
        expected = item.get("expected_output", "")
        # Simple overlap-based approximation
        if not retrieved or not expected:
            return (0.0, "Missing retrieved content or expected output")
        overlap = SequenceMatcher(None, retrieved, expected).ratio()
        return (overlap, f"Content overlap: {overlap:.2%}")

    elif metric_key == "tool_correctness":
        tools_called = item.get("tools_called", [])
        expected_tools = item.get("expected_tools", [])
        if isinstance(tools_called, str):
            tools_called = [t.strip() for t in tools_called.split(",")]
        if isinstance(expected_tools, str):
            expected_tools = [t.strip() for t in expected_tools.split(",")]

        if not expected_tools:
            return (1.0, "No expected tools specified")

        correct = sum(1 for t in expected_tools if t in tools_called)
        score = correct / len(expected_tools)
        return (score, f"Correct tools: {correct}/{len(expected_tools)}")

    return (0.5, f"Unknown metric: {metric_key}")

call_agent_api(agent_config, query) async

Call an external agent API to generate output.

Parameters:

Name Type Description Default
agent_config AgentConfig

Agent configuration

required
query str

The query to send

required

Returns:

Type Description
tuple[str, float]

Tuple of (output, latency_ms)

Source code in backend/app/services/eval_runner_service.py
async def call_agent_api(
    agent_config: AgentConfig,
    query: str,
) -> tuple[str, float]:
    """Call an external agent API to generate output.

    Args:
        agent_config: Agent configuration
        query: The query to send

    Returns:
        Tuple of (output, latency_ms)
    """
    if agent_config.type == AgentType.NONE:
        raise AgentConnectionError("No agent configured")

    if agent_config.type == AgentType.API:
        return await _call_http_agent(agent_config, query)

    if agent_config.type == AgentType.PROMPT:
        return await _call_prompt_agent(agent_config, query)

    raise AgentConnectionError(f"Unknown agent type: {agent_config.type}")

prepare_evaluation_data(dataset_data, column_mapping, metrics)

Validate data, build DatasetItems, and instantiate metrics.

Returns:

Type Description
list[DatasetItem]

Tuple of (dataset_items, scoring_metrics, valid_metric_keys, warnings).

list[Any]

Warnings are human-readable strings for surfacing to the user.

Source code in backend/app/services/eval_runner_service.py
def prepare_evaluation_data(
    dataset_data: list[dict[str, Any]],
    column_mapping: ColumnMapping,
    metrics: list[str],
) -> tuple[list[DatasetItem], list[Any], list[str], list[str]]:
    """Validate data, build DatasetItems, and instantiate metrics.

    Returns:
        Tuple of (dataset_items, scoring_metrics, valid_metric_keys, warnings).
        Warnings are human-readable strings for surfacing to the user.
    """
    warnings: list[str] = []

    # Build DatasetItem list from input data
    dataset_items: list[DatasetItem] = []
    for i, row in enumerate(dataset_data):
        item_kwargs: dict[str, Any] = {}

        # Map columns to DatasetItem fields
        if column_mapping.query:
            item_kwargs["query"] = str(row.get(column_mapping.query, ""))
        if column_mapping.actual_output:
            item_kwargs["actual_output"] = str(row.get(column_mapping.actual_output, ""))
        if column_mapping.expected_output:
            val = row.get(column_mapping.expected_output)
            if val:
                item_kwargs["expected_output"] = str(val)
        if column_mapping.retrieved_content:
            val = row.get(column_mapping.retrieved_content)
            if val:
                # axion expects retrieved_content as a list of strings
                if isinstance(val, list):
                    item_kwargs["retrieved_content"] = [str(v) for v in val]
                else:
                    item_kwargs["retrieved_content"] = [str(val)]
        if column_mapping.latency:
            val = row.get(column_mapping.latency)
            if val:
                item_kwargs["latency"] = float(val)
        if column_mapping.tools_called:
            val = row.get(column_mapping.tools_called)
            if val:
                item_kwargs["tools_called"] = val if isinstance(val, list) else [val]
        if column_mapping.expected_tools:
            val = row.get(column_mapping.expected_tools)
            if val:
                item_kwargs["expected_tools"] = val if isinstance(val, list) else [val]
        if column_mapping.acceptance_criteria:
            val = row.get(column_mapping.acceptance_criteria)
            if val:
                item_kwargs["acceptance_criteria"] = str(val)

        # Create DatasetItem
        try:
            dataset_items.append(DatasetItem(**item_kwargs))
        except Exception as e:
            msg = f"Row {i}: validation issue — {e}"
            warnings.append(msg)
            logger.warning(f"Failed to create DatasetItem for row {i}: {e}")
            # Create minimal item
            dataset_items.append(
                DatasetItem(
                    query=item_kwargs.get("query", f"Item {i}"),
                    actual_output=item_kwargs.get("actual_output", ""),
                )
            )

    # Check which fields are actually populated across the dataset
    populated_fields: set[str] = set()
    for item in dataset_items:
        if item.query:
            populated_fields.add("query")
        if item.actual_output:
            populated_fields.add("actual_output")
        if item.expected_output:
            populated_fields.add("expected_output")
        if item.retrieved_content:
            populated_fields.add("retrieved_content")

    # Get metric instances from axion's registry
    scoring_metrics = []
    valid_metric_keys = []
    for metric_key in metrics:
        try:
            metric_class = axion_metric_registry.get(metric_key)
            if metric_class:
                metric_instance = metric_class()

                # Check required fields for this metric
                metric_info = METRIC_REGISTRY_MAP.get(metric_key)
                if metric_info:
                    missing_fields = [
                        f for f in metric_info.required_fields if f not in populated_fields
                    ]
                    if missing_fields:
                        msg = (
                            f"Metric '{metric_info.name}' requires {missing_fields} "
                            f"but data only has {sorted(populated_fields)}"
                        )
                        warnings.append(msg)
                        logger.warning(msg)

                scoring_metrics.append(metric_instance)
                valid_metric_keys.append(metric_key)
            else:
                msg = f"Metric '{metric_key}' not found in axion registry"
                warnings.append(msg)
                logger.warning(msg)
        except Exception as e:
            msg = f"Failed to instantiate metric '{metric_key}': {e}"
            warnings.append(msg)
            logger.warning(msg)

    if not scoring_metrics:
        raise EvalRunnerError("No valid metrics could be instantiated")

    return dataset_items, scoring_metrics, valid_metric_keys, warnings

run_evaluation_sync(evaluation_name, dataset_data, column_mapping, metrics, model_name, llm_provider, max_concurrent, thresholds, agent_config)

Run evaluation using axion's evaluation_runner (synchronous).

Convenience wrapper that validates data and runs evaluation in one call.

Source code in backend/app/services/eval_runner_service.py
def run_evaluation_sync(
    evaluation_name: str,
    dataset_data: list[dict[str, Any]],
    column_mapping: ColumnMapping,
    metrics: list[str],
    model_name: str,
    llm_provider: str,
    max_concurrent: int,
    thresholds: dict[str, float] | None,
    agent_config: AgentConfig | None,
) -> EvaluationSummary:
    """Run evaluation using axion's evaluation_runner (synchronous).

    Convenience wrapper that validates data and runs evaluation in one call.
    """
    dataset_items, scoring_metrics, valid_metric_keys, _warnings = prepare_evaluation_data(
        dataset_data, column_mapping, metrics
    )
    return _run_evaluation_core(
        evaluation_name=evaluation_name,
        dataset_items=dataset_items,
        scoring_metrics=scoring_metrics,
        valid_metric_keys=valid_metric_keys,
        model_name=model_name,
        llm_provider=llm_provider,
        max_concurrent=max_concurrent,
        thresholds=thresholds,
    )

run_evaluation(evaluation_name, dataset_data, column_mapping, metrics, model_name, llm_provider, max_concurrent, thresholds, agent_config, on_progress=None, on_log=None) async

Run evaluation asynchronously using axion's evaluation_runner.

Runs the synchronous evaluation_runner in a thread pool to not block the async event loop while still showing progress in the terminal.

Source code in backend/app/services/eval_runner_service.py
async def run_evaluation(
    evaluation_name: str,
    dataset_data: list[dict[str, Any]],
    column_mapping: ColumnMapping,
    metrics: list[str],
    model_name: str,
    llm_provider: str,
    max_concurrent: int,
    thresholds: dict[str, float] | None,
    agent_config: AgentConfig | None,
    on_progress: Any | None = None,
    on_log: Any | None = None,
) -> EvaluationSummary:
    """Run evaluation asynchronously using axion's evaluation_runner.

    Runs the synchronous evaluation_runner in a thread pool to not block
    the async event loop while still showing progress in the terminal.
    """
    # Generate outputs from agent if configured
    if agent_config and agent_config.type != AgentType.NONE:
        logger.info("Generating outputs from agent...")
        for i, row in enumerate(dataset_data):
            output_col = column_mapping.actual_output
            if output_col and not row.get(output_col):
                try:
                    query_col = column_mapping.query
                    query = str(row.get(query_col, "")) if query_col else ""
                    output, latency = await call_agent_api(agent_config, query)
                    row[output_col] = output
                    if column_mapping.latency:
                        row[column_mapping.latency] = latency
                except AgentConnectionError as e:
                    logger.error(f"Agent call failed for row {i}: {e}")
                    row[output_col] = ""

    # Run the synchronous evaluation in a thread pool
    loop = asyncio.get_event_loop()
    summary = await loop.run_in_executor(
        None,
        run_evaluation_sync,
        evaluation_name,
        dataset_data,
        column_mapping,
        metrics,
        model_name,
        llm_provider,
        max_concurrent,
        thresholds,
        agent_config,
    )

    return summary

run_evaluation_stream(evaluation_name, dataset_data, column_mapping, metrics, model_name, llm_provider, max_concurrent, thresholds, agent_config) async

Run evaluation with SSE streaming updates.

Yields SSE events for progress, logs, and completion. Runs validation in the async context for real progress tracking, then runs axion evaluation in a thread pool.

Source code in backend/app/services/eval_runner_service.py
async def run_evaluation_stream(
    evaluation_name: str,
    dataset_data: list[dict[str, Any]],
    column_mapping: ColumnMapping,
    metrics: list[str],
    model_name: str,
    llm_provider: str,
    max_concurrent: int,
    thresholds: dict[str, float] | None,
    agent_config: AgentConfig | None,
) -> AsyncGenerator[dict[str, Any], None]:
    """Run evaluation with SSE streaming updates.

    Yields SSE events for progress, logs, and completion.
    Runs validation in the async context for real progress tracking,
    then runs axion evaluation in a thread pool.
    """
    import concurrent.futures

    total_evaluations = len(dataset_data) * len(metrics)
    thresholds = thresholds or {}

    def _make_log(level: str, message: str) -> dict[str, Any]:
        return {
            "event": "log",
            "data": {
                "timestamp": datetime.now(UTC).isoformat(),
                "level": level,
                "message": message,
            },
        }

    def _make_progress(
        current: int, total: int, status: str, message: str, phase: str = "running"
    ) -> dict[str, Any]:
        return {
            "event": "progress",
            "data": {
                "current": current,
                "total": total,
                "status": status,
                "message": message,
                "phase": phase,
            },
        }

    try:
        yield _make_log("INFO", f"Starting evaluation: {evaluation_name}")
        yield _make_log(
            "INFO",
            f"Processing {len(dataset_data)} items with {len(metrics)} metrics "
            f"({total_evaluations} total evaluations)",
        )

        # Phase 1: Validate data and prepare metrics (real progress)
        yield _make_progress(0, total_evaluations, "running", "Validating data...", "validating")

        # Generate outputs from agent if configured
        if agent_config and agent_config.type != AgentType.NONE:
            yield _make_log("INFO", f"Generating outputs from agent ({agent_config.type.value})...")
            for i, row in enumerate(dataset_data):
                output_col = column_mapping.actual_output
                if output_col and not row.get(output_col):
                    try:
                        query_col = column_mapping.query
                        query = str(row.get(query_col, "")) if query_col else ""
                        output, latency = await call_agent_api(agent_config, query)
                        row[output_col] = output
                        if column_mapping.latency:
                            row[column_mapping.latency] = latency
                    except AgentConnectionError as e:
                        yield _make_log("WARNING", f"Agent call failed for row {i}: {e}")
                        row[output_col] = ""

        dataset_items, scoring_metrics, valid_metric_keys, warnings = prepare_evaluation_data(
            dataset_data, column_mapping, metrics
        )

        yield _make_log("INFO", f"Validated {len(dataset_items)} items successfully")
        yield _make_log("INFO", f"Instantiated {len(scoring_metrics)} metrics: {valid_metric_keys}")

        # Surface validation warnings as log events
        if warnings:
            yield _make_log(
                "WARNING",
                f"{len(warnings)} validation warning(s) detected:",
            )
            for warn_msg in warnings:
                yield _make_log("WARNING", warn_msg)

        # Phase 2: Run axion evaluation in thread pool
        yield _make_progress(0, total_evaluations, "running", "Running evaluation...", "evaluating")
        yield _make_log("INFO", "Starting metric evaluation with axion...")

        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(
                _run_evaluation_core,
                evaluation_name,
                dataset_items,
                scoring_metrics,
                valid_metric_keys,
                model_name,
                llm_provider,
                max_concurrent,
                thresholds,
            )

            # Poll for completion while sending progress updates
            elapsed: float = 0
            poll_interval = 0.5  # Faster polling for better UX
            while not future.done():
                await asyncio.sleep(poll_interval)
                elapsed += poll_interval

                # Estimate progress — use log curve for more natural feel
                # Approaches 90% asymptotically, never reaches 100% until done
                import math

                progress_pct = 1 - math.exp(-elapsed * max_concurrent * 0.3 / total_evaluations)
                estimated_progress = min(
                    int(progress_pct * total_evaluations),
                    total_evaluations - 1,
                )

                yield _make_progress(
                    estimated_progress,
                    total_evaluations,
                    "running",
                    f"Evaluating metrics... ({int(elapsed)}s elapsed)",
                    "evaluating",
                )

            # Get the result (may raise)
            summary = future.result()

        # Phase 3: Complete
        logger.info(
            f"Evaluation complete, preparing response with "
            f"{len(summary.dataframe_records)} dataframe records"
        )

        yield _make_progress(
            total_evaluations, total_evaluations, "complete", "Evaluation complete!", "complete"
        )

        # Check for potential issues in results
        zero_metrics = [
            mr.metric_name
            for mr in summary.metric_results
            if mr.average_score == 0.0 and mr.pass_rate == 0.0
        ]
        if zero_metrics:
            yield _make_log(
                "WARNING",
                f"Metrics with 0% scores: {zero_metrics}. "
                "This may indicate missing required data fields (e.g., expected_output, "
                "retrieved_content) or metric execution failures.",
            )

        # Serialize summary carefully to handle any problematic values
        try:
            summary_dict = summary.model_dump()
            if summary_dict.get("dataframe_records"):
                for record in summary_dict["dataframe_records"]:
                    for key, val in list(record.items()):
                        if val is not None and not isinstance(
                            val, str | int | float | bool | list | dict
                        ):
                            record[key] = str(val)
            logger.info(
                f"Serialized summary with {len(summary_dict.get('dataframe_records', []))} records"
            )
        except Exception as e:
            logger.error(f"Failed to serialize summary: {e}")
            summary_dict = summary.model_dump(exclude={"dataframe_records", "dataframe_columns"})
            summary_dict["dataframe_records"] = []
            summary_dict["dataframe_columns"] = []

        yield {
            "event": "complete",
            "data": {
                "run_id": summary.run_id,
                "summary": summary_dict,
            },
        }

    except Exception as e:
        logger.error(f"Evaluation stream error: {e}", exc_info=True)
        yield {
            "event": "error",
            "data": {
                "message": str(e),
                "details": None,
            },
        }