51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

向量数据库 milvus 使用

Milvus 是一款开源的向量数据库,支持针对 TB 级向量的增删改操作和近实时查询,具有高度灵活、稳定可靠以及高速查询等特点。。Milvus 集成了 Faiss、NMSLIB、Annoy 等广泛应用的向量索引库,提供了一整套简单直观的 API,让你可以针对不同场景选择不同的索引类型。

Milvus 在 Apache 2 License 协议下发布,于 2019 年 10 月正式开源,是 LF AI & DATA 基金会的孵化项目。Milvus 的源代码被托管于 Github。

Milvus 文档:https://milvus.io/docs

  1. 数据库安装 {#title-0} ===================

我们这里采用基于 Docker 的安装方式,使用下面命令,下载 docker-compose.yam 文件:

$ wget https://github.com/milvus-io/milvus/releases/download/v2.2.3/milvus-standalone-docker-compose.yml -O docker-compose.yml

该文件的内容如下:

version: '3.5'

services: etcd: container_name: milvus-etcd image: quay.io/coreos/etcd:v3.5.5 environment: - ETCD_AUTO_COMPACTION_MODE=revision - ETCD_AUTO_COMPACTION_RETENTION=1000 - ETCD_QUOTA_BACKEND_BYTES=4294967296 - ETCD_SNAPSHOT_COUNT=50000 volumes: - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd

minio: container_name: milvus-minio image: minio/minio:RELEASE.2022-03-17T06-34-49Z environment: MINIO_ACCESS_KEY: minioadmin MINIO_SECRET_KEY: minioadmin ports: - "9001:9001" - "9000:9000" volumes: - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data command: minio server /minio_data --console-address ":9001" healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] interval: 30s timeout: 20s retries: 3

standalone: container_name: milvus-standalone image: milvusdb/milvus:v2.2.3 command: ["milvus", "run", "standalone"] environment: ETCD_ENDPOINTS: etcd:2379 MINIO_ADDRESS: minio:9000 volumes: - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus ports: - "19530:19530" - "9091:9091" depends_on: - "etcd" - "minio"

networks: default: name: milvus

接着,使用下面命令执行上面下载的文件,该文件会使得 Docker 下载并实例化 3 个容器对象:etcd、minio、standalone,所以执行完毕后,会运行这三个容器:

sudo docker-compose up -d

其他有用的一些命令:

# 查看容器运行状态, 注意在刚下载的 docker-compose.yam 目录下执行该命令
sudo docker-compose ps

停止所有容器

sudo docker-compose down

删除容器关联的数据文件目录,默认在 docker-compose.yml 同级目录下会创建一个 volumes 目录,用于存储相关数据文件

sudo rm -rf volumes

前面提到 Milvus 提供了多语言的 SDK,我们这里下载其 Python 语言的版本,安装命令为:

