Milvus 向量和BM25检索

发布于2024-08-24
 
第一步,创建Schema
 
milvus和es这类数据库有个很烦人的问题,那就是schema创建后无法修改,只能重新创建schema,然后迁移数据。
 

python

async def create_collection(self, collection_name: str = "document_chunks"): """Create collection if not exists""" if self.client is None: await self.connect() if await self.has_collection(collection_name): return # 1. Create schema schema = self.client.create_schema( auto_id=True, enable_dynamic_field=False, ) analyzer_params = { "type": "chinese"} schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True) schema.add_field(field_name="doc_id", datatype=DataType.INT64) schema.add_field(field_name="chunk_id", datatype=DataType.INT64) schema.add_field(field_name="doc_title", datatype=DataType.VARCHAR, max_length=800) schema.add_field(field_name="doc_text", datatype=DataType.VARCHAR, max_length=10000, enable_analyzer=True, analyzer_params=analyzer_params, enable_match=True) schema.add_field(field_name="sparse_bm25", datatype=DataType.SPARSE_FLOAT_VECTOR) schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=self.dim) bm25_function = Function( name="bm25", function_type=FunctionType.BM25, input_field_names=["doc_text"], output_field_names="sparse_bm25" ) schema.add_function(bm25_function) index_params = self.client.prepare_index_params() index_params.add_index( field_name="embedding", index_type="AUTOINDEX", metric_type="COSINE" ) index_params.add_index( field_name="sparse_bm25", index_name="sparse_bm25_index", index_type="SPARSE_WAND", metric_type="BM25", params={ "inverted_index_algo": "DAAT_MAXSCORE", "bm25_k1": 1.2, "bm25_b": 0.75 } ) # index_params.add_index( # field_name="doc_title", # index_type="TEXT" # ) self.client.create_collection( collection_name=collection_name, schema=schema, index_params=index_params )
Python
 
我们实现三种不同的检索,分别是BM25检索,向量检索,混合检索
 

python

