Milvus 是一款开源的向量数据库,支持针对 TB 级向量的增删改操作和近实时查询,具有高度灵活、稳定可靠以及高速查询等特点。。Milvus 集成了 Faiss、NMSLIB、Annoy 等广泛应用的向量索引库,提供了一整套简单直观的 API,让你可以针对不同场景选择不同的索引类型。
Milvus 在 Apache 2 License 协议下发布,于 2019 年 10 月正式开源,是 LF AI & DATA 基金会的孵化项目。Milvus 的源代码被托管于 Github。
Milvus 文档:https://milvus.io/docs
- 数据库安装 {#title-0} ===================
我们这里采用基于 Docker 的安装方式,使用下面命令,下载 docker-compose.yam 文件:
$ wget https://github.com/milvus-io/milvus/releases/download/v2.2.3/milvus-standalone-docker-compose.yml -O docker-compose.yml
该文件的内容如下:
version: '3.5'
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2022-03-17T06-34-49Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
ports:
- "9001:9001"
- "9000:9000"
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
standalone:
container_name: milvus-standalone
image: milvusdb/milvus:v2.2.3
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- "etcd"
- "minio"
networks:
default:
name: milvus
接着,使用下面命令执行上面下载的文件,该文件会使得 Docker 下载并实例化 3 个容器对象:etcd、minio、standalone,所以执行完毕后,会运行这三个容器:
sudo docker-compose up -d
其他有用的一些命令:
# 查看容器运行状态, 注意在刚下载的 docker-compose.yam 目录下执行该命令
sudo docker-compose ps
# 停止所有容器
sudo docker-compose down
# 删除容器关联的数据文件目录,默认在 docker-compose.yml 同级目录下会创建一个 volumes 目录,用于存储相关数据文件
# sudo rm -rf volumes
前面提到 Milvus 提供了多语言的 SDK,我们这里下载其 Python 语言的版本,安装命令为:
pip install pymilvus==2.2.2
- 数据库连接 {#title-1} ===================
pymilvus 中通过维护一个全局、唯一的 Connections 类型的 connections 对象来管理所有的数据库连接对象,我们可以通过该对象相关方法来设置和连接相关的操作:
示例代码:
from pymilvus import connections
# 导入的 connections 是一个单例对象,维护用户所有的连接
def test():
# 连接数据库, 如果 alias 没有指定的话,默认名字为 default
connections.connect(alias='main1', host='localhost', port=19530)
# 判断连接对象状态
print(connections.has_connection('main1'))
# 获得 alias 对应的连接地址
print(connections.get_connection_addr('main1'))
# 该函数可以添加连接信息,当创建数据库时可以直接使用 alias 标记在那个连接中进行操作
# connections.add_connection(main2={"host": "localhost", "port": '19530'})
'''
connections.add_connection(
default={"host": "localhost", "port": "19530"},
dev1={"host": "localhost", "port": "19531"},
dev2={"uri": "http://random.com/random"},
dev3={"uri": "http://localhost:19530"},
dev4={"uri": "tcp://localhost:19530"},
dev5={"address": "localhost:19530"},
prod={"uri": "http://random.random.random.com:19530"},
)
'''
# 获得所有连接对象
print(connections.list_connections())
# 断开名字为 alias 连接
connections.disconnect('main')
if __name__ == '__main__':
test()
程序输出结果:
True
{'address': 'localhost:19530', 'user': ''}
[('default', None), ('main1', <pymilvus.client.grpc_handler.GrpcHandler object at 0x7fc6700dc0d0>)]
- 集合创建操作 {#title-2} ====================
在 Milvus中,没有传统意义上的关系数据库中数据库、表的概念。Milvus 使用向量数据库 Vector Database和集合 Collection 来存储和组织向量数据。
Vector Database 是一种专门用于存储向量数据的数据库,一个 Vector Database 可以包含多个 Collection,每个 Collection 都是一个独立的向量数据集合,每个Collection可以包含多个向量数据。
接下来,我们就在向量数据库中创建一个可以存储向量数据的 Collection 集合,其创建步骤如下:
- 定义 Collection 的字段信息
- 定义 Collection 的配置信息
- 创建 Collection 集合到向量数据库中
支持的字段类型:
For primary key field:
DataType.INT64 (numpy.int64)
DataType.VARCHAR (VARCHAR)
For scalar field:
DataType.BOOL (Boolean)
DataType.INT64 (numpy.int64)
DataType.FLOAT (numpy.float32)
DataType.DOUBLE (numpy.double)
For vector field:
BINARY_VECTOR (Binary vector)
FLOAT_VECTOR (Float vector)
示例代码:
from pymilvus import connections
from pymilvus import CollectionSchema
from pymilvus import Collection
from pymilvus import FieldSchema
from pymilvus import DataType
from pymilvus import list_collections
from pymilvus import has_collection
from pymilvus import drop_collection
def test():
# 连接数据库
connections.connect(alias='main', host='localhost', port=19530)
# 1. 定义 collection 字段信息
field1 = FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, description='主键')
field2 = FieldSchema(name='name', dtype=DataType.VARCHAR, max_length=100, description='名字')
field3 = FieldSchema(name='vector', dtype=DataType.FLOAT_VECTOR, dim=328, description='向量')
# 2. 定义 collection 配置信息
collection_schema = CollectionSchema(fields=[field1, field2, field3], description='数据库')
# 3. 创建数据库和表
# using 如果不指定,则使用 default 连接
collection = Collection(name='my_collection', # collection 的名字
schema=collection_schema, # collection 的配置信息
using='main') # 向哪个连接中创建数据库和表
# 4. 其他关于 collection 的操作
# 获得指定连接中所有的 collection 名字
print('获得所有的集合:', list_collections(using='main'))
# 判断是否存在指定名字的 collection 集合
print('是否存在某集合:', has_collection(collection_name='my_collection', using='main'))
# 删除指定名字的 collection 集合
drop_collection(collection_name='my_collection', using='main')
print('是否存在某集合:', has_collection(collection_name='my_collection', using='main'))
# 断开数据库
connections.disconnect('main')
if __name__ == '__main__':
test()
程序输出内容:
获得所有的集合: ['first_milvus', 'my_collection']
是否存在某集合: True
是否存在某集合: False
- 集合插入操作 {#title-3} ====================
在进行插入操作时,重复插入 PK 相同的数据,insert_count 始终返回 10,它表示插入了数据 10 条,但是可能这 10 条数据在真正 flush 到 collection 时,由于 PK 重复被过滤掉。
from pymilvus import connections
from pymilvus import Collection
from pymilvus import list_collections
import numpy as np
def test():
# 连接数据库
connections.connect(alias='main', host='localhost', port=19530)
# 打印所有的 collection 集合
print('获得所有的集合:', list_collections(using='main'))
# 获得已存在集合对象
collection = Collection(name='my_collection',using='main')
# 构建要插入的数据(id, name, vector)
# 注意: 数据需要按照字段顺序构建
data_num = 10
user_data = [np.arange(data_num), ['name_' + str(i) for i in range(data_num)], np.random.randn(data_num, 328)]
# 向默认的 default 分区中插入 data_num 条数据
# 如果 id 存在则进行更新操作,需要 flush 将内存中的更新写入到数据库
res = collection.insert(data=user_data, partition_name='_default')
collection.flush()
print('插入行数:', res.insert_count)
print('插入主键:', res.primary_keys)
# 删除数据, Milvus 仅支持删除具有明确指定主键的实体,什么主键大于几、小于几都是不支持的
expr = 'id > 40'
res = collection.delete(expr=expr)
print('成功操作:', res.succ_count)
print('删除行数:', res.delete_count)
print('删除主键', res.primary_keys)
# 断开数据库
connections.disconnect('main')
if __name__ == '__main__':
test()
程序输出内容:
获得所有的集合: ['my_collection']
插入行数: 10
插入主键: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- 集合查询操作 {#title-4} ====================
在 Milvus 中进行查询操作,必须对要进行查询的字段、以及向量字段构建索引(及时不搜索该字段,仍然需要构建索引),并且要将 collection load 到内存中,因为 Milvus 的所有查询操作都是在内存中进行的。
另外,一个 collection 可以存在多个分区 Partition。例如:我们有 1 万条数据,插入时可以将这些数据按照某些条件分成 5 个分区 p1、p2、p3、p4、p5,插入时候就可以指定数据的插入分区。
当 load 数据时,可以只 load 某个分区数据,加快搜索速度。
距离度量和索引类型:https://milvus.io/docs/metric.md#floating
示例代码:
from pymilvus import connections
from pymilvus import Collection
def test():
# 连接数据库
connections.connect(alias='main', host='localhost', port=19530)
# 获得 collection 对象
collection = Collection(name='my_collection',using='main')
collection.release()
# 我们要查询 ID 字段,给其构建索引
# 构建索引需要在 collection 未加载到内存时进行
collection.create_index(field_name='id', index_name='PK_index')
# 给向量字段构建索引,并指定索引类型,以及相似度度量方式
# nlist 表示簇的个数,该参数可以将向量划分成多个区域,有利于加快搜索
index_params = {
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {"nlist": 1024}
}
collection.create_index(field_name='vector', index_name='vector_index', index_params=index_params)
# 将整个 collection 加载到内存中,也可以只加载某个 Partition
collection.load()
# 根据某个字段查询
# query 函数的 output_fields 字段可以返回任意字段
res = collection.query(expr='id > 0', output_fields=['id', 'name'])
print(len(res), res)
# 查询相似的向量
data = np.random.randn(1, 328)
res = collection.search(data=data, # 要搜索的向量
limit=3, # 返回记录数量
anns_field='vector', # 要搜索的字段
param={'nprobe': 10, 'metric_type': 'L2'}, # nprobe 在最近的10个簇中搜索
output_fields=['id']) # 输出字段名字,只能是标量字段(数字或者小数字段,字符串无法返回)
print(res)
# 断开数据库
connections.disconnect('main')
if __name__ == '__main__':
test()
程序输出结果:
# query 函数根据 ID 字段的搜索结果
9 [{'id': 1, 'name': 'name_1'}, {'id': 2, 'name': 'name_%s2'}, {'id': 3, 'name': 'name_3'}, {'id': 4, 'name': 'name_%s4'}, {'id': 5, 'name': 'name_5'}, {'id': 6, 'name': 'name_6'}, {'id': 7, 'name': 'name_7'}, {'id': 8, 'name': 'name_8'}, {'id': 9, 'name': 'name_9'}]
# search 函数搜索的最近向量
["['(distance: 535.2747802734375, id: 2)', '(distance: 552.648681640625, id: 1)', '(distance: 558.37158203125, id: 7)']"]