百万并发第一步:高并发ToC系统落地基石

Java教程 2025-09-30

引言

众所周知,构建高用户量的ToC系统时,需要关注:高并发、高可用、低延迟、海量用户、用户体验、快速迭代等。

让我们来一起看看高并发、低延迟思想下的经典代码范例。

本篇涵盖并行调用、高性能计数、批处理、缓存等核心模式。

1 CompletableFuture并行调用

在ToC应用中,一个页面(如“我的主页”)的数据往往来自多个下游服务(用户信息、推荐商品、我的待办)。串行调用会使延迟叠加,是性能杀手。

场景:一个请求需要聚合用户信息、用户积分、用户优惠券三个服务的数据。每个服务耗时约100ms。

@Service
public class AggregationService {

    @Autowired
    private UserInfoService userInfoService;
    @Autowired
    private PointService pointService;
    @Autowired
    private CouponService couponService;

    public UserHomePageDTO getHomePageData(Long userId) {
        // 1. 串行调用,耗时叠加
        UserInfo userInfo = userInfoService.getUserInfo(userId); // 耗时100ms
        UserPoint point = pointService.getUserPoint(userId);     // 耗时100ms
        List coupons = couponService.getCoupons(userId); // 耗时100ms

        // 2. 组装数据
        return assembleDTO(userInfo, point, coupons);
    }
    // ... assembleDTO a
}

使用CompletableFuture进行并发操作:

@Service
public class AggregationService {

    @Autowired
    private UserInfoService userInfoService;
    @Autowired
    private PointService pointService;
    @Autowired
    private CouponService couponService;

    // 自定义线程池,隔离不同业务,避免使用默认的ForkJoinPool处理I/O密集型任务
    @Bean(destroyMethod = "shutdownNow")
    public ExecutorService bizExecutor() {
        return new ThreadPoolExecutor(
            20, 50, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("biz-aggr-pool-%d").build(),
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:主线程执行,避免丢任务
        );
    }

    public UserHomePageDTO getHomePageData(Long userId) {
        ExecutorService executor = bizExecutor(); // 实际应 @Autowired,不然每次调用都创建新线程池

        CompletableFuture userInfoFuture = CompletableFuture
            .supplyAsync(() -> userInfoService.getUserInfo(userId), executor)
            .orTimeout(150, TimeUnit.MILLISECONDS)
            .exceptionally(ex -> {
                log.warn("UserInfoService timeout or error for userId: {}", userId, ex);
                return UserInfo.EMPTY; // 定义 EMPTY 常量,避免 null
            });

        CompletableFuture pointFuture = CompletableFuture
            .supplyAsync(() -> pointService.getUserPoint(userId), executor)
            .orTimeout(150, TimeUnit.MILLISECONDS)
            .exceptionally(ex -> {
                log.warn("PointService error for userId: {}", userId, ex);
                return UserPoint.ZERO;
            });

        CompletableFuture> couponsFuture = CompletableFuture
            .supplyAsync(() -> couponService.getCoupons(userId), executor)
            .orTimeout(150, TimeUnit.MILLISECONDS)
            .exceptionally(ex -> {
                log.warn("CouponService error for userId: {}", userId, ex);
                return Collections.emptyList();
            });

        // allOf 不返回结果,只等待完成
        CompletableFuture.allOf(userInfoFuture, pointFuture, couponsFuture).join();

        // 此时所有 future 已完成(成功或降级),直接 join() 安全
        return assembleDTO(
            userInfoFuture.join(),
            pointFuture.join(),
            couponsFuture.join()
        );
    }
}

将多个独立的、耗时的I/O操作并行化,用总耗时最长的单个操作时间来近似替代总耗时。

当然,对于对于I/O密集型任务,必须使用独立的、线程数合理的线程池。耗尽默认的ForkJoinPool会导致整个JVM的其他异步任务(如parallelStream)瘫痪。

1.1 注意事项

注意,生产中一定要设置超时+异常处理(降级或补偿)。

// 设置超时 + 异常降级
CompletableFuture userInfoFuture = CompletableFuture
    .supplyAsync(() -> userInfoService.getUserInfo(userId), BIZ_EXECUTOR)
    .orTimeout(200, TimeUnit.MILLISECONDS) // 超时熔断
    .exceptionally(ex -> {
        log.warn("UserInfoService failed for userId: {}", userId, ex);
        return DEFAULT_USER_INFO; // 降级兜底
    });

2 本地高并发计数器:LongAdder

统计一个热门商品详情页的实时浏览次数时,并发量极高。使用AtomicLong和synchronized性能较差:

public class ViewCounter {
    // 方案A: synchronized,锁粒度太粗,所有线程串行执行,并发越高,性能越差
    private long count = 0;
    public synchronized void increment() {
        count++;
    }

