分享免费的编程资源和教程

网站首页 > 技术教程 正文

【从零开始】5. 向量数据库选型与搭建

goqiw 2025-01-12 13:54:08 技术教程 2 ℃ 0 评论

书接上回...既然 Python 环境搭好了那么马上就可以开展工作了,首先需要回顾一下我们的最终实现效果是什么。按照预定计划是要做一个基于微信小程序的生成式人工智能机器人。

好,以此为目标先看看手头上有什么资源。

呃...目前手头上只有一张 1060 显卡,显卡虽然有 6G 显存但是在大模型面前还是不太够看。除了显卡外就是 16G 内存和一块 i5 的 CPU。相比于这些,磁盘空间就比较富裕一共有 4 T(256G SSD + 4T HDD)。基于这种配置下想通过大数据做模型微调是不太实际的了(主要是算力不够)。要达到较为理想的效果,剩下的就

只能“手搓”一个 RAG(检索增强生成)应用这个方法了(之所以“手搓”也是为了减少不必要的功能,减少资源消耗)。

既然考虑做 RAG 应用,那么向量数据库就是必不可少的了。现在可用资源吃紧向量数据库的选型也要考虑多方因素。业界推荐的 Milvus 只能存储向量数据而不能存储原文,因此使用 Milvus 时还需搭配其他存储库进行映射,这就不在考虑范围内了。当然了,某些特殊的向量机是可以将词向量或句子向量重新转换成文本的,但是一般不建议这样做。这里面涉及到低维度与高维度的转换开销问题,还不如直接通过字段关联映射来得快速直接。

那有没有一步到位的解决方案呢?

答案是肯定的,就目前所知支持向量的存储库就有 Elasticsearch、Lucene、Cassandra 、MongoDB、Clickhouse、PostgreSQL等。由于本次 RAG 数据主要来自文本,因此最终选择 Elasticsearch(以下简称“es”) 作为向量存储库。(关于 es 的安装可以参考另一片文章《【Docker】Elasticsearch 8.12 安装与搭建》这里就不过多叙述了)。

接下来将 es 的操作封装成 Python 工具类中(顺便将这个 RAG 项目也建起来吧)。项目结构如下图:

