Evaluation Pipeline¶
The evaluation pipeline measures extraction quality against ground truth data.
Pipeline Flow¶
flowchart LR
subgraph Input
TDL[TestDataLoader]
end
subgraph ThreadPool["Thread Pool (CPU-bound)"]
R[Reader]
C[Converter]
EV[Evaluators]
end
subgraph AsyncIO["Async Event Loop (I/O-bound)"]
E[Extractor]
end
subgraph Output
EX[Exporter]
end
TDL -->|"list[EvaluationExample]"| R
R -->|DocumentBytes| C
C -->|Document| E
E -->|ExtractionResult| EV
TDL -.->|GroundTruth| EV
EV -->|"list[EvaluationResult]"| EX
EX -->|Persisted| Storage[(Storage)]
Per-Example Processing¶
For each evaluation example, the orchestrator runs:
sequenceDiagram
participant O as Orchestrator
participant TP as Thread Pool
participant R as Reader
participant C as Converter
participant E as Extractor
participant EV as Evaluators
participant EX as Exporter
Note over O: EvaluationExample contains<br/>PathIdentifier + GroundTruth
Note over O: PipelineContext passed to all components
O->>TP: Submit ingest task
TP->>R: read(path_identifier, context)
R-->>TP: DocumentBytes
TP->>C: convert(document_bytes, context)
C-->>TP: Document
TP-->>O: Document
O->>E: extract(document, schema, context)
Note over E: Async I/O (e.g., LLM API call)
E-->>O: ExtractionResult
O->>TP: Submit evaluation tasks
TP->>EV: evaluate(ground_truth, prediction)
EV-->>TP: EvaluationResult
TP-->>O: list[EvaluationResult]
Note over O: After all examples complete
O->>EX: export(all_results, context)
Note over EX: Async I/O
EX-->>O: Done
Components¶
1. TestDataLoader¶
Loads evaluation examples (ground truth + file paths). Called before the orchestrator runs.
from document_extraction_tools.base import BaseTestDataLoader
from document_extraction_tools.types import EvaluationExample, ExtractionResult, PathIdentifier
class MyTestDataLoader(BaseTestDataLoader[InvoiceSchema]):
def load_test_data(
self, path_identifier: PathIdentifier
) -> list[EvaluationExample[InvoiceSchema]]:
# Load from JSON, CSV, database, etc.
examples = []
for row in load_ground_truth_data():
examples.append(EvaluationExample(
id=row["file_path"],
path_identifier=PathIdentifier(path=row["file_path"]),
true=ExtractionResult(data=InvoiceSchema(**row["true"])),
))
return examples
2. Evaluator¶
Computes metrics by comparing ground truth vs predictions. Runs in the thread pool.
from document_extraction_tools.base import BaseEvaluator
from document_extraction_tools.types import EvaluationResult, ExtractionResult, PipelineContext
class FieldAccuracyEvaluator(BaseEvaluator[InvoiceSchema]):
def evaluate(
self,
true: ExtractionResult[InvoiceSchema],
pred: ExtractionResult[InvoiceSchema],
context: PipelineContext | None = None,
) -> EvaluationResult:
# Compare fields
total_fields = len(true.data.model_fields)
correct = sum(
1 for field in true.data.model_fields
if getattr(true.data, field) == getattr(pred.data, field)
)
return EvaluationResult(
name="field_accuracy",
result=correct / total_fields,
description="Percentage of fields correctly extracted"
)
3. EvaluationExporter¶
Persists evaluation results. Called once after all examples are processed. Runs in the async event loop.
from document_extraction_tools.base import BaseEvaluationExporter
from document_extraction_tools.types import Document, EvaluationResult
class MyEvaluationExporter(BaseEvaluationExporter):
async def export(
self, results: list[tuple[Document, list[EvaluationResult]]]
) -> None:
# Save to file, database, monitoring system, etc.
for document, eval_results in results:
for result in eval_results:
print(f"{document.path_identifier.path}: {result.name}={result.result}")
Multiple Evaluators¶
You can run multiple evaluators to compute different metrics:
flowchart LR
subgraph Evaluators
E1[FieldAccuracyEvaluator]
E2[LevenshteinEvaluator]
E3[NumericToleranceEvaluator]
end
GT[GroundTruth] --> E1 & E2 & E3
Pred[Prediction] --> E1 & E2 & E3
E1 --> R1[EvaluationResult]
E2 --> R2[EvaluationResult]
E3 --> R3[EvaluationResult]
evaluator_classes = [
FieldAccuracyEvaluator,
LevenshteinDistanceEvaluator,
NumericToleranceEvaluator,
]
orchestrator = EvaluationOrchestrator.from_config(
config=config,
schema=InvoiceSchema,
reader_cls=MyReader,
converter_cls=MyConverter,
extractor_cls=MyExtractor,
test_data_loader_cls=MyTestDataLoader,
evaluator_classes=evaluator_classes,
evaluation_exporter_cls=MyEvaluationExporter,
)
Concurrency Model¶
flowchart TB
subgraph "Orchestrator.run(examples)"
direction TB
TP["ThreadPoolExecutor<br/>max_workers"]
SEM["Semaphore<br/>max_concurrency"]
subgraph "Per Example Task"
I["Ingest: Read + Convert"]
EX["Extract"]
EV["Evaluate (all evaluators)"]
end
TP -.->|"CPU-bound"| I
SEM -.->|"I/O-bound"| EX
TP -.->|"CPU-bound"| EV
subgraph "After All Examples"
Export["Export all results"]
end
end
| Stage | Execution Model | Reason |
|---|---|---|
| Reader | Thread pool | File I/O is blocking |
| Converter | Thread pool | CPU-bound parsing |
| Extractor | Async | Network I/O (LLM calls) |
| Evaluators | Thread pool | CPU-bound comparison |
| Exporter | Async | Network/disk I/O |
Running Evaluation¶
import uuid
from document_extraction_tools.config import load_evaluation_config
from document_extraction_tools.runners import EvaluationOrchestrator
from document_extraction_tools.types import PipelineContext
config = load_evaluation_config(
test_data_loader_config_cls=MyTestDataLoaderConfig,
evaluator_config_classes=[FieldAccuracyEvaluatorConfig],
reader_config_cls=MyReaderConfig,
converter_config_cls=MyConverterConfig,
extractor_config_cls=MyExtractorConfig,
evaluation_exporter_config_cls=MyEvaluationExporterConfig,
)
orchestrator = EvaluationOrchestrator.from_config(...)
# Load test data
examples = MyTestDataLoader(config).load_test_data(
PathIdentifier(path="/path/to/eval-set")
)
# Run evaluation with optional context
context = PipelineContext(context={"run_id": str(uuid.uuid4())[:8]})
await orchestrator.run(examples, context=context)