Skip to content

Base Classes

Abstract base classes that define the interfaces for pipeline components.

Extraction Pipeline

BaseFileLister

Bases: ABC

Abstract interface for file discovery.

Attributes:

Name Type Description
config BaseFileListerConfig

Component-specific configuration.

pipeline_config ExtractionPipelineConfig | None

Optional pipeline configuration when constructed with a pipeline config.

Initialize with a configuration object.

Parameters:

Name Type Description Default
config BaseFileListerConfig | ExtractionPipelineConfig

Configuration specific to the file lister implementation or full pipeline configuration.

required
Source code in src/document_extraction_tools/base/file_lister/base_file_lister.py
def __init__(
    self,
    config: BaseFileListerConfig | ExtractionPipelineConfig,
) -> None:
    """Initialize with a configuration object.

    Args:
        config (BaseFileListerConfig | ExtractionPipelineConfig):
            Configuration specific to the file lister implementation or full pipeline configuration.
    """
    if isinstance(config, ExtractionPipelineConfig):
        self.pipeline_config = config
        self.config = config.file_lister
    else:
        self.pipeline_config = None
        self.config = config

config instance-attribute

config: BaseFileListerConfig

pipeline_config instance-attribute

pipeline_config: ExtractionPipelineConfig | None

list_files abstractmethod

list_files(context: PipelineContext | None = None) -> list[PathIdentifier]

Scans the target source and returns a list of file identifiers.

This method should handle the logic to return a clean list of work items.

Parameters:

Name Type Description Default
context PipelineContext | None

Optional shared pipeline context.

None

Returns:

Type Description
list[PathIdentifier]

List[PathIdentifier]: A list of standardized objects containing the path and any necessary execution context.

Source code in src/document_extraction_tools/base/file_lister/base_file_lister.py
@abstractmethod
def list_files(
    self, context: PipelineContext | None = None
) -> list[PathIdentifier]:
    """Scans the target source and returns a list of file identifiers.

    This method should handle the logic to return a clean list of work items.

    Args:
        context (PipelineContext | None): Optional shared pipeline context.

    Returns:
        List[PathIdentifier]: A list of standardized objects containing the
                              path and any necessary execution context.
    """
    pass

BaseReader

Bases: ABC

Abstract interface for document ingestion.

Attributes:

Name Type Description
config BaseReaderConfig

Component-specific configuration.

pipeline_config ExtractionPipelineConfig | EvaluationPipelineConfig | None

Optional pipeline configuration when constructed with a pipeline config.

Initialize with a configuration object.

Parameters:

Name Type Description Default
config BaseReaderConfig | ExtractionPipelineConfig | EvaluationPipelineConfig

Configuration specific to the reader implementation or full pipeline configuration.

required
Source code in src/document_extraction_tools/base/reader/base_reader.py
def __init__(
    self,
    config: BaseReaderConfig | ExtractionPipelineConfig | EvaluationPipelineConfig,
) -> None:
    """Initialize with a configuration object.

    Args:
        config (BaseReaderConfig | ExtractionPipelineConfig | EvaluationPipelineConfig):
            Configuration specific to the reader implementation or full pipeline configuration.
    """
    if isinstance(config, (ExtractionPipelineConfig, EvaluationPipelineConfig)):
        self.pipeline_config = config
        self.config = config.reader
    else:
        self.pipeline_config = None
        self.config = config

config instance-attribute

config: BaseReaderConfig

pipeline_config instance-attribute

pipeline_config: ExtractionPipelineConfig | EvaluationPipelineConfig | None

read abstractmethod

read(path_identifier: PathIdentifier, context: PipelineContext | None = None) -> DocumentBytes

Reads a document from a specific source and returns its raw bytes.

Parameters:

Name Type Description Default
path_identifier PathIdentifier

The identifier for the file.

required
context PipelineContext | None

Optional shared pipeline context.

None

Returns:

Name Type Description
DocumentBytes DocumentBytes

A standardized container with raw bytes and source metadata.

Source code in src/document_extraction_tools/base/reader/base_reader.py
@abstractmethod
def read(
    self, path_identifier: PathIdentifier, context: PipelineContext | None = None
) -> DocumentBytes:
    """Reads a document from a specific source and returns its raw bytes.

    Args:
        path_identifier (PathIdentifier): The identifier for the file.
        context (PipelineContext | None): Optional shared pipeline context.

    Returns:
        DocumentBytes: A standardized container with raw bytes and source metadata.
    """
    pass

BaseConverter

Bases: ABC

Abstract interface for document transformation.

Attributes:

Name Type Description
config BaseConverterConfig

Component-specific configuration.

pipeline_config ExtractionPipelineConfig | EvaluationPipelineConfig | None

Optional pipeline configuration when constructed with a pipeline config.

Initialize with a configuration object.

Parameters:

Name Type Description Default
config BaseConverterConfig | ExtractionPipelineConfig | EvaluationPipelineConfig

Configuration specific to the converter implementation or full pipeline configuration.

required
Source code in src/document_extraction_tools/base/converter/base_converter.py
def __init__(
    self,
    config: (
        BaseConverterConfig | ExtractionPipelineConfig | EvaluationPipelineConfig
    ),
) -> None:
    """Initialize with a configuration object.

    Args:
        config (BaseConverterConfig | ExtractionPipelineConfig | EvaluationPipelineConfig):
            Configuration specific to the converter implementation or full pipeline configuration.
    """
    if isinstance(config, (ExtractionPipelineConfig, EvaluationPipelineConfig)):
        self.pipeline_config = config
        self.config = config.converter
    else:
        self.pipeline_config = None
        self.config = config

config instance-attribute

config: BaseConverterConfig

pipeline_config instance-attribute

