Skip to content

Pipeline Context

The PipelineContext is a shared state container that flows through all pipeline components, enabling communication and data sharing across the extraction or evaluation process.

Why Use PipelineContext?

Pipeline components are designed to be stateless and independent. However, real-world pipelines often need to:

  • Track execution metadata - Run IDs, timestamps, batch identifiers
  • Share configuration - Environment-specific settings, feature flags
  • Pass runtime state - Accumulated metrics, processing hints
  • Enable observability - Correlation IDs for logging and tracing

PipelineContext provides a clean way to share this information without coupling components together.

Creating a Context

from document_extraction_tools.types import PipelineContext

# Empty context (default)
context = PipelineContext()

# Context with initial values
context = PipelineContext(
    context={
        "run_id": "extraction-2024-01-15-001",
        "batch_size": 100,
    }
)

Passing Context to the Pipeline

Pass the context when calling orchestrator.run():

from document_extraction_tools.runners import ExtractionOrchestrator
from document_extraction_tools.types import PipelineContext

# Create context
context = PipelineContext(
    context={
        "run_id": "daily-extraction-001",
    }
)

# Pass to orchestrator
await orchestrator.run(file_paths, context=context)

If no context is provided, an empty PipelineContext() is created automatically.

Accessing Context in Components

Every component method receives an optional context parameter. Access values using the context dictionary:

In a Reader

from document_extraction_tools.base import BaseReader
from document_extraction_tools.types import DocumentBytes, PathIdentifier, PipelineContext


class MyReader(BaseReader):
    def read(
        self, path_identifier: PathIdentifier, context: PipelineContext | None = None
    ) -> DocumentBytes:
        # Access context values safely
        run_id = context.context.get("run_id") if context else None

        # Use for logging
        print(f"[{run_id}] Reading: {path_identifier.path}")

        with open(path_identifier.path, "rb") as f:
            return DocumentBytes(
                file_bytes=f.read(),
                path_identifier=path_identifier,
            )

In an Extractor

from document_extraction_tools.base import BaseExtractor
from document_extraction_tools.types import Document, ExtractionResult, PipelineContext


class MyExtractor(BaseExtractor):
    async def extract(
        self,
        document: Document,
        schema: type,
        context: PipelineContext | None = None,
    ) -> ExtractionResult:
        # Access runtime context values
        run_id = context.context.get("run_id") if context else None

        # Use context for logging/tracing
        print(f"[{run_id}] Extracting from: {document.id}")

        # Extract data
        result = await self._call_llm(document, schema)
        return ExtractionResult(data=result)

In an Exporter

from document_extraction_tools.base import BaseExtractionExporter
from document_extraction_tools.types import Document, ExtractionResult, PipelineContext


class MyExporter(BaseExtractionExporter):
    async def export(
        self,
        document: Document,
        data: ExtractionResult,
        context: PipelineContext | None = None,
    ) -> None:
        # Include context metadata in exports
        run_id = context.context.get("run_id") if context else "unknown"

        output = {
            "run_id": run_id,
            "document_id": document.id,
            "data": data.data.model_dump(),
            "metadata": data.metadata,
        }

        await self._save_to_database(output)

Common Use Cases

1. Run Tracking and Logging

import uuid
from datetime import datetime

context = PipelineContext(
    context={
        "run_id": str(uuid.uuid4()),
        "started_at": datetime.now().isoformat(),
    }
)

2. Batch Processing Metadata

context = PipelineContext(
    context={
        "batch_id": "batch-2024-01-15",
        "total_documents": len(file_paths),
    }
)

3. Correlation IDs for Distributed Tracing

context = PipelineContext(
    context={
        "correlation_id": request_id,  # From incoming request
        "trace_id": span.trace_id,     # From tracing system
        "span_id": span.span_id,
    }
)

Modifying Context During Execution

While PipelineContext is passed to all components, it's designed for read-mostly access. Components can read values but should be careful about modifications since multiple documents may be processed concurrently.

If you need to accumulate state (like metrics), consider using thread-safe structures:

from threading import Lock
from document_extraction_tools.types import PipelineContext


# Create context with thread-safe counter
class ThreadSafeCounter:
    def __init__(self) -> None:
        self._count = 0
        self._lock = Lock()

    def increment(self) -> int:
        with self._lock:
            self._count += 1
            return self._count


context = PipelineContext(
    context={
        "processed_counter": ThreadSafeCounter(),
    }
)

# In a component
counter = context.context.get("processed_counter")
if counter:
    count = counter.increment()
    print(f"Processed document #{count}")

Best Practices

Do

  • Use descriptive key names (run_id not id)
  • Provide default values when accessing (context.context.get("key", default))
  • Always check if context is None before accessing
  • Keep context values serializable when possible (for logging/debugging)
  • Use context for cross-cutting concerns (logging, tracing, configuration)

Don't

  • Store large objects (file contents, model instances) in context
  • Rely on context for component-specific configuration (use config classes instead)
  • Mutate context values in components without thread safety
  • Use context to pass data that should flow through the pipeline types (use metadata fields instead)

Context vs Component Config

Use Context For Use Config For
Runtime values (run IDs, timestamps) Static settings (model names, paths)
Cross-cutting concerns (logging, tracing) Component-specific parameters
Values that change per-run Environment variables, feature flags
Correlation IDs, batch metadata Values that change per-deployment

API Reference

Bases: BaseModel

Shared context passed through pipeline components.

context class-attribute instance-attribute

context: dict[str, Any] = Field(default_factory=dict, description='Shared context values available across pipeline components.')