async def bm25_search(self, question: str, limit: int = 100, collection_name: str = "document_chunks", filters: list[str] = [], match: str = None, ): if self.client is None: await self.connect() cut_res = jieba.cut_for_search(question) # 使用jieba分词 seg_list = list(cut_res) search_params = { 'params': {'drop_ratio_search': 0.2}, 'metric_type': 'BM25', } # 构建过滤条件(基于 filters) filter_expr = None if filters: like_clauses = [f"doc_title like '{value}%'" for value in filters] filter_expr = "(" + " or ".join(like_clauses) + ")" if match: if filter_expr: filter_expr = filter_expr + f" and TEXT_MATCH(doc_text, '{match}')" else: filter_expr = f"TEXT_MATCH(doc_text, '{match}')" res = self.client.search( collection_name=collection_name, data=seg_list, filter=filter_expr, search_params=search_params, limit=limit, output_fields=["doc_id", "chunk_id", "doc_title", "doc_text"] ) hits = [] if res and len(res) > 0: for hit in res[0]: hits.append({ "doc_id": hit["entity"].get("doc_id"), "chunk_id": hit["entity"].get("chunk_id"), "doc_title": hit["entity"].get("doc_title"), "doc_text": hit["entity"].get("doc_text"), "score": hit["distance"] }) return hits async def hybrid_search(self, question: str, query_vector: list, limit: int = 100, collection_name: str = "document_chunks", threshold: float = 0.5, filters: list[str] = [], match: str = None, ): if self.client is None: await self.connect() ranker = RRFRanker(60) # ranker= WeightedRanker(0.4, 0.6) search_param_1 = { "data": [query_vector], "anns_field": "embedding", "param": { "metric_type": "COSINE", "params": {"nprobe": 10} }, "limit": 2 } request_1 = AnnSearchRequest(**search_param_1) cut_res = jieba.cut_for_search(question) # 使用jieba分词 seg_list = list(cut_res) search_param_2 = { "data": [" ".join(seg_list)], "anns_field": "sparse_bm25", "param": { "metric_type": "BM25", "params": {"drop_ratio_build": 0.2} }, "limit": 2 } request_2 = AnnSearchRequest(**search_param_2) reqs = [request_1, request_2] # 构建过滤条件(基于 filters) filter_expr = None if filters: like_clauses = [f"doc_title like '{value}%'" for value in filters] filter_expr = "(" + " or ".join(like_clauses) + ")" if match: if filter_expr: filter_expr = filter_expr + f" and TEXT_MATCH(doc_text, '{match}')" else: filter_expr = f"TEXT_MATCH(doc_text, '{match}')" res = self.client.hybrid_search( collection_name=collection_name, reqs=reqs, filter=filter_expr, limit=limit, ranker=ranker, output_fields=["doc_id","doc_title", "doc_text"] ) hits = [] if res and len(res) > 0: for hit in res[0]: hits.append({ "doc_id": hit["entity"].get("doc_id"), "chunk_id": hit["entity"].get("chunk_id"), "doc_title": hit["entity"].get("doc_title"), "doc_text": hit["entity"].get("doc_text"), "score": hit["distance"] }) return hits async def search(self, query_vector: list, limit: int = 100, collection_name: str = "document_chunks", threshold: float = 0.5, start_time: str = None, end_time: str = None, filters: list[str] = [], ): """ Search similar vectors 参考文档:https://milvus.io/api-reference/pymilvus/v2.5.x/MilvusClient/Vector/search.md 过滤器文档:https://milvus.io/docs/zh/boolean.md """ if self.client is None: await self.connect() # if metric_type is COSINE, this radius value should be lower than that of range_filter. search_params = { "metric_type": "COSINE", "params": { "radius": 0.6, "range_filter": 1.0, # 最大距离 "nprobe": 10 } } # 构建过滤条件(基于 filters) filter_expr = None if filters: like_clauses = [f"doc_title like '{value}%'" for value in filters] filter_expr = "(" + " or ".join(like_clauses) + ")" # print("filter", filter) res = self.client.search( collection_name=collection_name, data=[query_vector], anns_field="embedding", params=search_params, filter=filter_expr, limit=limit, output_fields=["doc_id", "chunk_id", "doc_title", "doc_text"], ) hits = [] if res and len(res) > 0: for hit in res[0]: hits.append({ "doc_id": hit["entity"].get("doc_id"), "chunk_id": hit["entity"].get("chunk_id"), "doc_title": hit["entity"].get("doc_title"), "doc_text": hit["entity"].get("doc_text"), "score": hit["distance"] }) # TODO 对于所有知识库的问答,过滤相似度。如果是特定知识库,就不过滤了。 if hit["distance"] < threshold and collection_name == 'document_chunks': doc_title = hit["entity"].get("doc_title") log.info(f"drop document {doc_title}") return hits
Python
 
从产品设计来说,可以让用户创建智能体,根据需要设计不同的检索方式。
 
 
如果后期需要对schema调整,那么还需要执行数据迁移,我这边写了个脚本来完成,大概分成三个步骤,假设我们之前的collection名称是v0,新的collection名称是v1
  • 创建新的v1
  • 数据从旧v0复制到新的v1
  • 删掉v0, 并且将v1重命名为v0
 
下面是完整代码

python

