网站首页 > 技术教程 正文
书接上回...既然 Python 环境搭好了那么马上就可以开展工作了,首先需要回顾一下我们的最终实现效果是什么。按照预定计划是要做一个基于微信小程序的生成式人工智能机器人。
好,以此为目标先看看手头上有什么资源。
呃...目前手头上只有一张 1060 显卡,显卡虽然有 6G 显存但是在大模型面前还是不太够看。除了显卡外就是 16G 内存和一块 i5 的 CPU。相比于这些,磁盘空间就比较富裕一共有 4 T(256G SSD + 4T HDD)。基于这种配置下想通过大数据做模型微调是不太实际的了(主要是算力不够)。要达到较为理想的效果,剩下的就
只能“手搓”一个 RAG(检索增强生成)应用这个方法了(之所以“手搓”也是为了减少不必要的功能,减少资源消耗)。
既然考虑做 RAG 应用,那么向量数据库就是必不可少的了。现在可用资源吃紧向量数据库的选型也要考虑多方因素。业界推荐的 Milvus 只能存储向量数据而不能存储原文,因此使用 Milvus 时还需搭配其他存储库进行映射,这就不在考虑范围内了。当然了,某些特殊的向量机是可以将词向量或句子向量重新转换成文本的,但是一般不建议这样做。这里面涉及到低维度与高维度的转换开销问题,还不如直接通过字段关联映射来得快速直接。
那有没有一步到位的解决方案呢?
答案是肯定的,就目前所知支持向量的存储库就有 Elasticsearch、Lucene、Cassandra 、MongoDB、Clickhouse、PostgreSQL等。由于本次 RAG 数据主要来自文本,因此最终选择 Elasticsearch(以下简称“es”) 作为向量存储库。(关于 es 的安装可以参考另一片文章《【Docker】Elasticsearch 8.12 安装与搭建》这里就不过多叙述了)。
接下来将 es 的操作封装成 Python 工具类中(顺便将这个 RAG 项目也建起来吧)。项目结构如下图:
brain-mix
|-- resources
| `-- config
| `-- elastic_cnf.yml
|-- test
| `-- unit-test
| `-- elastic_util
| |-- batch_insert_test.py
| |-- delete_by_body_test.py
| |-- delete_by_id_test.py
| |-- find_and_create_index_test.py
| |-- find_by_body_nopaging_test.py
| |-- find_by_body_test.py
| |-- find_by_id_test.py
| |-- find_by_sql_test.py
| |-- insert_test.py
| `-- refresh_index_test.py
`-- utils
|-- elastic_util.py
`-- yaml_util.py
其中 resources 文件夹中将存放项目的配置文件,数据源文件(jsonl、csv 等)。test 文件夹将存放各种测试文件,譬如压力测试、单元测试等。而 utils 文件夹中将存放工具类,其中 elastic_util.py 就是本次实现的 es 工具类。为了方便配置信息的读取也封装了一个 yaml 文件读取工具类,下面就先看看 yaml_util.py 的内容:
import yaml
class YamlConfig:
_instance = None
def __init__(self, path):
"""
初始化 YamlConfig 实例.
此构造函数设置YAML文件的路径并加载其内容在`config`属性中。
参数:
path (str): YAML配置文档路径
"""
self.path = path
self.config = self.load_config()
def __new__(cls, path):
"""
一个静态方法,用于实例化 YamlConfig 的单例对象.
由于 YamlConfig 仅需要一个实例,可以使用单例模式来确保只有一个实例被创建.
参数:
path (str): YAML配置文档路径.
返回:
YamlConfig: YamlConfig 的单例对象.
"""
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.path = path
cls._instance.config = cls._instance.load_config()
return cls._instance
def load_config(self):
"""
读取YAML配置文档的内容。
读取并解析YAML配置文档,返回解析后的内容。
返回:
dict: 解析后的YAML配置文档内容。
"""
with open(self.path, 'r', encoding='utf-8') as file:
return yaml.safe_load(file)
def get_value(self, key):
"""
通过key获取YAML配置文档中的值。
参数:
key (str): 键名,可能包含多个部分,例如a.b.c。
返回:
object: 通过key获取的值,可能是None。
"""
key_parts = key.split('.')
value = self.config
for part in key_parts:
value = value.get(part)
if value is None:
break
return value
这里的设计思路是按需加载,加载的时候会将配置文件内容加载到内存中,然后需要时直接从内存中获取,这样就不用每次都读取 yml 文件了。有了这个 yaml 文件读取工具后,在 elastic_util.py 中读取配置就变得简单起来了,如下图:
from yaml_util import YamlConfig
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import os
project_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
class ElasticUtil:
instance = None
init_flag = False
# 读取 elasticsearch 配置
elastic_config = YamlConfig(os.path.join(project_dir, 'resources', 'config', 'elastic_cnf.yml'))
def __init__(self):
"""
初始化 ElasticUtil 实例。
此构造函数检查类的初始化标志。如果尚未初始化,则调用私有方法
`__elastic_init_model` 来初始化 Elasticsearch 客户端,并将初始化标志设置为 True。
"""
if not ElasticUtil.init_flag:
self.es = None
self.__elastic_init_model()
ElasticUtil.init_flag = True
def __new__(cls, *args, **kwargs):
"""
一个静态方法,用于实例化 elastic_util 的单例对象.
由于 elastic_util 仅需要一个实例,可以使用单例模式来确保只有一个实例被创建.
"""
if cls.instance is None:
cls.instance = super().__new__(cls)
return cls.instance
def __elastic_init_model(self) -> None:
"""
初始化Elasticsearch的client对象.
该函数读取YAML配置文件,获取Elasticsearch的host、username、password、max_retries、max_size、timeout等配置项。
然后使用这些配置项实例化Elasticsearch的client对象,并将其赋值给全局变量`es`。
"""
host = ElasticUtil.elastic_config.get_value('es.host')
username = ElasticUtil.elastic_config.get_value('es.username')
password = ElasticUtil.elastic_config.get_value('es.password')
max_retries = ElasticUtil.elastic_config.get_value('es.max-retries')
max_size = ElasticUtil.elastic_config.get_value('es.max-size')
timeout = ElasticUtil.elastic_config.get_value('es.timeout')
self.es = Elasticsearch(host,
basic_auth=(username, password),
max_retries=max_retries,
connections_per_node=max_size,
request_timeout=timeout
)
def insert(self, name, data) -> dict:
"""
插入单个文档到Elasticsearch索引中。
参数:
name (str): Elasticsearch索引的名称。
data (dict): 要插入的文档数据。
返回:
dict: 插入操作的结果。
该函数在指定的Elasticsearch索引中插入一个文档。
如果索引不存在,则抛出异常。
"""
if not self.es.indices.exists(index=name):
raise Exception(f"Index {name} does not exist")
response = self.es.index(index=name, body=data)
return response["_shards"]["successful"],response['_id']
def batch_insert(self, name, datas) -> int:
"""
批量插入文档到Elasticsearch索引中。
该函数将多个文档插入到Elasticsearch索引中。
参数:
name (str): Elasticsearch索引的名称。
datas (list): 要插入的文档列表,列表中的每个元素必须是字典类型。
返回:
None
"""
if not self.es.indices.exists(index=name):
raise Exception(f"Index {name} does not exist")
if not all(isinstance(doc, dict) for doc in datas):
raise TypeError("datas 中的所有元素必须是字典类型")
actions = [
{
"_index": name,
"_source": doc
}
for doc in datas
]
response = bulk(self.es, actions)
return response[0]
def refresh_index(self, name) -> None:
"""
重新刷新Elasticsearch索引,以便于最近插入的文档能够被搜索到。
参数:
name (str): Elasticsearch索引的名称。
"""
if not self.es.indices.exists(index=name):
raise Exception(f"Index {name} does not exist")
self.es.indices.refresh(index=name)
def delete_by_body(self, name, body) -> None:
"""
根据给定的搜索体从Elasticsearch索引中删除文档。
参数:
name (str): Elasticsearch索引的名称。
body (dict): 用于查找要删除的文档的搜索体。
返回:
None
"""
if not self.es.indices.exists(index=name):
raise Exception(f"Index {name} does not exist")
self.es.delete_by_query(index=name, query=body,refresh=True)
def delete_by_id(self, name, id) -> dict:
"""
通过ID在Elasticsearch中删除文档。
参数:
name (str): Elasticsearch索引的名称。
id (str): 要删除的文档的ID。
返回:
dict: 删除操作的结果。
"""
if id == '' or name == '':
raise TypeError("params cannot be empty")
if not self.es.indices.exists(index=name):
raise Exception(f"Index {name} does not exist")
return self.es.delete(index=name, id=id,refresh=True)
def find_by_id(self, name, id) -> dict:
"""
通过ID在Elasticsearch中查找文档。
参数:
name (str): Elasticsearch索引的名称。
id (str): 文档的ID。
返回:
dict: 文档的详细信息。
"""
if id == '' or name == '':
raise TypeError("params cannot be empty")
if not self.es.indices.exists(index=name):
raise Exception(f"Index {name} does not exist")
return self.es.get(index=name, id=id)
def find_by_body(self, name, body) -> list:
"""
通过给定的body在Elasticsearch中搜索并返回结果。
参数:
name (str): Elasticsearch索引的名称。
body (dict): 搜索的body。
返回:
list: 搜索响应的结果列表。
该函数使用Elasticsearch的search API执行搜索操作,并将所有的结果都return回去。
"""
if name == '':
raise TypeError("index cannot be empty")
if body == {}:
raise KeyError("body cannot be empty")
if not self.es.indices.exists(index=name):
raise Exception(f"Index {name} does not exist")
response = self.es.search(index=name, body=body)
return response['hits']['hits']
def find_by_body_nopaging(self, name, body) -> list:
"""
通过给定的body在Elasticsearch中搜索并返回结果,且不分页。
参数:
name (str): Elasticsearch索引的名称。
body (dict): 搜索的body。
返回:
list: 搜索响应的结果列表。
该函数使用Elasticsearch的search API执行搜索操作,并使用scroll API来获取所有的结果。
"""
if name == '':
raise TypeError("index cannot be empty")
if body == {}:
raise KeyError("body cannot be empty")
if not self.es.indices.exists(index=name):
raise Exception(f"Index {name} does not exist")
response = self.es.search(index=name, scroll='1m', body=body)
# 获取 scroll_id 和初始结果
scroll_id = response['_scroll_id']
hits = response['hits']['hits']
# 处理初始结果
all_hits = hits
# 循环获取剩余结果
while len(hits) > 0:
response = self.es.scroll(scroll_id=scroll_id, scroll='1m')
hits = response['hits']['hits']
all_hits.extend(hits)
# 清除 scroll
self.es.clear_scroll(scroll_id=scroll_id)
return all_hits
def find_and_create_index(self, yaml_key, mapping) -> str:
"""
通过name从配置文件中获取对应的index_name,然后判断index是否存在,不存在则创建,最后返回index_name。
参数:
name (str): 在配置文件中配置的index name。
mapping (dict): index的mapping。
返回:
str: 创建的index_name。
"""
if yaml_key == '':
raise TypeError("yaml_key cannot be empty")
index_name = ElasticUtil.elastic_config.get_value(yaml_key)
if not self.es.indices.exists(index=index_name) and mapping is not None:
self.es.indices.create(index=index_name, body=mapping)
return index_name
def find_by_sql(self, sql, fetch_size=100) -> list:
"""
执行Elasticsearch的SQL查询。
参数:
sql (str): Elasticsearch的SQL语句。
fetch_size (int): 一次从Elasticsearch获取的文档数量。
返回:
list: JSON字符串列表,每个字符串表示一个文档。
该函数执行Elasticsearch的SQL查询,并将结果以JSON字符串列表的形式返回。
"""
return self.es.sql.query(format="json", query=sql, fetch_size=fetch_size)
def update(self, name, data, id) -> dict:
"""
更新Elasticsearch中的文档。
参数:
name (str): Elasticsearch索引的名称。
data (dict): 包含更新字段及其新值的数据字典。
id (str): 要更新的文档的ID。
返回:
dict: 更新操作的结果。
该函数在指定的Elasticsearch索引中通过文档ID更新文档。返回更新操作的结果。
"""
return self.es.update(index=name, id=id, body=data)
elastic_util 做成单例,并在 elastic_util 中提供了多种操作模式:
- 插入类:insert、batch_insert
- 删除类:delete_by_body、delete_by_id
- 查询类:find_by_id、find_by_body、find_by_body_nopaging、find_by_sql
- 更新类:update
- 辅助类:refresh_index、find_and_create_index
这些函数都有对应的单元测试用例, 在路径 ${project_path}/test/unit-test/elastic_util 下。以 insert 函数的单元测试用例为例:
import unittest
import os
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
import sys
sys.path.append(os.path.join(project_dir, 'utils'))
from elastic_util import ElasticUtil # type: ignore
class TestElasticUtilInsert(unittest.TestCase):
def setUp(self):
"""
初始化ElasticUtil实例。
此函数在每次测试之前运行,用于初始化测试中使用的ElasticUtil实例。
"""
self.elastic = ElasticUtil()
self.index_name = 'test_index'
self.data = {'key': 'value'}
self.elastic.es.indices.create(index=self.index_name)
def tearDown(self):
self.elastic.es.options(ignore_status=404).indices.delete(index=self.index_name)
def test_insert_success(self):
"""
测试向Elasticsearch索引中插入文档是否成功。
该测试函数向Elasticsearch索引中插入一个文档,然后使用get API来检查文档是否插入成功。
"""
_,insert_id = self.elastic.insert(self.index_name, self.data)
# 检查数据是否插入成功
result = self.elastic.es.get(index=self.index_name, id=insert_id)
self.assertEqual(result['_source'], self.data)
def test_insert_failure_index_not_exists(self):
"""
测试向不存在的Elasticsearch索引中插入文档是否失败。
该测试函数尝试向不存在的Elasticsearch索引中插入一个文档,并检查是否抛出异常。
"""
name = 'non_existent_index'
self.elastic.es.options(ignore_status=404).indices.delete(index=name) # 删除索引
with self.assertRaises(Exception):
self.elastic.insert(name, self.data)
def test_insert_failure_elasticsearch_connection_error(self):
"""
测试Elasticsearch连接出错时插入文档是否失败。
该测试函数模拟Elasticsearch连接错误,然后尝试向Elasticsearch索引中插入一个文档,并检查是否抛出异常。
"""
original_es = self.elastic.es
self.elastic.es = None
with self.assertRaises(Exception):
self.elastic.insert(self.index_name, self.data)
self.elastic.es = original_es
def test_insert_failure_data_format_error(self):
"""
测试插入格式错误的数据时是否抛出异常。
该测试函数尝试插入一个无效格式的数据到Elasticsearch索引中,并检查是否抛出异常。
"""
data = 'invalid data'
with self.assertRaises(Exception):
self.elastic.insert(self.index_name, data)
if __name__ == '__main__':
unittest.main()
为了保证单元测试的质量,每个单元测试中都应包含 setUp 和 tearDown 两个函数(部分代码中因为需要预设数据因此没有 tearDown 函数),对测试数据进行销毁。
注意:这里没有使用 Mock,主要是因为这次测试的是工具类,需要将真实数据插入 es 库看到效果。Mock 只会对处理逻辑进行模拟并没有真正的将数据插入到 es 中,因此没有使用 Mock 来测试。
至此,向量数据库(Elasticsearch)搭建完成。
项目地址:https://github.com/yzh0623/brain-mix
(未完待续...)
猜你喜欢
- 2025-01-12 Spring Boot RESTful API设计:最佳实践指南
- 2025-01-12 由 Mybatis 源码畅谈软件设计(二):MappedStatement 和 SqlSource
- 2025-01-12 详细介绍一下Spring Boot中如何使用Hive?
- 2025-01-12 OGG同步到Kafka
- 2025-01-12 由 Mybatis 源码畅谈软件设计(五):ResultMap 的循环引用
- 2025-01-12 Spring Boot 项目轻松集成 Redis
- 2025-01-12 How China's Drone Manufacturers Leapfrog From Latecomers to Global Leaders
- 2025-01-12 Spring Boot与MyBatis:简化数据库操作
- 2025-01-12 SpringBoot整合ElasticSearch实现数据存储?
- 2025-01-12 28个SpringBoot项目中常用注解,日常开发、求职面试不再懵圈
你 发表评论:
欢迎- 01-12Spring Boot RESTful API设计:最佳实践指南
- 01-12由 Mybatis 源码畅谈软件设计(二):MappedStatement 和 SqlSource
- 01-12详细介绍一下Spring Boot中如何使用Hive?
- 01-12OGG同步到Kafka
- 01-12由 Mybatis 源码畅谈软件设计(五):ResultMap 的循环引用
- 01-12【从零开始】5. 向量数据库选型与搭建
- 01-12Spring Boot 项目轻松集成 Redis
- 01-12How China's Drone Manufacturers Leapfrog From Latecomers to Global Leaders
- 最近发表
-
- Spring Boot RESTful API设计:最佳实践指南
- 由 Mybatis 源码畅谈软件设计(二):MappedStatement 和 SqlSource
- 详细介绍一下Spring Boot中如何使用Hive?
- OGG同步到Kafka
- 由 Mybatis 源码畅谈软件设计(五):ResultMap 的循环引用
- 【从零开始】5. 向量数据库选型与搭建
- Spring Boot 项目轻松集成 Redis
- How China's Drone Manufacturers Leapfrog From Latecomers to Global Leaders
- Spring Boot与MyBatis:简化数据库操作
- SpringBoot整合ElasticSearch实现数据存储?
- 标签列表
-
- sd分区 (65)
- raid5数据恢复 (81)
- 地址转换 (73)
- 手机存储卡根目录 (55)
- tcp端口 (74)
- project server (59)
- 双击ctrl (55)
- 鼠标 单击变双击 (67)
- debugview (59)
- 字符动画 (65)
- flushdns (57)
- ps复制快捷键 (57)
- 清除系统垃圾代码 (58)
- web服务器的架设 (67)
- 16进制转换 (69)
- xclient (55)
- ps源文件 (67)
- filezilla server (59)
- 句柄无效 (56)
- word页眉页脚设置 (59)
- ansys实例 (56)
- 6 1 3固件 (59)
- sqlserver2000挂起 (59)
- vm虚拟主机 (55)
- config (61)
本文暂时没有评论,来添加一个吧(●'◡'●)