学习目标

  • 理解LLM应用的主要性能瓶颈及优化方向
  • 掌握多层缓存架构设计和实现
  • 实现异步并行调用优化LLM性能
  • 使用流式输出降低首Token延迟
  • 配置HTTP连接池提升吞吐量
  • 优化Token使用降低响应时间和成本
  • 实现批量处理提升系统效率
  • 建立性能监控和基准测试体系

LLM应用性能瓶颈分析

典型性能瓶颈

LLM应用的性能瓶颈主要来自以下几个方面:

  1. LLM API调用延迟 - 通常占总响应时间的70-90%
  2. 向量检索延迟 - RAG场景下的Embedding计算和相似度搜索
  3. 串行调用瓶颈 - 多次顺序调用LLM导致延迟叠加
  4. 内存和网络开销 - 大Prompt和响应的传输成本
  5. 并发限制 - 连接池配置不当导致的吞吐量限制

性能延迟分解表

操作类型典型延迟占比优化空间
LLM API调用1000-5000ms70-90%缓存、流式、并行
Embedding计算50-200ms5-10%批量、缓存
向量相似度搜索10-50ms2-5%索引优化、缓存
业务逻辑处理5-20ms1-3%代码优化
网络传输20-100ms2-5%连接池、压缩

性能优化策略矩阵

/**
 * 性能优化策略分析器
 */
@Service
public class PerformanceAnalyzer {

    @Autowired
    private MeterRegistry meterRegistry;

    /**
     * 分析请求性能瓶颈
     */
    public PerformanceReport analyzeRequest(String requestId) {
        Timer.Sample sample = Timer.start(meterRegistry);

        PerformanceReport report = PerformanceReport.builder()
            .requestId(requestId)
            .build();

        // 收集各阶段耗时
        report.setLlmCallTime(getLlmCallTime(requestId));
        report.setEmbeddingTime(getEmbeddingTime(requestId));
        report.setVectorSearchTime(getVectorSearchTime(requestId));
        report.setBusinessLogicTime(getBusinessLogicTime(requestId));

        // 计算优化建议
        report.setOptimizationSuggestions(generateSuggestions(report));

        sample.stop(meterRegistry.timer("performance.analysis"));
        return report;
    }

    private List<String> generateSuggestions(PerformanceReport report) {
        List<String> suggestions = new ArrayList<>();

        if (report.getLlmCallTime() > 3000) {
            suggestions.add("LLM调用延迟过高,建议启用缓存或流式输出");
        }

        if (report.getEmbeddingTime() > 100) {
            suggestions.add("Embedding计算耗时较长,建议使用批量处理");
        }

        if (report.getVectorSearchTime() > 30) {
            suggestions.add("向量搜索较慢,建议优化索引或启用缓存");
        }

        return suggestions;
    }
}

️ 多层缓存架构

三层缓存设计

为了最大化缓存命中率和性能提升,我们设计了三层缓存架构:

  • L1缓存: 本地精确匹配缓存 (Caffeine) - 毫秒级
  • L2缓存: 语义相似度缓存 (Embedding相似度>0.95) - 10-50ms
  • L3缓存: 分布式缓存 (Redis) - 50-100ms

L1 精确匹配缓存

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * L1本地精确匹配缓存
 */
@Component
public class L1ExactMatchCache {

    private final Cache<String, CachedResponse> cache;

    public L1ExactMatchCache() {
        this.cache = Caffeine.newBuilder()
            .maximumSize(10_000) // 最大缓存条目数
            .expireAfterWrite(Duration.ofMinutes(30)) // 写入后30分钟过期
            .recordStats() // 启用统计
            .build();
    }

    /**
     * 获取缓存的响应
     */
    public CachedResponse get(String prompt) {
        String cacheKey = generateCacheKey(prompt);
        return cache.getIfPresent(cacheKey);
    }

    /**
     * 缓存响应
     */
    public void put(String prompt, String response) {
        String cacheKey = generateCacheKey(prompt);
        CachedResponse cached = CachedResponse.builder()
            .response(response)
            .timestamp(System.currentTimeMillis())
            .hitCount(0)
            .build();
        cache.put(cacheKey, cached);
    }

    /**
     * 生成缓存键 (使用SHA-256哈希)
     */
    private String generateCacheKey(String prompt) {
        try {
            MessageDigest digest = MessageDigest.getInstance("SHA-256");
            byte[] hash = digest.digest(prompt.getBytes(StandardCharsets.UTF_8));
            return Base64.getEncoder().encodeToString(hash);
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("SHA-256 algorithm not found", e);
        }
    }

    /**
     * 获取缓存统计信息
     */
    public CacheStats getStats() {
        com.github.benmanes.caffeine.cache.stats.CacheStats stats = cache.stats();
        return CacheStats.builder()
            .hitRate(stats.hitRate())
            .missRate(stats.missRate())
            .hitCount(stats.hitCount())
            .missCount(stats.missCount())
            .loadSuccessCount(stats.loadSuccessCount())
            .totalLoadTime(stats.totalLoadTime())
            .evictionCount(stats.evictionCount())
            .build();
    }
}

L2 语义缓存

import dev.langchain4j.data.embedding.Embedding;
import dev.langchain4j.model.embedding.EmbeddingModel;
import dev.langchain4j.store.embedding.EmbeddingMatch;
import dev.langchain4j.store.embedding.EmbeddingSearchRequest;
import dev.langchain4j.store.embedding.EmbeddingSearchResult;
import dev.langchain4j.store.embedding.EmbeddingStore;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * L2语义相似度缓存
 */
@Component
public class L2SemanticCache {

    private final EmbeddingModel embeddingModel;
    private final EmbeddingStore<CachedResponse> embeddingStore;
    private static final double SIMILARITY_THRESHOLD = 0.95; // 相似度阈值

    public L2SemanticCache(EmbeddingModel embeddingModel,
                          EmbeddingStore<CachedResponse> embeddingStore) {
        this.embeddingModel = embeddingModel;
        this.embeddingStore = embeddingStore;
    }