pipeline_config: ExtractionPipelineConfig | EvaluationPipelineConfig | None

convert abstractmethod

convert(document_bytes: DocumentBytes, context: PipelineContext | None = None) -> Document

Transforms raw document bytes into a structured Document object.

This method should handle the parsing logic and map the metadata from the input bytes to the output document.

Parameters:

Name Type Description Default
document_bytes DocumentBytes

The standardized raw input containing file bytes and source metadata.

required
context PipelineContext | None

Optional shared pipeline context.

None

Returns:

Name Type Description
Document Document

The fully structured document model ready for extraction.

Source code in src/document_extraction_tools/base/converter/base_converter.py
@abstractmethod
def convert(
    self, document_bytes: DocumentBytes, context: PipelineContext | None = None
) -> Document:
    """Transforms raw document bytes into a structured Document object.

    This method should handle the parsing logic and map the metadata from the
    input bytes to the output document.

    Args:
        document_bytes (DocumentBytes): The standardized raw input containing
                                        file bytes and source metadata.
        context (PipelineContext | None): Optional shared pipeline context.

    Returns:
        Document: The fully structured document model ready for extraction.
    """
    pass

BaseExtractor

Bases: ABC

Abstract interface for data extraction.

Attributes:

Name Type Description
config BaseExtractorConfig

Component-specific configuration.

pipeline_config ExtractionPipelineConfig | EvaluationPipelineConfig | None

Optional pipeline configuration when constructed with a pipeline config.

Initialize with a configuration object.

Parameters:

Name Type Description Default
config BaseExtractorConfig | ExtractionPipelineConfig | EvaluationPipelineConfig

Configuration specific to the extractor implementation or full pipeline configuration.

required
Source code in src/document_extraction_tools/base/extractor/base_extractor.py
def __init__(
    self,
    config: (
        BaseExtractorConfig | ExtractionPipelineConfig | EvaluationPipelineConfig
    ),
) -> None:
    """Initialize with a configuration object.

    Args:
        config (BaseExtractorConfig | ExtractionPipelineConfig | EvaluationPipelineConfig):
            Configuration specific to the extractor implementation or full pipeline configuration.
    """
    if isinstance(config, (ExtractionPipelineConfig, EvaluationPipelineConfig)):
        self.pipeline_config = config
        self.config = config.extractor
    else:
        self.pipeline_config = None
        self.config = config

config instance-attribute

config: BaseExtractorConfig

pipeline_config instance-attribute

pipeline_config: ExtractionPipelineConfig | EvaluationPipelineConfig | None

extract abstractmethod async

extract(document: Document, schema: type[ExtractionSchema], context: PipelineContext | None = None) -> ExtractionResult[ExtractionSchema]

Extracts structured data from a Document to match the provided Schema.

This is an asynchronous operation to support I/O-bound tasks.

Parameters:

Name Type Description Default
document Document

The fully parsed document.

required
schema type[ExtractionSchema]

The Pydantic model class defining the target structure.

required
context PipelineContext | None

Optional shared pipeline context.

None

Returns:

Type Description
ExtractionResult[ExtractionSchema]

ExtractionResult[ExtractionSchema]: The extracted data with metadata.

Source code in src/document_extraction_tools/base/extractor/base_extractor.py
@abstractmethod
async def extract(
    self,
    document: Document,
    schema: type[ExtractionSchema],
    context: PipelineContext | None = None,
) -> ExtractionResult[ExtractionSchema]:
    """Extracts structured data from a Document to match the provided Schema.

    This is an asynchronous operation to support I/O-bound tasks.

    Args:
        document (Document): The fully parsed document.
        schema (type[ExtractionSchema]): The Pydantic model class defining the target structure.
        context (PipelineContext | None): Optional shared pipeline context.

    Returns:
        ExtractionResult[ExtractionSchema]: The extracted data with metadata.
    """
    pass

BaseExtractionExporter

Bases: ABC

Abstract interface for data persistence.

Attributes:

Name Type Description
config BaseExtractionExporterConfig

Component-specific configuration.

pipeline_config ExtractionPipelineConfig | None

Optional pipeline configuration when constructed with a pipeline config.

Initialize with a configuration object.

Parameters:

Name Type Description Default
config BaseExtractionExporterConfig | ExtractionPipelineConfig

Configuration specific to the exporter implementation or full pipeline configuration.

required
Source code in src/document_extraction_tools/base/exporter/base_extraction_exporter.py
def __init__(
    self,
    config: BaseExtractionExporterConfig | ExtractionPipelineConfig,
) -> None:
    """Initialize with a configuration object.

    Args:
        config (BaseExtractionExporterConfig | ExtractionPipelineConfig):
            Configuration specific to the exporter implementation or full pipeline configuration.
    """
    if isinstance(config, ExtractionPipelineConfig):
        self.pipeline_config = config
        self.config = config.extraction_exporter
    else:
        self.pipeline_config = None
        self.config = config

config instance-attribute

config: BaseExtractionExporterConfig

pipeline_config instance-attribute

pipeline_config: ExtractionPipelineConfig | None

export abstractmethod async

export(document: Document, data: ExtractionResult[ExtractionSchema], context: PipelineContext | None = None) -> None

Persists extracted data to the configured destination.

This is an asynchronous operation to support non-blocking I/O writes.

Parameters:

Name Type Description Default
document Document

The source document for this extraction.

required
data ExtractionResult[ExtractionSchema]

The extracted data with metadata.

required
context PipelineContext | None

Optional shared pipeline context.

None

Returns:

Name Type Description
None None

The method should raise an exception if the export fails.

