Leveraging Anthropic's New Message Batches API and Prompt Caching: A Practical Guide

Anthropic has recently introduced two powerful features to their API: Message Batches API and Prompt Caching. In this blog post, we'll explore how to use these features through a practical example.

Introduction

Anthropic's new Message Batches API and Prompt Caching features are primarily designed to address costs, offering substantial cost reductions:

  • Message Batches API: Offers up to 50% cost savings by allowing you to process large volumes of messages asynchronously in a single API call.
  • Prompt Caching: Reduces costs by up to 90% on input tokens for cached content by avoiding redundant processing of repeated prompts.

These two discounts stack together as well!

The Use Case

Suppose you have a long blog post stored in long_prompt.md, and you want to ask multiple questions about it:

  1. What companies are mentioned in this blog post?
  2. What are the key takeaways from this blog post?
  3. List all the people mentioned in this blog post.

By using Prompt Caching, we can store the long blog post once, which significantly reduces input token costs for future requests—up to 90% in some cases.

At the same time, the Message Batches API lets us handle multiple questions in one go, processing them asynchronously in a single API call while enjoying a 50% discount.

The Code

If you don't want to scroll down for any further explanation on how this works, here's a link to the full code on GitHub.

Overview

The script we're examining performs the following tasks:

  1. Reads a long prompt from a markdown file (long_prompt.md).
  2. Caches the prompt by making a single normal Messages API request to ensure the cache has been created in time for all the upcoming Batches API request.
  3. Sends a batch of questions related to the long prompt using the Message Batches API.
  4. Polls for batch processing status and retrieves the results once processing is complete.
  5. Logs and saves the results for further analysis.

Prerequisites

We recommend using an Anthropic Workspace to isolate each project, as the Batches API allows reading back all prompts in that Workspace. i.e., if you use only the default Workspace for all Batches API requests, then all API keys can read all Batches API results.

The Code Explained

Let's break down the script to understand how it leverages these features.

Imports and Setup

After install the requirements.txt using pip3.11 install -r requirements.txt, we're ready to start coding.

import asyncio
import logging
import httpx
import anthropic
import os
import hashlib

# Logging setup
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
c_handler = logging.StreamHandler()
logger.addHandler(c_handler)

# API Base URL and Model
BASE_URL = os.getenv("ANTHROPIC_API_BASE", "https://api.anthropic.com")
MODEL_TO_USE = "claude-3-5-sonnet-20240620"

Setting up logging and API configuration for Anthropic.

  • Logging: Configured to display DEBUG-level messages for detailed output.
  • BASE_URL: Points to Anthropic's API endpoint, or alternatively another endpoint like Cloudflare AI Gateway.
  • MODEL_TO_USE: Specifies the model to be used; in this case, claude-3-5-sonnet-20240620.

Main Function

async def main():
    # Initialize the asynchronous client
    client = anthropic.AsyncAnthropic(
        http_client=httpx.AsyncClient(
            http2=True,
            limits=httpx.Limits(
                max_connections=None,
                max_keepalive_connections=None,
                keepalive_expiry=None,
            ),
        ),
        default_headers={
            "Priority": "u=0",
            "Accept-Encoding": "zstd;q=1.0, br;q=0.9, gzip;q=0.8, deflate;q=0.7",
            "cf-skip-cache": "true",
        },
        max_retries=0,
        timeout=3600,
        base_url=BASE_URL,
    )

Setting up an asynchronous client with custom configurations.

  • Asynchronous Client: Configured with httpx for HTTP/2 support and customized headers.
  • Headers: Includes cf-skip-cache to bypass Cloudflare caching and custom encoding preferences. This is not required if you're not using Cloudflare AI Gateway, but doesn't hurt to include.

Reading and Hashing the Prompt

We're going to use this example long_prompt.md, as it needs to be at least 1024 tokens to be cached.

    # Read the long content from a markdown file
    with open("long_prompt.md", "r") as file:
        long_content = file.read()

    # Create an MD5 hash of the content for unique identification
    long_content_md5_hash = hashlib.md5(long_content.encode()).hexdigest()

Generate MD5 hash for markdown file content.

  • Long Prompt: Reads the content from long_prompt.md.
  • Hashing: Generates an MD5 hash of the content to create unique identifiers for tracking.

Defining Questions

In this section, we can define a list of questions that we want to ask about the long_prompt.md content. (If these questions themselves were longer than 1024 tokens, we could cache them too! This would be beneficial if we were asking long questions about multiple long blog posts.)

    # Define the list of questions to ask the model
    questions_to_ask = [
        "What companies are mentioned in this blog post?",
        "What are the key takeaways from this blog post?",
        "List all the people mentioned in this blog post.",
    ]