    /**
     * 语义搜索缓存
     */
    public CachedResponse get(String prompt) {
        // 计算prompt的embedding
        Embedding promptEmbedding = embeddingModel.embed(prompt).content();

        // 在embedding store中搜索相似的缓存项
        EmbeddingSearchRequest searchRequest = EmbeddingSearchRequest.builder()
            .queryEmbedding(promptEmbedding)
            .maxResults(1)
            .minScore(SIMILARITY_THRESHOLD)
            .build();

        EmbeddingSearchResult<CachedResponse> searchResult =
            embeddingStore.search(searchRequest);

        if (searchResult.matches().isEmpty()) {
            return null;
        }

        EmbeddingMatch<CachedResponse> bestMatch = searchResult.matches().get(0);

        // 如果相似度足够高,返回缓存的响应
        if (bestMatch.score() >= SIMILARITY_THRESHOLD) {
            CachedResponse cached = bestMatch.embedded();
            cached.incrementHitCount();
            cached.setSimilarityScore(bestMatch.score());
            return cached;
        }

        return null;
    }

    /**
     * 缓存响应及其embedding
     */
    public void put(String prompt, String response) {
        // 计算prompt的embedding
        Embedding promptEmbedding = embeddingModel.embed(prompt).content();

        // 创建缓存对象
        CachedResponse cached = CachedResponse.builder()
            .prompt(prompt)
            .response(response)
            .timestamp(System.currentTimeMillis())
            .hitCount(0)
            .build();

        // 存储到embedding store
        embeddingStore.add(promptEmbedding, cached);
    }

    /**
     * 批量预热缓存
     */
    public void warmUp(List<PromptResponsePair> pairs) {
        pairs.forEach(pair -> put(pair.getPrompt(), pair.getResponse()));
    }
}

L3 分布式缓存

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * L3分布式Redis缓存
 */
@Component
public class L3DistributedCache {

    private final RedisTemplate<String, String> redisTemplate;
    private final ObjectMapper objectMapper;
    private static final String CACHE_PREFIX = "llm:cache:";
    private static final Duration DEFAULT_TTL = Duration.ofHours(24);

    public L3DistributedCache(RedisTemplate<String, String> redisTemplate,
                             ObjectMapper objectMapper) {
        this.redisTemplate = redisTemplate;
        this.objectMapper = objectMapper;
    }

    /**
     * 获取缓存
     */
    public CachedResponse get(String prompt) {
        String cacheKey = CACHE_PREFIX + generateCacheKey(prompt);
        String json = redisTemplate.opsForValue().get(cacheKey);

        if (json == null) {
            return null;
        }

        try {
            return objectMapper.readValue(json, CachedResponse.class);
        } catch (Exception e) {
            // 反序列化失败,删除无效缓存
            redisTemplate.delete(cacheKey);
            return null;
        }
    }

    /**
     * 缓存响应
     */
    public void put(String prompt, String response, Duration ttl) {
        String cacheKey = CACHE_PREFIX + generateCacheKey(prompt);

        CachedResponse cached = CachedResponse.builder()
            .response(response)
            .timestamp(System.currentTimeMillis())
            .ttl(ttl.toMillis())
            .build();

        try {
            String json = objectMapper.writeValueAsString(cached);
            redisTemplate.opsForValue().set(cacheKey, json, ttl);
        } catch (Exception e) {
            throw new RuntimeException("Failed to cache response", e);
        }
    }

    public void put(String prompt, String response) {
        put(prompt, response, DEFAULT_TTL);
    }

    /**
     * 删除缓存
     */
    public void evict(String prompt) {
        String cacheKey = CACHE_PREFIX + generateCacheKey(prompt);
        redisTemplate.delete(cacheKey);
    }

    /**
     * 清空所有缓存
     */
    public void clear() {
        redisTemplate.keys(CACHE_PREFIX + "*").forEach(redisTemplate::delete);
    }

    /**
     * 主动刷新缓存
     */
    public void refresh(String prompt, String newResponse) {
        evict(prompt);
        put(prompt, newResponse);
    }

    private String generateCacheKey(String prompt) {
        try {
            MessageDigest digest = MessageDigest.getInstance("SHA-256");
            byte[] hash = digest.digest(prompt.getBytes(StandardCharsets.UTF_8));
            return Base64.getEncoder().encodeToString(hash);
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("SHA-256 algorithm not found", e);
        }
    }
}

统一缓存管理器

import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;

/**
 * 统一的多层缓存管理器
 */
@Slf4j
@Service
public class MultiLevelCacheManager {

    private final L1ExactMatchCache l1Cache;
    private final L2SemanticCache l2Cache;
    private final L3DistributedCache l3Cache;
    private final MeterRegistry meterRegistry;

    public MultiLevelCacheManager(L1ExactMatchCache l1Cache,
                                 L2SemanticCache l2Cache,
                                 L3DistributedCache l3Cache,
                                 MeterRegistry meterRegistry) {
        this.l1Cache = l1Cache;
        this.l2Cache = l2Cache;
        this.l3Cache = l3Cache;
        this.meterRegistry = meterRegistry;
    }

    /**
     * 从多层缓存中获取响应
     */
    public CachedResponse get(String prompt) {
        Timer.Sample sample = Timer.start(meterRegistry);

        // L1: 精确匹配缓存
        CachedResponse response = l1Cache.get(prompt);
        if (response != null) {
            sample.stop(meterRegistry.timer("cache.hit", "level", "L1"));
            meterRegistry.counter("cache.hit", "level", "L1").increment();
            log.debug("L1 cache hit for prompt: {}", truncate(prompt));
            return response;
        }

        // L2: 语义缓存
        response = l2Cache.get(prompt);
        if (response != null) {
            sample.stop(meterRegistry.timer("cache.hit", "level", "L2"));
            meterRegistry.counter("cache.hit", "level", "L2").increment();
            log.debug("L2 cache hit for prompt: {} (similarity: {})",
                     truncate(prompt), response.getSimilarityScore());

            // 将L2的结果提升到L1
            l1Cache.put(prompt, response.getResponse());
            return response;
        }

        // L3: 分布式缓存
        response = l3Cache.get(prompt);
        if (response != null) {
            sample.stop(meterRegistry.timer("cache.hit", "level", "L3"));
            meterRegistry.counter("cache.hit", "level", "L3").increment();
            log.debug("L3 cache hit for prompt: {}", truncate(prompt));

            // 将L3的结果提升到L2和L1
            l2Cache.put(prompt, response.getResponse());
            l1Cache.put(prompt, response.getResponse());
            return response;
        }

        // 所有缓存未命中
        sample.stop(meterRegistry.timer("cache.miss"));
        meterRegistry.counter("cache.miss").increment();
        log.debug("Cache miss for prompt: {}", truncate(prompt));
        return null;
    }

