知识库功能增强,支持Milvus,并优化相关逻辑
This commit is contained in:
@@ -155,6 +155,10 @@
|
||||
<groupId>com.easyagents</groupId>
|
||||
<artifactId>easy-agents-store-aliyun</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.easyagents</groupId>
|
||||
<artifactId>easy-agents-store-milvus</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.easyagents</groupId>
|
||||
<artifactId>easy-agents-store-chroma</artifactId>
|
||||
|
||||
@@ -15,6 +15,10 @@
|
||||
*/
|
||||
package com.easyagents.rerank;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONArray;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.alibaba.fastjson2.JSONPath;
|
||||
import com.easyagents.core.document.Document;
|
||||
import com.easyagents.core.model.client.HttpClient;
|
||||
import com.easyagents.core.model.rerank.BaseRerankModel;
|
||||
@@ -22,15 +26,19 @@ import com.easyagents.core.model.rerank.RerankException;
|
||||
import com.easyagents.core.model.rerank.RerankOptions;
|
||||
import com.easyagents.core.util.Maps;
|
||||
import com.easyagents.core.util.StringUtil;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONArray;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.alibaba.fastjson2.JSONPath;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DefaultRerankModel extends BaseRerankModel<DefaultRerankModelConfig> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DefaultRerankModel.class);
|
||||
private static final int MAX_RESPONSE_LOG_LENGTH = 512;
|
||||
|
||||
private HttpClient httpClient = new HttpClient();
|
||||
|
||||
public DefaultRerankModel(DefaultRerankModelConfig config) {
|
||||
@@ -56,8 +64,18 @@ public class DefaultRerankModel extends BaseRerankModel<DefaultRerankModelConfig
|
||||
headers.put("Authorization", "Bearer " + config.getApiKey());
|
||||
|
||||
List<String> payloadDocuments = new ArrayList<>(documents.size());
|
||||
for (Document document : documents) {
|
||||
List<Integer> documentIndexMapping = new ArrayList<>(documents.size());
|
||||
for (int i = 0; i < documents.size(); i++) {
|
||||
Document document = documents.get(i);
|
||||
if (document == null || StringUtil.noText(document.getContent())) {
|
||||
continue;
|
||||
}
|
||||
payloadDocuments.add(document.getContent());
|
||||
documentIndexMapping.add(i);
|
||||
}
|
||||
|
||||
if (payloadDocuments.isEmpty()) {
|
||||
throw new RerankException("empty input documents");
|
||||
}
|
||||
|
||||
String payload = Maps.of("model", options.getModelOrDefault(config.getModel()))
|
||||
@@ -111,20 +129,69 @@ public class DefaultRerankModel extends BaseRerankModel<DefaultRerankModelConfig
|
||||
JSONArray results = (JSONArray) JSONPath.eval(jsonObject, config.getResultsJsonPath());
|
||||
|
||||
if (results == null || results.isEmpty()) {
|
||||
throw new RerankException("empty results");
|
||||
String error = extractErrorMessage(jsonObject);
|
||||
String detail = "empty results";
|
||||
if (StringUtil.hasText(error)) {
|
||||
detail = detail + ", error=" + error;
|
||||
}
|
||||
detail = detail + ", response=" + truncate(response, MAX_RESPONSE_LOG_LENGTH);
|
||||
LOG.warn("Rerank response has no results. query={}, response={}", query, truncate(response, MAX_RESPONSE_LOG_LENGTH));
|
||||
throw new RerankException(detail);
|
||||
}
|
||||
|
||||
|
||||
for (int i = 0; i < results.size(); i++) {
|
||||
JSONObject result = results.getJSONObject(i);
|
||||
int index = result.getIntValue(config.getIndexJsonKey());
|
||||
Document document = documents.get(index);
|
||||
document.setScore(result.getDoubleValue(config.getScoreJsonKey()));
|
||||
if (index < 0 || index >= documentIndexMapping.size()) {
|
||||
continue;
|
||||
}
|
||||
int originalIndex = documentIndexMapping.get(index);
|
||||
Document document = documents.get(originalIndex);
|
||||
if (document != null) {
|
||||
document.setScore(result.getDoubleValue(config.getScoreJsonKey()));
|
||||
}
|
||||
}
|
||||
|
||||
// 对 documents 排序, score 越大的越靠前
|
||||
documents.sort(Comparator.comparingDouble(Document::getScore).reversed());
|
||||
documents.sort((d1, d2) -> Double.compare(scoreOrMin(d2), scoreOrMin(d1)));
|
||||
|
||||
return documents;
|
||||
}
|
||||
|
||||
private double scoreOrMin(Document document) {
|
||||
if (document == null || document.getScore() == null) {
|
||||
return Double.NEGATIVE_INFINITY;
|
||||
}
|
||||
return document.getScore();
|
||||
}
|
||||
|
||||
private String extractErrorMessage(JSONObject jsonObject) {
|
||||
if (jsonObject == null || jsonObject.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
Object nestedMessage = JSONPath.eval(jsonObject, "$.error.message");
|
||||
if (nestedMessage != null && StringUtil.hasText(String.valueOf(nestedMessage))) {
|
||||
return String.valueOf(nestedMessage);
|
||||
}
|
||||
Object message = jsonObject.get("message");
|
||||
if (message != null && StringUtil.hasText(String.valueOf(message))) {
|
||||
return String.valueOf(message);
|
||||
}
|
||||
Object msg = jsonObject.get("msg");
|
||||
if (msg != null && StringUtil.hasText(String.valueOf(msg))) {
|
||||
return String.valueOf(msg);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private String truncate(String value, int maxLength) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value.length() <= maxLength) {
|
||||
return value;
|
||||
}
|
||||
return value.substring(0, maxLength) + "...";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright (c) 2023-2026, Easy-Agents (fuhai999@gmail.com).
|
||||
* <p>
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.easyagents.rerank;
|
||||
|
||||
import com.easyagents.core.document.Document;
|
||||
import com.easyagents.core.model.client.HttpClient;
|
||||
import com.easyagents.core.model.rerank.RerankException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DefaultRerankModelBehaviorTest {
|
||||
|
||||
@Test
|
||||
public void testRerankShouldThrowDetailedErrorWhenResultsEmpty() {
|
||||
DefaultRerankModel model = newModel();
|
||||
model.setHttpClient(new MockHttpClient("{\"error\":{\"message\":\"invalid documents\"}}"));
|
||||
|
||||
try {
|
||||
model.rerank("query", Arrays.asList(Document.of("doc-1")));
|
||||
Assert.fail("Expected RerankException");
|
||||
} catch (RerankException e) {
|
||||
Assert.assertTrue(e.getMessage().contains("empty results"));
|
||||
Assert.assertTrue(e.getMessage().contains("invalid documents"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRerankShouldSkipNullContentAndMapOriginalIndex() {
|
||||
DefaultRerankModel model = newModel();
|
||||
model.setHttpClient(new MockHttpClient("{\"results\":[{\"index\":0,\"relevance_score\":0.8},{\"index\":1,\"relevance_score\":0.3}]}"));
|
||||
|
||||
Document empty = new Document();
|
||||
Document d1 = Document.of("doc-1");
|
||||
Document d2 = Document.of("doc-2");
|
||||
List<Document> rerankResult = model.rerank("query", Arrays.asList(empty, d1, d2));
|
||||
|
||||
Assert.assertNull(empty.getScore());
|
||||
Assert.assertEquals(0.8d, d1.getScore(), 0.0001d);
|
||||
Assert.assertEquals(0.3d, d2.getScore(), 0.0001d);
|
||||
Assert.assertEquals("doc-1", rerankResult.get(0).getContent());
|
||||
}
|
||||
|
||||
private DefaultRerankModel newModel() {
|
||||
DefaultRerankModelConfig config = new DefaultRerankModelConfig();
|
||||
config.setEndpoint("https://example.com");
|
||||
config.setRequestPath("/v1/rerank");
|
||||
config.setModel("test-rerank-model");
|
||||
config.setApiKey("test-key");
|
||||
return new DefaultRerankModel(config);
|
||||
}
|
||||
|
||||
private static class MockHttpClient extends HttpClient {
|
||||
private final String response;
|
||||
|
||||
private MockHttpClient(String response) {
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String post(String url, Map<String, String> headers, String payload) {
|
||||
return response;
|
||||
}
|
||||
}
|
||||
}
|
||||
38
easy-agents-store/easy-agents-store-milvus/pom.xml
Normal file
38
easy-agents-store/easy-agents-store-milvus/pom.xml
Normal file
@@ -0,0 +1,38 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.easyagents</groupId>
|
||||
<artifactId>easy-agents-store</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
|
||||
<name>easy-agents-store-milvus</name>
|
||||
<artifactId>easy-agents-store-milvus</artifactId>
|
||||
|
||||
<properties>
|
||||
<maven.compiler.source>8</maven.compiler.source>
|
||||
<maven.compiler.target>8</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.easyagents</groupId>
|
||||
<artifactId>easy-agents-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.milvus</groupId>
|
||||
<artifactId>milvus-sdk-java</artifactId>
|
||||
<version>2.4.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Copyright (c) 2023-2026, Easy-Agents (fuhai999@gmail.com).
|
||||
* <p>
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.easyagents.store.milvus;
|
||||
|
||||
import com.easyagents.core.store.condition.Condition;
|
||||
import com.easyagents.core.store.condition.ConditionType;
|
||||
import com.easyagents.core.store.condition.ExpressionAdaptor;
|
||||
import com.easyagents.core.store.condition.Value;
|
||||
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class MilvusExpressionAdaptor implements ExpressionAdaptor {
|
||||
|
||||
public static final MilvusExpressionAdaptor DEFAULT = new MilvusExpressionAdaptor();
|
||||
|
||||
@Override
|
||||
public String toOperationSymbol(ConditionType type) {
|
||||
if (type == ConditionType.EQ) {
|
||||
return " == ";
|
||||
}
|
||||
return type.getDefaultSymbol();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toCondition(Condition condition) {
|
||||
if (condition.getType() == ConditionType.BETWEEN) {
|
||||
Object[] values = (Object[]) ((Value) condition.getRight()).getValue();
|
||||
return "(" + toLeft(condition.getLeft())
|
||||
+ toOperationSymbol(ConditionType.GE)
|
||||
+ values[0] + " && "
|
||||
+ toLeft(condition.getLeft())
|
||||
+ toOperationSymbol(ConditionType.LE)
|
||||
+ values[1] + ")";
|
||||
}
|
||||
|
||||
return ExpressionAdaptor.super.toCondition(condition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toValue(Condition condition, Object value) {
|
||||
if (condition.getType() == ConditionType.IN) {
|
||||
Object[] values = (Object[]) value;
|
||||
StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
|
||||
for (Object v : values) {
|
||||
if (v != null) {
|
||||
stringJoiner.add("\"" + v + "\"");
|
||||
}
|
||||
}
|
||||
return stringJoiner.toString();
|
||||
}
|
||||
|
||||
return ExpressionAdaptor.super.toValue(condition, value);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,538 @@
|
||||
/*
|
||||
* Copyright (c) 2023-2026, Easy-Agents (fuhai999@gmail.com).
|
||||
* <p>
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.easyagents.store.milvus;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.easyagents.core.document.Document;
|
||||
import com.easyagents.core.store.DocumentStore;
|
||||
import com.easyagents.core.store.SearchWrapper;
|
||||
import com.easyagents.core.store.StoreOptions;
|
||||
import com.easyagents.core.store.StoreResult;
|
||||
import com.easyagents.core.util.CollectionUtil;
|
||||
import com.easyagents.core.util.Maps;
|
||||
import com.easyagents.core.util.StringUtil;
|
||||
import io.milvus.v2.client.ConnectConfig;
|
||||
import io.milvus.v2.client.MilvusClientV2;
|
||||
import io.milvus.v2.common.ConsistencyLevel;
|
||||
import io.milvus.v2.common.DataType;
|
||||
import io.milvus.v2.common.IndexParam;
|
||||
import io.milvus.v2.exception.MilvusClientException;
|
||||
import io.milvus.v2.service.collection.request.CreateCollectionReq;
|
||||
import io.milvus.v2.service.collection.request.GetLoadStateReq;
|
||||
import io.milvus.v2.service.collection.request.HasCollectionReq;
|
||||
import io.milvus.v2.service.collection.request.LoadCollectionReq;
|
||||
import io.milvus.v2.service.vector.request.*;
|
||||
import io.milvus.v2.service.vector.response.QueryResp;
|
||||
import io.milvus.v2.service.vector.response.SearchResp;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Milvus vector store based on Milvus Java SDK v2.
|
||||
*/
|
||||
public class MilvusVectorStore extends DocumentStore {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MilvusVectorStore.class);
|
||||
private static final long LOAD_TIMEOUT_MS = 30_000L;
|
||||
private static final long LOAD_POLL_INTERVAL_MS = 200L;
|
||||
|
||||
private static final String FIELD_ID = "id";
|
||||
private static final String FIELD_CONTENT = "content";
|
||||
private static final String FIELD_METADATA = "metadata";
|
||||
private static final String FIELD_VECTOR = "vector";
|
||||
|
||||
private final MilvusClientV2 client;
|
||||
private final MilvusVectorStoreConfig config;
|
||||
private final String defaultCollectionName;
|
||||
private final Set<String> initializedCollections = Collections.synchronizedSet(new HashSet<String>());
|
||||
private final Set<String> loadedCollections = Collections.synchronizedSet(new HashSet<String>());
|
||||
|
||||
public MilvusVectorStore(MilvusVectorStoreConfig config) {
|
||||
this.config = config;
|
||||
this.defaultCollectionName = config.getDefaultCollectionName();
|
||||
String uri = normalizeAndValidateUri(config.getUri());
|
||||
String dbName = StringUtil.hasText(config.getDatabaseName()) ? config.getDatabaseName().trim() : "default";
|
||||
|
||||
ConnectConfig.ConnectConfigBuilder<?, ?> builder = ConnectConfig.builder()
|
||||
.uri(uri)
|
||||
.dbName(dbName);
|
||||
|
||||
if (StringUtil.hasText(config.getToken())) {
|
||||
builder.token(config.getToken().trim());
|
||||
}
|
||||
|
||||
if (StringUtil.hasText(config.getUsername()) && StringUtil.hasText(config.getPassword())) {
|
||||
builder.username(config.getUsername().trim());
|
||||
builder.password(config.getPassword().trim());
|
||||
}
|
||||
|
||||
ConnectConfig connectConfig = builder.build();
|
||||
this.client = new MilvusClientV2(connectConfig);
|
||||
}
|
||||
|
||||
private String normalizeAndValidateUri(String uri) {
|
||||
if (StringUtil.noText(uri)) {
|
||||
throw new IllegalArgumentException("Milvus uri is required. Example: http://127.0.0.1:19530");
|
||||
}
|
||||
|
||||
String normalized = uri.trim();
|
||||
if (!normalized.contains("://")) {
|
||||
normalized = "http://" + normalized;
|
||||
}
|
||||
|
||||
URI parsed;
|
||||
try {
|
||||
parsed = URI.create(normalized);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Invalid Milvus uri: " + uri + ". Example: http://127.0.0.1:19530", e);
|
||||
}
|
||||
|
||||
if (StringUtil.noText(parsed.getHost()) || parsed.getPort() <= 0) {
|
||||
throw new IllegalArgumentException("Invalid Milvus uri: " + uri + ". Example: http://127.0.0.1:19530");
|
||||
}
|
||||
|
||||
return normalized;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoreResult doStore(List<Document> documents, StoreOptions options) {
|
||||
if (CollectionUtil.noItems(documents)) {
|
||||
return StoreResult.success();
|
||||
}
|
||||
String collectionName = options.getCollectionNameOrDefault(defaultCollectionName);
|
||||
if (StringUtil.noText(collectionName)) {
|
||||
throw new IllegalStateException("CollectionName is null or blank. please config the \"defaultCollectionName\" or store with designative collectionName.");
|
||||
}
|
||||
|
||||
int dimension = getDimension(documents);
|
||||
ensureCollectionExists(collectionName, dimension);
|
||||
|
||||
try {
|
||||
InsertReq.InsertReqBuilder<?, ?> builder = InsertReq.builder();
|
||||
if (StringUtil.hasText(options.getPartitionName())) {
|
||||
builder.partitionName(options.getPartitionName());
|
||||
}
|
||||
InsertReq insertReq = builder
|
||||
.collectionName(collectionName)
|
||||
.data(toMilvusDocuments(documents))
|
||||
.build();
|
||||
client.insert(insertReq);
|
||||
return StoreResult.successWithIds(documents);
|
||||
} catch (MilvusClientException e) {
|
||||
return StoreResult.fail();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoreResult doDelete(Collection<?> ids, StoreOptions options) {
|
||||
if (CollectionUtil.noItems(ids)) {
|
||||
return StoreResult.success();
|
||||
}
|
||||
String collectionName = options.getCollectionNameOrDefault(defaultCollectionName);
|
||||
if (StringUtil.noText(collectionName)) {
|
||||
throw new IllegalStateException("CollectionName is null or blank. please config the \"defaultCollectionName\" or store with designative collectionName.");
|
||||
}
|
||||
|
||||
try {
|
||||
DeleteReq.DeleteReqBuilder<?, ?> builder = DeleteReq.builder();
|
||||
if (StringUtil.hasText(options.getPartitionName())) {
|
||||
builder.partitionName(options.getPartitionName());
|
||||
}
|
||||
DeleteReq deleteReq = builder
|
||||
.collectionName(collectionName)
|
||||
.ids(new ArrayList<Object>(ids))
|
||||
.build();
|
||||
client.delete(deleteReq);
|
||||
return StoreResult.success();
|
||||
} catch (Exception e) {
|
||||
return StoreResult.fail();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoreResult doUpdate(List<Document> documents, StoreOptions options) {
|
||||
if (CollectionUtil.noItems(documents)) {
|
||||
return StoreResult.success();
|
||||
}
|
||||
String collectionName = options.getCollectionNameOrDefault(defaultCollectionName);
|
||||
if (StringUtil.noText(collectionName)) {
|
||||
throw new IllegalStateException("CollectionName is null or blank. please config the \"defaultCollectionName\" or store with designative collectionName.");
|
||||
}
|
||||
|
||||
int dimension = getDimension(documents);
|
||||
ensureCollectionExists(collectionName, dimension);
|
||||
|
||||
try {
|
||||
UpsertReq upsertReq = UpsertReq.builder()
|
||||
.collectionName(collectionName)
|
||||
.partitionName(options.getPartitionName())
|
||||
.data(toMilvusDocuments(documents))
|
||||
.build();
|
||||
client.upsert(upsertReq);
|
||||
return StoreResult.successWithIds(documents);
|
||||
} catch (Exception e) {
|
||||
return StoreResult.fail();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Document> doSearch(SearchWrapper wrapper, StoreOptions options) {
|
||||
String collectionName = options.getCollectionNameOrDefault(defaultCollectionName);
|
||||
if (StringUtil.noText(collectionName)) {
|
||||
throw new IllegalStateException("CollectionName is null or blank. please config the \"defaultCollectionName\" or store with designative collectionName.");
|
||||
}
|
||||
ensureCollectionLoaded(collectionName);
|
||||
|
||||
if (wrapper.getVector() == null || wrapper.getVector().length == 0) {
|
||||
return queryByCondition(wrapper, options, collectionName);
|
||||
}
|
||||
return searchByVector(wrapper, options, collectionName);
|
||||
}
|
||||
|
||||
private List<Document> searchByVector(SearchWrapper wrapper, StoreOptions options, String collectionName) {
|
||||
SearchReq searchReq = buildSearchReq(wrapper, options, collectionName);
|
||||
try {
|
||||
SearchResp resp = client.search(searchReq);
|
||||
return parseSearchResults(resp, wrapper.getMinScore());
|
||||
} catch (Exception e) {
|
||||
if (isCollectionNotLoaded(e)) {
|
||||
loadedCollections.remove(collectionName);
|
||||
try {
|
||||
ensureCollectionLoaded(collectionName);
|
||||
SearchResp retryResp = client.search(searchReq);
|
||||
return parseSearchResults(retryResp, wrapper.getMinScore());
|
||||
} catch (Exception retryException) {
|
||||
LOG.warn("Milvus search retry failed after load. collection={}, message={}", collectionName, retryException.getMessage());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
LOG.warn("Milvus search failed. collection={}, message={}", collectionName, e.getMessage());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
private List<Document> queryByCondition(SearchWrapper wrapper, StoreOptions options, String collectionName) {
|
||||
QueryReq queryReq = buildQueryReq(wrapper, options, collectionName);
|
||||
try {
|
||||
QueryResp resp = client.query(queryReq);
|
||||
return parseQueryResults(resp);
|
||||
} catch (Exception e) {
|
||||
if (isCollectionNotLoaded(e)) {
|
||||
loadedCollections.remove(collectionName);
|
||||
try {
|
||||
ensureCollectionLoaded(collectionName);
|
||||
QueryResp retryResp = client.query(queryReq);
|
||||
return parseQueryResults(retryResp);
|
||||
} catch (Exception retryException) {
|
||||
LOG.warn("Milvus query retry failed after load. collection={}, message={}", collectionName, retryException.getMessage());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
LOG.warn("Milvus query failed. collection={}, message={}", collectionName, e.getMessage());
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
||||
private SearchReq buildSearchReq(SearchWrapper wrapper, StoreOptions options, String collectionName) {
|
||||
SearchReq.SearchReqBuilder<?, ?> builder = SearchReq.builder()
|
||||
.collectionName(collectionName)
|
||||
.consistencyLevel(ConsistencyLevel.STRONG)
|
||||
.outputFields(getOutputFields(wrapper))
|
||||
.topK(wrapper.getMaxResults())
|
||||
.annsField(FIELD_VECTOR)
|
||||
.data(Collections.singletonList(toFloatList(wrapper.getVector())))
|
||||
.searchParams(Maps.of("ef", 64));
|
||||
|
||||
if (CollectionUtil.hasItems(options.getPartitionNamesOrEmpty())) {
|
||||
builder.partitionNames(options.getPartitionNamesOrEmpty());
|
||||
}
|
||||
String filterExpression = wrapper.toFilterExpression(MilvusExpressionAdaptor.DEFAULT);
|
||||
if (StringUtil.hasText(filterExpression)) {
|
||||
builder.filter(filterExpression);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private QueryReq buildQueryReq(SearchWrapper wrapper, StoreOptions options, String collectionName) {
|
||||
QueryReq.QueryReqBuilder<?, ?> builder = QueryReq.builder()
|
||||
.collectionName(collectionName)
|
||||
.consistencyLevel(ConsistencyLevel.STRONG)
|
||||
.outputFields(getOutputFields(wrapper));
|
||||
|
||||
if (CollectionUtil.hasItems(options.getPartitionNamesOrEmpty())) {
|
||||
builder.partitionNames(options.getPartitionNamesOrEmpty());
|
||||
}
|
||||
String filterExpression = wrapper.toFilterExpression(MilvusExpressionAdaptor.DEFAULT);
|
||||
if (StringUtil.hasText(filterExpression)) {
|
||||
builder.filter(filterExpression);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private List<Document> parseSearchResults(SearchResp resp, Double minScore) {
|
||||
List<List<SearchResp.SearchResult>> results = resp.getSearchResults();
|
||||
if (CollectionUtil.noItems(results)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
List<Document> documents = new ArrayList<Document>();
|
||||
for (List<SearchResp.SearchResult> resultList : results) {
|
||||
if (CollectionUtil.noItems(resultList)) {
|
||||
continue;
|
||||
}
|
||||
for (SearchResp.SearchResult result : resultList) {
|
||||
Document document = toDocument(result.getEntity());
|
||||
if (document == null) {
|
||||
continue;
|
||||
}
|
||||
document.setId(result.getId());
|
||||
Float distance = result.getDistance();
|
||||
if (distance != null) {
|
||||
double score = (distance + 1.0d) / 2.0d;
|
||||
document.setScore(score);
|
||||
}
|
||||
if (minScore == null || document.getScore() == null || document.getScore() >= minScore) {
|
||||
documents.add(document);
|
||||
}
|
||||
}
|
||||
}
|
||||
return documents;
|
||||
}
|
||||
|
||||
private List<Document> parseQueryResults(QueryResp resp) {
|
||||
List<QueryResp.QueryResult> results = resp.getQueryResults();
|
||||
if (CollectionUtil.noItems(results)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<Document> documents = new ArrayList<Document>(results.size());
|
||||
for (QueryResp.QueryResult result : results) {
|
||||
Document document = toDocument(result.getEntity());
|
||||
if (document != null) {
|
||||
documents.add(document);
|
||||
}
|
||||
}
|
||||
return documents;
|
||||
}
|
||||
|
||||
private Document toDocument(Map<String, Object> entity) {
|
||||
if (entity == null || entity.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Document document = new Document();
|
||||
document.setId(entity.get(FIELD_ID));
|
||||
|
||||
Object vectorObj = entity.get(FIELD_VECTOR);
|
||||
if (vectorObj instanceof List) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<? extends Number> vectorList = (List<? extends Number>) vectorObj;
|
||||
document.setVector(vectorList);
|
||||
}
|
||||
|
||||
Object contentObj = entity.get(FIELD_CONTENT);
|
||||
if (contentObj != null) {
|
||||
document.setContent(contentObj.toString());
|
||||
}
|
||||
|
||||
Object metadataObj = entity.get(FIELD_METADATA);
|
||||
if (metadataObj instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> metadata = (Map<String, Object>) metadataObj;
|
||||
document.addMetadata(metadata);
|
||||
} else if (metadataObj != null) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> metadata = JSON.parseObject(JSON.toJSONString(metadataObj), Map.class);
|
||||
document.addMetadata(metadata);
|
||||
}
|
||||
|
||||
return document;
|
||||
}
|
||||
|
||||
private List<JSONObject> toMilvusDocuments(List<Document> documents) {
|
||||
List<JSONObject> rows = new ArrayList<JSONObject>(documents.size());
|
||||
for (Document doc : documents) {
|
||||
JSONObject row = new JSONObject();
|
||||
row.put(FIELD_ID, String.valueOf(doc.getId()));
|
||||
row.put(FIELD_CONTENT, doc.getContent());
|
||||
row.put(FIELD_VECTOR, toFloatList(doc.getVector()));
|
||||
Map<String, Object> metadatas = doc.getMetadataMap();
|
||||
row.put(FIELD_METADATA, metadatas == null ? new JSONObject() : new JSONObject(metadatas));
|
||||
rows.add(row);
|
||||
}
|
||||
return rows;
|
||||
}
|
||||
|
||||
private List<String> getOutputFields(SearchWrapper wrapper) {
|
||||
List<String> outputFields = wrapper.isOutputVector()
|
||||
? new ArrayList<String>(Arrays.asList(FIELD_ID, FIELD_VECTOR, FIELD_CONTENT, FIELD_METADATA))
|
||||
: new ArrayList<String>(Arrays.asList(FIELD_ID, FIELD_CONTENT, FIELD_METADATA));
|
||||
|
||||
if (CollectionUtil.hasItems(wrapper.getOutputFields())) {
|
||||
outputFields.addAll(wrapper.getOutputFields());
|
||||
}
|
||||
return outputFields;
|
||||
}
|
||||
|
||||
private List<Float> toFloatList(float[] vector) {
|
||||
List<Float> values = new ArrayList<Float>(vector.length);
|
||||
for (float item : vector) {
|
||||
values.add(item);
|
||||
}
|
||||
return values;
|
||||
}
|
||||
|
||||
private int getDimension(List<Document> documents) {
|
||||
for (Document document : documents) {
|
||||
float[] vector = document.getVector();
|
||||
if (vector != null && vector.length > 0) {
|
||||
return vector.length;
|
||||
}
|
||||
}
|
||||
if (this.getEmbeddingModel() != null) {
|
||||
return this.getEmbeddingModel().dimensions();
|
||||
}
|
||||
throw new IllegalStateException("Unable to determine vector dimension for Milvus collection.");
|
||||
}
|
||||
|
||||
private void ensureCollectionExists(String collectionName, int dimension) {
|
||||
if (initializedCollections.contains(collectionName)) {
|
||||
return;
|
||||
}
|
||||
synchronized (initializedCollections) {
|
||||
if (initializedCollections.contains(collectionName)) {
|
||||
return;
|
||||
}
|
||||
Boolean exists = client.hasCollection(HasCollectionReq.builder().collectionName(collectionName).build());
|
||||
if (Boolean.TRUE.equals(exists)) {
|
||||
initializedCollections.add(collectionName);
|
||||
return;
|
||||
}
|
||||
if (!config.isAutoCreateCollection()) {
|
||||
throw new IllegalStateException("Milvus collection not found and autoCreateCollection is disabled: " + collectionName);
|
||||
}
|
||||
createCollection(collectionName, dimension);
|
||||
initializedCollections.add(collectionName);
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureCollectionLoaded(String collectionName) {
|
||||
if (loadedCollections.contains(collectionName)) {
|
||||
return;
|
||||
}
|
||||
synchronized (loadedCollections) {
|
||||
if (loadedCollections.contains(collectionName)) {
|
||||
return;
|
||||
}
|
||||
boolean loaded = false;
|
||||
try {
|
||||
loaded = Boolean.TRUE.equals(client.getLoadState(GetLoadStateReq.builder().collectionName(collectionName).build()));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Milvus getLoadState failed. collection={}, message={}", collectionName, e.getMessage());
|
||||
}
|
||||
|
||||
if (!loaded) {
|
||||
client.loadCollection(LoadCollectionReq.builder().collectionName(collectionName).build());
|
||||
waitForCollectionLoaded(collectionName);
|
||||
}
|
||||
loadedCollections.add(collectionName);
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForCollectionLoaded(String collectionName) {
|
||||
long deadline = System.currentTimeMillis() + LOAD_TIMEOUT_MS;
|
||||
while (System.currentTimeMillis() < deadline) {
|
||||
if (Boolean.TRUE.equals(client.getLoadState(GetLoadStateReq.builder().collectionName(collectionName).build()))) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(LOAD_POLL_INTERVAL_MS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException("Interrupted while loading Milvus collection: " + collectionName, e);
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("Timeout waiting for Milvus collection loaded: " + collectionName);
|
||||
}
|
||||
|
||||
private boolean isCollectionNotLoaded(Exception e) {
|
||||
Throwable current = e;
|
||||
while (current != null) {
|
||||
String message = current.getMessage();
|
||||
if (message != null && message.toLowerCase().contains("collection not loaded")) {
|
||||
return true;
|
||||
}
|
||||
current = current.getCause();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void createCollection(String collectionName, int dimension) {
|
||||
List<CreateCollectionReq.FieldSchema> fieldSchemaList = new ArrayList<CreateCollectionReq.FieldSchema>();
|
||||
fieldSchemaList.add(CreateCollectionReq.FieldSchema.builder()
|
||||
.name(FIELD_ID)
|
||||
.dataType(DataType.VarChar)
|
||||
.maxLength(64)
|
||||
.isPrimaryKey(true)
|
||||
.autoID(false)
|
||||
.build());
|
||||
fieldSchemaList.add(CreateCollectionReq.FieldSchema.builder()
|
||||
.name(FIELD_CONTENT)
|
||||
.dataType(DataType.VarChar)
|
||||
.maxLength(65535)
|
||||
.build());
|
||||
fieldSchemaList.add(CreateCollectionReq.FieldSchema.builder()
|
||||
.name(FIELD_METADATA)
|
||||
.dataType(DataType.JSON)
|
||||
.build());
|
||||
fieldSchemaList.add(CreateCollectionReq.FieldSchema.builder()
|
||||
.name(FIELD_VECTOR)
|
||||
.dataType(DataType.FloatVector)
|
||||
.dimension(dimension)
|
||||
.build());
|
||||
|
||||
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder()
|
||||
.fieldSchemaList(fieldSchemaList)
|
||||
.build();
|
||||
|
||||
List<IndexParam> indexParams = new ArrayList<IndexParam>();
|
||||
indexParams.add(IndexParam.builder()
|
||||
.fieldName(FIELD_VECTOR)
|
||||
.indexName(FIELD_VECTOR + "_hnsw_idx")
|
||||
.indexType(IndexParam.IndexType.HNSW)
|
||||
.metricType(IndexParam.MetricType.COSINE)
|
||||
.extraParams(Maps.of("M", 16).set("efConstruction", 200))
|
||||
.build());
|
||||
|
||||
CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
|
||||
.collectionName(collectionName)
|
||||
.collectionSchema(collectionSchema)
|
||||
.primaryFieldName(FIELD_ID)
|
||||
.vectorFieldName(FIELD_VECTOR)
|
||||
.description("Easy-Agents Milvus Vector Store")
|
||||
.indexParams(indexParams)
|
||||
.build();
|
||||
client.createCollection(createCollectionReq);
|
||||
ensureCollectionLoaded(collectionName);
|
||||
}
|
||||
|
||||
public MilvusClientV2 getClient() {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Copyright (c) 2023-2026, Easy-Agents (fuhai999@gmail.com).
|
||||
* <p>
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.easyagents.store.milvus;
|
||||
|
||||
import com.easyagents.core.store.DocumentStoreConfig;
|
||||
import com.easyagents.core.util.StringUtil;
|
||||
|
||||
/**
|
||||
* https://milvus.io/docs/install-java.md
|
||||
*/
|
||||
public class MilvusVectorStoreConfig implements DocumentStoreConfig {
|
||||
|
||||
private String uri;
|
||||
private String token;
|
||||
private String databaseName = "default";
|
||||
private String username;
|
||||
private String password;
|
||||
private String defaultCollectionName;
|
||||
private boolean autoCreateCollection = true;
|
||||
|
||||
public String getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
public void setUri(String uri) {
|
||||
this.uri = uri;
|
||||
}
|
||||
|
||||
public String getToken() {
|
||||
return token;
|
||||
}
|
||||
|
||||
public void setToken(String token) {
|
||||
this.token = token;
|
||||
}
|
||||
|
||||
public String getDatabaseName() {
|
||||
return databaseName;
|
||||
}
|
||||
|
||||
public void setDatabaseName(String databaseName) {
|
||||
this.databaseName = databaseName;
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
return username;
|
||||
}
|
||||
|
||||
public void setUsername(String username) {
|
||||
this.username = username;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public String getDefaultCollectionName() {
|
||||
return defaultCollectionName;
|
||||
}
|
||||
|
||||
public void setDefaultCollectionName(String defaultCollectionName) {
|
||||
this.defaultCollectionName = defaultCollectionName;
|
||||
}
|
||||
|
||||
public boolean isAutoCreateCollection() {
|
||||
return autoCreateCollection;
|
||||
}
|
||||
|
||||
public void setAutoCreateCollection(boolean autoCreateCollection) {
|
||||
this.autoCreateCollection = autoCreateCollection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAvailable() {
|
||||
return StringUtil.hasText(this.uri);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.easyagents.store.milvus;
|
||||
|
||||
import com.easyagents.core.store.SearchWrapper;
|
||||
import com.easyagents.core.store.condition.Connector;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public class MilvusExpressionAdaptorTest {
|
||||
|
||||
@Test
|
||||
public void testGroupAndInExpression() {
|
||||
SearchWrapper wrapper = new SearchWrapper();
|
||||
wrapper.eq("akey", "avalue").eq(Connector.OR, "bkey", "bvalue").group(w -> {
|
||||
w.eq("ckey", "cvalue").in(Connector.AND_NOT, "dkey", Arrays.asList("aa", "bb"));
|
||||
}).eq("a", "b");
|
||||
|
||||
String expr = "akey == \"avalue\" OR bkey == \"bvalue\" AND (ckey == \"cvalue\" AND NOT dkey IN [\"aa\",\"bb\"]) AND a == \"b\"";
|
||||
Assert.assertEquals(expr, wrapper.toFilterExpression(MilvusExpressionAdaptor.DEFAULT));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBetweenExpression() {
|
||||
SearchWrapper wrapper = new SearchWrapper();
|
||||
wrapper.eq("akey", "avalue").between(Connector.OR, "bkey", "1", "100").in("ckey", Arrays.asList("aa", "bb"));
|
||||
|
||||
String expr = "akey == \"avalue\" OR (bkey >= 1 && bkey <= 100) AND ckey IN [\"aa\",\"bb\"]";
|
||||
Assert.assertEquals(expr, wrapper.toFilterExpression(MilvusExpressionAdaptor.DEFAULT));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.easyagents.store.milvus;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class MilvusVectorStoreConfigTest {
|
||||
|
||||
@Test
|
||||
public void testUriIsRequired() {
|
||||
MilvusVectorStoreConfig config = new MilvusVectorStoreConfig();
|
||||
Assert.assertFalse(config.checkAvailable());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUriOnlyIsAvailable() {
|
||||
MilvusVectorStoreConfig config = new MilvusVectorStoreConfig();
|
||||
config.setUri("http://127.0.0.1:19530");
|
||||
Assert.assertTrue(config.checkAvailable());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTokenModeIsAvailable() {
|
||||
MilvusVectorStoreConfig config = new MilvusVectorStoreConfig();
|
||||
config.setUri("http://127.0.0.1:19530");
|
||||
config.setToken("root:Milvus");
|
||||
Assert.assertTrue(config.checkAvailable());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUsernamePasswordModeIsAvailable() {
|
||||
MilvusVectorStoreConfig config = new MilvusVectorStoreConfig();
|
||||
config.setUri("http://127.0.0.1:19530");
|
||||
config.setUsername("root");
|
||||
config.setPassword("Milvus");
|
||||
Assert.assertTrue(config.checkAvailable());
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@
|
||||
<modules>
|
||||
<module>easy-agents-store-qcloud</module>
|
||||
<module>easy-agents-store-aliyun</module>
|
||||
<module>easy-agents-store-milvus</module>
|
||||
<module>easy-agents-store-chroma</module>
|
||||
<module>easy-agents-store-redis</module>
|
||||
<module>easy-agents-store-opensearch</module>
|
||||
|
||||
5
pom.xml
5
pom.xml
@@ -286,6 +286,11 @@
|
||||
<artifactId>easy-agents-store-aliyun</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.easyagents</groupId>
|
||||
<artifactId>easy-agents-store-milvus</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.easyagents</groupId>
|
||||
<artifactId>easy-agents-store-chroma</artifactId>
|
||||
|
||||
Reference in New Issue
Block a user