Skip to content

Extraction Pipeline

The extraction pipeline transforms raw documents into structured data.

Pipeline Flow

flowchart LR
    subgraph Input
        FL[FileLister]
    end

    subgraph ThreadPool["Thread Pool (CPU-bound)"]
        R[Reader]
        C[Converter]
    end

    subgraph AsyncIO["Async Event Loop (I/O-bound)"]
        E[Extractor]
        EX[Exporter]
    end

    FL -->|"list[PathIdentifier]"| R
    R -->|DocumentBytes| C
    C -->|Document| E
    E -->|ExtractionResult| EX
    EX -->|Persisted| Output[(Storage)]

Per-Document Processing

For each document, the orchestrator runs:

sequenceDiagram
    participant O as Orchestrator
    participant TP as Thread Pool
    participant R as Reader
    participant C as Converter
    participant E as Extractor
    participant EX as Exporter

    Note over O: PipelineContext passed to all components

    O->>TP: Submit ingest task
    TP->>R: read(path_identifier, context)
    R-->>TP: DocumentBytes
    TP->>C: convert(document_bytes, context)
    C-->>TP: Document
    TP-->>O: Document

    O->>E: extract(document, schema, context)
    Note over E: Async I/O (e.g., LLM API call)
    E-->>O: ExtractionResult

    O->>EX: export(document, data, context)
    Note over EX: Async I/O (e.g., DB write)
    EX-->>O: Done

Components

1. FileLister

Discovers input files and returns a list of PathIdentifier objects. Called before the orchestrator runs.

from document_extraction_tools.base import BaseFileLister
from document_extraction_tools.types import PathIdentifier

class MyFileLister(BaseFileLister):
    def list_files(self) -> list[PathIdentifier]:
        # List files from local directory, cloud storage, database, etc.
        return [PathIdentifier(path=str(p)) for p in Path("data").glob("*.pdf")]

2. Reader

Reads raw bytes from the source and returns DocumentBytes. Runs in the thread pool.

from document_extraction_tools.base import BaseReader
from document_extraction_tools.types import DocumentBytes, PathIdentifier, PipelineContext

class MyReader(BaseReader):
    def read(
        self, path_identifier: PathIdentifier, context: PipelineContext | None = None
    ) -> DocumentBytes:
        with open(path_identifier.path, "rb") as f:
            return DocumentBytes(
                file_bytes=f.read(),
                path_identifier=path_identifier,
                metadata={"mime_type": "application/pdf"},
            )

3. Converter

Converts raw bytes into a structured Document with pages and content. Runs in the thread pool.

from document_extraction_tools.base import BaseConverter
from document_extraction_tools.types import Document, DocumentBytes, PipelineContext

class MyConverter(BaseConverter):
    def convert(
        self, document_bytes: DocumentBytes, context: PipelineContext | None = None
    ) -> Document:
        # Use PDF library, OCR engine, etc.
        pages = parse_pdf(document_bytes.file_bytes)
        return Document(
            id=str(document_bytes.path_identifier.path),
            path_identifier=document_bytes.path_identifier,
            pages=pages,
            content_type="text",
        )

4. Extractor

Asynchronously extracts structured data into your Pydantic schema, returning an ExtractionResult that wraps the data with optional metadata. Runs in the async event loop.

from document_extraction_tools.base import BaseExtractor
from document_extraction_tools.types import Document, ExtractionResult, PipelineContext

class MyExtractor(BaseExtractor):
    async def extract(
        self, document: Document, schema: type[T], context: PipelineContext | None = None
    ) -> ExtractionResult[T]:
        # Call LLM API, run rules engine, etc.
        response = await self.llm_client.extract(
            document=document,
            schema=schema,
        )
        return ExtractionResult(
            data=schema.model_validate(response),
            metadata={"tokens_used": response.usage, "latency_ms": response.latency},
        )

5. ExtractionExporter

Asynchronously persists extracted data to your destination. Receives the ExtractionResult containing both the extracted data and metadata. Runs in the async event loop.

from document_extraction_tools.base import BaseExtractionExporter
from document_extraction_tools.types import Document, ExtractionResult, PipelineContext

class MyExtractionExporter(BaseExtractionExporter):
    async def export(
        self,
        document: Document,
        data: ExtractionResult[T],
        context: PipelineContext | None = None,
    ) -> None:
        # Save to database, cloud storage, API, etc.
        await self.db.insert(
            path=document.path_identifier.path,
            data=data.data.model_dump(),
            metadata=data.metadata,
        )

ExtractionOrchestrator

The orchestrator coordinates all components:

import uuid
from document_extraction_tools.runners import ExtractionOrchestrator
from document_extraction_tools.types import PipelineContext

orchestrator = ExtractionOrchestrator.from_config(
    config=config,
    schema=InvoiceSchema,
    file_lister_cls=MyFileLister,
    reader_cls=MyReader,
    converter_cls=MyConverter,
    extractor_cls=MyExtractor,
    extraction_exporter_cls=MyExtractionExporter,
)

# Run the pipeline with optional context
context = PipelineContext(context={"run_id": str(uuid.uuid4())[:8]})
await orchestrator.run(file_paths, context=context)

Concurrency Model

flowchart TB
    subgraph "Orchestrator.run(file_paths)"
        direction TB
        TP["ThreadPoolExecutor<br/>max_workers"]
        SEM["Semaphore<br/>max_concurrency"]

        subgraph "Per Document Task"
            I["Ingest: Read + Convert"]
            EE["Extract + Export"]
        end

        TP -.->|"CPU-bound"| I
        SEM -.->|"I/O-bound"| EE
    end
Stage Execution Model Reason
Reader Thread pool File I/O is blocking
Converter Thread pool CPU-bound parsing
Extractor Async Network I/O (LLM calls)
Exporter Async Network/disk I/O

Configure via extraction_orchestrator.yaml:

max_workers: 4        # Thread pool size for Reader/Converter
max_concurrency: 10   # Max concurrent Extractor/Exporter calls