    /**
     * 缓存响应到所有层级
     */
    public void put(String prompt, String response) {
        l1Cache.put(prompt, response);
        l2Cache.put(prompt, response);
        l3Cache.put(prompt, response);

        meterRegistry.counter("cache.put").increment();
        log.debug("Cached response for prompt: {}", truncate(prompt));
    }

    /**
     * 从所有层级清除缓存
     */
    public void evict(String prompt) {
        l3Cache.evict(prompt);
        // L1和L2会自动过期,不需要手动清除

        meterRegistry.counter("cache.evict").increment();
    }

    /**
     * 获取缓存统计信息
     */
    public CacheStatistics getStatistics() {
        return CacheStatistics.builder()
            .l1Stats(l1Cache.getStats())
            .l1HitRate(getHitRate("L1"))
            .l2HitRate(getHitRate("L2"))
            .l3HitRate(getHitRate("L3"))
            .overallHitRate(getOverallHitRate())
            .build();
    }

    private double getHitRate(String level) {
        Counter hits = meterRegistry.counter("cache.hit", "level", level);
        Counter misses = meterRegistry.counter("cache.miss");
        double total = hits.count() + misses.count();
        return total > 0 ? hits.count() / total : 0.0;
    }

    private double getOverallHitRate() {
        double l1 = getHitRate("L1");
        double l2 = getHitRate("L2");
        double l3 = getHitRate("L3");
        return l1 + l2 + l3;
    }

    private String truncate(String text) {
        return text.length() > 50 ? text.substring(0, 50) + "..." : text;
    }
}

缓存失效策略

/**
 * 缓存失效策略配置
 */
@Configuration
public class CacheEvictionConfig {

    /**
     * TTL (Time To Live) 策略
     */
    @Bean
    public CacheEvictionStrategy ttlStrategy() {
        return new TtlEvictionStrategy(Duration.ofHours(24));
    }

    /**
     * LRU (Least Recently Used) 策略
     */
    @Bean
    public CacheEvictionStrategy lruStrategy() {
        return new LruEvictionStrategy(10000); // 最多保留10000条
    }

    /**
     * 主动刷新策略
     */
    @Bean
    public CacheRefreshStrategy refreshStrategy() {
        return CacheRefreshStrategy.builder()
            .refreshInterval(Duration.ofHours(12))
            .refreshOnAccess(true) // 访问时触发刷新
            .refreshThreshold(0.8) // 当TTL剩余20%时刷新
            .build();
    }
}

/**
 * 定时缓存刷新任务
 */
@Component
@Slf4j
public class CacheRefreshScheduler {

    private final MultiLevelCacheManager cacheManager;
    private final ChatLanguageModel chatModel;

    /**
     * 每小时检查并刷新即将过期的热点缓存
     */
    @Scheduled(cron = "0 0 * * * ?")
    public void refreshHotCache() {
        log.info("Starting hot cache refresh...");

        // 获取访问频率最高的prompt列表
        List<String> hotPrompts = getHotPrompts();

        hotPrompts.forEach(prompt -> {
            try {
                // 重新生成响应
                String response = chatModel.generate(prompt);
                // 更新缓存
                cacheManager.put(prompt, response);
                log.debug("Refreshed cache for prompt: {}", prompt);
            } catch (Exception e) {
                log.error("Failed to refresh cache for prompt: {}", prompt, e);
            }
        });

        log.info("Hot cache refresh completed. Refreshed {} items", hotPrompts.size());
    }

    private List<String> getHotPrompts() {
        // 从监控系统获取访问频率最高的prompt
        // 这里简化处理,实际应该从metrics中获取
        return List.of();
    }
}

异步并行优化

CompletableFuture并行LLM调用

import dev.langchain4j.model.chat.ChatLanguageModel;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

/**
 * 异步并行LLM调用服务
 */
@Service
public class ParallelLlmService {

    private final ChatLanguageModel chatModel;
    private final Executor llmExecutor;

    public ParallelLlmService(ChatLanguageModel chatModel,
                             @Qualifier("llmTaskExecutor") Executor llmExecutor) {
        this.chatModel = chatModel;
        this.llmExecutor = llmExecutor;
    }

    /**
     * 并行执行多个LLM调用
     */
    public List<String> parallelGenerate(List<String> prompts) {
        // 创建多个异步任务
        List<CompletableFuture<String>> futures = prompts.stream()
            .map(prompt -> CompletableFuture.supplyAsync(
                () -> chatModel.generate(prompt),
                llmExecutor
            ))
            .collect(Collectors.toList());

        // 等待所有任务完成
        CompletableFuture<Void> allOf = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );

        // 收集结果
        return allOf.thenApply(v ->
            futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        ).join();
    }

    /**
     * 并行调用并汇总结果
     */
    public String parallelGenerateAndCombine(List<String> prompts,
                                            String combinePrompt) {
        // 并行获取各部分结果
        List<String> partialResults = parallelGenerate(prompts);

        // 构建汇总prompt
        String combinedInput = combinePrompt + "nn" +
            String.join("nn", partialResults);

        // 最终汇总
        return chatModel.generate(combinedInput);
    }

    /**
     * 带超时的并行调用
     */
    public List<String> parallelGenerateWithTimeout(List<String> prompts,
                                                    Duration timeout) {
        List<CompletableFuture<String>> futures = prompts.stream()
            .map(prompt -> CompletableFuture.supplyAsync(
                () -> chatModel.generate(prompt),
                llmExecutor
            ).orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
             .exceptionally(ex -> {
                 log.error("LLM call timeout for prompt: {}", prompt, ex);
                 return "Error: Timeout";
             }))
            .collect(Collectors.toList());

        CompletableFuture<Void> allOf = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );

        return allOf.thenApply(v ->
            futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        ).join();
    }
}

并行RAG检索

import dev.langchain4j.data.document.Document;
import dev.langchain4j.data.segment.TextSegment;
import dev.langchain4j.store.embedding.EmbeddingMatch;
import dev.langchain4j.store.embedding.EmbeddingStore;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
 * 并行RAG检索服务
 */
@Service
public class ParallelRagService {

    private final EmbeddingStore<TextSegment> embeddingStore;
    private final EmbeddingModel embeddingModel;
    private final ChatLanguageModel chatModel;
    private final Executor ragExecutor;

