Processing documents—especially large ones—via an API like Gemini's can get expensive fast. Every call involves uploading files, generating responses and burning too much token. If you're dealing with the same documents repeatedly (say, a user asking multiple questions about a PDF), reprocessing it from scratch each time is wasteful. So, I set out to create a small service that's smart about caching: it remembers what it's seen, reuses it when possible and scales gracefully.
I wanted a caching system that was dynamic—capable of handling both ephemeral in-memory needs and persistent storage across sessions. It had to be smart enough to recognize when a document's already been processed, flexible enough to work with URLs and robust enough to recover from failures. This led me to a hybrid approach: an in-memory cache for speed, paired with Elasticsearch for persistence and metadata management.
If you want to use such an infrastructure, you will basically need two things:
- Gemini API Key
- Elasticsearch Deployment
You can check how to easily get API Key from here:
Get a Gemini API key | Google AI for Developers
For running Elasticsearch in your local environment for development purposes:
Install Elasticsearch with Docker | Elasticsearch Guide [8.17] | Elastic
If we set these two prerequisites, we can implement the following implementation, but I would like to mention another important point. When we store the caches on Gemini's own infrastructure, you will be subject to the following charges if you have a billing account on GCP, if you exceed the limits:
Free Tier | Paid Tier, per 1M tokens in USD | |
---|---|---|
Input price | Free of charge | $0.10 (text / image / video) $0.70 (audio) |
Output price | Free of charge | $0.40 |
Context caching price | Free of charge | $0.025 / 1,000,000 tokens (text/image/video) $0.175 / 1,000,000 tokens (audio) Available April 15, 2025 |
Context caching (storage) | Free of charge, up to 1,000,000 tokens of storage per hour Available April 15, 2025 | $1.00 / 1,000,000 tokens per hour Available April 15, 2025 |
Tuning price | Not available | Not available |
Grounding with Google Search | Free of charge, up to 500 RPD | 1,500 RPD (free), then $35 / 1,000 requests |
Used to improve our products | Yes | No |
Steps to Implement
Now we can talk about implementation! The code lives in three main files: main.py (the core logic), config.py (settings) and prompts.py (question formatting). In this post, I'll break it down module by module, explain the implementation, and share the rationale behind my choices—especially why we are using hybrid approach with Elasticsearch.
Step 1: Setting the Stage with config.py
First of all, to make the our core module's configurations manageable, we first create our configuration class. Basically it's where I stash all the knobs and dials—API keys, model names, and connection details:
import ssl
from elasticsearch import Elasticsearch
class CachingConfig:
# Google API Key
GEMINI_API_KEY = "AIza-1234567890"
# Google Gemini Model
GEMINI_MODEL = "gemini-1.5-flash-002"
# Cache TTL
CACHE_TTL = 365 # 365 days
# Elasticsearch Index Name
ELASTICSEARCH_INDEX_NAME = "context_cache"
# Elasticsearch Host
ELASTICSEARCH_HOST = "http://localhost:9200"
# Elasticsearch Username
ELASTICSEARCH_USERNAME = "elastic"
# Elasticsearch Password
ELASTICSEARCH_PASSWORD = "changeme"
# Elasticsearch SSL
ELASTICSEARCH_SSL = False
# Elasticsearch Client
elasticsearch_client = Elasticsearch(
hosts=ELASTICSEARCH_HOST,
basic_auth=(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD),
ssl_context=ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
if ELASTICSEARCH_SSL
else None,
)
Gemini API key and model (gemini-1.5-flash-002) are placeholders here—please don't hardcode keys in production; preferably use environment variables or a secrets manager. CACHE_TTL of 365 days is a starting point; it's long enough to be useful but not forever. And then there's Elasticsearch: I'm running it locally for now (http://localhost:9200), with basic auth and optional SSL. The elasticsearch_client is instantiated here so it's reusable across the app.
Why Elasticsearch? I'll get into this more later, but it's all about persistence and searchability. In-memory caches are fast but ephemeral; here in our scenario Elasticsearch lets us store cache mappings across restarts and query them efficiently.
Step 2: Formatting Question with Prompt
Next up is prompts.py, a tiny but critical piece:
def generate_document_question_prompt(question: str) -> str:
return (
f"Based on the content of this document, please provide a detailed answer to the following question:\n\n"
f"{question}\n\n"
)
This function formats questions for Gemini. It's simple—just prepends some context to the user's question—but it ensures Gemini knows to focus on the document. I kept it separate from main.py because prompt engineering is could be an detailed art. If you want to tweak how questions are phrased later (say, to handle multi-document queries), you can do it here without touching the core logic. Small, focused modules like this keep the system flexible.
Step 3: The Heart of It All
Now we're at the main of the system: main.py. This is where the GeminiCacheStore class lives, orchestrating document processing, caching and Elasticsearch integration. It's a static utility class, which I chose deliberately to keep things stateless and easy to reason about—no instantiation overhead, just pure functionality. It manages two layers of caching:
- In-Memory Cache (_file_cache): This is a simple dictionary that holds uploaded file objects during a session. It's ephemeral—when the process dies, it's gone. Perfect for quick lookups within a single run, but not enough on its own.
- Persistent Cache (Elasticsearch): This is where the real magic happens. I store mappings between files, their content hashes, filenames and Gemini cache names in an Elasticsearch index.
The class exposes methods like process_document_with_gemini, list_gemini_caches and get_cache_metadata, but the real work happens in how it orchestrates these two layers.
Processing Documents: The Workflow
Let's break down process_document_with_gemini, the main entry point. You give it a question and a document URL, and it returns a response from Gemini. Here's how it works:
- Initialization: It kicks off by loading cache mappings from Elasticsearch via _init_cache_mappings. This ensures we're starting with the latest state—whether it's a fresh run or a continuation from a previous session.
- Document Fetching: If the document isn't cached in memory (via a URL-based cache key), it downloads it using httpx. I went with an async client here because blocking I/O is a performance killer, especially with network calls.
- Cache Lookup: Before uploading to Gemini, it checks if we've seen this document before. It looks at both the filename (stored in Elasticsearch) and the content hash. If there's a hit, it reuses the existing Gemini cache—skipping the upload entirely.
- Uploading and Caching: If it's a new document, it uploads it to Gemini, generates a content hash, and creates a new cache with a TTL (defaulting to 365 days). The mappings—file ID to cache name, filename and hash—are saved back to Elasticsearch.
- Response Generation: Finally, it sends the formatted question to Gemini, leveraging the cached context, and returns the result.
This flow is designed to minimize redundant work. If you ask the same question about the same document twice, the second call is lightning-fast—no re-upload, no reprocessing.
Additionally, Elasticsearch brings a few killer features to the table:
- Scalability: If this system grows to handle thousands of documents or multiple users, Elasticsearch can scale horizontally. A JSON file can't.
- Searchability: The mappings (file IDs, cache names, filenames, hashes) are stored as structured documents. Need to find all caches for a specific filename? Elasticsearch makes that trivial with a query. Try doing that efficiently with a flat file.
- Durability: It's a distributed system with replication and fault tolerance built in. If my process persists across sessions, I don't want to lose cache data if a server crashes. Elasticsearch ensures the mappings survive restarts and failures.
- Flexibility: Adding new metadata (like usage stats or expiration policies) is as simple as updating the index schema. No need to rewrite the storage layer.
"""
This module contains functions to process documents using Gemini services.
It includes a dynamic caching strategy for context caches.
"""
import os
import logging
import asyncio
import hashlib
import urllib.parse
from datetime import datetime
from types import SimpleNamespace # used for creating a dummy cache object
from typing import Any, Optional, List, Union, Dict
from config import CachingConfig
from prompts import generate_document_question_prompt
logger = logging.getLogger(__name__)
# Set your Google API Key in environment
# WARNING: Don't hardcode API keys in production code.
# Use a proper configuration management system or environment variables.
os.environ["GOOGLE_API_KEY"] = CachingConfig.GEMINI_API_KEY
class GeminiCacheStore:
# In-memory file cache (ephemeral)
_file_cache: dict[str, Any] = {}
# Elasticsearch index for cache mapping
_cache_mapping_index = CachingConfig.ELASTICSEARCH_INDEX_NAME
@classmethod
async def _ensure_elasticsearch_index_exists(cls):
"""Ensure that the Elasticsearch index exists, creating it if needed."""
try:
es_client = CachingConfig.elasticsearch_client
index_exists = await es_client.indices.exists(
index=cls._cache_mapping_index
)
if not index_exists:
await es_client.indices.create(index=cls._cache_mapping_index)
logger.info(
f"Created Elasticsearch index {cls._cache_mapping_index} for cache mappings"
)
return True
except Exception as e:
logger.error(f"Error checking/creating Elasticsearch index: {str(e)}")
return False
# Initialize cache mappings from Elasticsearch
@classmethod
async def _init_cache_mappings(cls):
"""Initialize cache mappings from Elasticsearch."""
if not hasattr(cls, "_cache_mappings_initialized"):
cls._file_cache_mapping = {}
cls._multi_file_cache_mapping = {}
cls._document_content_hashes = {}
cls._document_filenames = {}
# Load from Elasticsearch
try:
es_client = CachingConfig.elasticsearch_client
# Check if the index exists
index_exists = await cls._ensure_elasticsearch_index_exists()
if index_exists:
# Get the cache mapping document
try:
response = await es_client.get(
index=cls._cache_mapping_index, id="cache_mapping"
)
if response and response.get("found", False):
cache_data = response.get("_source", {})
cls._file_cache_mapping = cache_data.get(
"file_cache_mapping", {}
)
cls._multi_file_cache_mapping = cache_data.get(
"multi_file_cache_mapping", {}
)
cls._document_content_hashes = cache_data.get(
"document_content_hashes", {}
)
cls._document_filenames = cache_data.get(
"document_filenames", {}
)
logger.info(
f"Loaded cache mappings from Elasticsearch index {cls._cache_mapping_index}"
)
except Exception as e:
logger.warning(
f"Error loading cache mappings from Elasticsearch: {str(e)}"
)
# Initialize with empty mappings if Elasticsearch fails
logger.info("Initializing with empty cache mappings")
except Exception as e:
logger.error(f"Error connecting to Elasticsearch: {str(e)}")
# Initialize with empty mappings if Elasticsearch is unavailable
logger.info(
"Initializing with empty cache mappings due to Elasticsearch connection error"
)
cls._cache_mappings_initialized = True
@classmethod
async def _save_cache_mappings(cls):
"""Save cache mappings to Elasticsearch."""
cache_data = {
"file_cache_mapping": cls._file_cache_mapping,
"multi_file_cache_mapping": cls._multi_file_cache_mapping,
"document_content_hashes": cls._document_content_hashes,
"document_filenames": cls._document_filenames,
"last_updated": datetime.now().isoformat(),
}
# Save to Elasticsearch
try:
es_client = CachingConfig.elasticsearch_client
# Ensure the index exists
if await cls._ensure_elasticsearch_index_exists():
# Save the cache mapping document
await es_client.index(
index=cls._cache_mapping_index,
id="cache_mapping",
document=cache_data,
refresh=True,
)
logger.info(
f"Saved cache mappings to Elasticsearch index {cls._cache_mapping_index}"
)
except Exception as e:
logger.error(f"Error saving cache mappings to Elasticsearch: {str(e)}")
logger.warning("Cache mappings could not be saved. Data may be lost.")
@staticmethod
def normalize_url(url: str) -> str:
"""Normalize URL for consistent cache key generation.
Args:
url (str): The URL to normalize.
Returns:
str: The normalized URL.
"""
parsed = urllib.parse.urlparse(url)
path = parsed.path.rstrip("/")
normalized = urllib.parse.urlunparse(
(
parsed.scheme,
parsed.netloc,
path,
"", # params
"", # query
"", # fragment
)
)
return normalized
@staticmethod
def _generate_url_cache_key(url: str) -> str:
"""Generate a cache key for a URL-based document.
Args:
url (str): The URL of the document.
Returns:
str: The cache key.
"""
normalized = GeminiCacheStore.normalize_url(url)
return hashlib.md5(normalized.encode()).hexdigest()
@staticmethod
async def list_gemini_caches() -> Union[List[Any], str]:
"""List all caches currently stored in Gemini.
Returns:
Union[List[Any], str]: List of cache objects or an error string.
"""
from google import genai
try:
logger.info("Initializing Gemini client for listing caches")
gemini_client = genai.Client()
logger.info("Retrieving cache list")
caches = gemini_client.caches.list()
logger.info("Found %d caches", len(caches))
# Initialize cache mappings from Elasticsearch
await GeminiCacheStore._init_cache_mappings()
return caches
except Exception as e:
logger.error("Error listing caches: %s", str(e), exc_info=True)
return f"Error listing caches: {str(e)}"
@staticmethod
async def get_cache_metadata() -> Union[List[Dict[str, Any]], str]:
"""Get metadata for all caches currently stored in Gemini and Elasticsearch.
Returns:
Union[List[Dict[str, Any]], str]: List of cache metadata dictionaries or an error string.
"""
try:
caches = await GeminiCacheStore.list_gemini_caches()
if isinstance(caches, str): # Error occurred
return caches
# Initialize cache mappings from Elasticsearch
await GeminiCacheStore._init_cache_mappings()
cache_metadata = []
for cache in caches:
metadata = {
"name": cache.name,
"model": cache.model,
"display_name": cache.display_name,
"create_time": cache.create_time,
"update_time": cache.update_time,
"expire_time": cache.expire_time,
}
# Add usage metadata if available
if hasattr(cache, "usage_metadata"):
metadata["usage_metadata"] = cache.usage_metadata
# Add file information from our mappings if available
file_info = {}
# Check if this cache is in our file mapping
for (
file_key,
cache_name,
) in GeminiCacheStore._file_cache_mapping.items():
if cache_name == cache.name:
file_info["file_key"] = file_key
if file_key in GeminiCacheStore._document_filenames:
file_info["filename"] = (
GeminiCacheStore._document_filenames[file_key]
)
if file_key in GeminiCacheStore._document_content_hashes:
file_info["content_hash"] = (
GeminiCacheStore._document_content_hashes[file_key]
)
break
# Check if this cache is in our multi-file mapping
for (
multi_key,
cache_name,
) in GeminiCacheStore._multi_file_cache_mapping.items():
if cache_name == cache.name and not multi_key.startswith(
"filenames:"
):
file_info["multi_file_key"] = multi_key
break
if file_info:
metadata["file_info"] = file_info
cache_metadata.append(metadata)
# Get elasticsearch info separately
elasticsearch_info = None
try:
es_client = CachingConfig.elasticsearch_client
if await es_client.indices.exists(
index=GeminiCacheStore._cache_mapping_index
):
# Get the cache mapping document
response = await es_client.get(
index=GeminiCacheStore._cache_mapping_index,
id="cache_mapping",
ignore=[404], # Ignore if document doesn't exist
)
if response and response.get("found", False):
elasticsearch_info = {
"index": GeminiCacheStore._cache_mapping_index,
"last_updated": response.get("_source", {}).get(
"last_updated", "unknown"
),
}
except Exception as e:
logger.warning(f"Error getting Elasticsearch metadata: {str(e)}")
# Add Elasticsearch info to the response
if elasticsearch_info:
return {
"caches": cache_metadata,
"elasticsearch_info": elasticsearch_info,
}
return cache_metadata
except Exception as e:
logger.error("Error getting cache metadata: %s", str(e), exc_info=True)
return f"Error getting cache metadata: {str(e)}"
@staticmethod
async def check_cache_exists(cache_name: str) -> Union[bool, str]:
"""Check if a specific cache exists in Gemini and our Elasticsearch mappings.
Args:
cache_name (str): The name of the cache to check.
Returns:
Union[bool, str]: True if the cache exists, False if not, or an error string.
"""
from google import genai
try:
logger.info("Checking if cache exists: %s", cache_name)
# Initialize cache mappings from Elasticsearch
await GeminiCacheStore._init_cache_mappings()
# Check if the cache exists in our mappings
in_file_mapping = (
cache_name in GeminiCacheStore._file_cache_mapping.values()
)
in_multi_file_mapping = (
cache_name in GeminiCacheStore._multi_file_cache_mapping.values()
)
if in_file_mapping or in_multi_file_mapping:
logger.info("Cache found in local mappings: %s", cache_name)
return True
# If not in our mappings, check Gemini directly
gemini_client = genai.Client()
try:
# Attempt to get the cache by name
cache = gemini_client.caches.get(name=cache_name)
logger.info("Cache found in Gemini: %s", cache_name)
return True
except Exception as e:
if "NOT_FOUND" in str(e):
logger.info("Cache not found: %s", cache_name)
return False
else:
# Re-raise if it's a different error
raise
except Exception as e:
logger.error("Error checking cache existence: %s", str(e), exc_info=True)
return f"Error checking cache existence: {str(e)}"
@staticmethod
async def process_document_with_gemini(
question: str,
document_link: Optional[Any] = None,
) -> str:
"""Process a document from a URL using Gemini services with caching.
Args:
question (str): The question or intention.
document_link (Optional[Any]): The URL to the document.
Returns:
str: The resulting text response from Gemini.
"""
# Initialize cache mappings from Elasticsearch
await GeminiCacheStore._init_cache_mappings()
import io
import httpx
import urllib.parse
import datetime
from google import genai
from google.genai import types
logger.info("Initializing Gemini client")
gemini_client = genai.Client()
try:
logger.info(
f"Starting document processing with params: document_link={document_link}"
)
if not document_link:
logger.info("No document link provided, skipping document processing")
return "No document link provided, skipping document processing"
# Extract the document filename from the URL
parsed_url = urllib.parse.urlparse(document_link)
document_filename = urllib.parse.unquote(parsed_url.path.split("/")[-1])
logger.info(f"Document filename: {document_filename}")
# Format the question for Gemini
formatted_question = generate_document_question_prompt(question)
# Check if we already have a cache for this filename
existing_cache = None
for file_id, cache_name in GeminiCacheStore._file_cache_mapping.items():
if (
file_id in GeminiCacheStore._document_filenames
and GeminiCacheStore._document_filenames[file_id]
== document_filename
):
# Verify the cache still exists
cache_exists = await GeminiCacheStore.check_cache_exists(cache_name)
if isinstance(cache_exists, bool) and cache_exists:
logger.info(
f"Found existing cache for filename {document_filename}: {cache_name}"
)
existing_cache = cache_name
break
# If we found an existing cache, we can skip downloading and uploading the file
if existing_cache:
logger.info(f"Using existing cache for this filename: {existing_cache}")
# Use a dummy uploaded_file object with just the name property
# This avoids uploading the file again
uploaded_file = SimpleNamespace(name="dummy_file_id")
# Use the existing cache directly
response = await asyncio.to_thread(
gemini_client.models.generate_content,
model=CachingConfig.GEMINI_MODEL,
contents=formatted_question,
config=types.GenerateContentConfig(cached_content=existing_cache),
)
logger.info(
"Successfully received response from Gemini using existing cache (response length: %d characters)",
len(response.text),
)
return response.text
# Continue with normal processing if no existing cache was found
cache_key = GeminiCacheStore._generate_url_cache_key(document_link)
if cache_key in GeminiCacheStore._file_cache:
logger.info("Using cached document file for URL: %s", document_link)
uploaded_file = GeminiCacheStore._file_cache[cache_key]
else:
logger.info("Fetching document from URL: %s", document_link)
# Downloading the document via httpx AsyncClient
async with httpx.AsyncClient() as http_client:
response = await http_client.get(document_link)
# Create an in-memory byte-stream using the downloaded content
doc_io = io.BytesIO(response.content)
# Also create a copy for hashing
content_for_hash = response.content
logger.info("Downloaded %d bytes from URL", len(response.content))
uploaded_file = gemini_client.files.upload(
file=doc_io, config=dict(mime_type="application/pdf")
)
GeminiCacheStore._file_cache[cache_key] = uploaded_file
logger.info("Document uploaded and cached with key: %s", cache_key)
# Generate a deterministic cache name based on document content
document_content_hash = hashlib.md5(content_for_hash).hexdigest()
# Store the content hash for later use
GeminiCacheStore._document_content_hashes[uploaded_file.name] = (
document_content_hash
)
# Store the document filename for later use
GeminiCacheStore._document_filenames[uploaded_file.name] = (
document_filename
)
# Save the updated mappings to Elasticsearch
await GeminiCacheStore._save_cache_mappings()
# Check if we already have a cache for this file
cache_name = None
if uploaded_file.name in GeminiCacheStore._file_cache_mapping:
cache_name = GeminiCacheStore._file_cache_mapping[uploaded_file.name]
# Verify the cache still exists
cache_exists = await GeminiCacheStore.check_cache_exists(cache_name)
if isinstance(cache_exists, bool) and cache_exists:
logger.info(
"Using existing cached context from mapping: %s",
cache_name,
)
else:
# Cache doesn't exist anymore, need to create a new one
cache_name = None
# Remove the stale mapping
del GeminiCacheStore._file_cache_mapping[uploaded_file.name]
# Get the document filename for metadata
document_filename_for_metadata = document_filename
# If we don't have a valid cache yet,
# try to find one or create a new one
if not cache_name:
# Get all caches
caches = await GeminiCacheStore.list_gemini_caches()
if isinstance(caches, str): # Error occurred
logger.warning("Error listing caches: %s", caches)
# Continue with creating a new cache
else:
logger.info("Found %d caches to check", len(caches))
# Search for existing caches that might match our content
for cache in caches:
try:
# Check if this cache contains our file or has our document filename in the display name
if (
document_filename_for_metadata
and cache.display_name
and document_filename_for_metadata in cache.display_name
):
logger.info(
"Found cache with matching document filename in display name: %s",
cache.display_name,
)
# Get cache details to verify it contains our file
cache_detail = await asyncio.to_thread(
gemini_client.caches.get, name=cache.name
)
if (
hasattr(cache_detail, "file_ids")
and cache_detail.file_ids
):
logger.info(
"Cache %s contains files: %s",
cache.name,
cache_detail.file_ids,
)
# If this cache contains only one file, use it
if len(cache_detail.file_ids) == 1:
cache_name = cache.name
# Store in our mapping for future use
GeminiCacheStore._file_cache_mapping[
uploaded_file.name
] = cache_name
logger.info(
"Found existing cache for this document: %s",
cache.name,
)
break
# If this cache contains our specific file, use it
if uploaded_file.name in cache_detail.file_ids:
cache_name = cache.name
# Store in our mapping for future use
GeminiCacheStore._file_cache_mapping[
uploaded_file.name
] = cache_name
logger.info(
"Found existing cache containing our file: %s",
cache.name,
)
break
except Exception as e:
logger.warning(
"Error checking cache %s: %s",
cache.name,
str(e),
)
continue
if not cache_name:
# Create a display name that includes the document filename
sanitized_filename = document_filename_for_metadata.replace(" ", "_")[
:50
] # Limit length and replace spaces
display_name = f"document-{sanitized_filename}"
logger.info(
"Creating new context cache for document with display name: %s",
display_name,
)
# Create the cache with our deterministic display name
cache = gemini_client.caches.create(
model=CachingConfig.GEMINI_MODEL,
config=types.CreateCachedContentConfig(
contents=[uploaded_file], display_name=display_name
),
)
ttl = f"{datetime.timedelta(days=CachingConfig.CACHE_TTL).total_seconds()}s"
gemini_client.caches.update(
name=cache.name, config=types.UpdateCachedContentConfig(ttl=ttl)
)
cache_name = cache.name
# Store in our mapping for future use
GeminiCacheStore._file_cache_mapping[uploaded_file.name] = cache_name
# Save the updated mapping to Elasticsearch
await GeminiCacheStore._save_cache_mappings()
logger.info(
"Created new cache: %s with display name: %s and 365-day TTL",
cache_name,
display_name,
)
else:
logger.info("Using existing cached context: %s", cache_name)
# Offload the blocking remote call to a separate thread
response = await asyncio.to_thread(
gemini_client.models.generate_content,
model=CachingConfig.GEMINI_MODEL,
contents=formatted_question,
config=types.GenerateContentConfig(cached_content=cache_name),
)
logger.info(
"Successfully received response from Gemini (response length: %d characters)",
len(response.text),
)
return response.text
except Exception as e:
logger.error(
"Error in process_document_with_gemini: %s", str(e), exc_info=True
)
return f"Error processing document: {str(e)}"
Utility Methods
@staticmethod
async def list_gemini_files() -> Union[List[str], str]:
"""List all files currently stored in Gemini.
Returns:
Union[List[str], str]: List of file names or an error string.
"""
from google import genai
try:
# Initialize cache mappings from Elasticsearch
await GeminiCacheStore._init_cache_mappings()
logger.info("Initializing Gemini client for listing files")
gemini_client = genai.Client()
logger.info("Retrieving file list")
files = gemini_client.files.list()
file_list = [file.name for file in files]
# Add file information from our mappings
file_info = []
for file_name in file_list:
info = {"name": file_name}
# Add filename if available
if file_name in GeminiCacheStore._document_filenames:
info["filename"] = GeminiCacheStore._document_filenames[file_name]
# Add content hash if available
if file_name in GeminiCacheStore._document_content_hashes:
info["content_hash"] = GeminiCacheStore._document_content_hashes[
file_name
]
file_info.append(info)
logger.info("Found %d files", len(file_list))
return file_info
except Exception as e:
logger.error(
"Error listing files: %s",
str(e),
exc_info=True,
)
return f"Error listing files: {str(e)}"
@staticmethod
async def clear_cache() -> str:
"""Clear the file cache and optionally the persistent cache mappings.
Returns:
str: A success message.
"""
GeminiCacheStore._file_cache.clear()
# Clear the persistent mappings if requested
if hasattr(GeminiCacheStore, "_file_cache_mapping"):
GeminiCacheStore._file_cache_mapping.clear()
if hasattr(GeminiCacheStore, "_multi_file_cache_mapping"):
GeminiCacheStore._multi_file_cache_mapping.clear()
if hasattr(GeminiCacheStore, "_document_content_hashes"):
GeminiCacheStore._document_content_hashes.clear()
if hasattr(GeminiCacheStore, "_document_filenames"):
GeminiCacheStore._document_filenames.clear()
# Save the cleared mappings
await GeminiCacheStore._save_cache_mappings()
# Also delete the cache mapping document from Elasticsearch
try:
es_client = CachingConfig.elasticsearch_client
# Verify index exists before attempting to delete
index_exists = await GeminiCacheStore._ensure_elasticsearch_index_exists()
if index_exists:
# Delete the cache mapping document
await es_client.delete(
index=GeminiCacheStore._cache_mapping_index,
id="cache_mapping",
ignore=[404], # Ignore if document doesn't exist
)
logger.info(
f"Deleted cache mapping document from Elasticsearch index {GeminiCacheStore._cache_mapping_index}"
)
except Exception as e:
logger.error(f"Error deleting cache mapping from Elasticsearch: {str(e)}")
logger.info("Document processing file cache and persistent mappings cleared")
return "File cache and persistent mappings cleared successfully"