Skip to content

Implementing an Evaluation Pipeline

This guide walks through setting up evaluation to measure extraction quality.

Prerequisites

  • A working Extraction Pipeline
  • Ground truth data for your documents
  • Understanding of relevant metrics for your use case

Step 1: Prepare Ground Truth Data

Create a dataset with known correct extractions. This example shows lease data similar to the examples repository:

data/evaluation/ground_truth.json
[
  {
    "file_path": "data/leases/lease_001.pdf",
    "ground_truth": {
      "landlord_name": "John Smith",
      "tenant_name": "Jane Doe",
      "property_address": {
        "street": "123 Main Street",
        "city": "San Francisco",
        "state": "CA",
        "postal_code": "94102"
      },
      "lease_start_date": "2024-01-01",
      "lease_end_date": "2024-12-31",
      "monthly_rent": 2500.00,
      "security_deposit": 5000.00,
      "currency": "USD"
    }
  },
  {
    "file_path": "data/leases/lease_002.pdf",
    "ground_truth": {
      "landlord_name": "ABC Properties LLC",
      "tenant_name": "Bob Wilson",
      "property_address": {
        "street": "456 Oak Avenue, Unit 2B",
        "city": "Los Angeles",
        "state": "CA",
        "postal_code": "90001"
      },
      "lease_start_date": "2024-03-15",
      "lease_end_date": "2025-03-14",
      "monthly_rent": 1800.00,
      "security_deposit": 3600.00,
      "currency": "USD"
    }
  }
]

Step 2: Implement TestDataLoader

import json
from pathlib import Path
from document_extraction_tools.base import BaseTestDataLoader
from document_extraction_tools.config import BaseTestDataLoaderConfig
from document_extraction_tools.types import (
    EvaluationExample,
    ExtractionResult,
    PathIdentifier,
    PipelineContext,
)


class JSONTestDataLoaderConfig(BaseTestDataLoaderConfig):
    pass  # Uses path_identifier to find the JSON file


class JSONTestDataLoader(BaseTestDataLoader[LeaseSchema]):
    def load_test_data(
        self,
        path_identifier: PathIdentifier,
        context: PipelineContext | None = None,
    ) -> list[EvaluationExample[LeaseSchema]]:
        with open(path_identifier.path) as f:
            data = json.load(f)

        examples = []
        for item in data:
            examples.append(EvaluationExample(
                id=item["file_path"],
                path_identifier=PathIdentifier(path=item["file_path"]),
                true=ExtractionResult(
                    data=LeaseSchema(**item["ground_truth"]),
                ),
            ))

        return examples

Step 3: Implement Evaluators

Create one or more evaluators to measure different aspects of quality.

Field Accuracy Evaluator

The examples repository includes accuracy and F1 evaluators. Here's a field accuracy evaluator:

from document_extraction_tools.base import BaseEvaluator
from document_extraction_tools.config import BaseEvaluatorConfig
from document_extraction_tools.types import EvaluationResult, ExtractionResult, PipelineContext


class FieldAccuracyEvaluatorConfig(BaseEvaluatorConfig):
    pass


class FieldAccuracyEvaluator(BaseEvaluator[LeaseSchema]):
    """Measures percentage of fields that exactly match."""

    def evaluate(
        self,
        true: ExtractionResult[LeaseSchema],
        pred: ExtractionResult[LeaseSchema],
        context: PipelineContext | None = None,
    ) -> EvaluationResult:
        true_data = true.data
        pred_data = pred.data
        fields = list(true_data.model_fields.keys())
        correct = sum(
            1 for field in fields
            if getattr(true_data, field) == getattr(pred_data, field)
        )

        accuracy = correct / len(fields) if fields else 0.0

        return EvaluationResult(
            name="field_accuracy",
            result=accuracy,
            description=f"{correct}/{len(fields)} fields correct"
        )

Numeric Tolerance Evaluator

class NumericToleranceEvaluatorConfig(BaseEvaluatorConfig):
    tolerance: float = 0.01  # 1% tolerance


class NumericToleranceEvaluator(BaseEvaluator[LeaseSchema]):
    """Checks if numeric fields (rent, deposit) are within tolerance."""

    def evaluate(
        self,
        true: ExtractionResult[LeaseSchema],
        pred: ExtractionResult[LeaseSchema],
        context: PipelineContext | None = None,
    ) -> EvaluationResult:
        true_data = true.data
        pred_data = pred.data
        # Check monthly_rent field
        if true_data.monthly_rent == 0:
            within_tolerance = pred_data.monthly_rent == 0
        else:
            relative_error = abs(true_data.monthly_rent - pred_data.monthly_rent) / true_data.monthly_rent
            within_tolerance = relative_error <= self.config.tolerance

        return EvaluationResult(
            name="rent_within_tolerance",
            result=1.0 if within_tolerance else 0.0,
            description=f"Rent {'within' if within_tolerance else 'outside'} {self.config.tolerance:.1%} tolerance"
        )

String Similarity Evaluator

from difflib import SequenceMatcher


class StringSimilarityEvaluatorConfig(BaseEvaluatorConfig):
    field_name: str = "landlord_name"