    // 方案B: AtomicLong,在高并发下,大量线程CAS自旋失败,消耗CPU
    private AtomicLong atomicCount = new AtomicLong(0);
    public void incrementWithAtomic() {
        atomicCount.incrementAndGet();
    }
}

可以使用LongAdder替代AtomicLong和synchronized:

import java.util.concurrent.atomic.LongAdder;

public class ViewCounter {
    // LongAdder是分段锁思想的实现,内部维护一个Cell数组
    // 并发不高时,直接操作base值,和AtomicLong类似
    // 并发变高时,线程会hash到不同的Cell上进行累加,大大减少了冲突
    private final LongAdder longAdder = new LongAdder();

    public void increment() {
        longAdder.increment();
    }

    public long getCount() {
        // sum()方法会累加所有Cell的值和base值,在读多写少的场景下可能会有数据不一致
        // 但在计数器这种场景下,最终一致性是可以接受的
        return longAdder.sum();
    }
}

LongAdder采用了空间换时间,分散热点的思想。 通过将一个热点计数器分散到多个“槽”(Cell)中,让不同的线程去更新不同的槽,最后再汇总,从而极大地降低了锁竞争。

适合读操作(sum())的频率远低于写操作(increment())的场景,用来统计、计数。

2.1 注意事项

LongAdder 内部的 Cell[] 在高并发下会扩容,极端情况下可能占用大量堆内存(比如 10w 线程持续写入)。 因此,LongAdder适合写远多于读的场景,且在堆内存有限的场景,需要监控LongAdder的实例数量。 或者考虑 Striped64 的变种或 Disruptor 环形缓冲。

此外,如果需要分布式的方案,Redis是主流方案,在实际中可以采用LongAdder+Redis的混合方案:

// 1. 本地 LongAdder 缓冲计数(高性能)
private final LongAdder localCounter = new LongAdder();

// 2. 定时/批量 flush 到 Redis(减少网络调用)
@Scheduled(fixedDelay = 1000)
public void flushToRedis() {
    long delta = localCounter.sumThenReset();
    if (delta > 0) {
        redisTemplate.opsForValue().increment("global:counter", delta);
    }
}
// sumThenReset() 不是原子的,在多线程调用 flush 时可能重复计数。加锁或使用单线程调度,比如上面的@Scheduled
  • 本地计数无锁高性能
  • Redis 写入频率大幅降低(比如 1000 次本地计数 → 1 次 Redis INCR)
  • 适合 PV/日志类计数(允许短暂不一致)

不过还需要注意,混合方案存在数据丢失风险(未 flush 的 delta 丢失)。如果需要强一致性(如金融计数),应直接使用Redis INCR,或引入本地WAL(Write-Ahead-Log)保证flush前数据可恢复。

3 优先考虑请求批处理

比如现在有一批商品ID。我们需要根据一批商品ID,查询每个商品的库存信息。不好的示例如下:

@Service
public class StockService {
    @Autowired
    private StockRpcService stockRpcService; // 假设这是一个RPC服务

    public Map getStocks(List productIds) {
        Map resultMap = new HashMap<>();
        if (CollectionUtils.isEmpty(productIds)) {
            return resultMap;
        }

        for (Long productId : productIds) {
            // 问题点:在循环中调用RPC,有多少个ID,就发起多少次调用
            Integer stock = stockRpcService.getStockByProductId(productId);
            resultMap.put(productId, stock);
        }
        return resultMap;
    }
}

这样的处理性能问题严重,在循环中进行网络调用(RPC或DB查询)是严重的反模式。如果productIds列表有100个元素,就会发起100次独立的网络请求。网络开销和下游服务的压力会非常大,导致接口延迟随输入规模线性增长。 且容错性差,任何一次RPC调用失败,都可能导致整个方法抛出异常而中断,已经查询到的数据也丢失了。

需要采取批处理进行优化:

@Service
public class StockService {
    @Autowired
    private StockRpcService stockRpcService;

    public Map getStocks(List productIds) {
        if (CollectionUtils.isEmpty(productIds)) {
            return Collections.emptyMap();
        }

        // 核心改动:调用下游服务提供的批量接口
        // 假设stockRpcService提供了一个批量查询接口
        Map stockMap = stockRpcService.getStocksByProductIds(productIds);

        // 如果下游不支持批量接口,也应该在业务层做聚合,然后分组分批调用单次查询接口
        // List> partitions = Lists.partition(productIds, 200); // e.g., 每200个ID一批
        // for (List partition : partitions) { ... }

        return stockMap;
    }
}

无论是数据库查询还是RPC调用,都应该将多次单点操作聚合成一次批量操作。

批处理可以大幅减少网络往返次数(RTT),降低数据库或下游服务的连接开销和负载,接口性能通常有数量级的提升。

