
上下文压缩后缓存失效:RAG 系统的真实踩坑与解法
用 RAG(检索增强生成)跑生产系统,最怕两件事:上下文超限,和明明压缩了却没省下 token。第二个问题更隐蔽——你上了上下文压缩,LLM 输入字符数却没降多少;或者明明压缩了,检索缓存命中率却变成了零。
这不是你的用法有问题,是上下文压缩和缓存系统之间有一个设计冲突:压缩改变的是内容,但缓存的 key 通常还是按原始内容生成的。本文记录我在生产环境里遇到的真实案例,给出可复制的代码和具体阈值。
问题:压缩后 key 不变,缓存逻辑却失效了
先看一个最常见的场景——基于向量相似度的检索缓存。我在用 ChromaDB 做语义缓存,逻辑很简单:如果用户的新问题和缓存里某个问题的向量相似度超过 0.85,就直接返回缓存结果,不走 LLM。
import chromadb
from chromadb.utils import embedding_functions
chroma_client = chromadb.Client()
collection = chroma_client.get_or_create_collection(
name="semantic_cache",
embedding_function=embedding_functions.DefaultEmbeddingFunction()
)
def get_cached_response(question: str, threshold: float = 0.85) -> str | None:
"""语义缓存查询"""
results = collection.query(
query_texts=[question],
n_results=1
)
if results["distances"] and results["distances"][0]:
similarity = 1 - results["distances"][0][0] # cosine distance -> similarity
if similarity >= threshold:
return results["metadatas"][0][0].get("response")
return None
这个逻辑在原始版本里工作正常。但某次我给 LLM 输入加了上下文压缩之后,缓存命中率从 60% 掉到了接近 0。
原因是什么?我加的压缩是这样的——把对话历史里每条消息都用一个"摘要+原文"的结构替换:
def compress_conversation(messages: list[dict], max_turns: int = 10) -> list[dict]:
"""只保留最近 N 轮,之前的压缩成摘要"""
if len(messages) <= max_turns:
return messages
system = messages[0] # system prompt 不动
recent = messages[-(max_turns * 2):] # 最近 N 轮对话
older = messages[1:-(max_turns * 2)] # 中间被压缩的部分
summary_prompt = f"摘要以下对话,不超50字:{[m['content'] for m in older]}"
# 调用 LLM 生成摘要(这里简化处理)
summary = f"[已摘要{len(older)}轮对话]"
compressed = [{"role": "system", "content": summary}]
return [system] + compressed + recent
压缩前后,用户问题的向量发生了变化——因为原始问题和"已摘要+原始问题拼接"之间的语义重叠度,在不同压缩比例下差异很大。我的实测数据:
原始问题向量相似度(问同一个问题):0.91
压缩后(中间历史变成摘要)的相似度:0.73
阈值 0.85 不满足,缓存未命中
这不是 ChromaDB 的 bug,是语义缓存的原始设计与上下文压缩之间的根本矛盾:缓存命中依赖向量相似度,但压缩改变了向量空间。
根因分析:三个层面的冲突
这个问题在三个层面同时存在:
1. 缓存 key 的生成方式
大多数语义缓存用问题文本或问题的 embedding 作为 key。压缩后文本变了,key 就对不上了。
2. 压缩比例不可控
压缩摘要的长度取决于被压缩内容的多少。不同轮数的对话压缩后,对同一个原始问题的向量相似度影响不同,无法用固定阈值解决。
3. 缓存粒度和压缩粒度不匹配
缓存按"问题→答案"粒度存储,但压缩发生在"整个对话历史"粒度。一处压缩,影响所有历史问题的向量。
解法一:压缩感知缓存(Compression-Aware Cache)
最直接的解法:缓存 key 不存储原始问题文本,而是存储问题的"语义指纹"——在压缩前生成,压缩后仍然可用。
import hashlib
def semantic_key(question: str, history_compressed: bool = False) -> str:
"""
生成压缩感知的缓存 key。
不依赖问题文本,而是用问题的核心语义特征。
"""
# 提取问题中的实体和意图词(忽略具体措辞)
import re
# 移除停用词
stopwords = {"的", "了", "在", "是", "我", "你", "他", "她", "它", "这个", "那个", "什么", "怎么", "为什么"}
words = re.findall(r'[\w]+', question.lower())
core_terms = [w for w in words if w not in stopwords and len(w) > 1]
# 用核心词生成稳定 key
key_body = "|".join(sorted(set(core_terms)))
suffix = "cmp" if history_compressed else "raw"
return f"{hashlib.md5(key_body.encode()).hexdigest()[:12]}_{suffix}"
这个方案的问题是精度损失——两个不同问题可能生成同一个 key。需要配合答案置信度过滤。
解法二:版本化缓存(Versioned Cache)
更可靠的做法是给缓存加版本号。每次上下文压缩后,版本号加 1,压缩前的缓存自然失效,不会返回错误结果。
cache_version = 0
def compress_and_increment_version(messages: list[dict]) -> tuple[list[dict], int]:
global cache_version
compressed = do_compress(messages) # 执行压缩
cache_version += 1
return compressed, cache_version
# ChromaDB metadata 里存储版本号
def cache_response(question: str, response: str, version: int):
collection.add(
documents=[question],
metadatas=[{"response": response, "cache_version": version}],
ids=[f"q_{semantic_key(question)}"]
)
def get_cached(question: str, current_version: int) -> str | None:
results = collection.query(query_texts=[question], n_results=1)
if not results["metadatas"]:
return None
cached_version = results["metadatas"][0][0].get("cache_version")
# 版本不匹配,说明上下文已被压缩,缓存不可用
if cached_version != current_version:
print(f"[Cache] version mismatch: cached={cached_version}, current={current_version}, skipping")
return None
return results["metadatas"][0][0].get("response")
实测效果:
压缩前缓存命中:60%(60/100 次查询)
压缩后(v1→v2)命中:0%(正确行为,不返回错误结果)
压缩后新查询建立缓存(v2):后续命中正常
解法三:旁路缓存(Side Cache)
如果你不想改缓存 key 的生成逻辑,可以加一层旁路缓存,专门存压缩后的问答对:
from collections import OrderedDict
class LRUCompressionCache:
"""LRU 旁路缓存,专门存压缩后场景下的问答对"""
def __init__(self, max_size: int = 100):
self.cache: OrderedDict[str, str] = OrderedDict()
self.max_size = max_size
self.compression_count = 0
def record_compression(self):
"""每次压缩操作后调用,清空旧缓存"""
self.compression_count += 1
# 压缩后旧缓存全部失效,因为向量空间已变
cleared = len(self.cache)
self.cache.clear()
print(f"[SideCache] cleared {cleared} entries on compression #{self.compression_count}")
def get(self, question: str) -> str | None:
return self.cache.get(question)
def put(self, question: str, response: str):
if question in self.cache:
self.cache.move_to_end(question)
else:
if len(self.cache) >= self.max_size:
self.cache.popitem(last=False)
self.cache[question] = response
在主流程里:
def chat_with_cache(messages: list[dict]) -> str:
global side_cache, cache_version
current_question = messages[-1]["content"]
# 1. 检查旁路缓存(压缩后场景)
cached = side_cache.get(current_question)
if cached:
return cached
# 2. 检查向量缓存(未压缩场景,版本需匹配)
cached = get_cached(current_question, cache_version)
if cached:
side_cache.put(current_question, cached) # 同步到旁路缓存
return cached
# 3. 走 LLM
response = call_llm(messages)
side_cache.put(current_question, response)
return response
避坑检查清单
上下文压缩 + 缓存这个组合,在上线前必须过一遍这个清单:
- [ ] 压缩前后,对同一个测试问题跑一遍向量相似度,确认阈值仍然有效
- [ ] 用不同压缩比例(20轮、50轮、100轮历史)分别测试缓存命中率
- [ ] 检查缓存未命中时的 fallback 逻辑——是否会把压缩后的内容错误地当作压缩前缓存?
- [ ] 如果用版本化缓存,确认版本号在压缩失败/回退时不会错误递增
- [ ] 监控缓存命中率,上线后如果发现持续低于预期,立即查是否是压缩导致的
现在你可以做什么
第一步:在你的 RAG 代码里,找到缓存查询函数,加上版本号字段(cache_version),初始值 0。
第二步:找到上下文压缩的调用位置,在压缩完成后执行 cache_version += 1 并打印日志。
第三步:修改缓存查询逻辑,如果 cached.cache_version != current_version,打印 version mismatch 并跳过,不返回旧答案。
第四步:用真实对话历史(至少50轮)跑一遍,验证压缩后缓存行为符合预期。
上线后持续监控 cache_hit_rate 和 version_mismatch_count 两个指标。如果后者持续增长,说明压缩频率或版本管理逻辑有问题,需要调优。
更多交流点击入群






