Extraction Pipeline¶
The extraction pipeline transforms raw documents into structured data.
Pipeline Flow¶
flowchart LR
subgraph Input
FL[FileLister]
end
subgraph ThreadPool["Thread Pool (CPU-bound)"]
R[Reader]
C[Converter]
end
subgraph AsyncIO["Async Event Loop (I/O-bound)"]
E[Extractor]
EX[Exporter]
end
FL -->|"list[PathIdentifier]"| R
R -->|DocumentBytes| C
C -->|Document| E
E -->|ExtractionResult| EX
EX -->|Persisted| Output[(Storage)]
Per-Document Processing¶
For each document, the orchestrator runs:
sequenceDiagram
participant O as Orchestrator
participant TP as Thread Pool
participant R as Reader
participant C as Converter
participant E as Extractor
participant EX as Exporter
Note over O: PipelineContext passed to all components
O->>TP: Submit ingest task
TP->>R: read(path_identifier, context)
R-->>TP: DocumentBytes
TP->>C: convert(document_bytes, context)
C-->>TP: Document
TP-->>O: Document
O->>E: extract(document, schema, context)
Note over E: Async I/O (e.g., LLM API call)
E-->>O: ExtractionResult
O->>EX: export(document, data, context)
Note over EX: Async I/O (e.g., DB write)
EX-->>O: Done
Components¶
1. FileLister¶
Discovers input files and returns a list of PathIdentifier objects. Called before the orchestrator runs.
from document_extraction_tools.base import BaseFileLister
from document_extraction_tools.types import PathIdentifier
class MyFileLister(BaseFileLister):
def list_files(self) -> list[PathIdentifier]:
# List files from local directory, cloud storage, database, etc.
return [PathIdentifier(path=str(p)) for p in Path("data").glob("*.pdf")]
2. Reader¶
Reads raw bytes from the source and returns DocumentBytes. Runs in the thread pool.
from document_extraction_tools.base import BaseReader
from document_extraction_tools.types import DocumentBytes, PathIdentifier, PipelineContext
class MyReader(BaseReader):
def read(
self, path_identifier: PathIdentifier, context: PipelineContext | None = None
) -> DocumentBytes:
with open(path_identifier.path, "rb") as f:
return DocumentBytes(
file_bytes=f.read(),
path_identifier=path_identifier,
metadata={"mime_type": "application/pdf"},
)
3. Converter¶
Converts raw bytes into a structured Document with pages and content. Runs in the thread pool.
from document_extraction_tools.base import BaseConverter
from document_extraction_tools.types import Document, DocumentBytes, PipelineContext
class MyConverter(BaseConverter):
def convert(
self, document_bytes: DocumentBytes, context: PipelineContext | None = None
) -> Document:
# Use PDF library, OCR engine, etc.
pages = parse_pdf(document_bytes.file_bytes)
return Document(
id=str(document_bytes.path_identifier.path),
path_identifier=document_bytes.path_identifier,
pages=pages,
content_type="text",
)
4. Extractor¶
Asynchronously extracts structured data into your Pydantic schema, returning an ExtractionResult that wraps the data with optional metadata. Runs in the async event loop.
from document_extraction_tools.base import BaseExtractor
from document_extraction_tools.types import Document, ExtractionResult, PipelineContext
class MyExtractor(BaseExtractor):
async def extract(
self, document: Document, schema: type[T], context: PipelineContext | None = None
) -> ExtractionResult[T]:
# Call LLM API, run rules engine, etc.
response = await self.llm_client.extract(
document=document,
schema=schema,
)
return ExtractionResult(
data=schema.model_validate(response),
metadata={"tokens_used": response.usage, "latency_ms": response.latency},
)
5. ExtractionExporter¶
Asynchronously persists extracted data to your destination. Receives the ExtractionResult containing both the extracted data and metadata. Runs in the async event loop.
from document_extraction_tools.base import BaseExtractionExporter
from document_extraction_tools.types import Document, ExtractionResult, PipelineContext
class MyExtractionExporter(BaseExtractionExporter):
async def export(
self,
document: Document,
data: ExtractionResult[T],
context: PipelineContext | None = None,
) -> None:
# Save to database, cloud storage, API, etc.
await self.db.insert(
path=document.path_identifier.path,
data=data.data.model_dump(),
metadata=data.metadata,
)
ExtractionOrchestrator¶
The orchestrator coordinates all components:
import uuid
from document_extraction_tools.runners import ExtractionOrchestrator
from document_extraction_tools.types import PipelineContext
orchestrator = ExtractionOrchestrator.from_config(
config=config,
schema=InvoiceSchema,
file_lister_cls=MyFileLister,
reader_cls=MyReader,
converter_cls=MyConverter,
extractor_cls=MyExtractor,
extraction_exporter_cls=MyExtractionExporter,
)
# Run the pipeline with optional context
context = PipelineContext(context={"run_id": str(uuid.uuid4())[:8]})
await orchestrator.run(file_paths, context=context)
Concurrency Model¶
flowchart TB
subgraph "Orchestrator.run(file_paths)"
direction TB
TP["ThreadPoolExecutor<br/>max_workers"]
SEM["Semaphore<br/>max_concurrency"]
subgraph "Per Document Task"
I["Ingest: Read + Convert"]
EE["Extract + Export"]
end
TP -.->|"CPU-bound"| I
SEM -.->|"I/O-bound"| EE
end
| Stage | Execution Model | Reason |
|---|---|---|
| Reader | Thread pool | File I/O is blocking |
| Converter | Thread pool | CPU-bound parsing |
| Extractor | Async | Network I/O (LLM calls) |
| Exporter | Async | Network/disk I/O |
Configure via extraction_orchestrator.yaml: