Implementing an Extraction Pipeline¶
This guide walks through implementing a complete extraction pipeline.
Prerequisites¶
- Document Extraction Tools installed
- Understanding of Core Concepts
- A clear definition of what data you want to extract
Step 1: Define Your Extraction Schema¶
Start by defining a Pydantic model for your target output. This example shows a lease details schema similar to the examples repository:
from pydantic import BaseModel, Field
class Address(BaseModel):
"""Property address details."""
street: str = Field(..., description="Street address")
city: str = Field(..., description="City name")
state: str = Field(..., description="State or region")
postal_code: str = Field(..., description="Postal/ZIP code")
class LeaseSchema(BaseModel):
"""Schema for extracted lease data."""
landlord_name: str = Field(
...,
description="Full name of the landlord or property owner"
)
tenant_name: str = Field(
...,
description="Full name of the tenant"
)
property_address: Address = Field(
...,
description="Address of the leased property"
)
lease_start_date: str = Field(
...,
description="Start date of the lease in YYYY-MM-DD format"
)
lease_end_date: str = Field(
...,
description="End date of the lease in YYYY-MM-DD format"
)
monthly_rent: float = Field(
...,
description="Monthly rent amount"
)
security_deposit: float = Field(
default=0.0,
description="Security deposit amount"
)
currency: str = Field(
default="USD",
description="Currency code"
)
Schema Design Tips
- Use descriptive
Fielddescriptions - they help LLM extractors understand what to extract - Set sensible defaults for optional fields
- Break complex structures into nested models (like
Addressabove) - Use standard date formats (ISO 8601) for date fields
Step 2: Implement Pipeline Components¶
FileLister¶
from pathlib import Path
from document_extraction_tools.base import BaseFileLister
from document_extraction_tools.config import BaseFileListerConfig
from document_extraction_tools.types import PathIdentifier, PipelineContext
class LocalFileListerConfig(BaseFileListerConfig):
input_directory: str
file_pattern: str = "*.pdf"
class LocalFileLister(BaseFileLister):
def list_files(
self, context: PipelineContext | None = None
) -> list[PathIdentifier]:
directory = Path(self.config.input_directory)
return [
PathIdentifier(path=str(p))
for p in directory.glob(self.config.file_pattern)
]
Reader¶
from document_extraction_tools.base import BaseReader
from document_extraction_tools.config import BaseReaderConfig
from document_extraction_tools.types import DocumentBytes, PathIdentifier, PipelineContext
class LocalReaderConfig(BaseReaderConfig):
pass # No additional config needed
class LocalReader(BaseReader):
def read(
self, path_identifier: PathIdentifier, context: PipelineContext | None = None
) -> DocumentBytes:
path = Path(path_identifier.path)
with open(path, "rb") as f:
return DocumentBytes(
file_bytes=f.read(),
path_identifier=path_identifier,
)
Converter¶
from document_extraction_tools.base import BaseConverter
from document_extraction_tools.config import BaseConverterConfig
from document_extraction_tools.types import Document, DocumentBytes, Page, PipelineContext
class PDFConverterConfig(BaseConverterConfig):
dpi: int = 300
class PDFConverter(BaseConverter):
def convert(
self, document_bytes: DocumentBytes, context: PipelineContext | None = None
) -> Document:
# Use your preferred PDF library (pypdf, pymupdf, etc.)
pages = self._parse_pdf(document_bytes.file_bytes)
return Document(
id=str(document_bytes.path_identifier.path),
path_identifier=document_bytes.path_identifier,
pages=pages,
metadata={"page_count": len(pages)},
content_type="image",
)
def _parse_pdf(self, pdf_bytes: bytes) -> list[Page]:
# Implementation depends on your PDF library
...
Extractor¶
This example shows a Gemini-based extractor similar to the examples repository:
import google.generativeai as genai
from document_extraction_tools.base import BaseExtractor
from document_extraction_tools.config import BaseExtractorConfig
from document_extraction_tools.types import Document, ExtractionResult, ImageData, PipelineContext
class GeminiExtractorConfig(BaseExtractorConfig):
model_name: str = "gemini-1.5-flash"
temperature: float = 0.0
class GeminiImageExtractor(BaseExtractor):
def __init__(self, config: GeminiExtractorConfig) -> None:
super().__init__(config)
self.model = genai.GenerativeModel(self.config.model_name)
async def extract(
self,
document: Document,
schema: type[LeaseSchema],
context: PipelineContext | None = None,
) -> ExtractionResult[LeaseSchema]:
# Build prompt with schema description
prompt = self._build_prompt(schema)
# Prepare image parts from document pages
parts = [prompt]
for page in document.pages:
if isinstance(page.data, ImageData):
parts.append(page.data.content)
# Call Gemini with images
response = await self.model.generate_content_async(
parts,
generation_config=genai.GenerationConfig(
temperature=self.config.temperature,
response_mime_type="application/json",
response_schema=schema,
),
)
# Parse and validate response, wrap in ExtractionResult
data = schema.model_validate_json(response.text)
return ExtractionResult(
data=data,
metadata={
"model": self.config.model_name,
"temperature": self.config.temperature,
},
)
def _build_prompt(self, schema: type) -> str:
return f"""Extract the following information from the lease document images.
Return a JSON object matching this schema:
{schema.model_json_schema()}
Be precise and extract exactly what is stated in the document."""
Exporter¶
import json
from pathlib import Path
from document_extraction_tools.base import BaseExtractionExporter
from document_extraction_tools.config import BaseExtractionExporterConfig
from document_extraction_tools.types import Document, ExtractionResult, PipelineContext
class JSONExporterConfig(BaseExtractionExporterConfig):
output_directory: str
class JSONExporter(BaseExtractionExporter):
async def export(
self,
document: Document,
data: ExtractionResult[LeaseSchema],
context: PipelineContext | None = None,
) -> None:
output_dir = Path(self.config.output_directory)
output_dir.mkdir(parents=True, exist_ok=True)
# Create output filename from input
input_name = Path(document.path_identifier.path).stem
output_path = output_dir / f"{input_name}.json"
# Export both data and metadata
output = {
"data": data.data.model_dump(),
"metadata": data.metadata,
}
with open(output_path, "w") as f:
json.dump(output, f, indent=2)
Step 3: Create Configuration Files¶
Step 4: Run the Pipeline¶
import asyncio
import uuid
from pathlib import Path
from document_extraction_tools.config import load_extraction_config
from document_extraction_tools.runners import ExtractionOrchestrator
from document_extraction_tools.types import PipelineContext
async def main():
# Load configuration
config = load_extraction_config(
lister_config_cls=LocalFileListerConfig,
reader_config_cls=LocalReaderConfig,
converter_config_cls=PDFConverterConfig,
extractor_config_cls=GeminiExtractorConfig,
extraction_exporter_config_cls=JSONExporterConfig,
config_dir=Path("config/yaml"),
)
# Create orchestrator
orchestrator = ExtractionOrchestrator.from_config(
config=config,
schema=LeaseSchema,
file_lister_cls=LocalFileLister,
reader_cls=LocalReader,
converter_cls=PDFConverter,
extractor_cls=GeminiImageExtractor,
extraction_exporter_cls=JSONExporter,
)
# List files
file_lister = LocalFileLister(config)
file_paths = file_lister.list_files()
print(f"Processing {len(file_paths)} lease documents...")
# Run pipeline with optional shared context
context = PipelineContext(context={"run_id": str(uuid.uuid4())[:8]})
await orchestrator.run(file_paths, context=context)
print("Done!")
if __name__ == "__main__":
asyncio.run(main())
Next Steps¶
- Add Evaluation to measure extraction quality
- See the Examples Repository for the complete Simple Lease Extraction implementation