    /**
     * 并行多数据源检索
     */
    public String retrieveFromMultipleSources(String query,
                                             List<String> dataSources) {
        // 并行计算query的embedding
        CompletableFuture<Embedding> embeddingFuture =
            CompletableFuture.supplyAsync(
                () -> embeddingModel.embed(query).content(),
                ragExecutor
            );

        // 并行从多个数据源检索
        List<CompletableFuture<List<EmbeddingMatch<TextSegment>>>> searchFutures =
            dataSources.stream()
                .map(source -> CompletableFuture.supplyAsync(() ->
                    searchInSource(embeddingFuture.join(), source),
                    ragExecutor
                ))
                .collect(Collectors.toList());

        // 等待所有检索完成
        CompletableFuture<Void> allSearches = CompletableFuture.allOf(
            searchFutures.toArray(new CompletableFuture[0])
        );

        // 合并所有检索结果
        List<TextSegment> allResults = allSearches.thenApply(v ->
            searchFutures.stream()
                .flatMap(future -> future.join().stream())
                .sorted((a, b) -> Double.compare(b.score(), a.score()))
                .limit(10)
                .map(EmbeddingMatch::embedded)
                .collect(Collectors.toList())
        ).join();

        // 生成最终答案
        String context = allResults.stream()
            .map(TextSegment::text)
            .collect(Collectors.joining("nn"));

        String prompt = String.format(
            "基于以下上下文回答问题:nn%snn问题: %s",
            context, query
        );

        return chatModel.generate(prompt);
    }

    /**
     * 并行检索和生成
     */
    public String parallelRetrieveAndGenerate(String query) {
        // 同时启动检索和初步分析
        CompletableFuture<List<TextSegment>> retrievalFuture =
            CompletableFuture.supplyAsync(() -> retrieve(query), ragExecutor);

        CompletableFuture<String> analysisFuture =
            CompletableFuture.supplyAsync(
                () -> chatModel.generate("分析这个问题: " + query),
                ragExecutor
            );

        // 等待检索完成
        List<TextSegment> segments = retrievalFuture.join();
        String analysis = analysisFuture.join();

        // 生成最终答案
        String context = segments.stream()
            .map(TextSegment::text)
            .collect(Collectors.joining("nn"));

        String finalPrompt = String.format(
            "初步分析:n%snn参考资料:n%snn请给出详细回答",
            analysis, context
        );

        return chatModel.generate(finalPrompt);
    }

    private List<EmbeddingMatch<TextSegment>> searchInSource(
        Embedding embedding, String source) {
        // 在特定数据源中搜索
        // 实际实现需要根据数据源类型调用不同的embeddingStore
        return embeddingStore.search(
            EmbeddingSearchRequest.builder()
                .queryEmbedding(embedding)
                .maxResults(5)
                .minScore(0.7)
                .build()
        ).matches();
    }

    private List<TextSegment> retrieve(String query) {
        Embedding embedding = embeddingModel.embed(query).content();
        return embeddingStore.search(
            EmbeddingSearchRequest.builder()
                .queryEmbedding(embedding)
                .maxResults(5)
                .build()
        ).matches().stream()
            .map(EmbeddingMatch::embedded)
            .collect(Collectors.toList());
    }
}

Spring异步配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

/**
 * 异步任务线程池配置
 */
@Configuration
@EnableAsync
public class AsyncConfig {

    /**
     * LLM调用专用线程池
     */
    @Bean(name = "llmTaskExecutor")
    public Executor llmTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10); // 核心线程数
        executor.setMaxPoolSize(50); // 最大线程数
        executor.setQueueCapacity(100); // 队列容量
        executor.setThreadNamePrefix("llm-async-");
        executor.setKeepAliveSeconds(60);
        executor.setRejectedExecutionHandler(
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
        executor.initialize();
        return executor;
    }

    /**
     * RAG检索专用线程池
     */
    @Bean(name = "ragTaskExecutor")
    public Executor ragTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(200);
        executor.setThreadNamePrefix("rag-async-");
        executor.setKeepAliveSeconds(60);
        executor.initialize();
        return executor;
    }

    /**
     * 通用异步任务线程池
     */
    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("async-");
        executor.initialize();
        return executor;
    }
}

使用@Async注解

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

/**
 * 使用@Async注解的异步服务
 */
@Service
public class AsyncLlmService {

    private final ChatLanguageModel chatModel;
    private final MultiLevelCacheManager cacheManager;

    /**
     * 异步生成响应
     */
    @Async("llmTaskExecutor")
    public CompletableFuture<String> generateAsync(String prompt) {
        // 先检查缓存
        CachedResponse cached = cacheManager.get(prompt);
        if (cached != null) {
            return CompletableFuture.completedFuture(cached.getResponse());
        }

        // 调用LLM
        String response = chatModel.generate(prompt);

        // 缓存结果
        cacheManager.put(prompt, response);

        return CompletableFuture.completedFuture(response);
    }

    /**
     * 异步批量生成
     */
    @Async("llmTaskExecutor")
    public CompletableFuture<List<String>> batchGenerateAsync(List<String> prompts) {
        List<String> responses = prompts.stream()
            .map(prompt -> {
                CachedResponse cached = cacheManager.get(prompt);
                if (cached != null) {
                    return cached.getResponse();
                }
                String response = chatModel.generate(prompt);
                cacheManager.put(prompt, response);
                return response;
            })
            .collect(Collectors.toList());

        return CompletableFuture.completedFuture(responses);
    }
}

流式输出优化

StreamingChatLanguageModel降低首Token延迟

import dev.langchain4j.model.chat.StreamingChatLanguageModel;
import dev.langchain4j.model.output.Response;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;

/**
 * 流式输出服务
 */
@Service
public class StreamingLlmService {

    private final StreamingChatLanguageModel streamingModel;

    public StreamingLlmService(StreamingChatLanguageModel streamingModel) {
        this.streamingModel = streamingModel;
    }

    /**
     * 流式生成响应
     */
    public Flux<String> generateStream(String prompt) {
        return Flux.create(sink -> {
            streamingModel.generate(prompt, new StreamingResponseHandler<String>() {
                @Override
                public void onNext(String token) {
                    sink.next(token);
                }

                @Override
                public void onComplete(Response<String> response) {
                    sink.complete();
                }

                @Override
                public void onError(Throwable error) {
                    sink.error(error);
                }
            });
        });
    }

    /**
     * 带缓冲的流式输出
     */
    public Flux<String> generateStreamWithBuffer(String prompt, int bufferSize) {
        return generateStream(prompt)
            .buffer(bufferSize)
            .map(tokens -> String.join("", tokens));
    }
}