Source code in src/document_extraction_tools/base/exporter/base_extraction_exporter.py
@abstractmethod
async def export(
    self,
    document: Document,
    data: ExtractionResult[ExtractionSchema],
    context: PipelineContext | None = None,
) -> None:
    """Persists extracted data to the configured destination.

    This is an asynchronous operation to support non-blocking I/O writes.

    Args:
        document (Document): The source document for this extraction.
        data (ExtractionResult[ExtractionSchema]): The extracted data with metadata.
        context (PipelineContext | None): Optional shared pipeline context.

    Returns:
        None: The method should raise an exception if the export fails.
    """
    pass

Evaluation Pipeline

BaseTestDataLoader

Bases: ABC, Generic[ExtractionSchema]

Abstract interface for loading evaluation test data.

Attributes:

Name Type Description
config BaseTestDataLoaderConfig

Component-specific configuration.

pipeline_config EvaluationPipelineConfig | None

Optional pipeline configuration when constructed with a pipeline config.

Initialize with a configuration object.

Parameters:

Name Type Description Default
config BaseTestDataLoaderConfig | EvaluationPipelineConfig

Configuration specific to the test data loader implementation or full pipeline configuration.

required
Source code in src/document_extraction_tools/base/test_data_loader/base_test_data_loader.py
def __init__(
    self,
    config: BaseTestDataLoaderConfig | EvaluationPipelineConfig,
) -> None:
    """Initialize with a configuration object.

    Args:
        config (BaseTestDataLoaderConfig | EvaluationPipelineConfig):
            Configuration specific to the test data loader implementation or full pipeline configuration.
    """
    if isinstance(config, EvaluationPipelineConfig):
        self.pipeline_config = config
        self.config = config.test_data_loader
    else:
        self.pipeline_config = None
        self.config = config

config instance-attribute

config: BaseTestDataLoaderConfig

pipeline_config instance-attribute

pipeline_config: EvaluationPipelineConfig | None

load_test_data abstractmethod

load_test_data(path_identifier: PathIdentifier, context: PipelineContext | None = None) -> list[EvaluationExample[ExtractionSchema]]

Load test examples for evaluation.

This method should retrieve and return a list of EvaluationExample instances based on the provided path identifier.

Parameters:

Name Type Description Default
path_identifier PathIdentifier

The source location for loading evaluation examples.

required
context PipelineContext | None

Optional shared pipeline context.

None

Returns:

Type Description
list[EvaluationExample[ExtractionSchema]]

list[EvaluationExample[ExtractionSchema]]: A list of evaluation examples for evaluation.

Source code in src/document_extraction_tools/base/test_data_loader/base_test_data_loader.py
@abstractmethod
def load_test_data(
    self,
    path_identifier: PathIdentifier,
    context: PipelineContext | None = None,
) -> list[EvaluationExample[ExtractionSchema]]:
    """Load test examples for evaluation.

    This method should retrieve and return a list of EvaluationExample instances
    based on the provided path identifier.

    Args:
        path_identifier (PathIdentifier): The source location for loading evaluation examples.
        context (PipelineContext | None): Optional shared pipeline context.

    Returns:
        list[EvaluationExample[ExtractionSchema]]: A list of evaluation examples for evaluation.
    """
    pass

BaseEvaluator

Bases: ABC, Generic[ExtractionSchema]

Abstract interface for evaluation metrics.

Attributes:

Name Type Description
config BaseEvaluatorConfig

Component-specific configuration.

pipeline_config EvaluationPipelineConfig | None

Optional pipeline configuration when constructed with a pipeline config.

Initialize with a configuration object.

Parameters:

Name Type Description Default
config BaseEvaluatorConfig | EvaluationPipelineConfig

Configuration specific to the evaluator implementation or full pipeline configuration.

required
Source code in src/document_extraction_tools/base/evaluator/base_evaluator.py
def __init__(self, config: BaseEvaluatorConfig | EvaluationPipelineConfig) -> None:
    """Initialize with a configuration object.

    Args:
        config (BaseEvaluatorConfig | EvaluationPipelineConfig):
            Configuration specific to the evaluator implementation or full pipeline configuration.
    """
    if isinstance(config, EvaluationPipelineConfig):
        self.pipeline_config = config
        self.config = self._resolve_config(config)
    else:
        self.pipeline_config = None
        self.config = config

config instance-attribute

config: BaseEvaluatorConfig

pipeline_config instance-attribute

pipeline_config: EvaluationPipelineConfig | None

_resolve_config

_resolve_config(pipeline_config: EvaluationPipelineConfig) -> BaseEvaluatorConfig

Select the evaluator-specific config from the pipeline config.

Parameters:

Name Type Description Default
pipeline_config EvaluationPipelineConfig

Pipeline configuration with evaluator configs.

required

Returns:

Name Type Description
BaseEvaluatorConfig BaseEvaluatorConfig

The config matching this evaluator.

Source code in src/document_extraction_tools/base/evaluator/base_evaluator.py
def _resolve_config(
    self, pipeline_config: EvaluationPipelineConfig
) -> BaseEvaluatorConfig:
    """Select the evaluator-specific config from the pipeline config.

    Args:
        pipeline_config (EvaluationPipelineConfig): Pipeline configuration with evaluator configs.

    Returns:
        BaseEvaluatorConfig: The config matching this evaluator.
    """
    evaluator_key = self.__class__.__name__
    config_lookup = {
        item.__class__.__name__.replace("Config", ""): item
        for item in pipeline_config.evaluators
    }
    evaluator_config = config_lookup.get(evaluator_key)
    if evaluator_config is None:
        raise ValueError(f"No configuration found for evaluator '{evaluator_key}'.")
    return evaluator_config

evaluate abstractmethod

evaluate(true: ExtractionResult[ExtractionSchema], pred: ExtractionResult[ExtractionSchema], context: PipelineContext | None = None) -> EvaluationResult

Compute a metric for a single document.

Parameters:

