架构原理详解
大模型模块架构原理详解
架构概述
BladeX AI大模型模块采用分层架构设计,通过抽象模板模式和工厂模式实现了对多种大模型提供商的统一封装。本文档深入解析模块的核心架构原理、调用流程和关键技术实现。
一、整体架构设计
1.1 分层架构图
1.2 核心设计原则
设计原则
- 单一职责:每个组件专注于特定功能
- 开闭原则:对扩展开放,对修改封闭
- 依赖倒置:依赖抽象而非具体实现
- 接口隔离:提供最小化的接口定义
- 组合优于继承:通过组合实现功能扩展
二、核心组件详解
2.1 LlmFactory 工厂模式
工厂实现原理
LlmFactory采用工厂模式创建不同类型的模型适配器,通过模型名称自动识别模型类型,并创建对应的Template实例。支持参数验证、类型解析和实例创建的完整流程。
public class LlmFactory {
public LlmTemplate createTemplate(String model, ModelConfig config) {
// 1. 参数验证
validateConfig(config);
// 2. 模型类型解析
String modelType = parseModelType(model, config);
// 3. 创建对应的Template实例
return switch (modelType.toLowerCase()) {
case MODEL_TYPE_OPENAI -> new OpenAITemplate(model, config);
case MODEL_TYPE_ANTHROPIC -> new AnthropicTemplate(model, config);
case MODEL_TYPE_DEEPSEEK -> new DeepSeekTemplate(model, config);
// ... 其他模型类型
default -> throw LlmException.unsupportedModel(modelType);
};
}
private String parseModelType(String modelName, ModelConfig config) {
// 优先使用配置中的模型类型
if (StringUtil.isNotBlank(config.getModelType())) {
return config.getModelType();
}
// 根据模型名称前缀自动识别
modelName = modelName.toLowerCase();
if (modelName.startsWith("gpt-")) return MODEL_TYPE_OPENAI;
if (modelName.startsWith("claude-")) return MODEL_TYPE_ANTHROPIC;
if (modelName.startsWith("deepseek-")) return MODEL_TYPE_DEEPSEEK;
// 默认使用OpenAI兼容格式
return MODEL_TYPE_OPENAI;
}
}
2.2 AbstractLlmTemplate 模板方法
模板方法模式
AbstractLlmTemplate定义了大模型调用的标准流程,子类只需实现特定的抽象方法即可适配不同的模型提供商。
2.3 关键抽象方法
抽象方法定义
AbstractLlmTemplate通过定义抽象方法,让子类实现特定的模型适配逻辑,包括API地址、提供商名称、响应构建等核心方法。
public abstract class AbstractLlmTemplate implements LlmTemplate {
// 子类必须实现的抽象方法
protected abstract String getDefaultApiUrl();
protected abstract String getProviderName();
protected abstract BladeChatResponse buildResponse(JsonNode responseNode);
protected abstract BladeChatResponse buildStreamResponse(String responseLine);
protected abstract void addModelSpecificParams(Map<String, Object> requestBody, BladeChatRequest request);
// 模板方法 - 定义标准流程
@Override
public BladeChatResponse chat(BladeChatRequest request) {
return retryTemplate.execute(context -> {
return doChatRequest(request);
});
}
protected BladeChatResponse doChatRequest(BladeChatRequest request) {
// 1. 构建请求体
Map<String, Object> requestBody = buildRequestBody(request);
// 2. 发送HTTP请求
ResponseEntity<String> response = restTemplate.exchange(
getApiUrl(), HttpMethod.POST,
new HttpEntity<>(requestBody, buildHeaders()),
String.class
);
// 3. 解析响应
JsonNode responseNode = objectMapper.readTree(response.getBody());
return buildResponse(responseNode);
}
}
2.4 LlmProcessor 流式处理
流式处理机制
LlmProcessor负责处理流式数据,包括数据过滤、JSON提取、响应构建和错误处理等关键步骤。
public class LlmProcessor {
public Flux<BladeChatResponse> process(Flux<String> dataStream) {
return dataStream
.filter(this::isValidLine) // 过滤有效数据行
.map(this::extractJsonData) // 提取JSON数据
.filter(Objects::nonNull) // 过滤空数据
.map(responseBuilder::build) // 构建响应对象
.onErrorResume(this::handleError); // 错误处理
}
private boolean isValidLine(String line) {
return StringUtil.isNotBlank(line)
&& line.startsWith("data: ")
&& !line.equals("data: [DONE]");
}
private String extractJsonData(String line) {
return line.substring(6).trim(); // 移除"data: "前缀
}
}
三、调用流程详解
3.1 同步调用时序图
3.2 流式调用时序图
3.3 错误处理流程
错误处理机制
错误处理流程包括重试策略配置和异常分类处理,通过指数退避算法和智能重试机制提升系统的稳定性。
// 重试策略配置
private RetryTemplate llmRetryTemplate() {
RetryTemplate template = new RetryTemplate();
// 重试策略:最多重试3次
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
// 退避策略:指数退避
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000); // 初始间隔1秒
backOffPolicy.setMultiplier(2.0); // 倍数2
backOffPolicy.setMaxInterval(10000); // 最大间隔10秒
template.setRetryPolicy(retryPolicy);
template.setBackOffPolicy(backOffPolicy);
return template;
}
// 异常分类处理
public boolean shouldRetry(Throwable throwable) {
if (throwable instanceof HttpServerErrorException) {
return true; // 5xx错误重试
}
if (throwable instanceof ResourceAccessException) {
return true; // 网络异常重试
}
if (throwable instanceof HttpClientErrorException) {
HttpClientErrorException ex = (HttpClientErrorException) throwable;
return ex.getStatusCode() == HttpStatus.TOO_MANY_REQUESTS; // 429限流重试
}
return false;
}
四、关键技术实现
4.1 参数适配机制
参数映射策略
参数适配机制通过通用参数映射和模型特定参数处理,实现了不同模型API的统一调用接口。
protected Map<String, Object> buildRequestBody(BladeChatRequest request) {
Map<String, Object> requestBody = new HashMap<>();
// 通用参数映射
requestBody.put("model", request.getModel());
requestBody.put("messages", convertMessages(request.getMessages()));
addIfNotNull(requestBody, "temperature", request.getTemperature());
addIfNotNull(requestBody, "max_tokens", request.getMaxTokens());
addIfNotNull(requestBody, "stream", request.getStream());
// 调用子类实现的模型特定参数
addModelSpecificParams(requestBody, request);
// 扩展参数处理
if (request.getExtraParams() != null) {
requestBody.putAll(request.getExtraParams());
}
return requestBody;
}
// OpenAI特定参数实现
@Override
protected void addModelSpecificParams(Map<String, Object> requestBody, BladeChatRequest request) {
addIfNotNull(requestBody, "frequency_penalty", request.getFrequencyPenalty());
addIfNotNull(requestBody, "presence_penalty", request.getPresencePenalty());
if (request.getFunctions() != null) {
requestBody.put("functions", request.getFunctions());
}
}
4.2 响应标准化
响应统一化
响应标准化处理不同模型的响应格式差异,将所有模型的响应统一转换为BladeChatResponse格式。
// 不同模型的响应格式差异处理
protected BladeChatResponse buildResponse(JsonNode responseNode) {
try {
// 提取通用字段
String id = extractId(responseNode);
String model = extractModel(responseNode);
Integer created = extractCreated(responseNode);
// 提取选择项
List<ChatChoice> choices = extractChoices(responseNode);
// 提取使用统计
ChatUsage usage = extractUsage(responseNode);
// 构建标准响应
return BladeChatResponse.builder()
.id(id)
.model(model)
.created(created)
.choices(choices)
.usage(usage)
.result(ChatResult.builder().done(true).build())
.build();
} catch (Exception e) {
throw LlmException.apiError("解析响应失败: " + e.getMessage());
}
}
4.3 连接池优化
连接池配置
HTTP连接池配置优化了网络连接的复用和管理,提升了API调用的性能和稳定性。
private RestTemplate llmRestTemplate() {
// HTTP连接池配置
PoolingHttpClientConnectionManager connectionManager =
new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(200); // 最大连接数
connectionManager.setDefaultMaxPerRoute(50); // 每个路由最大连接数
// HTTP客户端配置
CloseableHttpClient httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.setConnectionTimeToLive(30, TimeUnit.SECONDS) // 连接存活时间
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(5000) // 连接超时
.setSocketTimeout(30000) // 读取超时
.build())
.build();
// RestTemplate配置
HttpComponentsClientHttpRequestFactory factory =
new HttpComponentsClientHttpRequestFactory(httpClient);
RestTemplate restTemplate = new RestTemplate(factory);
restTemplate.setErrorHandler(new LlmResponseErrorHandler());
return restTemplate;
}
4.4 流式数据处理
背压控制
流式数据处理采用背压控制机制,确保在高并发场景下的稳定性和性能。
private WebClient llmWebClient() {
// 连接器配置
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(
HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofSeconds(30))
.doOnConnected(conn ->
conn.addHandlerLast(new ReadTimeoutHandler(30))
.addHandlerLast(new WriteTimeoutHandler(30)))
);
return WebClient.builder()
.clientConnector(connector)
.codecs(configurer -> {
configurer.defaultCodecs().maxInMemorySize(1024 * 1024); // 1MB缓冲区
})
.build();
}
// 流式处理背压控制
public Flux<BladeChatResponse> processStream(Flux<String> dataStream) {
return dataStream
.onBackpressureBuffer(1000) // 缓冲区大小
.publishOn(Schedulers.boundedElastic()) // 异步处理
.map(this::parseStreamLine)
.filter(Objects::nonNull)
.onErrorContinue((error, item) -> {
log.warn("处理流式数据异常: {}", error.getMessage());
});
}
五、性能优化策略
5.1 缓存机制
多级缓存
多级缓存机制通过配置缓存和模板缓存,显著提升了模型调用的性能。
@Component
public class ModelConfigCache {
private final Cache<String, ModelConfig> configCache =
Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(Duration.ofMinutes(30))
.build();
private final Cache<String, LlmTemplate> templateCache =
Caffeine.newBuilder()
.maximumSize(100)
.expireAfterAccess(Duration.ofMinutes(10))
.build();
public LlmTemplate getTemplate(String model, ModelConfig config) {
String cacheKey = model + ":" + config.hashCode();
return templateCache.get(cacheKey, key ->
llmFactory.createTemplate(model, config));
}
}
5.2 异步处理
异步优化
异步处理机制通过线程池配置,提升了系统的并发处理能力。
@Async("llmExecutor")
public CompletableFuture<BladeChatResponse> chatAsync(BladeChatRequest request) {
return CompletableFuture.supplyAsync(() -> {
LlmTemplate template = getTemplate(request.getModel());
return template.chat(request);
}, llmExecutor);
}
// 线程池配置
@Bean("llmExecutor")
public Executor llmExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("llm-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
5.3 监控指标
性能监控
性能监控组件提供了请求计数、延迟统计、Token使用量等关键指标的监控能力。
@Component
public class LlmMetrics {
private final MeterRegistry meterRegistry;
private final Timer.Sample requestTimer;
public void recordRequest(String model, String provider) {
Counter.builder("llm.requests.total")
.tag("model", model)
.tag("provider", provider)
.register(meterRegistry)
.increment();
}
public void recordLatency(String model, Duration duration) {
Timer.builder("llm.request.duration")
.tag("model", model)
.register(meterRegistry)
.record(duration);
}
public void recordTokenUsage(String model, int tokens) {
Gauge.builder("llm.tokens.usage")
.tag("model", model)
.register(meterRegistry, tokens);
}
}
通过这种分层架构和模板方法模式的设计,BladeX AI大模型模块实现了高度的可扩展性和可维护性,为企业级AI应用提供了稳定可靠的大模型接入能力。