WebFlux集成

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;

/**
 * 流式API控制器
 */
@RestController
@RequestMapping("/api/stream")
public class StreamingController {

    private final StreamingLlmService streamingService;

    public StreamingController(StreamingLlmService streamingService) {
        this.streamingService = streamingService;
    }

    /**
     * SSE流式响应
     */
    @PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamChat(@RequestBody ChatRequest request) {
        return streamingService.generateStream(request.getPrompt())
            .map(token -> "data: " + token + "nn");
    }

    /**
     * NDJSON流式响应
     */
    @PostMapping(value = "/chat/ndjson",
                produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<StreamChunk> streamChatNdjson(@RequestBody ChatRequest request) {
        return streamingService.generateStream(request.getPrompt())
            .map(token -> StreamChunk.builder()
                .content(token)
                .timestamp(System.currentTimeMillis())
                .build()
            );
    }
}

@Data
@Builder
class StreamChunk {
    private String content;
    private long timestamp;
}

SSE (Server-Sent Events) 实现

import org.springframework.stereotype.Controller;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;

/**
 * SSE流式输出控制器
 */
@Controller
@RequestMapping("/api/sse")
public class SseController {

    private final StreamingChatLanguageModel streamingModel;
    private final ExecutorService executorService;

    /**
     * SSE端点
     */
    @GetMapping("/chat")
    public SseEmitter streamChat(@RequestParam String prompt) {
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);