brain-mix
|-- resources
|   `-- config
|       `-- elastic_cnf.yml
|-- test
|   `-- unit-test
|       `-- elastic_util
|           |-- batch_insert_test.py
|           |-- delete_by_body_test.py
|           |-- delete_by_id_test.py
|           |-- find_and_create_index_test.py
|           |-- find_by_body_nopaging_test.py
|           |-- find_by_body_test.py
|           |-- find_by_id_test.py
|           |-- find_by_sql_test.py
|           |-- insert_test.py
|           `-- refresh_index_test.py
`-- utils
    |-- elastic_util.py
    `-- yaml_util.py

其中 resources 文件夹中将存放项目的配置文件,数据源文件(jsonl、csv 等)。test 文件夹将存放各种测试文件,譬如压力测试、单元测试等。而 utils 文件夹中将存放工具类,其中 elastic_util.py 就是本次实现的 es 工具类。为了方便配置信息的读取也封装了一个 yaml 文件读取工具类,下面就先看看 yaml_util.py 的内容:

import yaml

class YamlConfig:

    _instance = None

    def __init__(self, path):
        """
        初始化 YamlConfig 实例.
        此构造函数设置YAML文件的路径并加载其内容在`config`属性中。

        参数:
            path (str): YAML配置文档路径
        """
        self.path = path
        self.config = self.load_config()

    def __new__(cls, path):
        """
        一个静态方法,用于实例化 YamlConfig 的单例对象.
        由于 YamlConfig 仅需要一个实例,可以使用单例模式来确保只有一个实例被创建.

        参数:
            path (str): YAML配置文档路径.

        返回:
            YamlConfig: YamlConfig 的单例对象.
        """
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.path = path
            cls._instance.config = cls._instance.load_config()
        return cls._instance

    def load_config(self):
        """
        读取YAML配置文档的内容。

        读取并解析YAML配置文档,返回解析后的内容。

        返回:
            dict: 解析后的YAML配置文档内容。
        """
        with open(self.path, 'r', encoding='utf-8') as file:
            return yaml.safe_load(file)

    def get_value(self, key):
        """
        通过key获取YAML配置文档中的值。

        参数:
            key (str): 键名,可能包含多个部分,例如a.b.c。

        返回:
            object: 通过key获取的值,可能是None。
        """
        key_parts = key.split('.')
        value = self.config
        for part in key_parts:
            value = value.get(part)
            if value is None:
                break
        return value

这里的设计思路是按需加载,加载的时候会将配置文件内容加载到内存中,然后需要时直接从内存中获取,这样就不用每次都读取 yml 文件了。有了这个 yaml 文件读取工具后,在 elastic_util.py 中读取配置就变得简单起来了,如下图:

from yaml_util import YamlConfig 
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

import os
project_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


class ElasticUtil:

    instance = None
    init_flag = False

    # 读取 elasticsearch 配置
    elastic_config = YamlConfig(os.path.join(project_dir, 'resources', 'config', 'elastic_cnf.yml'))

    def __init__(self):
        """
        初始化 ElasticUtil 实例。

        此构造函数检查类的初始化标志。如果尚未初始化,则调用私有方法
        `__elastic_init_model` 来初始化 Elasticsearch 客户端,并将初始化标志设置为 True。
        """
        if not ElasticUtil.init_flag:
            self.es = None
            self.__elastic_init_model()
            ElasticUtil.init_flag = True


    def __new__(cls, *args, **kwargs):
        """
        一个静态方法,用于实例化 elastic_util 的单例对象.
        由于 elastic_util 仅需要一个实例,可以使用单例模式来确保只有一个实例被创建.
        """
        if cls.instance is None:
            cls.instance = super().__new__(cls)
        return cls.instance


    def __elastic_init_model(self) -> None:
        """
        初始化Elasticsearch的client对象.

        该函数读取YAML配置文件,获取Elasticsearch的host、username、password、max_retries、max_size、timeout等配置项。
        然后使用这些配置项实例化Elasticsearch的client对象,并将其赋值给全局变量`es`。
        """
        host = ElasticUtil.elastic_config.get_value('es.host')
        username = ElasticUtil.elastic_config.get_value('es.username')
        password = ElasticUtil.elastic_config.get_value('es.password')
        max_retries = ElasticUtil.elastic_config.get_value('es.max-retries')
        max_size = ElasticUtil.elastic_config.get_value('es.max-size')
        timeout = ElasticUtil.elastic_config.get_value('es.timeout')

        self.es = Elasticsearch(host,
                                basic_auth=(username, password),
                                max_retries=max_retries,
                                connections_per_node=max_size,
                                request_timeout=timeout
                               )


    def insert(self, name, data) -> dict:
        """
        插入单个文档到Elasticsearch索引中。

        参数:
            name (str): Elasticsearch索引的名称。
            data (dict): 要插入的文档数据。

        返回:
            dict: 插入操作的结果。

        该函数在指定的Elasticsearch索引中插入一个文档。
        如果索引不存在,则抛出异常。
        """
        if not self.es.indices.exists(index=name):
            raise Exception(f"Index {name} does not exist")

        response = self.es.index(index=name, body=data)
        return response["_shards"]["successful"],response['_id']


    def batch_insert(self, name, datas) -> int:
        """
        批量插入文档到Elasticsearch索引中。

        该函数将多个文档插入到Elasticsearch索引中。

        参数:
            name (str): Elasticsearch索引的名称。
            datas (list): 要插入的文档列表,列表中的每个元素必须是字典类型。

        返回:
            None
        """
        if not self.es.indices.exists(index=name):
            raise Exception(f"Index {name} does not exist")

        if not all(isinstance(doc, dict) for doc in datas):
            raise TypeError("datas 中的所有元素必须是字典类型")
        
        actions = [
            {
                "_index": name,
                "_source": doc
            }
            for doc in datas
        ]
        response = bulk(self.es, actions)
        return response[0]
    
    
    def refresh_index(self, name) -> None:
        """
        重新刷新Elasticsearch索引,以便于最近插入的文档能够被搜索到。

        参数:
            name (str): Elasticsearch索引的名称。
        """
        if not self.es.indices.exists(index=name):
            raise Exception(f"Index {name} does not exist")
        
        self.es.indices.refresh(index=name)
           

    def delete_by_body(self, name, body) -> None:
        """
        根据给定的搜索体从Elasticsearch索引中删除文档。

        参数:
            name (str): Elasticsearch索引的名称。
            body (dict): 用于查找要删除的文档的搜索体。

        返回:
            None
        """
        if not self.es.indices.exists(index=name):
            raise Exception(f"Index {name} does not exist")
        
        self.es.delete_by_query(index=name, query=body,refresh=True)
            

    def delete_by_id(self, name, id) -> dict:
        """
        通过ID在Elasticsearch中删除文档。
        
        参数:
            name (str): Elasticsearch索引的名称。
            id (str): 要删除的文档的ID。
        
        返回:
            dict: 删除操作的结果。
        """
        if id == '' or name == '':
            raise TypeError("params cannot be empty")
        
        if not self.es.indices.exists(index=name):
            raise Exception(f"Index {name} does not exist")
        
        return self.es.delete(index=name, id=id,refresh=True)


    def find_by_id(self, name, id) -> dict:
        """
        通过ID在Elasticsearch中查找文档。

        参数:
            name (str): Elasticsearch索引的名称。
            id (str): 文档的ID。

        返回:
            dict: 文档的详细信息。
        """
        if id == '' or name == '':
            raise TypeError("params cannot be empty")
        
        if not self.es.indices.exists(index=name):
            raise Exception(f"Index {name} does not exist")
        
        return self.es.get(index=name, id=id)
           

    def find_by_body(self, name, body) -> list:
        """
        通过给定的body在Elasticsearch中搜索并返回结果。

        参数:
            name (str): Elasticsearch索引的名称。
            body (dict): 搜索的body。

        返回:
            list: 搜索响应的结果列表。

        该函数使用Elasticsearch的search API执行搜索操作,并将所有的结果都return回去。
        """
        if name == '':
            raise TypeError("index cannot be empty")
        
        if body == {}:
            raise KeyError("body cannot be empty")
        
        if not self.es.indices.exists(index=name):
            raise Exception(f"Index {name} does not exist")
        
        response = self.es.search(index=name, body=body)
        return response['hits']['hits']
         

    def find_by_body_nopaging(self, name, body) -> list:
        """
        通过给定的body在Elasticsearch中搜索并返回结果,且不分页。

        参数:
            name (str): Elasticsearch索引的名称。
            body (dict): 搜索的body。

        返回:
            list: 搜索响应的结果列表。

        该函数使用Elasticsearch的search API执行搜索操作,并使用scroll API来获取所有的结果。
        """
        if name == '':
            raise TypeError("index cannot be empty")
        
        if body == {}:
            raise KeyError("body cannot be empty")
        
        if not self.es.indices.exists(index=name):
            raise Exception(f"Index {name} does not exist")
        
        response = self.es.search(index=name, scroll='1m', body=body)
        # 获取 scroll_id 和初始结果
        scroll_id = response['_scroll_id']
        hits = response['hits']['hits']
        # 处理初始结果
        all_hits = hits
        # 循环获取剩余结果
        while len(hits) > 0:
            response = self.es.scroll(scroll_id=scroll_id, scroll='1m')
            hits = response['hits']['hits']
            all_hits.extend(hits)
        # 清除 scroll
        self.es.clear_scroll(scroll_id=scroll_id)
        return all_hits


    def find_and_create_index(self, yaml_key, mapping) -> str:
        """
        通过name从配置文件中获取对应的index_name,然后判断index是否存在,不存在则创建,最后返回index_name。

        参数:
            name (str): 在配置文件中配置的index name。
            mapping (dict): index的mapping。

        返回:
            str: 创建的index_name。
        """
        if yaml_key == '':
            raise TypeError("yaml_key cannot be empty")
        
        index_name = ElasticUtil.elastic_config.get_value(yaml_key)
        if not self.es.indices.exists(index=index_name) and mapping is not None:
            self.es.indices.create(index=index_name, body=mapping)
        return index_name
   
    
    def find_by_sql(self, sql, fetch_size=100) -> list:
        """
        执行Elasticsearch的SQL查询。

        参数:
            sql (str): Elasticsearch的SQL语句。
            fetch_size (int): 一次从Elasticsearch获取的文档数量。

        返回:
            list: JSON字符串列表,每个字符串表示一个文档。

        该函数执行Elasticsearch的SQL查询,并将结果以JSON字符串列表的形式返回。
        """
        return self.es.sql.query(format="json", query=sql, fetch_size=fetch_size)


    def update(self, name, data, id) -> dict:
        """
        更新Elasticsearch中的文档。

        参数:
            name (str): Elasticsearch索引的名称。
            data (dict): 包含更新字段及其新值的数据字典。
            id (str): 要更新的文档的ID。

        返回:
            dict: 更新操作的结果。

        该函数在指定的Elasticsearch索引中通过文档ID更新文档。返回更新操作的结果。
        """
        return self.es.update(index=name, id=id, body=data)
            

elastic_util 做成单例,并在 elastic_util 中提供了多种操作模式:

  • 插入类:insert、batch_insert
  • 删除类:delete_by_body、delete_by_id
  • 查询类:find_by_id、find_by_body、find_by_body_nopaging、find_by_sql
  • 更新类:update
  • 辅助类:refresh_index、find_and_create_index

这些函数都有对应的单元测试用例, 在路径 ${project_path}/test/unit-test/elastic_util 下。以 insert 函数的单元测试用例为例:

import unittest

import os
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
import sys
sys.path.append(os.path.join(project_dir, 'utils'))

from elastic_util import ElasticUtil # type: ignore 


class TestElasticUtilInsert(unittest.TestCase):

    def setUp(self):
        """
        初始化ElasticUtil实例。
        此函数在每次测试之前运行,用于初始化测试中使用的ElasticUtil实例。
        """
        self.elastic = ElasticUtil()
        self.index_name = 'test_index'
        self.data = {'key': 'value'}
        self.elastic.es.indices.create(index=self.index_name)
        
    def tearDown(self):
        self.elastic.es.options(ignore_status=404).indices.delete(index=self.index_name)

    def test_insert_success(self):
        """
        测试向Elasticsearch索引中插入文档是否成功。
        该测试函数向Elasticsearch索引中插入一个文档,然后使用get API来检查文档是否插入成功。
        """
        _,insert_id = self.elastic.insert(self.index_name, self.data)
        # 检查数据是否插入成功
        result = self.elastic.es.get(index=self.index_name, id=insert_id)
        self.assertEqual(result['_source'], self.data)

    def test_insert_failure_index_not_exists(self):
        """
        测试向不存在的Elasticsearch索引中插入文档是否失败。
        该测试函数尝试向不存在的Elasticsearch索引中插入一个文档,并检查是否抛出异常。
        """
        name = 'non_existent_index'
        self.elastic.es.options(ignore_status=404).indices.delete(index=name)  # 删除索引
        with self.assertRaises(Exception):
            self.elastic.insert(name, self.data)

    def test_insert_failure_elasticsearch_connection_error(self):
        """
        测试Elasticsearch连接出错时插入文档是否失败。
        该测试函数模拟Elasticsearch连接错误,然后尝试向Elasticsearch索引中插入一个文档,并检查是否抛出异常。
        """
        original_es = self.elastic.es
        self.elastic.es = None
        with self.assertRaises(Exception):
            self.elastic.insert(self.index_name, self.data)
        self.elastic.es = original_es

    def test_insert_failure_data_format_error(self):
        """
        测试插入格式错误的数据时是否抛出异常。
        该测试函数尝试插入一个无效格式的数据到Elasticsearch索引中,并检查是否抛出异常。
        """
        data = 'invalid data'
        with self.assertRaises(Exception):
            self.elastic.insert(self.index_name, data)


if __name__ == '__main__':
    unittest.main()

为了保证单元测试的质量,每个单元测试中都应包含 setUp 和 tearDown 两个函数(部分代码中因为需要预设数据因此没有 tearDown 函数),对测试数据进行销毁。

注意:这里没有使用 Mock,主要是因为这次测试的是工具类,需要将真实数据插入 es 库看到效果。Mock 只会对处理逻辑进行模拟并没有真正的将数据插入到 es 中,因此没有使用 Mock 来测试。

至此,向量数据库(Elasticsearch)搭建完成。

项目地址:https://github.com/yzh0623/brain-mix

(未完待续...)

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表