Name Type Description Default
true ExtractionResult[ExtractionSchema]

Ground-truth data with metadata.

required
pred ExtractionResult[ExtractionSchema]

Predicted data with metadata.

required
context PipelineContext | None

Optional shared pipeline context.

None

Returns:

Name Type Description
EvaluationResult EvaluationResult

The metric result for this document.

Source code in src/document_extraction_tools/base/evaluator/base_evaluator.py
@abstractmethod
def evaluate(
    self,
    true: ExtractionResult[ExtractionSchema],
    pred: ExtractionResult[ExtractionSchema],
    context: PipelineContext | None = None,
) -> EvaluationResult:
    """Compute a metric for a single document.

    Args:
        true (ExtractionResult[ExtractionSchema]): Ground-truth data with metadata.
        pred (ExtractionResult[ExtractionSchema]): Predicted data with metadata.
        context (PipelineContext | None): Optional shared pipeline context.

    Returns:
        EvaluationResult: The metric result for this document.
    """
    pass

BaseEvaluationExporter

Bases: ABC

Abstract interface for exporting evaluation results.

Attributes:

Name Type Description
config BaseEvaluationExporterConfig

Component-specific configuration.

pipeline_config EvaluationPipelineConfig | None

Optional pipeline configuration when constructed with a pipeline config.

Initialize with a configuration object.

Parameters:

Name Type Description Default
config BaseEvaluationExporterConfig | EvaluationPipelineConfig

Configuration specific to the evaluation exporter implementation or full pipeline configuration.

required
Source code in src/document_extraction_tools/base/exporter/base_evaluation_exporter.py
def __init__(
    self,
    config: BaseEvaluationExporterConfig | EvaluationPipelineConfig,
) -> None:
    """Initialize with a configuration object.

    Args:
        config (BaseEvaluationExporterConfig | EvaluationPipelineConfig):
            Configuration specific to the evaluation exporter implementation or full pipeline configuration.
    """
    if isinstance(config, EvaluationPipelineConfig):
        self.pipeline_config = config
        self.config = config.evaluation_exporter
    else:
        self.pipeline_config = None
        self.config = config

config instance-attribute

config: BaseEvaluationExporterConfig

pipeline_config instance-attribute

pipeline_config: EvaluationPipelineConfig | None

export abstractmethod async

export(results: list[tuple[Document, list[EvaluationResult]]], context: PipelineContext | None = None) -> None

Persist evaluation results to a target destination.

This is an asynchronous operation to support non-blocking I/O writes.

Parameters:

Name Type Description Default
results list[tuple[Document, list[EvaluationResult]]]

A list of tuples containing documents and their associated evaluation results.

required
context PipelineContext | None

Optional shared pipeline context.

None

Returns:

Name Type Description
None None

The method should raise an exception if the export fails.

Source code in src/document_extraction_tools/base/exporter/base_evaluation_exporter.py
@abstractmethod
async def export(
    self,
    results: list[tuple[Document, list[EvaluationResult]]],
    context: PipelineContext | None = None,
) -> None:
    """Persist evaluation results to a target destination.

    This is an asynchronous operation to support non-blocking I/O writes.

    Args:
        results (list[tuple[Document, list[EvaluationResult]]]):
            A list of tuples containing documents and their associated evaluation results.
        context (PipelineContext | None): Optional shared pipeline context.

    Returns:
        None: The method should raise an exception if the export fails.
    """
    pass

Orchestrators

ExtractionOrchestrator

Bases: Generic[ExtractionSchema]

Coordinates the document extraction pipeline.

This class manages the lifecycle of document processing, ensuring that CPU-bound tasks (Reading/Converting) are offloaded to a thread pool while I/O-bound tasks (Extracting/Exporting) run concurrently in the async event loop.

Attributes:

Name Type Description
config ExtractionOrchestratorConfig

Orchestrator configuration.

file_lister BaseFileLister

File lister component instance.

reader BaseReader

Reader component instance.

converter BaseConverter

Converter component instance.

extractor BaseExtractor

Extractor component instance.

extraction_exporter BaseExtractionExporter

Extraction exporter component instance.

schema type[ExtractionSchema]

Target extraction schema.

Initialize the orchestrator with pipeline components.

Parameters:

Name Type Description Default
config ExtractionOrchestratorConfig

Configuration for the orchestrator.

required
file_lister BaseFileLister

Component to list input files.

required
reader BaseReader

Component to read raw file bytes.

required
converter BaseConverter

Component to transform bytes into Document objects.

required
extractor BaseExtractor

Component to extract structured data via LLM.

required
extraction_exporter BaseExtractionExporter

Component to persist the extraction results.

required
schema type[ExtractionSchema]

The target Pydantic model definition for extraction.

required
Source code in src/document_extraction_tools/runners/extraction/extraction_orchestrator.py
def __init__(
    self,
    config: ExtractionOrchestratorConfig,
    file_lister: BaseFileLister,
    reader: BaseReader,
    converter: BaseConverter,
    extractor: BaseExtractor,
    extraction_exporter: BaseExtractionExporter,
    schema: type[ExtractionSchema],
) -> None:
    """Initialize the orchestrator with pipeline components.

    Args:
        config (ExtractionOrchestratorConfig): Configuration for the orchestrator.
        file_lister (BaseFileLister): Component to list input files.
        reader (BaseReader): Component to read raw file bytes.
        converter (BaseConverter): Component to transform bytes into Document objects.
        extractor (BaseExtractor): Component to extract structured data via LLM.
        extraction_exporter (BaseExtractionExporter): Component to persist the extraction results.
        schema (type[ExtractionSchema]): The target Pydantic model definition for extraction.
    """
    self.config = config
    self.file_lister = file_lister
    self.reader = reader
    self.converter = converter
    self.extractor = extractor
    self.extraction_exporter = extraction_exporter
    self.schema = schema

