← Back to Home

Building a Scalable Document Processing System with Gemini 2.0 and Elasticsearch

April 2, 2025

3697 words · 19 minutes read

Read on my Notion Blog

Processing documents—especially large ones—via an API like Gemini's can get expensive fast. Every call involves uploading files, generating responses and burning too much token. If you're dealing with the same documents repeatedly (say, a user asking multiple questions about a PDF), reprocessing it from scratch each time is wasteful. So, I set out to create a small service that's smart about caching: it remembers what it's seen, reuses it when possible and scales gracefully.

I wanted a caching system that was dynamic—capable of handling both ephemeral in-memory needs and persistent storage across sessions. It had to be smart enough to recognize when a document's already been processed, flexible enough to work with URLs and robust enough to recover from failures. This led me to a hybrid approach: an in-memory cache for speed, paired with Elasticsearch for persistence and metadata management.

If you want to use such an infrastructure, you will basically need two things:

  • Gemini API Key
  • Elasticsearch Deployment

You can check how to easily get API Key from here:

Get a Gemini API key | Google AI for Developers

For running Elasticsearch in your local environment for development purposes:

Install Elasticsearch with Docker | Elasticsearch Guide [8.17] | Elastic

If we set these two prerequisites, we can implement the following implementation, but I would like to mention another important point. When we store the caches on Gemini's own infrastructure, you will be subject to the following charges if you have a billing account on GCP, if you exceed the limits:

Free TierPaid Tier, per 1M tokens in USD
Input priceFree of charge$0.10 (text / image / video)
$0.70 (audio)
Output priceFree of charge$0.40
Context caching priceFree of charge$0.025 / 1,000,000 tokens (text/image/video)
$0.175 / 1,000,000 tokens (audio)
Available April 15, 2025
Context caching (storage)Free of charge, up to 1,000,000 tokens of storage per hour
Available April 15, 2025
$1.00 / 1,000,000 tokens per hour
Available April 15, 2025
Tuning priceNot availableNot available
Grounding with Google SearchFree of charge, up to 500 RPD1,500 RPD (free), then $35 / 1,000 requests
Used to improve our productsYesNo

Steps to Implement

Now we can talk about implementation! The code lives in three main files: main.py (the core logic), config.py (settings) and prompts.py (question formatting). In this post, I'll break it down module by module, explain the implementation, and share the rationale behind my choices—especially why we are using hybrid approach with Elasticsearch.

Step 1: Setting the Stage with config.py

First of all, to make the our core module's configurations manageable, we first create our configuration class. Basically it's where I stash all the knobs and dials—API keys, model names, and connection details:

import ssl
from elasticsearch import Elasticsearch

class CachingConfig:
    # Google API Key
    GEMINI_API_KEY = "AIza-1234567890"
    # Google Gemini Model
    GEMINI_MODEL = "gemini-1.5-flash-002"
    # Cache TTL
    CACHE_TTL = 365  # 365 days
    # Elasticsearch Index Name
    ELASTICSEARCH_INDEX_NAME = "context_cache"
    # Elasticsearch Host
    ELASTICSEARCH_HOST = "http://localhost:9200"
    # Elasticsearch Username
    ELASTICSEARCH_USERNAME = "elastic"
    # Elasticsearch Password
    ELASTICSEARCH_PASSWORD = "changeme"
    # Elasticsearch SSL
    ELASTICSEARCH_SSL = False

    # Elasticsearch Client
    elasticsearch_client = Elasticsearch(
        hosts=ELASTICSEARCH_HOST,
        basic_auth=(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD),
        ssl_context=ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        if ELASTICSEARCH_SSL
        else None,
    )

