feat: 增强工作台趋势概览与聊天排行

- 支持用户活跃与智能体活跃趋势统计及自定义时间范围

- 增加用户活跃榜与智能体趋势数据结构及查询实现

- 同步补齐工作台页面展示与定向测试
This commit is contained in:
2026-05-06 19:22:09 +08:00
parent 5827ecde42
commit 31b0e21d3d
20 changed files with 2087 additions and 146 deletions

View File

@@ -0,0 +1,19 @@
package tech.easyflow.chatlog.domain.dto;
import java.math.BigInteger;
/**
* 聊天活跃用户排行项。
*
* @param userId 用户 ID
* @param userAccount 用户账号快照
* @param sessionTotal 会话总数
* @param messageTotal 消息总数
* @param assistantTotal 使用智能体数
*/
public record ChatActiveUserRank(BigInteger userId,
String userAccount,
long sessionTotal,
long messageTotal,
long assistantTotal) {
}

View File

@@ -0,0 +1,17 @@
package tech.easyflow.chatlog.domain.dto;
import java.math.BigInteger;
/**
* 智能体会话趋势项。
*
* @param assistantId 智能体 ID
* @param assistantName 智能体名称
* @param bucketKey 趋势桶标识,日趋势为 yyyy-MM-dd小时趋势为 yyyy-MM-dd HH:00:00
* @param sessionTotal 会话总数
*/
public record ChatAssistantSessionTrend(BigInteger assistantId,
String assistantName,
String bucketKey,
long sessionTotal) {
}

View File

@@ -7,11 +7,13 @@ import java.math.BigInteger;
*
* @param assistantId 智能体 ID
* @param assistantName 智能体名称
* @param userTotal 使用用户数
* @param sessionTotal 会话总数
* @param messageTotal 消息总数
*/
public record ChatAssistantUsageRank(BigInteger assistantId,
String assistantName,
long userTotal,
long sessionTotal,
long messageTotal) {
}

View File