config instance-attribute

config = config

file_lister instance-attribute

file_lister = file_lister

reader instance-attribute

reader = reader

converter instance-attribute

converter = converter

extractor instance-attribute

extractor = extractor

extraction_exporter instance-attribute

extraction_exporter = extraction_exporter

schema instance-attribute

schema = schema

from_config classmethod

from_config(config: ExtractionPipelineConfig, schema: type[ExtractionSchema], file_lister_cls: type[BaseFileLister], reader_cls: type[BaseReader], converter_cls: type[BaseConverter], extractor_cls: type[BaseExtractor], extraction_exporter_cls: type[BaseExtractionExporter]) -> ExtractionOrchestrator[ExtractionSchema]

Factory method to create an Orchestrator from a PipelineConfig.

Parameters:

Name Type Description Default
config ExtractionPipelineConfig

The full pipeline configuration.

required
schema type[ExtractionSchema]

The target Pydantic model definition for extraction.

required
file_lister_cls type[BaseFileLister]

The concrete FileLister class to instantiate.

required
reader_cls type[BaseReader]

The concrete Reader class to instantiate.

required
converter_cls type[BaseConverter]

The concrete Converter class to instantiate.

required
extractor_cls type[BaseExtractor]

The concrete Extractor class to instantiate.

required
extraction_exporter_cls type[BaseExtractionExporter]

The concrete ExtractionExporter class to instantiate.

required

Returns:

Type Description
ExtractionOrchestrator[ExtractionSchema]

ExtractionOrchestrator[ExtractionSchema]: The configured orchestrator instance.

Source code in src/document_extraction_tools/runners/extraction/extraction_orchestrator.py
@classmethod
def from_config(
    cls,
    config: ExtractionPipelineConfig,
    schema: type[ExtractionSchema],
    file_lister_cls: type[BaseFileLister],
    reader_cls: type[BaseReader],
    converter_cls: type[BaseConverter],
    extractor_cls: type[BaseExtractor],
    extraction_exporter_cls: type[BaseExtractionExporter],
) -> "ExtractionOrchestrator[ExtractionSchema]":
    """Factory method to create an Orchestrator from a PipelineConfig.

    Args:
        config (ExtractionPipelineConfig): The full pipeline configuration.
        schema (type[ExtractionSchema]): The target Pydantic model definition for extraction.
        file_lister_cls (type[BaseFileLister]): The concrete FileLister class to instantiate.
        reader_cls (type[BaseReader]): The concrete Reader class to instantiate.
        converter_cls (type[BaseConverter]): The concrete Converter class to instantiate.
        extractor_cls (type[BaseExtractor]): The concrete Extractor class to instantiate.
        extraction_exporter_cls (type[BaseExtractionExporter]): The concrete
            ExtractionExporter class to instantiate.

    Returns:
        ExtractionOrchestrator[ExtractionSchema]: The configured orchestrator instance.
    """
    file_lister_instance = file_lister_cls(config)
    reader_instance = reader_cls(config)
    converter_instance = converter_cls(config)
    extractor_instance = extractor_cls(config)
    extraction_exporter_instance = extraction_exporter_cls(config)

    return cls(
        config=config.extraction_orchestrator,
        file_lister=file_lister_instance,
        reader=reader_instance,
        converter=converter_instance,
        extractor=extractor_instance,
        extraction_exporter=extraction_exporter_instance,
        schema=schema,
    )

_ingest staticmethod

_ingest(path_identifier: PathIdentifier, reader: BaseReader, converter: BaseConverter, context: PipelineContext) -> Document

Performs the CPU-bound ingestion phase.

Parameters:

Name Type Description Default
path_identifier PathIdentifier

The path identifier to the source file.

required
reader BaseReader

The reader instance to use.

required
converter BaseConverter

The converter instance to use.

required
context PipelineContext

Shared pipeline context.

required

Returns:

Name Type Description
Document Document

The fully parsed document object.

Source code in src/document_extraction_tools/runners/extraction/extraction_orchestrator.py
@staticmethod
def _ingest(
    path_identifier: PathIdentifier,
    reader: BaseReader,
    converter: BaseConverter,
    context: PipelineContext,
) -> Document:
    """Performs the CPU-bound ingestion phase.

    Args:
        path_identifier (PathIdentifier): The path identifier to the source file.
        reader (BaseReader): The reader instance to use.
        converter (BaseConverter): The converter instance to use.
        context (PipelineContext): Shared pipeline context.

    Returns:
        Document: The fully parsed document object.
    """
    doc_bytes: DocumentBytes = reader.read(path_identifier, context)
    return converter.convert(doc_bytes, context)

_run_in_executor_with_context async staticmethod

_run_in_executor_with_context(loop: AbstractEventLoop, pool: ThreadPoolExecutor, func: Callable[..., T], *args: object) -> T

Run a function in an executor while preserving contextvars.

Parameters:

Name Type Description Default
loop AbstractEventLoop

The event loop to use.

required
pool ThreadPoolExecutor

The thread pool to run the function in.

required
func Callable[..., T]

The function to execute.

required
*args object

Arguments to pass to the function.

()

Returns:

Type Description
T

The result of the function execution.

Source code in src/document_extraction_tools/runners/extraction/extraction_orchestrator.py
@staticmethod
async def _run_in_executor_with_context(
    loop: asyncio.AbstractEventLoop,
    pool: ThreadPoolExecutor,
    func: Callable[..., T],
    *args: object,
) -> T:
    """Run a function in an executor while preserving contextvars.

    Args:
        loop (asyncio.AbstractEventLoop): The event loop to use.
        pool (ThreadPoolExecutor): The thread pool to run the function in.
        func (Callable[..., T]): The function to execute.
        *args (object): Arguments to pass to the function.

    Returns:
        The result of the function execution.
    """
    ctx = contextvars.copy_context()
    return await loop.run_in_executor(pool, ctx.run, func, *args)