3.1 注意事项

批量太大可能压垮下游(如 DB 一次查 10w 条),批量太小又失去意义。一般来说根据实际资源情况进行调整。

此外,还需要注意分片失败的处理策略,是否重试?是否跳过?

List> partitions = Lists.partition(productIds, 100); // 经验值:100~500
Map result = new HashMap<>();
for (List batch : partitions) {
    try {
        Map batchResult = stockRpcService.getStocksByProductIds(batch);
        result.putAll(batchResult);
    } catch (Exception e) {
        // 可选:记录失败 batch,异步重试 or 降级返回默认库存
        log.error("Batch stock query failed for: {}", batch, e);
        batch.forEach(id -> result.put(id, DEFAULT_STOCK));
    }
}

4 缓存:高并发的基石

在商品查询、用户信息等高频读场景中,缓存是降低数据库和下游服务压力的核心手段。往往采用 本地缓存(Caffeine) + 分布式缓存(Redis) 的多级架构。

@Service
public class UserInfoService {

	// Caffeine 本地缓存 + Redis 二级缓存
    private final LoadingCache localCache = Caffeine.newBuilder()
        .maximumSize(10_000)
        .expireAfterWrite(10, TimeUnit.MINUTES)
        .build(this::loadFromRedis);

    private UserInfo loadFromRedis(Long userId) {
        // 1. 先查 Redis
        UserInfo user = redisTemplate.opsForValue().get("user:info:" + userId);
        if (user != null) {
            return user;
        }

        // 2. Redis 未命中,查 DB(防穿透)
        user = userMapper.selectById(userId);
        if (user != null) {
            // 写回 Redis,设置合理过期时间
            redisTemplate.opsForValue().set("user:info:" + userId, user, 30, TimeUnit.MINUTES);
        } else {
            // 3. 防缓存穿透:空值也缓存(短 TTL)
            redisTemplate.opsForValue().set("user:info:" + userId, UserInfo.EMPTY, 2, TimeUnit.MINUTES);
        }
        return user;
    }

    public UserInfo getUserInfo(Long userId) {
        return localCache.get(userId);
    }
}

4.1 缓存问题处理

使用缓存需要注意防备缓存穿透、缓存雪崩问题,处理好缓存一致性等...

4.1.1 缓存穿透

查询不存在的数据 → 缓存空值(短 TTL)或使用布隆过滤器过滤非法ID

@Service
public class UserService {

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private UserMapper userMapper;

    // 布隆过滤器(启动时加载已存在用户 ID)
    private volatile BloomFilter userBloomFilter = BloomFilter.create(Funnels.longFunnel(), 10_000_000, 0.01); // 1% 误判率

    @PostConstruct
    public void initBloomFilter() {
        // 实际可从 DB 或离线任务加载活跃用户 ID
        // userBloomFilter.putAll(existsUserIds);
    }

    public User getUserById(Long userId) {
        // 1. 布隆过滤器拦截明显非法 ID
        if (userId == null || userId <= 0 || !userBloomFilter.mightContain(userId)) {
            return User.EMPTY; // 直接返回空对象,不查缓存/DB
        }

        String key = "user:info:" + userId;

        // 2. 查 Redis
        User user = (User) redisTemplate.opsForValue().get(key);
        if (user != null) {
            if (user == User.EMPTY) {
                return User.EMPTY; // 空值缓存命中
            }
            return user;
        }

        // 3. Redis 未命中,查 DB
        user = userMapper.selectById(userId);
        if (user != null) {
            // 正常数据:缓存 30 分钟
            redisTemplate.opsForValue().set(key, user, 30, TimeUnit.MINUTES);
        } else {
            // 空值:缓存 2 分钟,防止穿透
            redisTemplate.opsForValue().set(key, User.EMPTY, 2, TimeUnit.MINUTES);
        }
        return user;
    }
}

注意:布隆过滤器需定期更新(如监听用户注册事件),否则新用户会被误判为“不存在”。

4.1.2 缓存击穿

如“双 11 热门商品”缓存过期,10w 请求同时打到 DB,导致 DB 瞬间过载。

解决方案

  • 本地缓存(Caffeine)天然防击穿get(key, callable) 内部加锁,只允许一个线程加载
  • Redis 互斥锁:用 SET key lock EX 5 NX (SETNX)实现分布式锁,仅一个线程回源
@Service
public class ProductService {

    private final LoadingCache localCache = Caffeine.newBuilder()
        .maximumSize(10_000)
        .expireAfterWrite(10, TimeUnit.MINUTES)
        .build(this::loadProductFromRemote); // 自动加锁加载

