From 42b3e23426b9af61cbbcb695bd1498e322a1cdd7 Mon Sep 17 00:00:00 2001 From: chenqingxiang04 Date: Wed, 15 Jan 2025 20:42:20 +0800 Subject: [PATCH] add baidu mochow vdb Signed-off-by: chenqingxiang04 --- .../service/common/vectorStore/constants.ts | 5 + .../service/common/vectorStore/controller.ts | 4 +- .../common/vectorStore/mochow/class.ts | 446 ++++++++++++++++++ packages/service/common/vectorStore/type.d.ts | 2 + packages/service/package.json | 1 + projects/app/.env.template | 5 + 6 files changed, 462 insertions(+), 1 deletion(-) create mode 100644 packages/service/common/vectorStore/mochow/class.ts diff --git a/packages/service/common/vectorStore/constants.ts b/packages/service/common/vectorStore/constants.ts index 5b9206eb5a9a..ffa18b5409f2 100644 --- a/packages/service/common/vectorStore/constants.ts +++ b/packages/service/common/vectorStore/constants.ts @@ -4,3 +4,8 @@ export const DatasetVectorTableName = 'modeldata'; export const PG_ADDRESS = process.env.PG_URL; export const MILVUS_ADDRESS = process.env.MILVUS_ADDRESS; export const MILVUS_TOKEN = process.env.MILVUS_TOKEN; + +export const MOCHOW_ADDRESS = process.env.MOCHOW_ADDRESS; +export const MOCHOW_ACCOUNT = process.env.MOCHOW_ACCOUNT; +export const MOCHOW_APIKEY = process.env.MOCHOW_APIKEY; +export const MOCHOW_REPLICA_NUM = process.env.MOCHOW_REPLICA_NUM; diff --git a/packages/service/common/vectorStore/controller.ts b/packages/service/common/vectorStore/controller.ts index 156ce70d114a..bc2f87d63445 100644 --- a/packages/service/common/vectorStore/controller.ts +++ b/packages/service/common/vectorStore/controller.ts @@ -3,12 +3,14 @@ import { PgVectorCtrl } from './pg/class'; import { getVectorsByText } from '../../core/ai/embedding'; import { InsertVectorProps } from './controller.d'; import { VectorModelItemType } from '@fastgpt/global/core/ai/model.d'; -import { MILVUS_ADDRESS, PG_ADDRESS } from './constants'; +import { MILVUS_ADDRESS, MOCHOW_ADDRESS, PG_ADDRESS } from './constants'; import { MilvusCtrl } from './milvus/class'; +import { MochowCtrl } from './mochow/class'; const getVectorObj = () => { if (PG_ADDRESS) return new PgVectorCtrl(); if (MILVUS_ADDRESS) return new MilvusCtrl(); + if (MOCHOW_ADDRESS) return new MochowCtrl(); return new PgVectorCtrl(); }; diff --git a/packages/service/common/vectorStore/mochow/class.ts b/packages/service/common/vectorStore/mochow/class.ts new file mode 100644 index 000000000000..eb549466ed75 --- /dev/null +++ b/packages/service/common/vectorStore/mochow/class.ts @@ -0,0 +1,446 @@ +import { + FieldSchema, + IndexSchema, + TableSchema, + FieldType, + IndexType, + MetricType, + PartitionType, + CreateTableArgs, + InsertArgs, + SelectArgs, + DeleteArgs, + VectorSearchArgs, + VectorTopkSearchRequest, + VectorSearchConfig, + SearchResponse, + Vector, + Row, + ServerErrCode, + ClientConfiguration, + MochowClient +} from '@mochow/mochow-sdk-node'; +import { + DatasetVectorDbName, + DatasetVectorTableName, + MOCHOW_ADDRESS, + MOCHOW_ACCOUNT, + MOCHOW_APIKEY, + MOCHOW_REPLICA_NUM +} from '../constants'; +import type { + DelDatasetVectorCtrlProps, + EmbeddingRecallCtrlProps, + EmbeddingRecallResponse, + InsertVectorControllerProps +} from '../controller.d'; +import { delay } from '@fastgpt/global/common/system/utils'; +import { addLog } from '../../system/log'; +import { customNanoid } from '@fastgpt/global/common/string/tools'; + +export class MochowCtrl { + constructor() {} + getClient = async () => { + if (!MOCHOW_ADDRESS) { + return Promise.reject('MOCHOW_ADDRESS is not set'); + } + if (global.mochowClient) return global.mochowClient; + + let config: ClientConfiguration = { + endpoint: MOCHOW_ADDRESS, + credential: { + account: MOCHOW_ACCOUNT, + apiKey: MOCHOW_APIKEY + } + }; + global.mochowClient = new MochowClient(config); + + addLog.info(`Mochow connected`); + + return global.mochowClient; + }; + init = async () => { + const client = await this.getClient(); + + const list_database_resp = await client.listDatabases(); + if (list_database_resp.code != ServerErrCode.OK) { + return Promise.reject('list databases error: ' + list_database_resp.msg); + } + + if (!list_database_resp.databases.includes(DatasetVectorDbName)) { + const create_database_resp = await client.createDatabase(DatasetVectorDbName); + if (create_database_resp.code != ServerErrCode.OK) { + return Promise.reject('create databases error: ' + create_database_resp.msg); + } + } + + const list_table_resp = await client.listTables(DatasetVectorDbName); + if (list_table_resp.code != ServerErrCode.OK) { + return Promise.reject('list tables error: ' + list_table_resp.msg); + } + + if (!list_table_resp.tables.includes(DatasetVectorTableName)) { + let fields: FieldSchema[] = [ + { + fieldName: 'id', + fieldType: FieldType.Int64, + primaryKey: true, + partitionKey: true, + autoIncrement: false, + notNull: true + }, + { + fieldName: 'teamId', + fieldType: FieldType.String, + notNull: true + }, + { + fieldName: 'datasetId', + fieldType: FieldType.String, + notNull: true + }, + { + fieldName: 'collectionId', + fieldType: FieldType.String, + notNull: true + }, + { + fieldName: 'createTime', + fieldType: FieldType.Int64, + notNull: true + }, + { + fieldName: 'vector', + fieldType: FieldType.FloatVector, + notNull: true, + dimension: 1536 + } + ]; + + // Indexes + let indexes: IndexSchema[] = [ + { + indexName: 'teamId_idx', + field: 'teamId', + indexType: IndexType.SecondaryIndex + }, + { + indexName: 'datasetId_idx', + field: 'datasetId', + indexType: IndexType.SecondaryIndex + }, + { + indexName: 'collectionId_idx', + field: 'collectionId', + indexType: IndexType.SecondaryIndex + }, + { + indexName: 'createTime_idx', + field: 'createTime', + indexType: IndexType.SecondaryIndex + }, + { + indexName: 'vector_idx', + field: 'vector', + indexType: IndexType.HNSW, + metricType: MetricType.IP, + params: { + M: 64, + efConstruction: 32 + }, + autoBuild: false + } + ]; + + // create table + let schema: TableSchema = { fields: fields, indexes: indexes }; + let createTableReq: CreateTableArgs = { + database: DatasetVectorDbName, + table: DatasetVectorTableName, + description: 'fastgpt', + replication: Number(MOCHOW_REPLICA_NUM), + partition: { + partitionType: PartitionType.HASH, + partitionNum: 3 + }, + enableDynamicField: false, + schema: schema + }; + const create_table_resp = await client.createTable(createTableReq); + if (create_table_resp.code != ServerErrCode.OK) { + return Promise.reject('list tables error: ' + list_table_resp.msg); + } + addLog.info(`Create mochow table success`); + } + }; + + insert = async (props: InsertVectorControllerProps): Promise<{ insertId: string }> => { + const client = await this.getClient(); + const { teamId, datasetId, collectionId, vector, retry = 3 } = props; + + const generateId = () => { + // in js, the max safe integer is 2^53 - 1: 9007199254740991 + // so we can generate a random number between 1-8 as the first digit + // and the rest 15 digits can be random + const firstDigit = customNanoid('12345678', 1); + const restDigits = customNanoid('1234567890', 15); + return Number(`${firstDigit}${restDigits}`); + }; + const id = generateId(); + const insertArgs: InsertArgs = { + database: DatasetVectorDbName, + table: DatasetVectorTableName, + rows: [ + { + id: id, + teamId: String(teamId), + datasetId: String(datasetId), + collectionId: String(collectionId), + createTime: Date.now(), + vector: vector + } + ] + }; + try { + const insert_resp = await client.insert(insertArgs); + if (insert_resp.code != ServerErrCode.OK) { + return Promise.reject('insert tables error: ' + insert_resp.msg); + } + if (insert_resp.affectedCount == 1) { + return { + insertId: String(id) + }; + } else { + return Promise.reject('insertDatasetData: unknown error'); + } + } catch (error) { + if (retry <= 0) { + return Promise.reject(error); + } + await delay(500); + return this.insert({ + ...props, + retry: retry - 1 + }); + } + }; + delete = async (props: DelDatasetVectorCtrlProps): Promise => { + const { teamId, retry = 2 } = props; + const client = await this.getClient(); + + const teamIdWhere = `(teamId=='${String(teamId)}')`; + const where = await (() => { + if ('id' in props && props.id) return `(id==${props.id})`; + + if ('datasetIds' in props && props.datasetIds) { + const datasetIdWhere = `(datasetId in [${props.datasetIds + .map((id) => `${String(id)}`) + .join(',')}])`; + + if ('collectionIds' in props && props.collectionIds) { + return `${datasetIdWhere} and (collectionId in [${props.collectionIds + .map((id) => `${String(id)}`) + .join(',')}])`; + } + + return `${datasetIdWhere}`; + } + + if ('idList' in props && Array.isArray(props.idList)) { + if (props.idList.length === 0) return; + return `(id in [${props.idList.map((id) => String(id)).join(',')}])`; + } + return Promise.reject('deleteDatasetData: no where'); + })(); + + if (!where) return; + + const concatWhere = `${teamIdWhere} and ${where}`; + let deleteReq: DeleteArgs = { + database: DatasetVectorDbName, + table: DatasetVectorTableName, + filter: concatWhere + }; + + try { + await client.delete(deleteReq); + } catch (error) { + if (retry <= 0) { + return Promise.reject(error); + } + await delay(500); + return this.delete({ + ...props, + retry: retry - 1 + }); + } + }; + embRecall = async (props: EmbeddingRecallCtrlProps): Promise => { + const client = await this.getClient(); + const { + teamId, + datasetIds, + vector, + limit, + forbidCollectionIdList, + filterCollectionIdList, + retry = 2 + } = props; + + // Forbid collection + const formatForbidCollectionIdList = (() => { + if (!filterCollectionIdList) return forbidCollectionIdList; + const list = forbidCollectionIdList + .map((id) => String(id)) + .filter((id) => !filterCollectionIdList.includes(id)); + return list; + })(); + const forbidColQuery = + formatForbidCollectionIdList.length > 0 + ? `and (collectionId not in [${formatForbidCollectionIdList.map((id) => `${id}`).join(',')}])` + : ''; + + // filter collection id + const formatFilterCollectionId = (() => { + if (!filterCollectionIdList) return; + return filterCollectionIdList + .map((id) => String(id)) + .filter((id) => !forbidCollectionIdList.includes(id)); + })(); + const collectionIdQuery = formatFilterCollectionId + ? `and (collectionId in [${formatFilterCollectionId.map((id) => `${id}`)}])` + : ``; + // Empty data + if (formatFilterCollectionId && formatFilterCollectionId.length === 0) { + return { results: [] }; + } + + try { + let searchArgs: VectorSearchArgs = { + database: DatasetVectorDbName, + table: DatasetVectorTableName, + request: new VectorTopkSearchRequest('vector', new Vector(vector), limit) + .Filter( + `(teamId == '${teamId}') and (datasetId in [${datasetIds.map((id) => `'${id}'`).join(',')}]) ${collectionIdQuery} ${forbidColQuery}` + ) + .Projections(['collectionId']) + .Config(new VectorSearchConfig().Ef(100)) + }; + const vector_search_resp = await client.vectorSearch(searchArgs); + if (vector_search_resp.code != 0) { + console.log('search row failed'); + return Promise.reject('search row error: ' + vector_search_resp.msg); + } + const rows = (vector_search_resp as SearchResponse).rows as Row[]; + + return { + results: rows.map((item) => ({ + id: item['id'], + collectionId: item['collectionId'], + score: item.score + })) + }; + } catch (error) { + if (retry <= 0) { + return Promise.reject(error); + } + return this.embRecall({ + ...props, + retry: retry - 1 + }); + } + }; + + getVectorCountByTeamId = async (teamId: string) => { + const client = await this.getClient(); + let total = 0; + let isTruncated = true; + let marker = undefined; + while (isTruncated) { + let selectArgs: SelectArgs = { + database: DatasetVectorDbName, + table: DatasetVectorTableName, + filter: `teamId == '${String(teamId)}'`, + projections: ['id'], + marker: marker, + limit: 1000 + }; + let resp = await client.select(selectArgs); + if (resp.code != 0) { + console.log('fail to select data due to: ' + resp.msg); + return Promise.reject('fail to select data due to: ' + resp.msg); + } + console.log('marker : %j ', resp); + isTruncated = resp.isTruncated; + marker = resp.nextMarker; + total = total + resp.rows.length; + } + + return total; + }; + getVectorCountByDatasetId = async (teamId: string, datasetId: string) => { + const client = await this.getClient(); + + let total = 0; + let isTruncated = true; + let marker = undefined; + while (isTruncated) { + let selectArgs: SelectArgs = { + database: DatasetVectorDbName, + table: DatasetVectorTableName, + filter: `(teamId == '${String(teamId)}') and (dataset == '${String(datasetId)}')`, + projections: ['id'], + marker: marker, + limit: 1000 + }; + let resp = await client.select(selectArgs); + if (resp.code != 0) { + console.log('fail to select data due to: ' + resp.msg); + return resp; + } + console.log('marker : ' + marker); + isTruncated = resp.isTruncated; + marker = resp.nextMarker; + total = total + resp.rows.length; + } + + return total; + }; + + getVectorDataByTime = async (start: Date, end: Date) => { + const client = await this.getClient(); + const startTimestamp = new Date(start).getTime(); + const endTimestamp = new Date(end).getTime(); + + let total = 0; + let isTruncated = true; + let marker = undefined; + let selectRows: Row[] = []; + while (isTruncated) { + let selectArgs: SelectArgs = { + database: DatasetVectorDbName, + table: DatasetVectorTableName, + filter: `(createTime >= ${startTimestamp}) and (createTime <= ${endTimestamp})`, + projections: ['id', 'teamId', 'datasetId'], + marker: marker, + limit: 1000 + }; + let resp = await client.select(selectArgs); + if (resp.code != 0) { + console.log('fail to select data due to: ' + resp.msg); + return resp; + } + console.log('marker : ' + marker); + selectRows.push(resp.rows); + isTruncated = resp.isTruncated; + marker = resp.nextMarker; + total = total + resp.rows.length; + } + + return selectRows.map((item) => ({ + id: item['id'], + collectionId: item['collectionId'], + score: item.score + })); + }; +} diff --git a/packages/service/common/vectorStore/type.d.ts b/packages/service/common/vectorStore/type.d.ts index 2ccc1f4a69fa..a3d90fa62396 100644 --- a/packages/service/common/vectorStore/type.d.ts +++ b/packages/service/common/vectorStore/type.d.ts @@ -1,9 +1,11 @@ import type { Pool } from 'pg'; import { MilvusClient } from '@zilliz/milvus2-sdk-node'; +import { MochowClient } from '@mochow/mochow-sdk-node'; declare global { var pgClient: Pool | null; var milvusClient: MilvusClient | null; + var mochowClient: MochowClient | null; } export type EmbeddingRecallItemType = { diff --git a/packages/service/package.json b/packages/service/package.json index 3555248b3777..d551f4fdc732 100644 --- a/packages/service/package.json +++ b/packages/service/package.json @@ -6,6 +6,7 @@ "@node-rs/jieba": "1.10.0", "@xmldom/xmldom": "^0.8.10", "@zilliz/milvus2-sdk-node": "2.4.2", + "@mochow/mochow-sdk-node": "2.1.3", "axios": "^1.5.1", "chalk": "^5.3.0", "cheerio": "1.0.0-rc.12", diff --git a/projects/app/.env.template b/projects/app/.env.template index b8f4ca0a8e7f..9ba69bc649ca 100644 --- a/projects/app/.env.template +++ b/projects/app/.env.template @@ -29,6 +29,11 @@ PG_URL=postgresql://username:password@host:port/postgres MILVUS_ADDRESS= MILVUS_TOKEN= +MOCHOW_ADDRESS= +MOCHOW_ACCOUNT= +MOCHOW_APIKEY= +MOCHOW_REPLICA_NUM=3 + # code sandbox url SANDBOX_URL=http://localhost:3001 # 商业版地址