Preparing questions for blog post analysis.

  • Questions: A list of queries related to the long prompt.

Preparing Batch Requests

A simple loop to prepare the batch requests will do the trick.

    # Prepare a list to hold all the request payloads
    requests = []
    for question in questions_to_ask:
        custom_id = f"{long_content_md5_hash}{hashlib.md5(question.encode()).hexdigest()}"
        user_id = f"ai.moda-dev-{custom_id}"
        request = {
            "custom_id": custom_id,
            "params": {
                "model": MODEL_TO_USE,
                "max_tokens": 8192,
                "temperature": 0.0,
                "metadata": {"user_id": user_id},
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "text",
                                "text": long_content,
                                "cache_control": {"type": "ephemeral"},
                            },
                            {
                                "type": "text",
                                "text": question,
                            },
                        ],
                    }
                ],
            },
        }
        requests.append(request)

Generating request payloads with unique identifiers for questions.

  • Custom ID: Combines the prompt and question hashes to uniquely identify each request.
  • User ID: Created for metadata tracking. This is optional, but is helpful in tracking abuse.
  • Request Structure: Each request includes the model, parameters, metadata, and messages.
  • Cache Control: The cache_control parameter with type: "ephemeral" marks the long_content for caching.

Caching the Long Context

Currently, this step is necessary to ensure the cache is created before processing batch requests. If omitted, Anthropic will process most of the requests in parallel, and have cache misses on the majority. (Hopefully Anthropic will optimize the Batches API to avoid this step in the future. We will update this blog post when that occurs.)

    # Send a single message to cache the long content
    message = await client.messages.create(
        model=MODEL_TO_USE,
        max_tokens=1,
        temperature=0.0,
        metadata={"user_id": f"ai.moda-dev-{long_content_md5_hash}"},
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": long_content,
                        "cache_control": {"type": "ephemeral"},
                    }
                ],
            }
        ],
        extra_headers={
            "anthropic-beta": "prompt-caching-2024-07-31",
        },
    )

Caching long content with a single message request.

  • Caching Call: Sends a minimal request to cache the long_content.
  • Extra Headers: Includes anthropic-beta to enable the Prompt Caching beta feature.
  • Max Tokens: Set to 1 since we're only interested in caching, not the response.

The Cache Response Log

After the request above runs, you'll see the payload below logged to your console.

{
  "id": "msg_01BiscezbkaJCkRHEydAtEM7",
  "content": [
    {
      "text": "Here",
      "type": "text"
    }
  ],
  "model": "claude-3-5-sonnet-20240620",
  "role": "assistant",
  "stop_reason": "max_tokens",
  "stop_sequence": null,
  "type": "message",
  "usage": {
    "input_tokens": 3,
    "output_tokens": 1,
    "cache_creation_input_tokens": 1121,
    "cache_read_input_tokens": 0
  }
}

Result from Messages API.

Notice cache_creation_input_tokens? That's what we need to see.

Sending the Batch of Questions

Now that we've created the cache, we're ready to use the Batches API!

    # Send the batch of questions to the API
    batch_create_result = await client.beta.messages.batches.create(
        requests=requests,
        extra_headers={
            "anthropic-beta": "prompt-caching-2024-07-31,message-batches-2024-09-24"
        },
    )

Sending questions batch to API with custom headers.

  • Batch Creation: Uses the Message Batches API to send all questions in a single batch.
  • Beta Features: Enables both Prompt Caching and Message Batches via the anthropic-beta header.

Batches API Creation Response

If the request above succeeds, we should see a message similar to the one below logged.

{
  "id": "msgbatch_01EbNKhSWYyTQbWtvL8c173Z",
  "cancel_initiated_at": null,
  "created_at": "2024-10-09T05:09:37.276591Z",
  "ended_at": null,
  "expires_at": "2024-10-10T05:09:37.276591Z",
  "processing_status": "in_progress",
  "request_counts": {
    "canceled": 0,
    "errored": 0,
    "expired": 0,
    "processing": 3,
    "succeeded": 0
  },
  "results_url": null,
  "type": "message_batch"
}

Result from Batches API Creation.

Polling for Batch Status

Since the Batches API isn't instant, we need to poll for results.

    # Poll the API until the batch processing has ended
    while processing_status != "ended":
        await asyncio.sleep(10)
        try:
            batch_response = await client.beta.messages.batches.retrieve(
                batch_create_result.id,
            )
            processing_status = batch_response.processing_status
            logger.debug(
                f"File Id: {batch_create_result.id}, Status: {processing_status}"
            )
        except Exception as e:
            logger.debug(f"An error occurred: {e}")