        executorService.execute(() -> {
            try {
                streamingModel.generate(prompt, new StreamingResponseHandler<String>() {
                    @Override
                    public void onNext(String token) {
                        try {
                            emitter.send(SseEmitter.event()
                                .name("message")
                                .data(token));
                        } catch (IOException e) {
                            emitter.completeWithError(e);
                        }
                    }

                    @Override
                    public void onComplete(Response<String> response) {
                        emitter.complete();
                    }

                    @Override
                    public void onError(Throwable error) {
                        emitter.completeWithError(error);
                    }
                });
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }
}

连接池优化

OkHttp连接池配置

import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * OkHttp连接池配置
 */
@Configuration
public class OkHttpConfig {

    @Bean
    public OkHttpClient okHttpClient() {
        // 配置连接池
        ConnectionPool connectionPool = new ConnectionPool(
            50, // 最大空闲连接数
            5, // 连接保持活跃时间
            TimeUnit.MINUTES
        );

        return new OkHttpClient.Builder()
            .connectionPool(connectionPool)
            .connectTimeout(Duration.ofSeconds(30)) // 连接超时
            .readTimeout(Duration.ofSeconds(60)) // 读取超时
            .writeTimeout(Duration.ofSeconds(60)) // 写入超时
            .retryOnConnectionFailure(true) // 连接失败重试
            .followRedirects(true)
            .followSslRedirects(true)
            .build();
    }
}

Apache HttpClient连接池配置

import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

/**
 * Apache HttpClient连接池配置
 */
@Configuration
public class HttpClientConfig {

    @Bean
    public CloseableHttpClient httpClient() {
        // 配置连接池管理器
        PoolingHttpClientConnectionManager connectionManager =
            new PoolingHttpClientConnectionManager();
        connectionManager.setMaxTotal(200); // 最大连接数
        connectionManager.setDefaultMaxPerRoute(50); // 每个路由的最大连接数
        connectionManager.setValidateAfterInactivity(2000); // 验证空闲连接

        // 配置请求参数
        RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(30000) // 连接超时 30s
            .setSocketTimeout(60000) // 读取超时 60s
            .setConnectionRequestTimeout(10000) // 从连接池获取连接超时 10s
            .build();

        return HttpClients.custom()
            .setConnectionManager(connectionManager)
            .setDefaultRequestConfig(requestConfig)
            .evictExpiredConnections() // 自动清理过期连接
            .evictIdleConnections(60, TimeUnit.SECONDS) // 清理空闲连接
            .setKeepAliveStrategy((response, context) -> 60 * 1000) // Keep-Alive策略
            .build();
    }
}

Keep-Alive配置

import dev.langchain4j.model.openai.OpenAiChatModel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

/**
 * LLM客户端Keep-Alive配置
 */
@Configuration
public class LlmClientConfig {

    @Bean
    public OpenAiChatModel openAiChatModel(OkHttpClient okHttpClient) {
        return OpenAiChatModel.builder()
            .apiKey(System.getenv("OPENAI_API_KEY"))
            .modelName("gpt-4")
            .timeout(Duration.ofSeconds(60))
            .maxRetries(3)
            .logRequests(true)
            .logResponses(true)
            // 使用配置好的OkHttpClient
            .build();
    }
}

Token优化

Prompt压缩

import dev.langchain4j.data.message.ChatMessage;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Collectors;

/**
 * Prompt压缩器
 */
@Component
public class PromptCompressor {

    /**
     * 压缩对话历史
     */
    public List<ChatMessage> compressHistory(List<ChatMessage> messages,
                                            int maxTokens) {
        // 保留系统消息和最新的几条消息
        List<ChatMessage> systemMessages = messages.stream()
            .filter(msg -> msg instanceof SystemMessage)
            .collect(Collectors.toList());

        List<ChatMessage> recentMessages = messages.stream()
            .filter(msg -> !(msg instanceof SystemMessage))
            .skip(Math.max(0, messages.size() - 10))
            .collect(Collectors.toList());

        List<ChatMessage> compressed = new ArrayList<>();
        compressed.addAll(systemMessages);
        compressed.addAll(recentMessages);

        return compressed;
    }

    /**
     * 摘要压缩长对话
     */
    public List<ChatMessage> summarizeHistory(List<ChatMessage> messages,
                                             ChatLanguageModel summaryModel) {
        if (messages.size() <= 5) {
            return messages;
        }

        // 提取需要摘要的消息
        List<ChatMessage> toSummarize = messages.subList(1, messages.size() - 3);

        // 生成摘要
        String conversation = toSummarize.stream()
            .map(msg -> msg.text())
            .collect(Collectors.joining("n"));

        String summaryPrompt = "请用3-5句话概括以下对话:nn" + conversation;
        String summary = summaryModel.generate(summaryPrompt);

        // 构建压缩后的历史
        List<ChatMessage> compressed = new ArrayList<>();
        compressed.add(messages.get(0)); // System message
        compressed.add(UserMessage.from("(之前的对话摘要: " + summary + ")"));
        compressed.addAll(messages.subList(messages.size() - 3, messages.size()));

        return compressed;
    }

    /**
     * 去除冗余信息
     */
    public String removeRedundancy(String prompt) {
        return prompt
            .replaceAll("\s+", " ") // 多个空白符替换为单个空格
            .replaceAll("(?m)^\s+", "") // 删除行首空白
            .replaceAll("(?m)\s+$", "") // 删除行尾空白
            .replaceAll("\n{3,}", "nn") // 多个换行符替换为两个
            .trim();
    }
}

历史截断策略

import dev.langchain4j.memory.ChatMemory;
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 对话历史管理配置
 */
@Configuration
public class ChatMemoryConfig {

    /**
     * 固定窗口大小的历史记录
     */
    @Bean
    public ChatMemory fixedWindowMemory() {
        return MessageWindowChatMemory.builder()
            .maxMessages(10) // 最多保留10条消息
            .build();
    }

    /**
     * Token限制的历史记录
     */
    @Bean
    public ChatMemory tokenLimitedMemory() {
        return TokenWindowChatMemory.builder()
            .maxTokens(2000) // 最多2000个token
            .build();
    }
}

/**
 * 智能历史截断服务
 */
@Service
public class SmartHistoryTruncator {

    private final PromptCompressor compressor;
    private final ChatLanguageModel summaryModel;

    /**
     * 智能截断历史
     */
    public List<ChatMessage> truncate(List<ChatMessage> messages, int maxTokens) {
        int currentTokens = estimateTokens(messages);

        if (currentTokens <= maxTokens) {
            return messages;
        }

        // 策略1: 保留重要消息,删除次要消息
        List<ChatMessage> important = filterImportantMessages(messages);
        if (estimateTokens(important) <= maxTokens) {
            return important;
        }

        // 策略2: 摘要压缩
        List<ChatMessage> summarized = compressor.summarizeHistory(
            messages, summaryModel
        );
        if (estimateTokens(summarized) <= maxTokens) {
            return summarized;
        }

        // 策略3: 强制截断
        return compressor.compressHistory(messages, maxTokens);
    }

    private List<ChatMessage> filterImportantMessages(List<ChatMessage> messages) {
        return messages.stream()
            .filter(msg -> isImportant(msg))
            .collect(Collectors.toList());
    }

    private boolean isImportant(ChatMessage message) {
        // 判断消息重要性的逻辑
        // 例如: 包含关键词、用户明确指示、系统消息等
        return message instanceof SystemMessage ||
               message.text().contains("重要") ||
               message.text().contains("记住");
    }

    private int estimateTokens(List<ChatMessage> messages) {
        // 简单估算: 1个token约等于4个字符
        int totalChars = messages.stream()
            .mapToInt(msg -> msg.text().length())
            .sum();
        return totalChars / 4;
    }
}

System Prompt最小化

/**
 * System Prompt优化器
 */
@Component
public class SystemPromptOptimizer {

    /**
     * 优化System Prompt
     */
    public String optimize(String systemPrompt) {
        // 删除示例 (如果不必要)
        String optimized = removeExamples(systemPrompt);

        // 压缩指令
        optimized = compressInstructions(optimized);

        // 使用简洁语言
        optimized = simplifyLanguage(optimized);

        return optimized;
    }

    private String removeExamples(String prompt) {
        // 如果模型已经理解任务,可以删除示例
        return prompt.replaceAll("(?s)示例:.*?(?=\n\n|$)", "");
    }

    private String compressInstructions(String prompt) {
        // 用更简洁的方式表达相同的指令
        return prompt
            .replace("你需要", "")
            .replace("请务必", "必须")
            .replace("请注意", "注意");
    }

    private String simplifyLanguage(String prompt) {
        // 使用更简单的词汇
        return prompt
            .replace("进行分析", "分析")
            .replace("提供答案", "回答")
            .replace("给出建议", "建议");
    }

    /**
     * 动态System Prompt
     */
    public String buildDynamicSystemPrompt(String taskType) {
        return switch (taskType) {
            case "qa" -> "你是问答助手。简洁准确地回答问题。";
            case "summarization" -> "你是摘要助手。提取关键信息。";
            case "translation" -> "你是翻译助手。准确翻译文本。";
            default -> "你是AI助手。";
        };
    }
}

批量处理

请求合并

import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * 请求批量合并服务
 */
@Service
public class RequestBatchingService {

    private final ChatLanguageModel chatModel;
    private final BlockingQueue<BatchRequest> requestQueue;
    private final ScheduledExecutorService scheduler;

    private static final int BATCH_SIZE = 10;
    private static final int BATCH_TIMEOUT_MS = 100;

    public RequestBatchingService(ChatLanguageModel chatModel) {
        this.chatModel = chatModel;
        this.requestQueue = new LinkedBlockingQueue<>();
        this.scheduler = Executors.newSingleThreadScheduledExecutor();

        // 启动批处理任务
        startBatchProcessor();
    }

    /**
     * 提交请求到批处理队列
     */
    public CompletableFuture<String> submit(String prompt) {
        CompletableFuture<String> future = new CompletableFuture<>();
        BatchRequest request = new BatchRequest(prompt, future);
        requestQueue.offer(request);
        return future;
    }

    /**
     * 启动批处理器
     */
    private void startBatchProcessor() {
        scheduler.scheduleWithFixedDelay(() -> {
            try {
                processBatch();
            } catch (Exception e) {
                log.error("Error processing batch", e);
            }
        }, 0, BATCH_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }

    /**
     * 处理一批请求
     */
    private void processBatch() {
        List<BatchRequest> batch = new ArrayList<>();

        // 收集一批请求
        requestQueue.drainTo(batch, BATCH_SIZE);

        if (batch.isEmpty()) {
            return;
        }

        // 合并prompt
        String mergedPrompt = batch.stream()
            .map(req -> req.getPrompt())
            .collect(Collectors.joining("n---n"));

        // 批量调用LLM
        String mergedResponse = chatModel.generate(
            "请分别回答以下问题,用---分隔:nn" + mergedPrompt
        );

        // 分割响应
        String[] responses = mergedResponse.split("---");

        // 返回结果给各个请求
        for (int i = 0; i < batch.size(); i++) {
            String response = i < responses.length ?
                responses[i].trim() : "Error: No response";
            batch.get(i).getFuture().complete(response);
        }
    }

    @Data
    @AllArgsConstructor
    private static class BatchRequest {
        private String prompt;
        private CompletableFuture<String> future;
    }
}

批量Embedding计算

import dev.langchain4j.data.embedding.Embedding;
import dev.langchain4j.model.embedding.EmbeddingModel;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

/**
 * 批量Embedding计算服务
 */
@Service
public class BatchEmbeddingService {

    private final EmbeddingModel embeddingModel;
    private final BlockingQueue<EmbeddingRequest> requestQueue;
    private final ScheduledExecutorService scheduler;

    private static final int BATCH_SIZE = 50;
    private static final int BATCH_TIMEOUT_MS = 200;

    public BatchEmbeddingService(EmbeddingModel embeddingModel) {
        this.embeddingModel = embeddingModel;
        this.requestQueue = new LinkedBlockingQueue<>();
        this.scheduler = Executors.newSingleThreadScheduledExecutor();

        startBatchProcessor();
    }

    /**
     * 提交文本到批量Embedding队列
     */
    public CompletableFuture<Embedding> embed(String text) {
        CompletableFuture<Embedding> future = new CompletableFuture<>();
        EmbeddingRequest request = new EmbeddingRequest(text, future);
        requestQueue.offer(request);
        return future;
    }

    /**
     * 启动批处理器
     */
    private void startBatchProcessor() {
        scheduler.scheduleWithFixedDelay(() -> {
            try {
                processBatch();
            } catch (Exception e) {
                log.error("Error processing embedding batch", e);
            }
        }, 0, BATCH_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }

    /**
     * 批量处理Embedding请求
     */
    private void processBatch() {
        List<EmbeddingRequest> batch = new ArrayList<>();
        requestQueue.drainTo(batch, BATCH_SIZE);

        if (batch.isEmpty()) {
            return;
        }

        // 提取所有文本
        List<String> texts = batch.stream()
            .map(EmbeddingRequest::getText)
            .collect(Collectors.toList());

        // 批量计算embedding
        List<Embedding> embeddings = embeddingModel.embedAll(texts).content();

        // 返回结果
        for (int i = 0; i < batch.size(); i++) {
            batch.get(i).getFuture().complete(embeddings.get(i));
        }
    }

    /**
     * 批量Embedding with自动分批
     */
    public List<Embedding> embedBatch(List<String> texts) {
        // 如果文本数量超过批大小,自动分批处理
        if (texts.size() <= BATCH_SIZE) {
            return embeddingModel.embedAll(texts).content();
        }

        // 分批处理
        List<Embedding> allEmbeddings = new ArrayList<>();
        for (int i = 0; i < texts.size(); i += BATCH_SIZE) {
            int end = Math.min(i + BATCH_SIZE, texts.size());
            List<String> subList = texts.subList(i, end);
            List<Embedding> embeddings = embeddingModel.embedAll(subList).content();
            allEmbeddings.addAll(embeddings);
        }

        return allEmbeddings;
    }

    @Data
    @AllArgsConstructor
    private static class EmbeddingRequest {
        private String text;
        private CompletableFuture<Embedding> future;
    }
}

性能监控和基准测试

JMH基准测试

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.concurrent.TimeUnit;

/**
 * LLM性能基准测试
 */
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 1, jvmArgs = {"-Xms2G", "-Xmx2G"})
@Warmup(iterations = 3, time = 5)
@Measurement(iterations = 5, time = 10)
public class LlmBenchmark {

    private ChatLanguageModel chatModel;
    private MultiLevelCacheManager cacheManager;
    private ParallelLlmService parallelService;

    private static final String TEST_PROMPT = "What is artificial intelligence?";

    @Setup
    public void setup() {
        // 初始化测试组件
        chatModel = OpenAiChatModel.builder()
            .apiKey(System.getenv("OPENAI_API_KEY"))
            .modelName("gpt-3.5-turbo")
            .build();

        cacheManager = createCacheManager();
        parallelService = createParallelService();
    }

    /**
     * 基准: 无缓存的LLM调用
     */
    @Benchmark
    public String benchmarkNoCache() {
        return chatModel.generate(TEST_PROMPT);
    }

    /**
     * 基准: 带L1缓存的LLM调用
     */
    @Benchmark
    public String benchmarkWithL1Cache() {
        CachedResponse cached = cacheManager.get(TEST_PROMPT);
        if (cached != null) {
            return cached.getResponse();
        }
        String response = chatModel.generate(TEST_PROMPT);
        cacheManager.put(TEST_PROMPT, response);
        return response;
    }

    /**
     * 基准: 并行LLM调用
     */
    @Benchmark
    public List<String> benchmarkParallelCalls() {
        List<String> prompts = List.of(
            "What is AI?",
            "What is ML?",
            "What is DL?"
        );
        return parallelService.parallelGenerate(prompts);
    }

    /**
     * 运行基准测试
     */
    public static void main(String[] args) throws Exception {
        Options opt = new OptionsBuilder()
            .include(LlmBenchmark.class.getSimpleName())
            .build();

        new Runner(opt).run();
    }
}

P99延迟监控

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.stereotype.Component;

/**
 * P99延迟监控
 */
@Component
public class LatencyMonitor {

    private final MeterRegistry meterRegistry;

    public LatencyMonitor(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    /**
     * 记录LLM调用延迟
     */
    public <T> T measureLatency(String operation, Supplier<T> supplier) {
        Timer.Sample sample = Timer.start(meterRegistry);
        try {
            return supplier.get();
        } finally {
            sample.stop(Timer.builder("llm.latency")
                .tag("operation", operation)
                .publishPercentiles(0.5, 0.95, 0.99) // P50, P95, P99
                .publishPercentileHistogram()
                .register(meterRegistry));
        }
    }

    /**
     * 获取P99延迟
     */
    public double getP99Latency(String operation) {
        Timer timer = meterRegistry.find("llm.latency")
            .tag("operation", operation)
            .timer();

        if (timer != null) {
            return timer.percentile(0.99);
        }
        return 0.0;
    }

    /**
     * 监控缓存命中率
     */
    public void recordCacheHit(String level) {
        meterRegistry.counter("cache.hit", "level", level).increment();
    }

    public void recordCacheMiss() {
        meterRegistry.counter("cache.miss").increment();
    }

    /**
     * 获取缓存命中率
     */
    public double getCacheHitRate() {
        double hits = meterRegistry.counter("cache.hit").count();
        double misses = meterRegistry.counter("cache.miss").count();
        double total = hits + misses;
        return total > 0 ? hits / total : 0.0;
    }
}

性能仪表板

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 性能监控仪表板API
 */
@RestController
@RequestMapping("/api/metrics")
public class MetricsController {

    private final LatencyMonitor latencyMonitor;
    private final MultiLevelCacheManager cacheManager;
    private final MeterRegistry meterRegistry;

    /**
     * 获取性能指标概览
     */
    @GetMapping("/overview")
    public MetricsOverview getOverview() {
        return MetricsOverview.builder()
            .p99Latency(latencyMonitor.getP99Latency("generate"))
            .cacheHitRate(latencyMonitor.getCacheHitRate())
            .cacheStats(cacheManager.getStatistics())
            .requestCount(getRequestCount())
            .errorRate(getErrorRate())
            .avgResponseTime(getAvgResponseTime())
            .build();
    }

    /**
     * 获取详细的延迟分布
     */
    @GetMapping("/latency")
    public LatencyDistribution getLatencyDistribution() {
        Timer timer = meterRegistry.find("llm.latency").timer();

        return LatencyDistribution.builder()
            .p50(timer.percentile(0.50))
            .p95(timer.percentile(0.95))
            .p99(timer.percentile(0.99))
            .max(timer.max(TimeUnit.MILLISECONDS))
            .mean(timer.mean(TimeUnit.MILLISECONDS))
            .count(timer.count())
            .build();
    }

    /**
     * 获取缓存性能指标
     */
    @GetMapping("/cache")
    public CacheMetrics getCacheMetrics() {
        CacheStatistics stats = cacheManager.getStatistics();

        return CacheMetrics.builder()
            .l1HitRate(stats.getL1HitRate())
            .l2HitRate(stats.getL2HitRate())
            .l3HitRate(stats.getL3HitRate())
            .overallHitRate(stats.getOverallHitRate())
            .l1Stats(stats.getL1Stats())
            .build();
    }

    private long getRequestCount() {
        return meterRegistry.counter("llm.requests").count();
    }

    private double getErrorRate() {
        double errors = meterRegistry.counter("llm.errors").count();
        double total = getRequestCount();
        return total > 0 ? errors / total : 0.0;
    }

    private double getAvgResponseTime() {
        Timer timer = meterRegistry.find("llm.latency").timer();
        return timer != null ? timer.mean(TimeUnit.MILLISECONDS) : 0.0;
    }
}

@Data
@Builder
class MetricsOverview {
    private double p99Latency;
    private double cacheHitRate;
    private CacheStatistics cacheStats;
    private long requestCount;
    private double errorRate;
    private double avgResponseTime;
}

@Data
@Builder
class LatencyDistribution {
    private double p50;
    private double p95;
    private double p99;
    private double max;
    private double mean;
    private long count;
}

@Data
@Builder
class CacheMetrics {
    private double l1HitRate;
    private double l2HitRate;
    private double l3HitRate;
    private double overallHitRate;
    private CacheStats l1Stats;
}

性能告警

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * 性能告警监控
 */
@Component
@Slf4j
public class PerformanceAlerting {

    private final LatencyMonitor latencyMonitor;
    private final AlertService alertService;

    // 告警阈值
    private static final double P99_THRESHOLD = 5000.0; // 5秒
    private static final double CACHE_HIT_THRESHOLD = 0.7; // 70%
    private static final double ERROR_RATE_THRESHOLD = 0.05; // 5%

    /**
     * 每分钟检查性能指标
     */
    @Scheduled(fixedRate = 60000)
    public void checkPerformance() {
        // 检查P99延迟
        double p99 = latencyMonitor.getP99Latency("generate");
        if (p99 > P99_THRESHOLD) {
            alertService.sendAlert(
                "High P99 Latency",
                String.format("P99 latency is %.2fms, exceeds threshold %.2fms",
                             p99, P99_THRESHOLD)
            );
        }

        // 检查缓存命中率
        double hitRate = latencyMonitor.getCacheHitRate();
        if (hitRate < CACHE_HIT_THRESHOLD) {
            alertService.sendAlert(
                "Low Cache Hit Rate",
                String.format("Cache hit rate is %.2f%%, below threshold %.2f%%",
                             hitRate * 100, CACHE_HIT_THRESHOLD * 100)
            );
        }
    }
}

实战练习

练习1: 实现智能缓存策略

任务: 实现一个智能缓存决策系统,根据请求特征决定是否缓存:

@Service
public class SmartCacheStrategy {

    /**
     * 决定是否缓存响应
     */
    public boolean shouldCache(String prompt, String response) {
        // TODO: 实现缓存决策逻辑
        // 考虑因素:
        // 1. prompt的通用性 (是否包含时间相关、个性化内容)
        // 2. response的稳定性 (多次调用是否一致)
        // 3. 计算成本 (Token数量)
        // 4. 访问频率预测

        return false;
    }
}

提示:

  • 分析prompt中的时间相关词汇 ("今天"、"现在"等)
  • 检测个性化内容 (用户名、ID等)
  • 估算response的价值 (长度、复杂度)

练习2: 实现自适应批处理

任务: 实现一个自适应批处理系统,根据负载动态调整批大小:

@Service
public class AdaptiveBatchProcessor {

    private int currentBatchSize = 10;

    /**
     * 根据负载调整批大小
     */
    public void adjustBatchSize(double qps, double avgLatency) {
        // TODO: 实现自适应批大小逻辑
        // 考虑因素:
        // 1. 当前QPS (每秒查询数)
        // 2. 平均延迟
        // 3. 队列深度
        // 4. 错误率
    }
}

练习3: 实现性能优化建议系统

任务: 分析应用性能指标,自动生成优化建议:

@Service
public class PerformanceAdvisor {

    /**
     * 生成性能优化建议
     */
    public List<OptimizationSuggestion> generateSuggestions(
        MetricsOverview metrics) {

        List<OptimizationSuggestion> suggestions = new ArrayList<>();

        // TODO: 分析指标并生成建议
        // 1. 如果P99延迟过高 -> 建议启用缓存或流式输出
        // 2. 如果缓存命中率低 -> 建议优化缓存策略
        // 3. 如果错误率高 -> 建议增加重试或降级
        // 4. 如果并发度低 -> 建议增加并行处理

        return suggestions;
    }
}

最后更新:2026-03-09 字数统计:5,200 字 预计阅读时间:40 分钟

本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com