Skip to main content

Claim Check Pattern with Temporal

Last updated Oct 7, 2025

The Claim Check pattern enables efficient handling of large payloads by storing them externally and passing only keys through Temporal workflows and activities.

This pattern is particularly useful when:

  • Working with large files, images, or datasets
  • Processing bulk data operations
  • Handling payloads that exceed Temporal's size limits
  • Improving performance by reducing payload serialization overhead

How the Claim Check Pattern Works

The Claim Check pattern implements a PayloadCodec that:

  1. Encode: Replaces large payloads with unique keys and stores the original data in external storage (Redis, S3, etc.)
  2. Decode: Retrieves the original payload using the key when needed

This allows Temporal workflows to operate with small, lightweight keys instead of large payloads, while maintaining transparent access to the full data through automatic encoding/decoding.

Claim Check Codec Implementation

The ClaimCheckCodec class implements the PayloadCodec interface to handle the encoding and decoding of payloads.

import uuid
import redis.asyncio as redis
from typing import Iterable, List

from temporalio.api.common.v1 import Payload
from temporalio.converter import PayloadCodec

class ClaimCheckCodec(PayloadCodec):
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port)

async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
"""Replace large payloads with keys and store original data in Redis."""
out: List[Payload] = []
for payload in payloads:
encoded = await self.encode_payload(payload)
out.append(encoded)
return out

async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
"""Retrieve original payloads from Redis using stored keys."""
out: List[Payload] = []
for payload in payloads:
if payload.metadata.get("temporal.io/claim-check-codec", b"").decode() != "v1":
# Not a claim-checked payload, pass through unchanged
out.append(payload)
continue

redis_key = payload.data.decode("utf-8")
stored_data = await self.redis_client.get(redis_key)
if stored_data is None:
raise ValueError(f"Claim check key not found in Redis: {redis_key}")

original_payload = Payload.FromString(stored_data)
out.append(original_payload)
return out

async def encode_payload(self, payload: Payload) -> Payload:
"""Store payload in Redis and return a key-based payload."""
key = str(uuid.uuid4())
serialized_data = payload.SerializeToString()

# Store the original payload data in Redis
await self.redis_client.set(key, serialized_data)

# Return a lightweight payload containing only the key
return Payload(
metadata={
"encoding": b"claim-checked",
"temporal.io/claim-check-codec": b"v1",
},
data=key.encode("utf-8"),
)

Claim Check Plugin

The ClaimCheckPlugin integrates the codec with Temporal's client configuration.

import os
from temporalio.client import Plugin, ClientConfig
from temporalio.converter import DataConverter

from claim_check_codec import ClaimCheckCodec

class ClaimCheckPlugin(Plugin):
def __init__(self):
self.redis_host = os.getenv("REDIS_HOST", "localhost")
self.redis_port = int(os.getenv("REDIS_PORT", "6379"))

def get_data_converter(self, config: ClientConfig) -> DataConverter:
"""Configure the data converter with claim check codec."""
default_converter_class = config["data_converter"].payload_converter_class
claim_check_codec = ClaimCheckCodec(self.redis_host, self.redis_port)

return DataConverter(
payload_converter_class=default_converter_class,
payload_codec=claim_check_codec
)

def configure_client(self, config: ClientConfig) -> ClientConfig:
"""Apply the claim check configuration to the client."""
config["data_converter"] = self.get_data_converter(config)
return super().configure_client(config)

Example: Large Data Processing Workflow

This example demonstrates processing large datasets using the Claim Check pattern.

from temporalio import workflow, activity
from dataclasses import dataclass
from typing import List
import json

@dataclass
class LargeDataset:
"""Represents a large dataset that would benefit from claim check pattern."""
data: List[dict]
metadata: dict

@dataclass
class ProcessingResult:
"""Result of processing the large dataset."""
processed_count: int
summary: dict
errors: List[str]

@activity.defn
async def process_large_dataset(dataset: LargeDataset) -> ProcessingResult:
"""Process a large dataset - this would normally cause payload size issues.

This activity demonstrates how the claim check pattern allows processing
of large datasets without hitting Temporal's payload size limits.

Note: For production workloads processing very large datasets, consider adding
activity heartbeats to prevent timeout issues during long-running processing.

Args:
dataset: Large dataset to process

Returns:
ProcessingResult with processing statistics and any errors
"""
processed_count = 0
errors = []
summary = {"total_items": len(dataset.data)}

for item in dataset.data:
try:
# Simulate processing each item
if "value" in item:
item["processed_value"] = item["value"] * 2
processed_count += 1
elif "text" in item:
# Simulate text processing
item["word_count"] = len(item["text"].split())
processed_count += 1
except Exception as e:
errors.append(f"Error processing item {item.get('id', 'unknown')}: {str(e)}")

summary["processed_items"] = processed_count
summary["error_count"] = len(errors)

return ProcessingResult(
processed_count=processed_count,
summary=summary,
errors=errors
)