process_document async

process_document(path_identifier: PathIdentifier, pool: ThreadPoolExecutor, semaphore: Semaphore, context: PipelineContext) -> None

Runs the full processing lifecycle for a single document.

  1. Ingest (Read+Convert) -> Offloaded to ThreadPool (CPU).
  2. Extract -> Async Wait (I/O).
  3. Export -> Async Wait (I/O).

Parameters:

Name Type Description Default
path_identifier PathIdentifier

The input file to process.

required
pool ThreadPoolExecutor

The shared pool for CPU tasks.

required
semaphore Semaphore

The shared limiter for I/O tasks.

required
context PipelineContext

Shared pipeline context.

required
Source code in src/document_extraction_tools/runners/extraction/extraction_orchestrator.py
async def process_document(
    self,
    path_identifier: PathIdentifier,
    pool: ThreadPoolExecutor,
    semaphore: asyncio.Semaphore,
    context: PipelineContext,
) -> None:
    """Runs the full processing lifecycle for a single document.

    1. Ingest (Read+Convert) -> Offloaded to ThreadPool (CPU).
    2. Extract -> Async Wait (I/O).
    3. Export -> Async Wait (I/O).

    Args:
        path_identifier (PathIdentifier): The input file to process.
        pool (ThreadPoolExecutor): The shared pool for CPU tasks.
        semaphore (asyncio.Semaphore): The shared limiter for I/O tasks.
        context (PipelineContext): Shared pipeline context.
    """
    loop = asyncio.get_running_loop()

    document: Document = await self._run_in_executor_with_context(
        loop,
        pool,
        self._ingest,
        path_identifier,
        self.reader,
        self.converter,
        context,
    )

    async with semaphore:
        extracted_data: ExtractionResult[ExtractionSchema] = (
            await self.extractor.extract(document, self.schema, context)
        )
        await self.extraction_exporter.export(document, extracted_data, context)

        logger.info("Completed extraction for %s", document.id)

run async

run(file_paths_to_process: list[PathIdentifier], context: PipelineContext | None = None) -> None

Main entry point. Orchestrates the execution of the provided file list.

Parameters:

Name Type Description Default
file_paths_to_process list[PathIdentifier]

The list of file paths to process.

required
context PipelineContext | None

Optional shared pipeline context.