Continuously check API until batch processing completes.

  • Polling Loop: Checks the processing status every 10 seconds.
  • Status Check: Updates processing_status based on the batch's current state.

Batch Questions Response

Once the Message Batch is complete, you'll see the message below logged to the console.

{
  "id": "msgbatch_01EbNKhSWYyTQbWtvL8c173Z",
  "cancel_initiated_at": null,
  "created_at": "2024-10-09T05:09:37.276591Z",
  "ended_at": "2024-10-09T05:10:10.804706Z",
  "expires_at": "2024-10-10T05:09:37.276591Z",
  "processing_status": "ended",
  "request_counts": {
    "canceled": 0,
    "errored": 0,
    "expired": 0,
    "processing": 0,
    "succeeded": 3
  },
  "results_url": "https://api.anthropic.com/v1/messages/batches/msgbatch_01EbNKhSWYyTQbWtvL8c173Z/results",
  "type": "message_batch"
}

Result from Message Batch.

A mere ~34 seconds for all three messages! Not bad.

(Note: if the message batch took longer than 5 minutes, it is possible that we may have a cache miss. This can be dealt with by running a new prompt every 5 minutes to ensure the cache remains active. At the moment, batches are completed very quickly in practice, and this currently seems unnecessarily complicated.)

Retrieving and Saving Results

We're ready to get our results!

    # Retrieve the results from the batch processing
    batch_result = await client.beta.messages.batches.results(
        batch_create_result.id,
    )

    # Iterate asynchronously over each result in the batch
    async for result in batch_result:
        json_string = result.model_dump_json(indent=2)
        logger.info(json_string)
        with open(f"output/{result.custom_id}.json", "w") as f:
            f.write(json_string)
  • Results Retrieval: Accesses the batch results after processing has ended.
  • Asynchronous Iteration: Processes each result individually.
  • Saving Outputs: Writes each result to a JSON file named after its custom_id.

Final Results

Three results should appear in the console logs. Here's one example:

{
  "custom_id": "5f01134f8c259dada84776ca175ee6e3cb65b47fde2de43d28ca12a3d70c5461",
  "result": {
    "message": {
      "id": "msg_01F1AvFVramG9FwXii9hHyBQ",
      "content": [
        {
          "text": "The companies mentioned in this press release are:\n\n1. Paladin Capital Group - The main company announcing the new fund\n\n2. CalypsoAI - A company Paladin has previously invested in that develops AI/ML monitoring systems for the U.S. military\n\n3. Hack The Box - Another previous Paladin investment, described as a hacking playground and cybersecurity community\n\n4. Expel - A managed detection and response provider that Paladin has invested in\n\n5. Corellium - A company Paladin is investing in from the new fund, which provides testing for IoT devices\n\n6. Nisos - Another new investment, described as a managed intelligence provider\n\n7. Virtuoso - A London-based company Paladin is investing in that provides AI-powered testing automation\n\nThe press release also mentions that Paladin has invested in over 60 companies since 2008, but these are the specific companies named.",
          "type": "text"
        }
      ],
      "model": "claude-3-5-sonnet-20240620",
      "role": "assistant",
      "stop_reason": "end_turn",
      "stop_sequence": null,
      "type": "message",
      "usage": {
        "cache_creation_input_tokens": 0,
        "cache_read_input_tokens": 1121,
        "input_tokens": 13,
        "output_tokens": 207
      }
    },
    "type": "succeeded"
  }
}

Example completion result from the Batches API.

Notice the cache_read_input_tokens? That means our caching worked as expected too!

Running the Script

Ensure you have your environment set up with the necessary dependencies and your Anthropic API key configured as ANTHROPIC_API_KEY. Place your long prompt content in a file named long_prompt.md. Finally, execute the script:

python3.11 main.py

Best Practices

  • Cache Reusable Content: Place static and large content by themselves in the context array and use cache_control to mark them for caching.
  • Use Meaningful Custom IDs: This helps in tracking and matching requests with their responses.
  • Monitor Cache Performance: Use the response fields to verify cache usage.
  • Handle Batch Processing Carefully: Since batches are processed asynchronously and may return results in any order, always rely on custom_id for matching.
  • Use Isolated Workspaces: Since the Batches API allows reading all batch prompts for 29 days, you should use an isolated Workspace with unique API keys for each project.