pip install pymilvus==2.2.2
  1. 数据库连接 {#title-1} ===================

pymilvus 中通过维护一个全局、唯一的 Connections 类型的 connections 对象来管理所有的数据库连接对象,我们可以通过该对象相关方法来设置和连接相关的操作:

示例代码:

from pymilvus import connections
# 导入的 connections 是一个单例对象,维护用户所有的连接

def test():

# 连接数据库, 如果 alias 没有指定的话,默认名字为 default
connections.connect(alias='main1', host='localhost', port=19530)

# 判断连接对象状态
print(connections.has_connection('main1'))

# 获得 alias 对应的连接地址
print(connections.get_connection_addr('main1'))

# 该函数可以添加连接信息,当创建数据库时可以直接使用 alias 标记在那个连接中进行操作
# connections.add_connection(main2={"host": "localhost", "port": '19530'})
'''
     connections.add_connection(
            default={"host": "localhost", "port": "19530"},
            dev1={"host": "localhost", "port": "19531"},
            dev2={"uri": "http://random.com/random"},
            dev3={"uri": "http://localhost:19530"},
            dev4={"uri": "tcp://localhost:19530"},
            dev5={"address": "localhost:19530"},
            prod={"uri": "http://random.random.random.com:19530"},
        )
'''

# 获得所有连接对象
print(connections.list_connections())

# 断开名字为 alias 连接
connections.disconnect('main')

if name == 'main': test()

程序输出结果:

True
{'address': 'localhost:19530', 'user': ''}
[('default', None), ('main1', <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fc6700dc0d0>)]
  1. 集合创建操作 {#title-2} ====================

在 Milvus中,没有传统意义上的关系数据库中数据库、表的概念。Milvus 使用向量数据库 Vector Database和集合 Collection 来存储和组织向量数据。

Vector Database 是一种专门用于存储向量数据的数据库,一个 Vector Database 可以包含多个 Collection,每个 Collection 都是一个独立的向量数据集合,每个Collection可以包含多个向量数据。

接下来,我们就在向量数据库中创建一个可以存储向量数据的 Collection 集合,其创建步骤如下:

  1. 定义 Collection 的字段信息
  2. 定义 Collection 的配置信息
  3. 创建 Collection 集合到向量数据库中

支持的字段类型:

For primary key field:
	DataType.INT64 (numpy.int64)
	DataType.VARCHAR (VARCHAR)

For scalar field: DataType.BOOL (Boolean) DataType.INT64 (numpy.int64) DataType.FLOAT (numpy.float32) DataType.DOUBLE (numpy.double)

For vector field: BINARY_VECTOR (Binary vector) FLOAT_VECTOR (Float vector)

示例代码:

from pymilvus import connections
from pymilvus import CollectionSchema
from pymilvus import Collection
from pymilvus import FieldSchema
from pymilvus import DataType
from pymilvus import list_collections
from pymilvus import has_collection
from pymilvus import drop_collection

def test():

# 连接数据库
connections.connect(alias='main', host='localhost', port=19530)


# 1. 定义 collection 字段信息
field1 = FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, description='主键')
field2 = FieldSchema(name='name', dtype=DataType.VARCHAR, max_length=100, description='名字')
field3 = FieldSchema(name='vector', dtype=DataType.FLOAT_VECTOR, dim=328, description='向量')

# 2. 定义 collection 配置信息
collection_schema = CollectionSchema(fields=[field1, field2, field3], description='数据库')

# 3. 创建数据库和表
# using 如果不指定,则使用 default 连接
collection = Collection(name='my_collection',        # collection 的名字
                        schema=collection_schema,    # collection 的配置信息
                        using='main')                # 向哪个连接中创建数据库和表

# 4. 其他关于 collection 的操作
# 获得指定连接中所有的 collection 名字
print('获得所有的集合:', list_collections(using='main'))

# 判断是否存在指定名字的 collection 集合
print('是否存在某集合:', has_collection(collection_name='my_collection', using='main'))

# 删除指定名字的 collection 集合
drop_collection(collection_name='my_collection', using='main')
print('是否存在某集合:', has_collection(collection_name='my_collection', using='main'))


# 断开数据库
connections.disconnect('main')

if name == 'main': test()

程序输出内容:

获得所有的集合: ['first_milvus', 'my_collection']
是否存在某集合: True
是否存在某集合: False
  1. 集合插入操作 {#title-3} ====================

在进行插入操作时,重复插入 PK 相同的数据,insert_count 始终返回 10,它表示插入了数据 10 条,但是可能这 10 条数据在真正 flush 到 collection 时,由于 PK 重复被过滤掉。

from pymilvus import connections
from pymilvus import Collection
from pymilvus import list_collections
import numpy as np

def test():

# 连接数据库
connections.connect(alias='main', host='localhost', port=19530)

# 打印所有的 collection 集合
print('获得所有的集合:', list_collections(using='main'))

# 获得已存在集合对象
collection = Collection(name='my_collection',using='main')

# 构建要插入的数据(id, name, vector)
# 注意: 数据需要按照字段顺序构建
data_num = 10
user_data = [np.arange(data_num), ['name_' + str(i) for i in range(data_num)], np.random.randn(data_num, 328)]

# 向默认的 default 分区中插入 data_num 条数据
# 如果 id 存在则进行更新操作,需要 flush 将内存中的更新写入到数据库
res = collection.insert(data=user_data, partition_name='_default')
collection.flush()
print('插入行数:', res.insert_count)
print('插入主键:', res.primary_keys)

# 删除数据, Milvus 仅支持删除具有明确指定主键的实体,什么主键大于几、小于几都是不支持的
expr = 'id &gt; 40'
res = collection.delete(expr=expr)
print('成功操作:', res.succ_count)
print('删除行数:', res.delete_count)
print('删除主键', res.primary_keys)

# 断开数据库
connections.disconnect('main')

if name == 'main': test()

程序输出内容:

获得所有的集合: ['my_collection']
插入行数: 10
插入主键: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  1. 集合查询操作 {#title-4} ====================

在 Milvus 中进行查询操作,必须对要进行查询的字段、以及向量字段构建索引(及时不搜索该字段,仍然需要构建索引),并且要将 collection load 到内存中,因为 Milvus 的所有查询操作都是在内存中进行的。

另外,一个 collection 可以存在多个分区 Partition。例如:我们有 1 万条数据,插入时可以将这些数据按照某些条件分成 5 个分区 p1、p2、p3、p4、p5,插入时候就可以指定数据的插入分区。

当 load 数据时,可以只 load 某个分区数据,加快搜索速度。

距离度量和索引类型:https://milvus.io/docs/metric.md#floating

示例代码:

from pymilvus import connections
from pymilvus import Collection

def test():

# 连接数据库
connections.connect(alias='main', host='localhost', port=19530)

# 获得 collection 对象
collection = Collection(name='my_collection',using='main')
collection.release()

# 我们要查询 ID 字段,给其构建索引
# 构建索引需要在 collection 未加载到内存时进行
collection.create_index(field_name='id', index_name='PK_index')

# 给向量字段构建索引,并指定索引类型,以及相似度度量方式
# nlist 表示簇的个数,该参数可以将向量划分成多个区域,有利于加快搜索
index_params = {
    &quot;metric_type&quot;: &quot;L2&quot;,
    &quot;index_type&quot;: &quot;IVF_FLAT&quot;,
    &quot;params&quot;: {&quot;nlist&quot;: 1024}
}
collection.create_index(field_name='vector', index_name='vector_index', index_params=index_params)

# 将整个 collection 加载到内存中,也可以只加载某个 Partition
collection.load()

# 根据某个字段查询
# query 函数的 output_fields 字段可以返回任意字段
res = collection.query(expr='id &gt; 0', output_fields=['id', 'name'])
print(len(res), res)


# 查询相似的向量
data = np.random.randn(1, 328)
res = collection.search(data=data,   # 要搜索的向量
                        limit=3,     # 返回记录数量
                        anns_field='vector',  # 要搜索的字段
                        param={'nprobe': 10, 'metric_type': 'L2'},   # nprobe 在最近的10个簇中搜索
                        output_fields=['id'])  # 输出字段名字,只能是标量字段(数字或者小数字段,字符串无法返回)

print(res)


# 断开数据库
connections.disconnect('main')

if name == 'main': test()

程序输出结果:

# query 函数根据 ID 字段的搜索结果
9 [{'id': 1, 'name': 'name_1'}, {'id': 2, 'name': 'name_%s2'}, {'id': 3, 'name': 'name_3'}, {'id': 4, 'name': 'name_%s4'}, {'id': 5, 'name': 'name_5'}, {'id': 6, 'name': 'name_6'}, {'id': 7, 'name': 'name_7'}, {'id': 8, 'name': 'name_8'}, {'id': 9, 'name': 'name_9'}]

search 函数搜索的最近向量

["['(distance: 535.2747802734375, id: 2)', '(distance: 552.648681640625, id: 1)', '(distance: 558.37158203125, id: 7)']"]

赞(5)
未经允许不得转载:工具盒子 » 向量数据库 milvus 使用