None
Source code in src/document_extraction_tools/runners/extraction/extraction_orchestrator.py
async def run(
    self,
    file_paths_to_process: list[PathIdentifier],
    context: PipelineContext | None = None,
) -> None:
    """Main entry point. Orchestrates the execution of the provided file list.

    Args:
        file_paths_to_process (list[PathIdentifier]): The list of file paths to process.
        context (PipelineContext | None): Optional shared pipeline context.
    """
    context = context or PipelineContext()
    semaphore = asyncio.Semaphore(self.config.max_concurrency)

    with ThreadPoolExecutor(max_workers=self.config.max_workers) as pool:

        tasks = [
            self.process_document(path_identifier, pool, semaphore, context)
            for path_identifier in file_paths_to_process
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        for path_identifier, result in zip(
            file_paths_to_process, results, strict=True
        ):
            if isinstance(result, BaseException):
                logger.error(
                    "Extraction pipeline failed for %s",
                    path_identifier,
                    exc_info=result,
                )

EvaluationOrchestrator

Bases: Generic[ExtractionSchema]

Coordinates evaluation across multiple evaluators.

Attributes:

Name Type Description
config EvaluationOrchestratorConfig

Orchestrator configuration.

test_data_loader BaseTestDataLoader[ExtractionSchema]

Test data loader instance.

reader BaseReader

Reader component instance.

converter BaseConverter

Converter component instance.

extractor BaseExtractor

Extractor component instance.

evaluators list[BaseEvaluator[ExtractionSchema]]

Evaluator instances.

evaluation_exporter BaseEvaluationExporter

Evaluation exporter instance.

schema type[ExtractionSchema]

Target extraction schema.

Initialize the evaluation orchestrator with pipeline components.

Parameters:

Name Type Description Default
config EvaluationOrchestratorConfig

Configuration for evaluation orchestration.

required
test_data_loader BaseTestDataLoader[ExtractionSchema]

Component to load evaluation examples.

required
reader BaseReader

Component to read raw file bytes.

required
converter BaseConverter

Component to transform bytes into Document objects.

required
extractor BaseExtractor

Component to generate predictions.

required
evaluators Iterable[BaseEvaluator[ExtractionSchema]]

Metrics to apply to each example.

required
evaluation_exporter BaseEvaluationExporter

Component to persist evaluation results.

required
schema type[ExtractionSchema]

The target Pydantic model definition for extraction.

required
Source code in src/document_extraction_tools/runners/evaluation/evaluation_orchestrator.py
def __init__(
    self,
    config: EvaluationOrchestratorConfig,
    test_data_loader: BaseTestDataLoader[ExtractionSchema],
    reader: BaseReader,
    converter: BaseConverter,
    extractor: BaseExtractor,
    evaluators: Iterable[BaseEvaluator[ExtractionSchema]],
    evaluation_exporter: BaseEvaluationExporter,
    schema: type[ExtractionSchema],
) -> None:
    """Initialize the evaluation orchestrator with pipeline components.

    Args:
        config (EvaluationOrchestratorConfig): Configuration for evaluation orchestration.
        test_data_loader (BaseTestDataLoader[ExtractionSchema]): Component to load evaluation examples.
        reader (BaseReader): Component to read raw file bytes.
        converter (BaseConverter): Component to transform bytes into Document objects.
        extractor (BaseExtractor): Component to generate predictions.
        evaluators (Iterable[BaseEvaluator[ExtractionSchema]]): Metrics to apply to each example.
        evaluation_exporter (BaseEvaluationExporter): Component to persist evaluation results.
        schema (type[ExtractionSchema]): The target Pydantic model definition for extraction.
    """
    self.config = config
    self.test_data_loader = test_data_loader
    self.reader = reader
    self.converter = converter
    self.extractor = extractor
    self.evaluators = list(evaluators)
    self.evaluation_exporter = evaluation_exporter
    self.schema = schema

config instance-attribute

config = config

test_data_loader instance-attribute

test_data_loader = test_data_loader

reader instance-attribute

reader = reader

converter instance-attribute

converter = converter

extractor instance-attribute

extractor = extractor

evaluators instance-attribute

evaluators = list(evaluators)

evaluation_exporter instance-attribute

evaluation_exporter = evaluation_exporter

schema instance-attribute

schema = schema

from_config classmethod

from_config(config: EvaluationPipelineConfig, schema: type[ExtractionSchema], reader_cls: type[BaseReader], converter_cls: type[BaseConverter], extractor_cls: type[BaseExtractor], test_data_loader_cls: type[BaseTestDataLoader[ExtractionSchema]], evaluator_classes: list[type[BaseEvaluator[ExtractionSchema]]], evaluation_exporter_cls: type[BaseEvaluationExporter]) -> EvaluationOrchestrator[ExtractionSchema]

Factory method to create an EvaluationOrchestrator from config.

Parameters:

Name Type Description Default
config EvaluationPipelineConfig

The full evaluation pipeline configuration.

required
schema type[ExtractionSchema]

The target Pydantic model definition for extraction.

required
reader_cls type[BaseReader]

The concrete Reader class to instantiate.

required
converter_cls type[BaseConverter]

The concrete Converter class to instantiate.

required
extractor_cls type[BaseExtractor]

The concrete Extractor class to instantiate.

required
test_data_loader_cls type[BaseTestDataLoader[ExtractionSchema]]

The concrete TestDataLoader class to instantiate.

required
evaluator_classes list[type[BaseEvaluator[ExtractionSchema]]]

The evaluator classes available for instantiation.

required
evaluation_exporter_cls type[BaseEvaluationExporter]

The concrete EvaluationExporter class to instantiate.

required

Returns:

Type Description
EvaluationOrchestrator[ExtractionSchema]

EvaluationOrchestrator[ExtractionSchema]: The configured orchestrator.

Source code in src/document_extraction_tools/runners/evaluation/evaluation_orchestrator.py
@classmethod
def from_config(
    cls,
    config: EvaluationPipelineConfig,
    schema: type[ExtractionSchema],
    reader_cls: type[BaseReader],
    converter_cls: type[BaseConverter],
    extractor_cls: type[BaseExtractor],
    test_data_loader_cls: type[BaseTestDataLoader[ExtractionSchema]],
    evaluator_classes: list[type[BaseEvaluator[ExtractionSchema]]],
    evaluation_exporter_cls: type[BaseEvaluationExporter],
) -> "EvaluationOrchestrator[ExtractionSchema]":
    """Factory method to create an EvaluationOrchestrator from config.

    Args:
        config (EvaluationPipelineConfig): The full evaluation pipeline configuration.
        schema (type[ExtractionSchema]): The target Pydantic model definition for extraction.
        reader_cls (type[BaseReader]): The concrete Reader class to instantiate.
        converter_cls (type[BaseConverter]): The concrete Converter class to instantiate.
        extractor_cls (type[BaseExtractor]): The concrete Extractor class to instantiate.
        test_data_loader_cls (type[BaseTestDataLoader[ExtractionSchema]]): The
            concrete TestDataLoader class to instantiate.
        evaluator_classes (list[type[BaseEvaluator[ExtractionSchema]]]): The
            evaluator classes available for instantiation.
        evaluation_exporter_cls (type[BaseEvaluationExporter]): The concrete
            EvaluationExporter class to instantiate.

    Returns:
        EvaluationOrchestrator[ExtractionSchema]: The configured orchestrator.
    """
    reader_instance = reader_cls(config)
    converter_instance = converter_cls(config)
    extractor_instance = extractor_cls(config)
    test_data_loader_instance = test_data_loader_cls(config)
    evaluation_exporter_instance = evaluation_exporter_cls(config)

    evaluators = [evaluator_cls(config) for evaluator_cls in evaluator_classes]
    if not evaluators:
        raise ValueError("No evaluators provided.")

    return cls(
        config=config.evaluation_orchestrator,
        test_data_loader=test_data_loader_instance,
        reader=reader_instance,
        converter=converter_instance,
        extractor=extractor_instance,
        evaluators=evaluators,
        evaluation_exporter=evaluation_exporter_instance,
        schema=schema,
    )

_ingest staticmethod

_ingest(path_identifier: PathIdentifier, reader: BaseReader, converter: BaseConverter, context: PipelineContext) -> Document

Performs the CPU-bound ingestion phase.

Parameters:

Name Type Description Default
path_identifier PathIdentifier

The path identifier to the source file.

required
reader BaseReader

The reader instance to use.

required
converter BaseConverter

The converter instance to use.

required
context PipelineContext

Shared pipeline context.

required

Returns:

Name Type Description
Document Document

The fully parsed document object.

Source code in src/document_extraction_tools/runners/evaluation/evaluation_orchestrator.py
@staticmethod
def _ingest(
    path_identifier: PathIdentifier,
    reader: BaseReader,
    converter: BaseConverter,
    context: PipelineContext,
) -> Document:
    """Performs the CPU-bound ingestion phase.

    Args:
        path_identifier (PathIdentifier): The path identifier to the source file.
        reader (BaseReader): The reader instance to use.
        converter (BaseConverter): The converter instance to use.
        context (PipelineContext): Shared pipeline context.

    Returns:
        Document: The fully parsed document object.
    """
    doc_bytes: DocumentBytes = reader.read(path_identifier, context)
    return converter.convert(doc_bytes, context)

_run_in_executor_with_context async staticmethod

_run_in_executor_with_context(loop: AbstractEventLoop, pool: ThreadPoolExecutor, func: Callable[..., T], *args: object) -> T

Run a function in an executor while preserving contextvars.

Parameters:

Name Type Description Default
loop AbstractEventLoop

The event loop to use.

required
pool ThreadPoolExecutor

The thread pool to run the function in.

required
func Callable[..., T]

The function to execute.

required
*args object

Arguments to pass to the function.

()

Returns:

Type Description
T

The result of the function execution.

Source code in src/document_extraction_tools/runners/evaluation/evaluation_orchestrator.py
@staticmethod
async def _run_in_executor_with_context(
    loop: asyncio.AbstractEventLoop,
    pool: ThreadPoolExecutor,
    func: Callable[..., T],
    *args: object,
) -> T:
    """Run a function in an executor while preserving contextvars.

    Args:
        loop (asyncio.AbstractEventLoop): The event loop to use.
        pool (ThreadPoolExecutor): The thread pool to run the function in.
        func (Callable[..., T]): The function to execute.
        *args (object): Arguments to pass to the function.

    Returns:
        The result of the function execution.
    """
    ctx = contextvars.copy_context()
    return await loop.run_in_executor(pool, ctx.run, func, *args)

process_example async

process_example(example: EvaluationExample[ExtractionSchema], pool: ThreadPoolExecutor, semaphore: Semaphore, context: PipelineContext) -> tuple[Document, list[EvaluationResult]]

Runs extraction, evaluation, and export for a single example.

Parameters:

Name Type Description Default
example EvaluationExample[ExtractionSchema]

The evaluation example to process.

required
pool ThreadPoolExecutor

The thread pool for CPU-bound tasks.

required
semaphore Semaphore

Semaphore to limit concurrency.

required
context PipelineContext

Shared pipeline context.

required

Returns:

Type Description
tuple[Document, list[EvaluationResult]]

tuple[Document, list[EvaluationResult]]: The document and its evaluation results.

Source code in src/document_extraction_tools/runners/evaluation/evaluation_orchestrator.py
async def process_example(
    self,
    example: EvaluationExample[ExtractionSchema],
    pool: ThreadPoolExecutor,
    semaphore: asyncio.Semaphore,
    context: PipelineContext,
) -> tuple[Document, list[EvaluationResult]]:
    """Runs extraction, evaluation, and export for a single example.

    Args:
        example (EvaluationExample[ExtractionSchema]): The evaluation example to process.
        pool (ThreadPoolExecutor): The thread pool for CPU-bound tasks.
        semaphore (asyncio.Semaphore): Semaphore to limit concurrency.
        context (PipelineContext): Shared pipeline context.

    Returns:
        tuple[Document, list[EvaluationResult]]: The document and its evaluation results.
    """
    loop = asyncio.get_running_loop()

    document: Document = await self._run_in_executor_with_context(
        loop,
        pool,
        self._ingest,
        example.path_identifier,
        self.reader,
        self.converter,
        context,
    )

    async with semaphore:
        pred: ExtractionResult[ExtractionSchema] = await self.extractor.extract(
            document, self.schema, context
        )

        evaluation_tasks = [
            self._run_in_executor_with_context(
                loop, pool, evaluator.evaluate, example.true, pred, context
            )
            for evaluator in self.evaluators
        ]
        results: list[EvaluationResult] = list(
            await asyncio.gather(*evaluation_tasks)
        )

        logger.info("Completed evaluation for %s", document.id)
        return document, results

run async

run(examples: list[EvaluationExample[ExtractionSchema]], context: PipelineContext | None = None) -> None

Run all evaluators and export results for the provided examples.

Parameters:

Name Type Description Default
examples list[EvaluationExample[ExtractionSchema]]

The evaluation examples to evaluate.

required
context PipelineContext | None

Optional shared pipeline context.

None
Source code in src/document_extraction_tools/runners/evaluation/evaluation_orchestrator.py
async def run(
    self,
    examples: list[EvaluationExample[ExtractionSchema]],
    context: PipelineContext | None = None,
) -> None:
    """Run all evaluators and export results for the provided examples.

    Args:
        examples (list[EvaluationExample[ExtractionSchema]]): The evaluation examples to evaluate.
        context (PipelineContext | None): Optional shared pipeline context.
    """
    context = context or PipelineContext()
    semaphore = asyncio.Semaphore(self.config.max_concurrency)

    with ThreadPoolExecutor(max_workers=self.config.max_workers) as pool:
        tasks = [
            self.process_example(example, pool, semaphore, context)
            for example in examples
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        valid_results: list[tuple[Document, list[EvaluationResult]]] = []

        for example, result in zip(examples, results, strict=True):
            if isinstance(result, BaseException):
                logger.error(
                    "Evaluation pipeline failed for %s",
                    example.path_identifier,
                    exc_info=result,
                )
            else:
                valid_results.append(result)

        if valid_results:
            await self.evaluation_exporter.export(valid_results, context)

Import Shortcuts

All base classes can be imported from the top-level base module:

from document_extraction_tools.base import (
    BaseFileLister,
    BaseReader,
    BaseConverter,
    BaseExtractor,
    BaseExtractionExporter,
    BaseTestDataLoader,
    BaseEvaluator,
    BaseEvaluationExporter,
)