class StringSimilarityEvaluator(BaseEvaluator[LeaseSchema]):
    """Measures string similarity for a specific field."""

    def evaluate(
        self,
        true: ExtractionResult[LeaseSchema],
        pred: ExtractionResult[LeaseSchema],
        context: PipelineContext | None = None,
    ) -> EvaluationResult:
        true_value = getattr(true.data, self.config.field_name, "")
        pred_value = getattr(pred.data, self.config.field_name, "")

        similarity = SequenceMatcher(
            None,
            true_value.lower(),
            pred_value.lower()
        ).ratio()

        return EvaluationResult(
            name=f"{self.config.field_name}_similarity",
            result=similarity,
            description=f"String similarity: {similarity:.2%}"
        )

Step 4: Implement EvaluationExporter

import csv
from pathlib import Path
from document_extraction_tools.base import BaseEvaluationExporter
from document_extraction_tools.config import BaseEvaluationExporterConfig
from document_extraction_tools.types import Document, EvaluationResult, PipelineContext


class CSVEvaluationExporterConfig(BaseEvaluationExporterConfig):
    output_path: str


class CSVEvaluationExporter(BaseEvaluationExporter):
    async def export(
        self,
        results: list[tuple[Document, list[EvaluationResult]]],
        context: PipelineContext | None = None,
    ) -> None:
        output_path = Path(self.config.output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        # Collect all metric names
        all_metrics = set()
        for _, eval_results in results:
            for result in eval_results:
                all_metrics.add(result.name)

        # Write CSV
        with open(output_path, "w", newline="") as f:
            writer = csv.writer(f)

            # Header
            writer.writerow(["file"] + sorted(all_metrics))

            # Data rows
            for document, eval_results in results:
                row = [document.path_identifier.path]
                metrics = {r.name: r.result for r in eval_results}
                for metric in sorted(all_metrics):
                    row.append(metrics.get(metric, ""))
                writer.writerow(row)

        # Print summary
        print(f"\nEvaluation Results Summary:")
        print(f"{'Metric':<30} {'Mean':>10} {'Min':>10} {'Max':>10}")
        print("-" * 62)

        for metric in sorted(all_metrics):
            values = [
                r.result for _, results in results
                for r in results if r.name == metric
            ]
            if values:
                print(f"{metric:<30} {sum(values)/len(values):>10.3f} {min(values):>10.3f} {max(values):>10.3f}")

Step 5: Create Configuration Files

config/yaml/evaluator.yaml
FieldAccuracyEvaluatorConfig: {}

NumericToleranceEvaluatorConfig:
  tolerance: 0.01

StringSimilarityEvaluatorConfig:
  field_name: "landlord_name"
config/yaml/evaluation_exporter.yaml
output_path: "./output/evaluation_results.csv"
config/yaml/evaluation_orchestrator.yaml
max_workers: 4
max_concurrency: 10

Step 6: Run Evaluation

import asyncio
import uuid
from pathlib import Path
from document_extraction_tools.config import load_evaluation_config
from document_extraction_tools.runners import EvaluationOrchestrator
from document_extraction_tools.types import PathIdentifier, PipelineContext

async def main():
    # Load configuration
    config = load_evaluation_config(
        test_data_loader_config_cls=JSONTestDataLoaderConfig,
        evaluator_config_classes=[
            FieldAccuracyEvaluatorConfig,
            NumericToleranceEvaluatorConfig,
            StringSimilarityEvaluatorConfig,
        ],
        reader_config_cls=LocalReaderConfig,
        converter_config_cls=PDFConverterConfig,
        extractor_config_cls=GeminiExtractorConfig,
        evaluation_exporter_config_cls=CSVEvaluationExporterConfig,
        config_dir=Path("config/yaml"),
    )

    # Create orchestrator
    orchestrator = EvaluationOrchestrator.from_config(
        config=config,
        schema=LeaseSchema,
        reader_cls=LocalReader,
        converter_cls=PDFConverter,
        extractor_cls=GeminiImageExtractor,
        test_data_loader_cls=JSONTestDataLoader,
        evaluator_classes=[
            FieldAccuracyEvaluator,
            NumericToleranceEvaluator,
            StringSimilarityEvaluator,
        ],
        evaluation_exporter_cls=CSVEvaluationExporter,
    )

    # Load test data
    test_data_loader = JSONTestDataLoader(config)
    examples = test_data_loader.load_test_data(
        PathIdentifier(path="data/evaluation/ground_truth.json")
    )

    print(f"Evaluating {len(examples)} lease documents...")

    # Run evaluation with optional shared context
    context = PipelineContext(context={"run_id": str(uuid.uuid4())[:8]})
    await orchestrator.run(examples, context=context)

    print(f"\nResults saved to {config.evaluation_exporter.output_path}")


if __name__ == "__main__":
    asyncio.run(main())

Best Practices

Evaluation Tips

  • Use multiple metrics - Different metrics capture different aspects of quality
  • Track metrics over time - Save results to monitor improvements
  • Stratify your test set - Include edge cases and representative samples
  • Validate ground truth - Errors in ground truth corrupt your metrics