Gemini API key and model (gemini-1.5-flash-002) are placeholders here—please don't hardcode keys in production; preferably use environment variables or a secrets manager. CACHE_TTL of 365 days is a starting point; it's long enough to be useful but not forever. And then there's Elasticsearch: I'm running it locally for now (http://localhost:9200), with basic auth and optional SSL. The elasticsearch_client is instantiated here so it's reusable across the app.

Why Elasticsearch? I'll get into this more later, but it's all about persistence and searchability. In-memory caches are fast but ephemeral; here in our scenario Elasticsearch lets us store cache mappings across restarts and query them efficiently.

Step 2: Formatting Question with Prompt

Next up is prompts.py, a tiny but critical piece:

def generate_document_question_prompt(question: str) -> str:
    return (
        f"Based on the content of this document, please provide a detailed answer to the following question:\n\n"
        f"{question}\n\n"
    )

This function formats questions for Gemini. It's simple—just prepends some context to the user's question—but it ensures Gemini knows to focus on the document. I kept it separate from main.py because prompt engineering is could be an detailed art. If you want to tweak how questions are phrased later (say, to handle multi-document queries), you can do it here without touching the core logic. Small, focused modules like this keep the system flexible.

Step 3: The Heart of It All

Now we're at the main of the system: main.py. This is where the GeminiCacheStore class lives, orchestrating document processing, caching and Elasticsearch integration. It's a static utility class, which I chose deliberately to keep things stateless and easy to reason about—no instantiation overhead, just pure functionality. It manages two layers of caching:

  • In-Memory Cache (_file_cache): This is a simple dictionary that holds uploaded file objects during a session. It's ephemeral—when the process dies, it's gone. Perfect for quick lookups within a single run, but not enough on its own.
  • Persistent Cache (Elasticsearch): This is where the real magic happens. I store mappings between files, their content hashes, filenames and Gemini cache names in an Elasticsearch index.

The class exposes methods like process_document_with_gemini, list_gemini_caches and get_cache_metadata, but the real work happens in how it orchestrates these two layers.

Processing Documents: The Workflow

Let's break down process_document_with_gemini, the main entry point. You give it a question and a document URL, and it returns a response from Gemini. Here's how it works:

  1. Initialization: It kicks off by loading cache mappings from Elasticsearch via _init_cache_mappings. This ensures we're starting with the latest state—whether it's a fresh run or a continuation from a previous session.
  2. Document Fetching: If the document isn't cached in memory (via a URL-based cache key), it downloads it using httpx. I went with an async client here because blocking I/O is a performance killer, especially with network calls.
  3. Cache Lookup: Before uploading to Gemini, it checks if we've seen this document before. It looks at both the filename (stored in Elasticsearch) and the content hash. If there's a hit, it reuses the existing Gemini cache—skipping the upload entirely.
  4. Uploading and Caching: If it's a new document, it uploads it to Gemini, generates a content hash, and creates a new cache with a TTL (defaulting to 365 days). The mappings—file ID to cache name, filename and hash—are saved back to Elasticsearch.
  5. Response Generation: Finally, it sends the formatted question to Gemini, leveraging the cached context, and returns the result.

This flow is designed to minimize redundant work. If you ask the same question about the same document twice, the second call is lightning-fast—no re-upload, no reprocessing.

Additionally, Elasticsearch brings a few killer features to the table:

  • Scalability: If this system grows to handle thousands of documents or multiple users, Elasticsearch can scale horizontally. A JSON file can't.
  • Searchability: The mappings (file IDs, cache names, filenames, hashes) are stored as structured documents. Need to find all caches for a specific filename? Elasticsearch makes that trivial with a query. Try doing that efficiently with a flat file.
  • Durability: It's a distributed system with replication and fault tolerance built in. If my process persists across sessions, I don't want to lose cache data if a server crashes. Elasticsearch ensures the mappings survive restarts and failures.
  • Flexibility: Adding new metadata (like usage stats or expiration policies) is as simple as updating the index schema. No need to rewrite the storage layer.
"""
This module contains functions to process documents using Gemini services.
It includes a dynamic caching strategy for context caches.
"""

import os
import logging
import asyncio
import hashlib
import urllib.parse

from datetime import datetime
from types import SimpleNamespace  # used for creating a dummy cache object
from typing import Any, Optional, List, Union, Dict

from config import CachingConfig
from prompts import generate_document_question_prompt

logger = logging.getLogger(__name__)

# Set your Google API Key in environment
# WARNING: Don't hardcode API keys in production code.
# Use a proper configuration management system or environment variables.
os.environ["GOOGLE_API_KEY"] = CachingConfig.GEMINI_API_KEY

class GeminiCacheStore:
    # In-memory file cache (ephemeral)
    _file_cache: dict[str, Any] = {}

    # Elasticsearch index for cache mapping
    _cache_mapping_index = CachingConfig.ELASTICSEARCH_INDEX_NAME

    @classmethod
    async def _ensure_elasticsearch_index_exists(cls):
        """Ensure that the Elasticsearch index exists, creating it if needed."""
        try:
            es_client = CachingConfig.elasticsearch_client
            index_exists = await es_client.indices.exists(
                index=cls._cache_mapping_index
            )
            if not index_exists:
                await es_client.indices.create(index=cls._cache_mapping_index)
                logger.info(
                    f"Created Elasticsearch index {cls._cache_mapping_index} for cache mappings"
                )
            return True
        except Exception as e:
            logger.error(f"Error checking/creating Elasticsearch index: {str(e)}")
            return False

    # Initialize cache mappings from Elasticsearch
    @classmethod
    async def _init_cache_mappings(cls):
        """Initialize cache mappings from Elasticsearch."""
        if not hasattr(cls, "_cache_mappings_initialized"):
            cls._file_cache_mapping = {}
            cls._multi_file_cache_mapping = {}
            cls._document_content_hashes = {}
            cls._document_filenames = {}

            # Load from Elasticsearch
            try:
                es_client = CachingConfig.elasticsearch_client

                # Check if the index exists
                index_exists = await cls._ensure_elasticsearch_index_exists()
                if index_exists:
                    # Get the cache mapping document
                    try:
                        response = await es_client.get(
                            index=cls._cache_mapping_index, id="cache_mapping"
                        )

                        if response and response.get("found", False):
                            cache_data = response.get("_source", {})
                            cls._file_cache_mapping = cache_data.get(
                                "file_cache_mapping", {}
                            )
                            cls._multi_file_cache_mapping = cache_data.get(
                                "multi_file_cache_mapping", {}
                            )
                            cls._document_content_hashes = cache_data.get(
                                "document_content_hashes", {}
                            )
                            cls._document_filenames = cache_data.get(
                                "document_filenames", {}
                            )
                            logger.info(
                                f"Loaded cache mappings from Elasticsearch index {cls._cache_mapping_index}"
                            )
                    except Exception as e:
                        logger.warning(
                            f"Error loading cache mappings from Elasticsearch: {str(e)}"
                        )
                        # Initialize with empty mappings if Elasticsearch fails
                        logger.info("Initializing with empty cache mappings")
            except Exception as e:
                logger.error(f"Error connecting to Elasticsearch: {str(e)}")
                # Initialize with empty mappings if Elasticsearch is unavailable
                logger.info(
                    "Initializing with empty cache mappings due to Elasticsearch connection error"
                )

            cls._cache_mappings_initialized = True

    @classmethod
    async def _save_cache_mappings(cls):
        """Save cache mappings to Elasticsearch."""
        cache_data = {
            "file_cache_mapping": cls._file_cache_mapping,
            "multi_file_cache_mapping": cls._multi_file_cache_mapping,
            "document_content_hashes": cls._document_content_hashes,
            "document_filenames": cls._document_filenames,
            "last_updated": datetime.now().isoformat(),
        }

        # Save to Elasticsearch
        try:
            es_client = CachingConfig.elasticsearch_client

            # Ensure the index exists
            if await cls._ensure_elasticsearch_index_exists():
                # Save the cache mapping document
                await es_client.index(
                    index=cls._cache_mapping_index,
                    id="cache_mapping",
                    document=cache_data,
                    refresh=True,
                )
                logger.info(
                    f"Saved cache mappings to Elasticsearch index {cls._cache_mapping_index}"
                )
        except Exception as e:
            logger.error(f"Error saving cache mappings to Elasticsearch: {str(e)}")
            logger.warning("Cache mappings could not be saved. Data may be lost.")

    @staticmethod
    def normalize_url(url: str) -> str:
        """Normalize URL for consistent cache key generation.

        Args:
            url (str): The URL to normalize.

        Returns:
            str: The normalized URL.
        """
        parsed = urllib.parse.urlparse(url)
        path = parsed.path.rstrip("/")
        normalized = urllib.parse.urlunparse(
            (
                parsed.scheme,
                parsed.netloc,
                path,
                "",  # params
                "",  # query
                "",  # fragment
            )
        )
        return normalized

    @staticmethod
    def _generate_url_cache_key(url: str) -> str:
        """Generate a cache key for a URL-based document.

        Args:
            url (str): The URL of the document.

        Returns:
            str: The cache key.
        """
        normalized = GeminiCacheStore.normalize_url(url)
        return hashlib.md5(normalized.encode()).hexdigest()
  @staticmethod
  async def list_gemini_caches() -> Union[List[Any], str]:
      """List all caches currently stored in Gemini.

      Returns:
          Union[List[Any], str]: List of cache objects or an error string.
      """
      from google import genai

      try:
          logger.info("Initializing Gemini client for listing caches")
          gemini_client = genai.Client()
          logger.info("Retrieving cache list")

          caches = gemini_client.caches.list()
          logger.info("Found %d caches", len(caches))

          # Initialize cache mappings from Elasticsearch
          await GeminiCacheStore._init_cache_mappings()

          return caches

      except Exception as e:
          logger.error("Error listing caches: %s", str(e), exc_info=True)
          return f"Error listing caches: {str(e)}"

    @staticmethod
    async def get_cache_metadata() -> Union[List[Dict[str, Any]], str]:
        """Get metadata for all caches currently stored in Gemini and Elasticsearch.

        Returns:
            Union[List[Dict[str, Any]], str]: List of cache metadata dictionaries or an error string.
        """
        try:
            caches = await GeminiCacheStore.list_gemini_caches()
            if isinstance(caches, str):  # Error occurred
                return caches

            # Initialize cache mappings from Elasticsearch
            await GeminiCacheStore._init_cache_mappings()

            cache_metadata = []
            for cache in caches:
                metadata = {
                    "name": cache.name,
                    "model": cache.model,
                    "display_name": cache.display_name,
                    "create_time": cache.create_time,
                    "update_time": cache.update_time,
                    "expire_time": cache.expire_time,
                }

                # Add usage metadata if available
                if hasattr(cache, "usage_metadata"):
                    metadata["usage_metadata"] = cache.usage_metadata

                # Add file information from our mappings if available
                file_info = {}

                # Check if this cache is in our file mapping
                for (
                    file_key,
                    cache_name,
                ) in GeminiCacheStore._file_cache_mapping.items():
                    if cache_name == cache.name:
                        file_info["file_key"] = file_key
                        if file_key in GeminiCacheStore._document_filenames:
                            file_info["filename"] = (
                                GeminiCacheStore._document_filenames[file_key]
                            )
                        if file_key in GeminiCacheStore._document_content_hashes:
                            file_info["content_hash"] = (
                                GeminiCacheStore._document_content_hashes[file_key]
                            )
                        break

                # Check if this cache is in our multi-file mapping
                for (
                    multi_key,
                    cache_name,
                ) in GeminiCacheStore._multi_file_cache_mapping.items():
                    if cache_name == cache.name and not multi_key.startswith(
                        "filenames:"
                    ):
                        file_info["multi_file_key"] = multi_key
                        break

                if file_info:
                    metadata["file_info"] = file_info

                cache_metadata.append(metadata)

            # Get elasticsearch info separately
            elasticsearch_info = None
            try:
                es_client = CachingConfig.elasticsearch_client
                if await es_client.indices.exists(
                    index=GeminiCacheStore._cache_mapping_index
                ):
                    # Get the cache mapping document
                    response = await es_client.get(
                        index=GeminiCacheStore._cache_mapping_index,
                        id="cache_mapping",
                        ignore=[404],  # Ignore if document doesn't exist
                    )

                    if response and response.get("found", False):
                        elasticsearch_info = {
                            "index": GeminiCacheStore._cache_mapping_index,
                            "last_updated": response.get("_source", {}).get(
                                "last_updated", "unknown"
                            ),
                        }
            except Exception as e:
                logger.warning(f"Error getting Elasticsearch metadata: {str(e)}")

            # Add Elasticsearch info to the response
            if elasticsearch_info:
                return {
                    "caches": cache_metadata,
                    "elasticsearch_info": elasticsearch_info,
                }

            return cache_metadata

        except Exception as e:
            logger.error("Error getting cache metadata: %s", str(e), exc_info=True)
            return f"Error getting cache metadata: {str(e)}"

    @staticmethod
    async def check_cache_exists(cache_name: str) -> Union[bool, str]:
        """Check if a specific cache exists in Gemini and our Elasticsearch mappings.

        Args:
            cache_name (str): The name of the cache to check.

        Returns:
            Union[bool, str]: True if the cache exists, False if not, or an error string.
        """
        from google import genai

        try:
            logger.info("Checking if cache exists: %s", cache_name)

            # Initialize cache mappings from Elasticsearch
            await GeminiCacheStore._init_cache_mappings()

            # Check if the cache exists in our mappings
            in_file_mapping = (
                cache_name in GeminiCacheStore._file_cache_mapping.values()
            )
            in_multi_file_mapping = (
                cache_name in GeminiCacheStore._multi_file_cache_mapping.values()
            )

            if in_file_mapping or in_multi_file_mapping:
                logger.info("Cache found in local mappings: %s", cache_name)
                return True

            # If not in our mappings, check Gemini directly
            gemini_client = genai.Client()

            try:
                # Attempt to get the cache by name
                cache = gemini_client.caches.get(name=cache_name)
                logger.info("Cache found in Gemini: %s", cache_name)
                return True
            except Exception as e:
                if "NOT_FOUND" in str(e):
                    logger.info("Cache not found: %s", cache_name)
                    return False
                else:
                    # Re-raise if it's a different error
                    raise

        except Exception as e:
            logger.error("Error checking cache existence: %s", str(e), exc_info=True)
            return f"Error checking cache existence: {str(e)}"
  	@staticmethod
    async def process_document_with_gemini(
        question: str,
        document_link: Optional[Any] = None,
    ) -> str:
        """Process a document from a URL using Gemini services with caching.

        Args:
            question (str): The question or intention.
            document_link (Optional[Any]): The URL to the document.

        Returns:
            str: The resulting text response from Gemini.
        """
        # Initialize cache mappings from Elasticsearch
        await GeminiCacheStore._init_cache_mappings()

        import io
        import httpx
        import urllib.parse
        import datetime
        from google import genai
        from google.genai import types

        logger.info("Initializing Gemini client")
        gemini_client = genai.Client()

        try:
            logger.info(
                f"Starting document processing with params: document_link={document_link}"
            )

            if not document_link:
                logger.info("No document link provided, skipping document processing")
                return "No document link provided, skipping document processing"

            # Extract the document filename from the URL
            parsed_url = urllib.parse.urlparse(document_link)
            document_filename = urllib.parse.unquote(parsed_url.path.split("/")[-1])
            logger.info(f"Document filename: {document_filename}")

            # Format the question for Gemini
            formatted_question = generate_document_question_prompt(question)

            # Check if we already have a cache for this filename
            existing_cache = None
            for file_id, cache_name in GeminiCacheStore._file_cache_mapping.items():
                if (
                    file_id in GeminiCacheStore._document_filenames
                    and GeminiCacheStore._document_filenames[file_id]
                    == document_filename
                ):
                    # Verify the cache still exists
                    cache_exists = await GeminiCacheStore.check_cache_exists(cache_name)
                    if isinstance(cache_exists, bool) and cache_exists:
                        logger.info(
                            f"Found existing cache for filename {document_filename}: {cache_name}"
                        )
                        existing_cache = cache_name
                        break

            # If we found an existing cache, we can skip downloading and uploading the file
            if existing_cache:
                logger.info(f"Using existing cache for this filename: {existing_cache}")

                # Use a dummy uploaded_file object with just the name property
                # This avoids uploading the file again
                uploaded_file = SimpleNamespace(name="dummy_file_id")

                # Use the existing cache directly
                response = await asyncio.to_thread(
                    gemini_client.models.generate_content,
                    model=CachingConfig.GEMINI_MODEL,
                    contents=formatted_question,
                    config=types.GenerateContentConfig(cached_content=existing_cache),
                )

                logger.info(
                    "Successfully received response from Gemini using existing cache (response length: %d characters)",
                    len(response.text),
                )
                return response.text

            # Continue with normal processing if no existing cache was found
            cache_key = GeminiCacheStore._generate_url_cache_key(document_link)
            if cache_key in GeminiCacheStore._file_cache:
                logger.info("Using cached document file for URL: %s", document_link)
                uploaded_file = GeminiCacheStore._file_cache[cache_key]
            else:
                logger.info("Fetching document from URL: %s", document_link)
                # Downloading the document via httpx AsyncClient
                async with httpx.AsyncClient() as http_client:
                    response = await http_client.get(document_link)
                    # Create an in-memory byte-stream using the downloaded content
                    doc_io = io.BytesIO(response.content)
                    # Also create a copy for hashing
                    content_for_hash = response.content

                logger.info("Downloaded %d bytes from URL", len(response.content))
                uploaded_file = gemini_client.files.upload(
                    file=doc_io, config=dict(mime_type="application/pdf")
                )
                GeminiCacheStore._file_cache[cache_key] = uploaded_file
                logger.info("Document uploaded and cached with key: %s", cache_key)

                # Generate a deterministic cache name based on document content
                document_content_hash = hashlib.md5(content_for_hash).hexdigest()
                # Store the content hash for later use
                GeminiCacheStore._document_content_hashes[uploaded_file.name] = (
                    document_content_hash
                )

                # Store the document filename for later use
                GeminiCacheStore._document_filenames[uploaded_file.name] = (
                    document_filename
                )

                # Save the updated mappings to Elasticsearch
                await GeminiCacheStore._save_cache_mappings()

            # Check if we already have a cache for this file
            cache_name = None
            if uploaded_file.name in GeminiCacheStore._file_cache_mapping:
                cache_name = GeminiCacheStore._file_cache_mapping[uploaded_file.name]
                # Verify the cache still exists
                cache_exists = await GeminiCacheStore.check_cache_exists(cache_name)
                if isinstance(cache_exists, bool) and cache_exists:
                    logger.info(
                        "Using existing cached context from mapping: %s",
                        cache_name,
                    )
                else:
                    # Cache doesn't exist anymore, need to create a new one
                    cache_name = None
                    # Remove the stale mapping
                    del GeminiCacheStore._file_cache_mapping[uploaded_file.name]

            # Get the document filename for metadata
            document_filename_for_metadata = document_filename
            # If we don't have a valid cache yet, 
            # try to find one or create a new one
            if not cache_name:
                # Get all caches
                caches = await GeminiCacheStore.list_gemini_caches()
                if isinstance(caches, str):  # Error occurred
                    logger.warning("Error listing caches: %s", caches)
                    # Continue with creating a new cache
                else:
                    logger.info("Found %d caches to check", len(caches))
                    # Search for existing caches that might match our content
                    for cache in caches:
                        try:
                            # Check if this cache contains our file or has our document filename in the display name
                            if (
                                document_filename_for_metadata
                                and cache.display_name
                                and document_filename_for_metadata in cache.display_name
                            ):
                                logger.info(
                                    "Found cache with matching document filename in display name: %s",
                                    cache.display_name,
                                )

                                # Get cache details to verify it contains our file
                                cache_detail = await asyncio.to_thread(
                                    gemini_client.caches.get, name=cache.name
                                )

                                if (
                                    hasattr(cache_detail, "file_ids")
                                    and cache_detail.file_ids
                                ):
                                    logger.info(
                                        "Cache %s contains files: %s",
                                        cache.name,
                                        cache_detail.file_ids,
                                    )

                                    # If this cache contains only one file, use it
                                    if len(cache_detail.file_ids) == 1:
                                        cache_name = cache.name
                                        # Store in our mapping for future use
                                        GeminiCacheStore._file_cache_mapping[
                                            uploaded_file.name
                                        ] = cache_name
                                        logger.info(
                                            "Found existing cache for this document: %s",
                                            cache.name,
                                        )
                                        break

                                    # If this cache contains our specific file, use it
                                    if uploaded_file.name in cache_detail.file_ids:
                                        cache_name = cache.name
                                        # Store in our mapping for future use
                                        GeminiCacheStore._file_cache_mapping[
                                            uploaded_file.name
                                        ] = cache_name
                                        logger.info(
                                            "Found existing cache containing our file: %s",
                                            cache.name,
                                        )
                                        break
                        except Exception as e:
                            logger.warning(
                                "Error checking cache %s: %s",
                                cache.name,
                                str(e),
                            )
                            continue

            if not cache_name:
                # Create a display name that includes the document filename
                sanitized_filename = document_filename_for_metadata.replace(" ", "_")[
                    :50
                ]  # Limit length and replace spaces
                display_name = f"document-{sanitized_filename}"

                logger.info(
                    "Creating new context cache for document with display name: %s",
                    display_name,
                )

                # Create the cache with our deterministic display name
                cache = gemini_client.caches.create(
                    model=CachingConfig.GEMINI_MODEL,
                    config=types.CreateCachedContentConfig(
                        contents=[uploaded_file], display_name=display_name
                    ),
                )

                ttl = f"{datetime.timedelta(days=CachingConfig.CACHE_TTL).total_seconds()}s"
                gemini_client.caches.update(
                    name=cache.name, config=types.UpdateCachedContentConfig(ttl=ttl)
                )

                cache_name = cache.name
                # Store in our mapping for future use
                GeminiCacheStore._file_cache_mapping[uploaded_file.name] = cache_name
                # Save the updated mapping to Elasticsearch
                await GeminiCacheStore._save_cache_mappings()
                logger.info(
                    "Created new cache: %s with display name: %s and 365-day TTL",
                    cache_name,
                    display_name,
                )
            else:
                logger.info("Using existing cached context: %s", cache_name)

            # Offload the blocking remote call to a separate thread
            response = await asyncio.to_thread(
                gemini_client.models.generate_content,
                model=CachingConfig.GEMINI_MODEL,
                contents=formatted_question,
                config=types.GenerateContentConfig(cached_content=cache_name),
            )

            logger.info(
                "Successfully received response from Gemini (response length: %d characters)",
                len(response.text),
            )
            return response.text

        except Exception as e:
            logger.error(
                "Error in process_document_with_gemini: %s", str(e), exc_info=True
            )
            return f"Error processing document: {str(e)}"

Utility Methods

  	@staticmethod
    async def list_gemini_files() -> Union[List[str], str]:
        """List all files currently stored in Gemini.

        Returns:
            Union[List[str], str]: List of file names or an error string.
        """
        from google import genai

        try:
            # Initialize cache mappings from Elasticsearch
            await GeminiCacheStore._init_cache_mappings()

            logger.info("Initializing Gemini client for listing files")
            gemini_client = genai.Client()
            logger.info("Retrieving file list")
            files = gemini_client.files.list()
            file_list = [file.name for file in files]

            # Add file information from our mappings
            file_info = []
            for file_name in file_list:
                info = {"name": file_name}

                # Add filename if available
                if file_name in GeminiCacheStore._document_filenames:
                    info["filename"] = GeminiCacheStore._document_filenames[file_name]

                # Add content hash if available
                if file_name in GeminiCacheStore._document_content_hashes:
                    info["content_hash"] = GeminiCacheStore._document_content_hashes[
                        file_name
                    ]

                file_info.append(info)

            logger.info("Found %d files", len(file_list))
            return file_info

        except Exception as e:
            logger.error(
                "Error listing files: %s",
                str(e),
                exc_info=True,
            )
            return f"Error listing files: {str(e)}"

    @staticmethod
    async def clear_cache() -> str:
        """Clear the file cache and optionally the persistent cache mappings.

        Returns:
            str: A success message.
        """
        GeminiCacheStore._file_cache.clear()

        # Clear the persistent mappings if requested
        if hasattr(GeminiCacheStore, "_file_cache_mapping"):
            GeminiCacheStore._file_cache_mapping.clear()
        if hasattr(GeminiCacheStore, "_multi_file_cache_mapping"):
            GeminiCacheStore._multi_file_cache_mapping.clear()
        if hasattr(GeminiCacheStore, "_document_content_hashes"):
            GeminiCacheStore._document_content_hashes.clear()
        if hasattr(GeminiCacheStore, "_document_filenames"):
            GeminiCacheStore._document_filenames.clear()

        # Save the cleared mappings
        await GeminiCacheStore._save_cache_mappings()

        # Also delete the cache mapping document from Elasticsearch
        try:
            es_client = CachingConfig.elasticsearch_client

            # Verify index exists before attempting to delete
            index_exists = await GeminiCacheStore._ensure_elasticsearch_index_exists()
            if index_exists:
                # Delete the cache mapping document
                await es_client.delete(
                    index=GeminiCacheStore._cache_mapping_index,
                    id="cache_mapping",
                    ignore=[404],  # Ignore if document doesn't exist
                )
                logger.info(
                    f"Deleted cache mapping document from Elasticsearch index {GeminiCacheStore._cache_mapping_index}"
                )
        except Exception as e:
            logger.error(f"Error deleting cache mapping from Elasticsearch: {str(e)}")

        logger.info("Document processing file cache and persistent mappings cleared")
        return "File cache and persistent mappings cleared successfully"