二开定制指南
智能体二次开发定制指南
定制概述
BladeX AI智能体编排模块提供了完善的扩展机制,支持开发者自定义节点类型、扩展工作流能力。本指南详细介绍如何进行二次开发定制,包括自定义节点开发、工厂集成、配置测试等完整流程。
一、开发环境准备
1.1 项目结构了解
项目结构
在开始自定义开发前,需要了解智能体模块的项目结构和核心组件。
flow/engine/
├── provider/ # 执行器提供者
│ ├── BladeFlowExecutor.java # 工作流执行器
│ ├── NodeExecutorFactory.java # 节点执行器工厂
│ ├── AbstractNodeExecutor.java # 节点执行器基类
│ └── node/ # 节点执行器实现
│ ├── StartNodeExecutor.java
│ ├── LLMNodeExecutor.java
│ └── ...
├── model/ # 数据模型
│ ├── FlowDsl.java # 工作流DSL模型
│ ├── FlowNode.java # 节点模型
│ └── FlowConnection.java # 连接模型
├── context/ # 执行上下文
│ └── FlowContext.java # 工作流上下文
├── constant/ # 常量定义
│ └── NodeConstant.java # 节点常量
├── exception/ # 异常处理
│ └── FlowException.java # 工作流异常
└── tool/ # 工具类
└── VariableTool.java # 变量处理工具
1.2 依赖配置
依赖管理
确保项目中包含必要的依赖配置,支持自定义节点的开发和集成。
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- BladeX Core -->
<dependency>
<groupId>org.springblade</groupId>
<artifactId>blade-core-tool</artifactId>
</dependency>
<!-- Jackson JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
二、自定义节点开发
2.1 节点执行器开发
开发规范
自定义节点执行器必须继承AbstractNodeExecutor
基类,并实现doExecute
方法。
开发步骤:
- 继承基类:继承
AbstractNodeExecutor
抽象类 - 实现逻辑:实现
doExecute
方法定义节点行为 - 参数验证:添加节点特有的参数验证逻辑
- 异常处理:处理节点执行过程中的异常
@Component
@RequiredArgsConstructor
public class CustomNodeExecutor extends AbstractNodeExecutor {
// 注入需要的服务
private final CustomService customService;
@Override
protected Kv doExecute(FlowNode node, FlowContext context) {
try {
// 1. 获取节点配置
CustomNode config = node.getCustomParams();
if (config == null) {
throw FlowException.invalidParam("自定义节点配置无效");
}
// 2. 参数验证
validateCustomParams(config);
// 3. 获取输入变量
Kv variables = getInputVariables(node, context);
// 4. 执行业务逻辑
String result = executeCustomLogic(config, variables);
// 5. 返回执行结果
return Kv.create()
.set(NodeConstant.CONTENT, result)
.set(NodeConstant.TIMESTAMP, System.currentTimeMillis());
} catch (Exception e) {
throw FlowException.nodeError(node.getId(),
"[自定义节点] -> " + e.getMessage());
}
}
/**
* 验证自定义参数
*/
private void validateCustomParams(CustomNode config) {
if (StringUtil.isBlank(config.getCustomParam())) {
throw FlowException.invalidParam("自定义参数不能为空");
}
// 其他验证逻辑...
}
/**
* 执行自定义业务逻辑
*/
private String executeCustomLogic(CustomNode config, Kv variables) {
// 处理变量替换
String processedParam = VariableTool.resolve(config.getCustomParam(), variables);
// 调用自定义服务
return customService.processCustomLogic(processedParam);
}
}
2.2 节点配置模型
配置模型
为自定义节点定义配置模型,用于存储节点的参数配置。
@Data
@Schema(name = "CustomNode", description = "自定义节点配置")
public class CustomNode {
/**
* 自定义参数
*/
@Schema(description = "自定义参数")
private String customParam;
/**
* 处理模式
*/
@Schema(description = "处理模式")
private String processMode;
/**
* 超时时间(秒)
*/
@Schema(description = "超时时间")
private Integer timeout;
/**
* 是否启用缓存
*/
@Schema(description = "是否启用缓存")
private Boolean enableCache;
/**
* 扩展配置
*/
@Schema(description = "扩展配置")
private Map<String, Object> extendConfig;
}
2.3 FlowNode模型扩展
模型扩展
在FlowNode模型中添加自定义节点的配置字段,支持DSL解析。
public class FlowNode {
// ... 现有字段
/**
* 自定义节点参数
*/
@Schema(description = "自定义节点参数")
private CustomNode customParams;
// getter和setter方法
public CustomNode getCustomParams() {
return customParams;
}
public void setCustomParams(CustomNode customParams) {
this.customParams = customParams;
}
}
三、工厂类集成
3.1 注册节点执行器
工厂注册
在NodeExecutorFactory
中注册新的节点执行器,使其能够被工作流引擎识别和调用。
@Component
public class NodeExecutorFactory {
private final Map<String, NodeExecutor> executors = new HashMap<>();
public NodeExecutorFactory(
// ... 现有执行器
CustomNodeExecutor customNodeExecutor // 添加自定义执行器
) {
// ... 现有注册
// 注册自定义节点执行器
executors.put("custom", customNodeExecutor);
}
public NodeExecutor getExecutor(String type) {
NodeExecutor executor = executors.get(type);
if (executor == null) {
throw new RuntimeException("未知的节点类型: " + type);
}
return executor;
}
/**
* 动态注册节点执行器
*/
public void registerExecutor(String type, NodeExecutor executor) {
executors.put(type, executor);
}
/**
* 获取所有支持的节点类型
*/
public Set<String> getSupportedTypes() {
return executors.keySet();
}
}
3.2 节点常量定义
常量管理
在NodeConstant
中定义自定义节点的常量,保持代码的一致性。
public interface NodeConstant {
// ... 现有常量
/**
* 自定义节点类型
*/
String CUSTOM = "custom";
/**
* 自定义节点输出字段
*/
String CUSTOM_RESULT = "customResult";
String CUSTOM_STATUS = "customStatus";
String CUSTOM_MESSAGE = "customMessage";
/**
* 自定义节点配置字段
*/
String CUSTOM_PARAM = "customParam";
String PROCESS_MODE = "processMode";
String TIMEOUT = "timeout";
String ENABLE_CACHE = "enableCache";
}
四、配置和测试
4.1 Spring配置
配置管理
确保自定义节点执行器能够被Spring容器正确管理和注入。
@Configuration
@ComponentScan(basePackages = "org.springblade.modules.aigc.flow.engine.provider.node")
public class FlowEngineConfig {
/**
* 自定义服务配置
*/
@Bean
@ConditionalOnMissingBean
public CustomService customService() {
return new CustomServiceImpl();
}
/**
* 节点执行器工厂配置
*/
@Bean
public NodeExecutorFactory nodeExecutorFactory(List<NodeExecutor> executors) {
return new NodeExecutorFactory(executors);
}
}
4.2 单元测试
测试覆盖
为自定义节点执行器编写完整的单元测试,确保功能的正确性和稳定性。
@ExtendWith(MockitoExtension.class)
class CustomNodeExecutorTest {
@Mock
private CustomService customService;
@InjectMocks
private CustomNodeExecutor customNodeExecutor;
private FlowContext context;
private FlowNode node;
@BeforeEach
void setUp() {
context = new FlowContext(1L, "test");
node = new FlowNode();
node.setId("custom_1");
node.setType("custom");
CustomNode customParams = new CustomNode();
customParams.setCustomParam("test param");
customParams.setProcessMode("sync");
customParams.setTimeout(30);
node.setCustomParams(customParams);
}
@Test
void testExecuteSuccess() {
// Given
when(customService.processCustomLogic(anyString()))
.thenReturn("处理成功");
// When
Kv result = customNodeExecutor.execute(node, context);
// Then
assertThat(result).isNotNull();
assertThat(result.getStr(NodeConstant.CONTENT)).isEqualTo("处理成功");
assertThat(result.getLong(NodeConstant.TIMESTAMP)).isNotNull();
verify(customService).processCustomLogic("test param");
}
@Test
void testExecuteWithInvalidParams() {
// Given
node.setCustomParams(null);
// When & Then
assertThatThrownBy(() -> customNodeExecutor.execute(node, context))
.isInstanceOf(FlowException.class)
.hasMessageContaining("自定义节点配置无效");
}
@Test
void testExecuteWithVariableReplacement() {
// Given
context.setVariable("input", "dynamic value");
CustomNode customParams = node.getCustomParams();
customParams.setCustomParam("{{input}}");
when(customService.processCustomLogic("dynamic value"))
.thenReturn("动态处理结果");
// When
Kv result = customNodeExecutor.execute(node, context);
// Then
assertThat(result.getStr(NodeConstant.CONTENT)).isEqualTo("动态处理结果");
verify(customService).processCustomLogic("dynamic value");
}
}
4.3 集成测试
集成验证
编写集成测试验证自定义节点在完整工作流中的执行效果。
@SpringBootTest
@TestPropertySource(properties = {
"spring.datasource.url=jdbc:h2:mem:testdb",
"spring.jpa.hibernate.ddl-auto=create-drop"
})
class CustomNodeIntegrationTest {
@Autowired
private BladeFlowExecutor flowExecutor;
@Test
void testCustomNodeInWorkflow() {
// Given
AiFlow flow = createTestFlow();
FlowDsl dsl = createTestDsl();
Kv params = Kv.create().set("input", "测试输入");
// When
FlowExecuteResult result = flowExecutor.execute(flow, dsl, params);
// Then
assertThat(result).isNotNull();
assertThat(result.getExecuted()).isEqualTo(ExecutionStatus.SUCCESS.getValue());
assertThat(result.getNodes()).hasSize(3); // start -> custom -> end
// 验证自定义节点执行结果
FlowNode customNode = result.getNodes().stream()
.filter(node -> "custom".equals(node.getType()))
.findFirst()
.orElseThrow();
assertThat(customNode.getStatus()).isEqualTo(ExecutionStatus.SUCCESS);
assertThat(customNode.getOutput()).isNotNull();
}
private FlowDsl createTestDsl() {
FlowDsl dsl = new FlowDsl();
// 创建节点
List<FlowNode> nodes = Arrays.asList(
createStartNode(),
createCustomNode(),
createEndNode()
);
// 创建连接
List<FlowConnection> connections = Arrays.asList(
createConnection("start_1", "custom_1"),
createConnection("custom_1", "end_1")
);
dsl.setNodes(nodes);
dsl.setConnections(connections);
return dsl;
}
private FlowNode createCustomNode() {
FlowNode node = new FlowNode();
node.setId("custom_1");
node.setType("custom");
node.setName("自定义节点");
CustomNode customParams = new CustomNode();
customParams.setCustomParam("{{start_1.input}}");
customParams.setProcessMode("sync");
node.setCustomParams(customParams);
return node;
}
}
五、高级定制功能
5.1 自定义认证机制
认证扩展
为自定义节点添加特殊的认证机制,支持不同的安全要求。
@Component
public class CustomAuthenticationProvider {
/**
* 验证自定义节点的访问权限
*/
public boolean validateAccess(FlowNode node, FlowContext context) {
CustomNode config = node.getCustomParams();
// 检查API密钥
String apiKey = config.getExtendConfig().get("apiKey").toString();
if (StringUtil.isBlank(apiKey)) {
return false;
}
// 验证权限
return validateApiKey(apiKey, context.getFlowId());
}
private boolean validateApiKey(String apiKey, Long flowId) {
// 实现API密钥验证逻辑
return true;
}
}
5.2 自定义缓存策略
缓存设计
为自定义节点实现缓存机制,提高执行效率和性能。
@Component
@RequiredArgsConstructor
public class CustomCacheManager {
private final RedisTemplate<String, Object> redisTemplate;
/**
* 获取缓存结果
*/
public Kv getCachedResult(String cacheKey) {
Object cached = redisTemplate.opsForValue().get(cacheKey);
return cached != null ? (Kv) cached : null;
}
/**
* 缓存执行结果
*/
public void cacheResult(String cacheKey, Kv result, Duration expiration) {
redisTemplate.opsForValue().set(cacheKey, result, expiration);
}
/**
* 生成缓存键
*/
public String generateCacheKey(FlowNode node, Kv variables) {
CustomNode config = node.getCustomParams();
String paramHash = DigestUtils.md5Hex(
config.getCustomParam() + variables.toString()
);
return String.format("custom_node:%s:%s", node.getId(), paramHash);
}
}
5.3 自定义监控集成
监控扩展
为自定义节点添加监控指标,支持性能分析和问题诊断。
@Component
@RequiredArgsConstructor
public class CustomNodeMonitor {
private final MeterRegistry meterRegistry;
/**
* 记录节点执行指标
*/
public void recordExecution(FlowNode node, long duration, boolean success) {
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(Timer.builder("custom_node_execution")
.tag("node_id", node.getId())
.tag("success", String.valueOf(success))
.register(meterRegistry));
// 记录执行次数
Counter.builder("custom_node_count")
.tag("node_id", node.getId())
.tag("status", success ? "success" : "failure")
.register(meterRegistry)
.increment();
}
/**
* 记录业务指标
*/
public void recordBusinessMetric(String metricName, double value, String... tags) {
Gauge.builder("custom_node_business_metric")
.tag("metric_name", metricName)
.tags(tags)
.register(meterRegistry, () -> value);
}
}
六、部署和运维
6.1 配置管理
配置策略
为自定义节点提供灵活的配置管理,支持不同环境的配置需求。
# application.yml
bladex:
flow:
custom:
# 自定义节点全局配置
enabled: true
default-timeout: 30
max-retry-count: 3
cache:
enabled: true
ttl: 3600
# 自定义服务配置
service:
endpoint: "https://api.custom-service.com"
api-key: "${CUSTOM_API_KEY:}"
connection-timeout: 5000
read-timeout: 30000
# 监控配置
monitoring:
enabled: true
metrics-prefix: "custom_node"
6.2 健康检查
健康监控
实现自定义节点的健康检查机制,确保服务的可用性。
@Component
public class CustomNodeHealthIndicator implements HealthIndicator {
@Autowired
private CustomService customService;
@Override
public Health health() {
try {
// 检查自定义服务状态
boolean serviceAvailable = customService.isAvailable();
if (serviceAvailable) {
return Health.up()
.withDetail("custom_service", "Available")
.withDetail("last_check", Instant.now())
.build();
} else {
return Health.down()
.withDetail("custom_service", "Unavailable")
.withDetail("last_check", Instant.now())
.build();
}
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.withDetail("last_check", Instant.now())
.build();
}
}
}
6.3 日志配置
日志管理
配置自定义节点的日志输出,支持问题排查和性能分析。
<!-- logback-spring.xml -->
<configuration>
<!-- 自定义节点日志配置 -->
<logger name="org.springblade.modules.aigc.flow.engine.provider.node.CustomNodeExecutor"
level="INFO" additivity="false">
<appender-ref ref="CUSTOM_NODE_FILE"/>
<appender-ref ref="CONSOLE"/>
</logger>
<!-- 自定义节点文件输出 -->
<appender name="CUSTOM_NODE_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logs/custom-node.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/custom-node.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxFileSize>100MB</maxFileSize>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
</configuration>
七、最佳实践
7.1 开发规范
开发建议
遵循以下开发规范,确保自定义节点的质量和可维护性。
代码规范:
- 命名规范:使用清晰、有意义的类名和方法名
- 注释完整:为关键方法和复杂逻辑添加详细注释
- 异常处理:统一使用FlowException处理业务异常
- 参数验证:严格验证输入参数的有效性
- 资源管理:正确管理外部资源的生命周期
7.2 性能优化
性能考虑
在开发自定义节点时,需要考虑性能因素,避免影响整体工作流的执行效率。
优化策略:
- 异步处理:对于耗时操作,考虑使用异步处理
- 连接池:使用连接池管理外部服务连接
- 缓存机制:合理使用缓存减少重复计算
- 资源限制:设置合理的超时时间和重试次数
- 批量处理:支持批量数据处理提高效率
7.3 安全考虑
安全建议
在自定义节点开发中,需要充分考虑安全因素,保护系统和数据安全。
安全措施:
- 输入验证:严格验证和过滤用户输入
- 权限控制:实现细粒度的权限控制机制
- 敏感数据:加密存储和传输敏感数据
- 审计日志:记录关键操作的审计日志
- 错误处理:避免在错误信息中泄露敏感信息
通过以上完整的二次开发定制指南,开发者可以轻松扩展BladeX AI智能体编排模块的功能,满足特定的业务需求和应用场景。