@workflow.defn
class LargeDataProcessingWorkflow:
@workflow.run
async def run(self, dataset: LargeDataset) -> ProcessingResult:
"""Process large dataset using claim check pattern."""
result = await workflow.execute_activity(
process_large_dataset,
dataset,
start_to_close_timeout=timedelta(minutes=10),
summary="Process large dataset"
)
return result

Configuration

Set environment variables to configure the Redis connection:

# Configure Redis connection (optional - defaults to localhost:6379)
export REDIS_HOST=localhost
export REDIS_PORT=6379

Prerequisites

  • Redis Server: Required for external storage of large payloads

Running the Example

  1. Start Redis server:
redis-server
  1. Start the Temporal Dev Server:
temporal server start-dev
  1. Run the worker:
uv run python -m worker
  1. Start execution:
uv run python -m start_workflow

Codec Server for Web UI

When using the Claim Check pattern, the Temporal Web UI will show encoded Redis keys instead of the actual payload data. This makes debugging and monitoring difficult since you can't see what data is being passed through your workflows.

The Problem

Without a codec server, the Web UI displays raw claim check keys like:

abc123-def4-5678-9abc-def012345678

This provides no context about what data is stored or how to access it, making workflow debugging and monitoring challenging.

Our Solution: Lightweight Codec Server

We've designed a codec server that provides helpful information without the risks of reading large payload data:

Design Principles

  1. No Data Reading: The codec server never reads the actual payload data during decode operations
  2. On-Demand Access: Full data is available via a separate endpoint when needed
  3. Simple & Safe: Just provides the Redis key and a link - no assumptions about data content
  4. Performance First: Zero impact on Web UI performance

What It Shows

Instead of raw keys, the Web UI displays:

"Claim check data (key: abc123-def4-5678-9abc-def012345678) - View at: http://localhost:8081/view/abc123-def4-5678-9abc-def012345678"

This gives you:

  • Clear identification: You know this is claim check data
  • Redis key: The actual key used for storage
  • Direct access: Click the URL to view the full payload data

Running the Codec Server

  1. Start the codec server:
uv run python -m codec_server
  1. Configure the Temporal Web UI to use the codec server. For temporal server start-dev, see the Temporal documentation on configuring codec servers for the appropriate configuration method.

  2. Access the Temporal Web UI and you'll see helpful summaries instead of raw keys.

Why This Approach?

Avoiding Common Pitfalls

❌ What we DON'T do:

  • Parse or analyze payload data (could be huge or malformed)
  • Attempt to summarize content (assumes data structure)
  • Read data during decode operations (performance impact)

✅ What we DO:

  • Provide the Redis key for manual inspection
  • Offer a direct link to view full data when needed
  • Keep the Web UI responsive with minimal information

Benefits

  • Performance: No Redis calls during Web UI operations
  • Safety: No risk of parsing problematic data
  • Flexibility: Works with any data type or size
  • Debugging: Full data accessible when needed via /view/{key} endpoint

Configuration Details

The codec server implements the Temporal codec server protocol with two endpoints:

  • /decode: Returns helpful text with Redis key and view URL
  • /view/{key}: Serves the raw payload data for inspection

When you click the view URL, you'll see the complete payload data as stored in Redis, formatted appropriately for text or binary content.

Considerations

Performance Trade-offs

  • Benefits: Reduces payload size, improves workflow performance, enables handling of large data
  • Costs: Additional network calls to external storage, potential latency increase

Storage Backend Options

While this example uses Redis, production systems typically use:

  • AWS S3: For AWS environments
  • Google Cloud Storage: For GCP environments
  • Azure Blob Storage: For Azure environments
  • Redis: For development and testing

Activity Heartbeats

For production workloads processing very large datasets, consider implementing activity heartbeats to prevent timeout issues:

@activity.defn
async def process_large_dataset(dataset: LargeDataset) -> ProcessingResult:
total_items = len(dataset.data)

for i, item in enumerate(dataset.data):
# Send heartbeat every 100 items to prevent timeout
if i % 100 == 0:
await activity.heartbeat(f"Processed {i}/{total_items} items")

# Process item...

This ensures Temporal knows the activity is still making progress during long-running operations.

Error Handling

The codec includes error handling for:

  • Missing keys in storage
  • Storage connection failures
  • Serialization/deserialization errors

Cleanup

Consider implementing cleanup strategies for stored data:

  • TTL (Time To Live) for automatic expiration
  • Manual cleanup workflows
  • Lifecycle policies for cloud storage

Best Practices

  1. Enable globally: The claim check pattern applies to all payloads when enabled, so consider the performance impact across your entire system
  2. Monitor storage: Track storage usage and costs since all payloads will be stored externally
  3. Implement cleanup: Prevent storage bloat with appropriate cleanup strategies
  4. Test thoroughly: Verify the pattern works correctly with your specific data types and doesn't introduce unexpected latency
  5. Consider alternatives: Evaluate if data compression or other optimizations might be sufficient before implementing claim check