    private Product loadProductFromRemote(Long productId) {
        // 1. 查 Redis(Redis 层也可能击穿,但压力远小于 DB)
        String redisKey = "product:detail:" + productId;
        Product product = (Product) redisTemplate.opsForValue().get(redisKey);
        if (product != null) {
            return product;
        }

        // 2. Redis 未命中,尝试加分布式锁回源
        String lockKey = "lock:product:" + productId;
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 3, TimeUnit.SECONDS);
        if (Boolean.TRUE.equals(locked)) {
            try {
                // 双重检查(防止锁释放后又被其他线程加载)
                product = (Product) redisTemplate.opsForValue().get(redisKey);
                if (product == null) {
                    product = productMapper.selectById(productId);
                    if (product != null) {
                        redisTemplate.opsForValue().set(redisKey, product, 30, TimeUnit.MINUTES);
                    } else {
                        redisTemplate.opsForValue().set(redisKey, Product.EMPTY, 2, TimeUnit.MINUTES);
                    }
                }
            } finally {
                redisTemplate.delete(lockKey); // 释放锁
            }
        } else {
            // 未抢到锁,短暂等待后重试(或直接返回旧数据/降级)
            try { Thread.sleep(50); } catch (InterruptedException e) { /* ignore */ }
            Product cached = (Product) redisTemplate.opsForValue().get(redisKey);
            return cached != null ? cached : Product.EMPTY;
        }
        return product;
    }

    public Product getProduct(Long productId) {
        return localCache.get(productId); // Caffeine 自动处理并发加载
    }
}

Caffeine 本地缓存已解决 90% 的击穿问题,Redis 层锁仅作为兜底。

4.1.3 缓存雪崩

问题:系统重启或缓存预热失败,导致所有 Key TTL 相同,同时失效。

解决方案

  • 随机过期时间:基础 TTL + 随机偏移(如 30 分钟 ± 5 分钟)
  • 永不过期 + 后台刷新:适用于核心热点数据(如首页 Banner)
// 设置 30 分钟基础过期 + 0~300 秒随机偏移
long baseTtl = 30 * 60; // 30 分钟
long randomTtl = ThreadLocalRandom.current().nextLong(0, 300); // 0~5 分钟
redisTemplate.opsForValue().set("product:detail:" + productId, product, baseTtl + randomTtl, TimeUnit.SECONDS);

对核心数据使用 CaffeinerefreshAfterWrite,后台异步刷新,前台永远读缓存。

4.1.4 缓存一致性

问题:更新用户昵称后,缓存仍是旧值,导致用户看到不一致数据。

解决方案

采用 “先更新 DB,再删除缓存”(Cache-Aside 模式),并考虑延迟双删应对主从延迟:

  1. 先更新数据库
  2. 删除缓存
  3. 延迟 N 毫秒后,再次删除缓存(应对主从复制延迟)
@Service
@Transactional
public class UserService {

    @Autowired
    private UserMapper userMapper;

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private TaskExecutor taskExecutor; // 异步线程池

    public void updateUser(User user) {
        // 1. 更新 DB
        userMapper.updateById(user);

        String cacheKey = "user:info:" + user.getId();

        // 2. 第一次删缓存
        redisTemplate.delete(cacheKey);

        // 3. 延迟 500ms 再删一次(覆盖主从延迟窗口)
        taskExecutor.execute(() -> {
            try {
                Thread.sleep(500);
                redisTemplate.delete(cacheKey);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
}

不过还需要注意,这种延迟双删的模式是仅适用于中小规模系统、允许短暂不一致的轻量方案。

对于大型项目,需要更可靠的缓存一致性可以采用 Cannal+Binlog监听的中间件级可靠方案。简单流程如下:

  1. Canal 伪装成 MySQL 从库,实时监听 Binlog
  2. 解析 UPDATE/DELETE 事件,精准提取变更的主键
  3. 异步发送 MQ 或直接调用,删除对应缓存
// Canal 监听到 Binlog 事件
{
  "table": "user",
  "type": "UPDATE",
  "data": { "id": 123, "name": "new_name" }
}

// 缓存服务消费事件
redis.delete("user:info:123");

上了 Canal,架构复杂性会提升一些,但不需要在业务代码里写延迟双删了

阶段方案特点
初期延迟双删代码简单,快速上线
中期延迟双删 + 本地缓存降级减少对 Redis 一致性的依赖
成熟期Canal / DTS / MQ + Binlog专业中间件保障一致性
高级缓存预热 + 版本号/逻辑时钟user_v2,彻底规避不一致

5 监控与可观测性

高并发系统不能只看代码,还要能观测、诊断、预警

虽然本篇不展开监控细节,但所有高并发接口都应具备:

  • 关键路径打点(如各服务调用耗时)
  • 异常率、超时率监控
  • 线程池活跃度、队列堆积告