from pymilvus import MilvusClient, DataType, Function, FunctionType OLD_COLLECTION = "document_chunks" NEW_COLLECTION = "document_chunks_v1" DIM = 1024 # 替换为你实际的向量维度 client = MilvusClient(uri="http://localhost:19530") # 修改为你的地址 # 1. 读取原 collection 全量数据 # print(f"Reading data from collection: {OLD_COLLECTION}") # old_data = client.query( # collection_name=OLD_COLLECTION, # filter=None, # output_fields=["id", "doc_id", "chunk_id", "doc_title", "doc_text", "embedding"], # limit=2000 # ) # print(f"Fetched {len(old_data)} rows.") # 2. 如果collection 存在则备份 # if client.has_collection(NEW_COLLECTION): # client.rename_collection("document_chunks", "document_chunks_backup") analyzer_params = { "type": "chinese"} client.drop_collection(NEW_COLLECTION) # 3. 创建新 schema,带 TEXT 索引 schema = client.create_schema(auto_id=True, enable_dynamic_field=False) schema.add_field("id", DataType.INT64, is_primary=True) schema.add_field("doc_id", DataType.INT64) schema.add_field("chunk_id", DataType.INT64) schema.add_field(field_name="doc_title", datatype=DataType.VARCHAR, max_length=800) schema.add_field(field_name="doc_text", datatype=DataType.VARCHAR, max_length=10000, enable_analyzer=True, analyzer_params=analyzer_params, enable_match=True) schema.add_field(field_name="sparse_bm25", datatype=DataType.SPARSE_FLOAT_VECTOR) schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=DIM) bm25_function = Function( name="bm25", function_type=FunctionType.BM25, input_field_names=["doc_text"], output_field_names="sparse_bm25" ) schema.add_function(bm25_function) index_params = client.prepare_index_params() # 添加向量索引 index_params.add_index( field_name="embedding", index_type="AUTOINDEX", metric_type="COSINE" ) index_params.add_index( field_name="sparse_bm25", index_name="sparse_bm25_index", index_type="SPARSE_INVERTED_INDEX", metric_type="BM25", params={ "inverted_index_algo": "DAAT_MAXSCORE", "bm25_k1": 1.2, "bm25_b": 0.75 } ) # 4. 创建新 collection print(f"Creating new collection: {NEW_COLLECTION}") client.create_collection( collection_name=NEW_COLLECTION, schema=schema, index_params=index_params ) # 去掉 old_data 中的 id 字段(因为新 collection 设置了 auto_id=True) # for row in old_data: # row.pop("id", None) # 5. 插入数据 # print(f"Inserting data into: {NEW_COLLECTION}") # client.insert( # collection_name=NEW_COLLECTION, # data=old_data # ) print("✅ Migration complete.")
Python
 
迁移数据

python

from pymilvus import MilvusClient, DataType, Function, FunctionType import datetime OLD_COLLECTION = "document_chunks" NEW_COLLECTION = "document_chunks_v1" DIM = 1024 # 替换为你实际的向量维度 BATCH_SIZE = 1000 # 每批读取记录数 client = MilvusClient(uri="http://localhost:19530") # 修改为你的地址 # 1. 分批读取原 collection 全量数据(基于主键 id) print(f"Reading data from collection: {OLD_COLLECTION}") all_data = [] all_rows = 0 last_id = -1 # 假设 id 从 0 或 1 开始 while True: batch = client.query( collection_name=OLD_COLLECTION, filter=f"id > {last_id}", output_fields=["id", "doc_id", "chunk_id", "doc_title", "doc_text", "embedding"], limit=BATCH_SIZE ) if not batch: break last_id = max(row["id"] for row in batch) # 去掉 old_data 中的 id 字段(因为新 collection 设置了 auto_id=True) for row in batch: row.pop("id", None) # all_data.extend(batch) all_rows += len(batch) print(f"Fetched {all_rows} rows so far...") client.insert( collection_name=NEW_COLLECTION, data=batch ) print(f"Inserted {len(batch)} rows into new collection.") print(f"✅ Total fetched: {len(all_data)} rows.")
Python
 
最后重命名

python

from pymilvus import MilvusClient, DataType, Function, FunctionType OLD_COLLECTION = "document_chunks" NEW_COLLECTION = "document_chunks_v1" DIM = 1024 # 替换为你实际的向量维度 client = MilvusClient(uri="http://localhost:19530") # 修改为你的地址 if client.has_collection(OLD_COLLECTION): client.drop_collection(OLD_COLLECTION) if client.has_collection(NEW_COLLECTION): client.rename_collection(NEW_COLLECTION, OLD_COLLECTION) print("✅ Migration complete.")
Python
解密 ClickHouse:为何它能成为 OLAP 领域的顶级玩家?如何在PostgreSQL中高效处理数十亿行数据?——TimescaleDB功能实测解析
Loading...
©2021-2025 Arterning.
All rights reserved.