@@ -6,8 +6,12 @@ package tech.easyflow.chatlog.domain.dto;
* @param sessionTotal 会话总数
* @param messageTotal 消息总数
* @param activeAssistantTotal 活跃智能体数
* @param chatActiveUserTotal AI 活跃用户数
*/
public record ChatDashboardSummary(long sessionTotal, long messageTotal, long activeAssistantTotal) {
public record ChatDashboardSummary(long sessionTotal,
long messageTotal,
long activeAssistantTotal,
long chatActiveUserTotal) {
/**
* 创建空汇总结果。
@@ -15,6 +19,6 @@ public record ChatDashboardSummary(long sessionTotal, long messageTotal, long ac
* @return 空汇总结果
*/
public static ChatDashboardSummary empty() {
return new ChatDashboardSummary(0L, 0L, 0L);
return new ChatDashboardSummary(0L, 0L, 0L, 0L);
}
}

View File

@@ -6,6 +6,10 @@ package tech.easyflow.chatlog.domain.dto;
* @param bucketKey 趋势桶标识,日趋势为 yyyy-MM-dd小时趋势为 yyyy-MM-dd HH:00:00
* @param sessionTotal 会话总数
* @param messageTotal 消息总数
* @param activeUserTotal AI 活跃用户数
*/
public record ChatDashboardTrend(String bucketKey, long sessionTotal, long messageTotal) {
public record ChatDashboardTrend(String bucketKey,
long sessionTotal,
long messageTotal,
long activeUserTotal) {
}

View File

@@ -4,6 +4,8 @@ import org.springframework.beans.factory.ObjectProvider;
import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter;
import org.springframework.stereotype.Repository;
import org.springframework.util.StringUtils;
import tech.easyflow.chatlog.domain.dto.ChatActiveUserRank;
import tech.easyflow.chatlog.domain.dto.ChatAssistantSessionTrend;
import tech.easyflow.chatlog.domain.dto.ChatAssistantUsageRank;
import tech.easyflow.chatlog.domain.dto.ChatDashboardSummary;
import tech.easyflow.chatlog.domain.dto.ChatDashboardTrend;
@@ -26,6 +28,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -222,21 +225,24 @@ public class ChatAnalyticalDBRepository {
List<Object> args = new java.util.ArrayList<>();
StringBuilder sql = new StringBuilder();
sql.append("SELECT ")
.append("ifNull(sum(message_count), 0) AS message_total, ")
.append("ifNull(sum(session_count), 0) AS session_total, ")
.append("uniqExact(dimension_id) AS active_assistant_total ")
.append("FROM dws_chat_assistant_day ")
.append("WHERE stat_date >= toDate(?) AND stat_date < toDate(?)");
.append("uniqExact(agg.dimension_id) AS session_total, ")
.append("ifNull(sum(agg.message_count), 0) AS message_total, ")
.append("uniqExact(agg.assistant_id) AS active_assistant_total, ")
.append("uniqExact(agg.user_id) AS active_user_total ")
.append("FROM dws_chat_session_day agg ")
.append("LEFT JOIN (SELECT id, tenant_id FROM ods_chat_session FINAL) s ON s.id = agg.dimension_id ")
.append("WHERE agg.stat_date >= toDate(?) AND agg.stat_date < toDate(?)");
args.add(startDate.toString());
args.add(endDate.toString());
appendOptionalTenantFilter(sql, args, tenantId, "tenant_id");
appendOptionalTenantFilter(sql, args, tenantId, "s.tenant_id");
ChatDashboardSummary summary = analyticalDBOperations.queryOne(
sql.toString(),
(rs, rowNum) -> new ChatDashboardSummary(
rs.getLong("session_total"),
rs.getLong("message_total"),
rs.getLong("active_assistant_total")
rs.getLong("active_assistant_total"),
rs.getLong("active_user_total")
),
args.toArray()
);
@@ -255,22 +261,25 @@ public class ChatAnalyticalDBRepository {
assertAvailable();
List<Object> args = new java.util.ArrayList<>();
StringBuilder sql = new StringBuilder();
sql.append("SELECT toString(stat_date) AS bucket_key, ")
.append("ifNull(sum(message_count), 0) AS message_total, ")
.append("ifNull(sum(session_count), 0) AS session_total ")
.append("FROM dws_chat_assistant_day ")
.append("WHERE stat_date >= toDate(?) AND stat_date < toDate(?)");
sql.append("SELECT toString(agg.stat_date) AS bucket_key, ")
.append("uniqExact(agg.dimension_id) AS session_total, ")
.append("ifNull(sum(agg.message_count), 0) AS message_total, ")
.append("uniqExact(agg.user_id) AS active_user_total ")
.append("FROM dws_chat_session_day agg ")
.append("LEFT JOIN (SELECT id, tenant_id FROM ods_chat_session FINAL) s ON s.id = agg.dimension_id ")
.append("WHERE agg.stat_date >= toDate(?) AND agg.stat_date < toDate(?)");
args.add(startDate.toString());
args.add(endDate.toString());
appendOptionalTenantFilter(sql, args, tenantId, "tenant_id");
sql.append(" GROUP BY stat_date ORDER BY stat_date ASC");
appendOptionalTenantFilter(sql, args, tenantId, "s.tenant_id");
sql.append(" GROUP BY agg.stat_date ORDER BY agg.stat_date ASC");
return analyticalDBOperations.query(
sql.toString(),
(rs, rowNum) -> new ChatDashboardTrend(
rs.getString("bucket_key"),
rs.getLong("session_total"),
rs.getLong("message_total")
rs.getLong("message_total"),
rs.getLong("active_user_total")
),
args.toArray()
);
@@ -292,7 +301,8 @@ public class ChatAnalyticalDBRepository {
StringBuilder sql = new StringBuilder();
sql.append("SELECT formatDateTime(toStartOfHour(l.created), '%Y-%m-%d %H:00:00') AS bucket_key, ")
.append("count() AS message_total, ")
.append("uniqExact(l.session_id) AS session_total ")
.append("uniqExact(l.session_id) AS session_total, ")
.append("uniqExact(l.user_id) AS active_user_total ")
.append("FROM ods_chat_log l ")
.append("LEFT JOIN (SELECT id, tenant_id FROM ods_chat_session FINAL) s ON s.id = l.session_id ")
.append("WHERE l.created >= toDateTime(?) AND l.created < toDateTime(?)");
@@ -306,7 +316,8 @@ public class ChatAnalyticalDBRepository {
(rs, rowNum) -> new ChatDashboardTrend(
rs.getString("bucket_key"),
rs.getLong("session_total"),
rs.getLong("message_total")
rs.getLong("message_total"),
rs.getLong("active_user_total")
),
args.toArray()
);
@@ -329,17 +340,21 @@ public class ChatAnalyticalDBRepository {
int safeLimit = Math.max(limit, 1);
List<Object> args = new java.util.ArrayList<>();
StringBuilder sql = new StringBuilder();
sql.append("SELECT agg.assistant_id, snapshot.assistant_name, agg.session_total, agg.message_total ")
sql.append("SELECT agg.assistant_id AS assistant_id, ")
.append("snapshot.assistant_name AS assistant_name, ")
.append("agg.user_total, agg.session_total, agg.message_total ")
.append("FROM (")
.append("SELECT dimension_id AS assistant_id, ")
.append("ifNull(sum(session_count), 0) AS session_total, ")
.append("ifNull(sum(message_count), 0) AS message_total ")
.append("FROM dws_chat_assistant_day ")
.append("WHERE stat_date >= toDate(?) AND stat_date < toDate(?)");
.append("SELECT agg.assistant_id, ")
.append("uniqExact(agg.user_id) AS user_total, ")
.append("uniqExact(agg.dimension_id) AS session_total, ")
.append("ifNull(sum(agg.message_count), 0) AS message_total ")
.append("FROM dws_chat_session_day agg ")
.append("LEFT JOIN (SELECT id, tenant_id FROM ods_chat_session FINAL) s ON s.id = agg.dimension_id ")
.append("WHERE agg.stat_date >= toDate(?) AND agg.stat_date < toDate(?)");
args.add(startDate.toString());
args.add(endDate.toString());
appendOptionalTenantFilter(sql, args, tenantId, "tenant_id");
sql.append(" GROUP BY dimension_id")
appendOptionalTenantFilter(sql, args, tenantId, "s.tenant_id");
sql.append(" GROUP BY agg.assistant_id")
.append(") agg ")
.append("LEFT JOIN (")
.append("SELECT assistant_id, argMax(assistant_name, modified) AS assistant_name ")
@@ -347,7 +362,7 @@ public class ChatAnalyticalDBRepository {
appendOptionalTenantFilter(sql, args, tenantId, "tenant_id");
sql.append(" GROUP BY assistant_id")
.append(") snapshot ON snapshot.assistant_id = agg.assistant_id ")
.append("ORDER BY agg.message_total DESC, agg.session_total DESC, agg.assistant_id ASC ")
.append("ORDER BY agg.session_total DESC, agg.user_total DESC, agg.message_total DESC, agg.assistant_id ASC ")
.append("LIMIT ?");
args.add(safeLimit);
@@ -356,6 +371,7 @@ public class ChatAnalyticalDBRepository {
(rs, rowNum) -> new ChatAssistantUsageRank(
bigInteger(rs.getObject("assistant_id")),
rs.getString("assistant_name"),
rs.getLong("user_total"),
rs.getLong("session_total"),
rs.getLong("message_total")
),
@@ -363,6 +379,215 @@ public class ChatAnalyticalDBRepository {
);
}
/**
* 查询智能体日趋势。
*
* @param startDate 开始日期,包含
* @param endDate 结束日期,不包含
* @param tenantId 租户 ID空表示全局
* @param assistantIds 智能体 ID 列表
* @return 趋势列表
*/
public List<ChatAssistantSessionTrend> queryAssistantSessionTrends(LocalDate startDate,
LocalDate endDate,
BigInteger tenantId,
List<BigInteger> assistantIds) {
assertAvailable();
if (assistantIds == null || assistantIds.isEmpty()) {
return Collections.emptyList();
}
List<Object> args = new ArrayList<>();
StringBuilder sql = new StringBuilder();
sql.append("SELECT agg.assistant_id AS assistant_id, ")
.append("snapshot.assistant_name AS assistant_name, ")
.append("toString(agg.stat_date) AS bucket_key, ")
.append("uniqExact(agg.dimension_id) AS session_total ")
.append("FROM dws_chat_session_day agg ")
.append("LEFT JOIN (SELECT id, tenant_id FROM ods_chat_session FINAL) s ON s.id = agg.dimension_id ")
.append("LEFT JOIN (")
.append("SELECT assistant_id, argMax(assistant_name, modified) AS assistant_name ")
.append("FROM ods_chat_session FINAL WHERE is_deleted = 0 ");
appendOptionalTenantFilter(sql, args, tenantId, "tenant_id");
sql.append(" GROUP BY assistant_id")
.append(") snapshot ON snapshot.assistant_id = agg.assistant_id ")
.append("WHERE agg.stat_date >= toDate(?) AND agg.stat_date < toDate(?)");
args.add(startDate.toString());
args.add(endDate.toString());
appendOptionalTenantFilter(sql, args, tenantId, "s.tenant_id");
appendAssistantIdFilter(sql, args, "agg.assistant_id", assistantIds);
sql.append(" GROUP BY agg.assistant_id, snapshot.assistant_name, agg.stat_date ")
.append("ORDER BY agg.stat_date ASC, agg.assistant_id ASC");
return analyticalDBOperations.query(
sql.toString(),
(rs, rowNum) -> new ChatAssistantSessionTrend(
bigInteger(rs.getObject("assistant_id")),
rs.getString("assistant_name"),
rs.getString("bucket_key"),
rs.getLong("session_total")
),
args.toArray()
);
}
/**
* 查询智能体小时趋势。
*
* @param startTime 开始时间,包含
* @param endTime 结束时间,不包含
* @param tenantId 租户 ID空表示全局
* @param assistantIds 智能体 ID 列表
* @return 趋势列表
*/
public List<ChatAssistantSessionTrend> queryAssistantSessionHourlyTrends(LocalDateTime startTime,
LocalDateTime endTime,
BigInteger tenantId,
List<BigInteger> assistantIds) {
assertAvailable();
if (assistantIds == null || assistantIds.isEmpty()) {
return Collections.emptyList();
}
List<Object> args = new ArrayList<>();
StringBuilder sql = new StringBuilder();
sql.append("SELECT l.assistant_id AS assistant_id, ")
.append("snapshot.assistant_name AS assistant_name, ")
.append("formatDateTime(toStartOfHour(l.created), '%Y-%m-%d %H:00:00') AS bucket_key, ")
.append("uniqExact(l.session_id) AS session_total ")
.append("FROM ods_chat_log l ")
.append("LEFT JOIN (SELECT id, tenant_id FROM ods_chat_session FINAL) s ON s.id = l.session_id ")
.append("LEFT JOIN (")
.append("SELECT assistant_id, argMax(assistant_name, modified) AS assistant_name ")
.append("FROM ods_chat_session FINAL WHERE is_deleted = 0 ");
appendOptionalTenantFilter(sql, args, tenantId, "tenant_id");
sql.append(" GROUP BY assistant_id")
.append(") snapshot ON snapshot.assistant_id = l.assistant_id ")
.append("WHERE l.created >= toDateTime(?) AND l.created < toDateTime(?)");
args.add(CH_DATE_TIME_FORMATTER.format(startTime));
args.add(CH_DATE_TIME_FORMATTER.format(endTime));
appendOptionalTenantFilter(sql, args, tenantId, "s.tenant_id");
appendAssistantIdFilter(sql, args, "l.assistant_id", assistantIds);
sql.append(" GROUP BY l.assistant_id, snapshot.assistant_name, bucket_key ")
.append("ORDER BY bucket_key ASC, l.assistant_id ASC");
return analyticalDBOperations.query(
sql.toString(),
(rs, rowNum) -> new ChatAssistantSessionTrend(
bigInteger(rs.getObject("assistant_id")),
rs.getString("assistant_name"),
rs.getString("bucket_key"),
rs.getLong("session_total")
),
args.toArray()
);
}
/**
* 查询活跃用户排行。
*
* @param startDate 开始日期,包含
* @param endDate 结束日期,不包含
* @param tenantId 租户 ID空表示全局
* @param limit 返回条数
* @return 排行列表
*/
public List<ChatActiveUserRank> queryActiveUserRanks(LocalDate startDate,
LocalDate endDate,
BigInteger tenantId,
int limit) {
assertAvailable();
int safeLimit = Math.max(limit, 1);
List<Object> args = new java.util.ArrayList<>();
StringBuilder sql = new StringBuilder();
sql.append("SELECT agg.user_id AS user_id, ")
.append("snapshot.user_account AS user_account, ")
.append("agg.session_total, agg.message_total, agg.assistant_total ")
.append("FROM (")
.append("SELECT agg.user_id, ")
.append("uniqExact(agg.dimension_id) AS session_total, ")
.append("ifNull(sum(agg.message_count), 0) AS message_total, ")
.append("uniqExact(agg.assistant_id) AS assistant_total ")
.append("FROM dws_chat_session_day agg ")
.append("LEFT JOIN (SELECT id, tenant_id FROM ods_chat_session FINAL) s ON s.id = agg.dimension_id ")
.append("WHERE agg.stat_date >= toDate(?) AND agg.stat_date < toDate(?)");
args.add(startDate.toString());
args.add(endDate.toString());
appendOptionalTenantFilter(sql, args, tenantId, "s.tenant_id");
sql.append(" GROUP BY agg.user_id")
.append(") agg ")
.append("LEFT JOIN (")
.append("SELECT user_id, argMax(user_account, modified) AS user_account ")
.append("FROM ods_chat_session FINAL WHERE is_deleted = 0 ");
appendOptionalTenantFilter(sql, args, tenantId, "tenant_id");
sql.append(" GROUP BY user_id")
.append(") snapshot ON snapshot.user_id = agg.user_id ")
.append("ORDER BY agg.session_total DESC, agg.message_total DESC, agg.user_id ASC ")
.append("LIMIT ?");
args.add(safeLimit);
return analyticalDBOperations.query(
sql.toString(),
(rs, rowNum) -> new ChatActiveUserRank(
bigInteger(rs.getObject("user_id")),
rs.getString("user_account"),
rs.getLong("session_total"),
rs.getLong("message_total"),
rs.getLong("assistant_total")
),
args.toArray()
);
}
/**
* 追加智能体 ID 过滤。
*
* @param sql SQL 构造器
* @param params 参数列表
* @param columnName 列名
* @param assistantIds 智能体 ID 列表
*/
private void appendAssistantIdFilter(StringBuilder sql,
List<Object> params,
String columnName,
List<BigInteger> assistantIds) {
if (assistantIds == null || assistantIds.isEmpty()) {
return;
}
List<BigInteger> nonNullAssistantIds = new ArrayList<>();
boolean containsNullAssistantId = false;
for (BigInteger assistantId : assistantIds) {
if (assistantId == null) {
containsNullAssistantId = true;
} else {
nonNullAssistantIds.add(assistantId);
}
}
if (nonNullAssistantIds.isEmpty() && !containsNullAssistantId) {
return;
}
sql.append(" AND (");
boolean hasPreviousCondition = false;
if (!nonNullAssistantIds.isEmpty()) {
sql.append(columnName).append(" IN (");
for (int i = 0; i < nonNullAssistantIds.size(); i++) {
if (i > 0) {
sql.append(", ");
}
sql.append("?");
params.add(nonNullAssistantIds.get(i));
}
sql.append(")");
hasPreviousCondition = true;
}
if (containsNullAssistantId) {
if (hasPreviousCondition) {
sql.append(" OR ");
}
sql.append(columnName).append(" IS NULL");
}
sql.append(")");
}
public void refreshDws(Set<LocalDate> dates) {
if (!enabled() || dates.isEmpty()) {
return;

View File

@@ -1,5 +1,7 @@
package tech.easyflow.chatlog.service;
import tech.easyflow.chatlog.domain.dto.ChatActiveUserRank;
import tech.easyflow.chatlog.domain.dto.ChatAssistantSessionTrend;
import tech.easyflow.chatlog.domain.dto.ChatAssistantUsageRank;
import tech.easyflow.chatlog.domain.dto.ChatDashboardSummary;
import tech.easyflow.chatlog.domain.dto.ChatDashboardTrend;
@@ -58,6 +60,48 @@ public interface ChatDashboardQueryService {
BigInteger tenantId,
int limit);
/**
* 查询智能体日趋势。
*
* @param startDate 开始日期,包含当天
* @param endDate 结束日期,不包含当天
* @param tenantId 租户 ID空表示全局
* @param assistantIds 智能体 ID 列表
* @return 智能体日趋势
*/
List<ChatAssistantSessionTrend> queryAssistantTrends(LocalDate startDate,
LocalDate endDate,
BigInteger tenantId,
List<BigInteger> assistantIds);
/**
* 查询智能体小时趋势。
*
* @param startTime 开始时间,包含
* @param endTime 结束时间,不包含
* @param tenantId 租户 ID空表示全局
* @param assistantIds 智能体 ID 列表
* @return 智能体小时趋势
*/
List<ChatAssistantSessionTrend> queryAssistantHourlyTrends(LocalDateTime startTime,
LocalDateTime endTime,
BigInteger tenantId,
List<BigInteger> assistantIds);
/**
* 查询活跃用户排行。
*
* @param startDate 开始日期,包含当天
* @param endDate 结束日期,不包含当天
* @param tenantId 租户 ID空表示全局
* @param limit 返回条数
* @return 活跃用户排行
*/
List<ChatActiveUserRank> queryActiveUserRanks(LocalDate startDate,
LocalDate endDate,
BigInteger tenantId,
int limit);
/**
* 当前分析库是否可用。
*

View File

@@ -1,6 +1,8 @@
package tech.easyflow.chatlog.service.impl;
import org.springframework.stereotype.Service;
import tech.easyflow.chatlog.domain.dto.ChatActiveUserRank;
import tech.easyflow.chatlog.domain.dto.ChatAssistantSessionTrend;
import tech.easyflow.chatlog.domain.dto.ChatAssistantUsageRank;
import tech.easyflow.chatlog.domain.dto.ChatDashboardSummary;
import tech.easyflow.chatlog.domain.dto.ChatDashboardTrend;
@@ -93,6 +95,66 @@ public class ChatDashboardQueryServiceImpl implements ChatDashboardQueryService
return analyticalDBRepository.queryAssistantUsageRanks(startDate, endDate, tenantId, limit);
}
/**
* 查询智能体日趋势。
*
* @param startDate 开始日期,包含当天
* @param endDate 结束日期,不包含当天
* @param tenantId 租户 ID空表示全局
* @param assistantIds 智能体 ID 列表
* @return 智能体日趋势
*/
@Override
public List<ChatAssistantSessionTrend> queryAssistantTrends(LocalDate startDate,
LocalDate endDate,
BigInteger tenantId,
List<BigInteger> assistantIds) {
if (!available() || assistantIds == null || assistantIds.isEmpty()) {
return Collections.emptyList();
}
return analyticalDBRepository.queryAssistantSessionTrends(startDate, endDate, tenantId, assistantIds);
}
/**
* 查询智能体小时趋势。
*
* @param startTime 开始时间,包含
* @param endTime 结束时间,不包含
* @param tenantId 租户 ID空表示全局
* @param assistantIds 智能体 ID 列表
* @return 智能体小时趋势
*/
@Override
public List<ChatAssistantSessionTrend> queryAssistantHourlyTrends(LocalDateTime startTime,
LocalDateTime endTime,
BigInteger tenantId,
List<BigInteger> assistantIds) {
if (!available() || assistantIds == null || assistantIds.isEmpty()) {
return Collections.emptyList();
}
return analyticalDBRepository.queryAssistantSessionHourlyTrends(startTime, endTime, tenantId, assistantIds);
}
/**
* 查询活跃用户排行。
*
* @param startDate 开始日期,包含当天
* @param endDate 结束日期,不包含当天
* @param tenantId 租户 ID空表示全局
* @param limit 返回条数
* @return 活跃用户排行
*/
@Override
public List<ChatActiveUserRank> queryActiveUserRanks(LocalDate startDate,
LocalDate endDate,
BigInteger tenantId,
int limit) {
if (!available()) {
return Collections.emptyList();
}
return analyticalDBRepository.queryActiveUserRanks(startDate, endDate, tenantId, limit);
}
/**
* 当前分析库是否可用。
*

View File

@@ -0,0 +1,188 @@
package tech.easyflow.chatlog.repository.analyticaldb;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.support.StaticListableBeanFactory;
import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter;
import org.springframework.jdbc.core.RowMapper;
import tech.easyflow.chatlog.domain.dto.ChatDashboardSummary;
import tech.easyflow.chatlog.support.ChatJsonSupport;
import tech.easyflow.common.analyticaldb.config.AnalyticalDBFlywayProperties;
import tech.easyflow.common.analyticaldb.core.AnalyticalDBOperations;
import tech.easyflow.common.analyticaldb.page.AnalyticalDBPageRequest;
import tech.easyflow.common.analyticaldb.page.AnalyticalDBPageResult;
import tech.easyflow.common.analyticaldb.support.AnalyticalDBHealthSupport;
import java.math.BigInteger;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* {@link ChatAnalyticalDBRepository} 测试。
*/
public class ChatAnalyticalDBRepositoryTest {
/**
* 验证工作台汇总使用跨天去重的 session 口径。
*/
@Test
public void shouldUseDistinctSessionSqlForDashboardSummary() {
RecordingAnalyticalDBOperations operations = new RecordingAnalyticalDBOperations();
operations.queryOneResult = new ChatDashboardSummary(2L, 5L, 1L, 1L);
ChatAnalyticalDBRepository repository = newRepository(operations);
repository.queryDashboardSummary(LocalDate.of(2026, 4, 1), LocalDate.of(2026, 4, 8), BigInteger.ONE);
Assert.assertNotNull(operations.lastQueryOneSql);
Assert.assertTrue(operations.lastQueryOneSql.contains("FROM dws_chat_session_day agg"));
Assert.assertTrue(operations.lastQueryOneSql.contains("uniqExact(agg.dimension_id) AS session_total"));
Assert.assertTrue(operations.lastQueryOneSql.contains("uniqExact(agg.user_id) AS active_user_total"));
}
/**
* 验证智能体使用榜按去重会话数排序,并同时统计用户数。
*/
@Test
public void shouldUseDistinctSessionSqlForAssistantUsageRanks() {
RecordingAnalyticalDBOperations operations = new RecordingAnalyticalDBOperations();
ChatAnalyticalDBRepository repository = newRepository(operations);
repository.queryAssistantUsageRanks(LocalDate.of(2026, 4, 1), LocalDate.of(2026, 4, 8), BigInteger.ONE, 5);
Assert.assertNotNull(operations.lastQuerySql);
Assert.assertTrue(operations.lastQuerySql.contains("FROM dws_chat_session_day agg"));
Assert.assertTrue(operations.lastQuerySql.contains("uniqExact(agg.user_id) AS user_total"));
Assert.assertTrue(operations.lastQuerySql.contains("uniqExact(agg.dimension_id) AS session_total"));
Assert.assertTrue(operations.lastQuerySql.contains(
"ORDER BY agg.session_total DESC, agg.user_total DESC, agg.message_total DESC, agg.assistant_id ASC"
));
Assert.assertTrue(operations.lastQuerySql.contains("agg.assistant_id AS assistant_id"));
Assert.assertTrue(operations.lastQuerySql.contains("snapshot.assistant_name AS assistant_name"));
}
/**
* 验证智能体趋势查询显式返回 assistant_id 别名,避免 ClickHouse JDBC 无法按列名映射。
*/
@Test
public void shouldAliasAssistantIdForAssistantTrendQueries() {
RecordingAnalyticalDBOperations operations = new RecordingAnalyticalDBOperations();
ChatAnalyticalDBRepository repository = newRepository(operations);
repository.queryAssistantSessionTrends(
LocalDate.of(2026, 4, 1),
LocalDate.of(2026, 4, 8),
BigInteger.ONE,
List.of(BigInteger.ONE, BigInteger.TWO)
);
Assert.assertNotNull(operations.lastQuerySql);
Assert.assertTrue(operations.lastQuerySql.contains("agg.assistant_id AS assistant_id"));
Assert.assertTrue(operations.lastQuerySql.contains("snapshot.assistant_name AS assistant_name"));
Assert.assertTrue(operations.lastQuerySql.contains("agg.assistant_id IN (?, ?)"));
}
/**
* 验证智能体趋势查询在 Top 列表包含空 assistant_id 时会补上 IS NULL 条件。
*/
@Test
public void shouldSupportNullAssistantIdInAssistantTrendQueries() {
RecordingAnalyticalDBOperations operations = new RecordingAnalyticalDBOperations();
ChatAnalyticalDBRepository repository = newRepository(operations);
repository.queryAssistantSessionTrends(
LocalDate.of(2026, 4, 1),
LocalDate.of(2026, 4, 8),
BigInteger.ONE,
Arrays.asList(BigInteger.ONE, null)
);
Assert.assertNotNull(operations.lastQuerySql);
Assert.assertTrue(operations.lastQuerySql.contains("(agg.assistant_id IN (?) OR agg.assistant_id IS NULL)"));
}
/**
* 构造仓储实例。
*
* @param operations 分析库操作桩
* @return 仓储实例
*/
private ChatAnalyticalDBRepository newRepository(RecordingAnalyticalDBOperations operations) {
StaticListableBeanFactory beanFactory = new StaticListableBeanFactory();
beanFactory.addBean("analyticalDBOperations", operations);
ObjectProvider<AnalyticalDBOperations> provider = beanFactory.getBeanProvider(AnalyticalDBOperations.class);
AnalyticalDBHealthSupport healthSupport =
new AnalyticalDBHealthSupport(provider, new AnalyticalDBFlywayProperties());
ChatJsonSupport jsonSupport = new ChatJsonSupport(new ObjectMapper());
return new ChatAnalyticalDBRepository(provider, healthSupport, jsonSupport);
}
/**
* 记录 SQL 的分析库桩实现。
*/
private static class RecordingAnalyticalDBOperations implements AnalyticalDBOperations {
private String lastQueryOneSql;
private String lastQuerySql;
private ChatDashboardSummary queryOneResult;
@Override
public boolean available() {
return true;
}
@Override
public void assertAvailable() {
}
@Override
public <T> List<T> query(String sql, RowMapper<T> rowMapper, Object... args) {
this.lastQuerySql = sql;
return Collections.emptyList();
}
@Override
public <T> T queryOne(String sql, Class<T> requiredType, Object... args) {
this.lastQueryOneSql = sql;
return null;
}
@SuppressWarnings("unchecked")
@Override
public <T> T queryOne(String sql, RowMapper<T> rowMapper, Object... args) {
this.lastQueryOneSql = sql;
return (T) queryOneResult;
}
@Override
public <T> List<T> queryForList(String sql, Class<T> elementType, Object... args) {
return Collections.emptyList();
}
@Override
public int update(String sql, Object... args) {
return 0;
}
@Override
public <T> int[][] batchUpdate(String sql,
List<T> items,
int batchSize,
ParameterizedPreparedStatementSetter<T> setter) {
return new int[0][];
}
@Override
public <T> AnalyticalDBPageResult<T> page(String countSql,
Object[] countArgs,
String dataSql,
Object[] dataArgs,
AnalyticalDBPageRequest pageRequest,
RowMapper<T> rowMapper) {
return null;
}
}
}