冷热钱包系统设计实战(下):核心代码实现与部署


本文分为上下两篇,这是下篇。

上篇回顾:

  1. 引言:为什么需要冷热钱包?
  2. 冷热钱包的由来:从比特币说起
  3. 问题场景:一次真实的线上事故
  4. 冷热钱包架构:小白也能懂的方案
  5. 系统架构设计

本篇内容: 6. 核心代码实现

  1. 完整交易流程
  2. 并发控制机制
  3. 部署与运维
  4. 总结与展望

六、核心代码实现

6.1 核心常量配置

/**
 * 用户账户服务实现
 * 核心逻辑:冷热钱包扣款机制
 */
@Service
public class UserAccountServiceImpl implements UserAccountService {

    /**
     * 热钱包阈值
     * 当热钱包余额低于此值时,自动从冷钱包补充
     */
    private static final BigDecimal HOT_WALLET_THRESHOLD = new BigDecimal("10000");

    /**
     * 热钱包补充金额
     * 从冷钱包调拨到热钱包的固定金额
     */
    private static final BigDecimal HOT_WALLET_RECHARGE_AMOUNT = new BigDecimal("50000");
    
}

6.2 处理充值

充值是用户资金流入的主要方式,所有充值直接进入冷钱包:

/**
 * 处理充值
 * 充值金额直接进入冷钱包
 */
private void processRecharge(UserAccount account, TransactionRequest request,
                            TransactionResponse response) {
    BigDecimal amount = request.getAmount();

    // 增加冷钱包余额(使用乐观锁)
    int rows = userAccountMapper.addColdBalance(
        account.getUserId(),
        amount.longValue(),
        account.getVersion()
    );

    if (rows == 0) {
        throw new WalletException("充值失败,并发冲突");
    }

    // 更新账户对象
    account.setColdBalance(account.getColdBalance().add(amount));

    // 记录交易
    saveTransactionRecord(account, request, response,
                         TransactionType.RECHARGE,
                         account.getColdBalance(),
                         account.getHotBalance());
}

6.3 处理消费(核心逻辑)

这是整个系统最核心的逻辑:优先从热钱包扣款,热钱包不足时自动从冷钱包调拨

/**
 * 处理消费(核心逻辑)
 * 优先从热钱包扣款,热钱包不足时自动从冷钱包调拨
 */
private void processConsume(UserAccount account, TransactionRequest request,
                           TransactionResponse response) {
    BigDecimal amount = request.getAmount();

    // 1. 检查总余额是否足够
    BigDecimal totalBalance = account.getColdBalance().add(account.getHotBalance());
    if (totalBalance.compareTo(amount) < 0) {
        throw new WalletException("余额不足");
    }

    // 2. 如果热钱包余额不足,先从冷钱包调拨
    if (account.getHotBalance().compareTo(amount) < 0) {
        BigDecimal needTransfer = amount.subtract(account.getHotBalance());
        transferColdToHot(account, needTransfer);

        // 重新查询账户以获取最新的版本号
        LambdaQueryWrapper<UserAccount> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(UserAccount::getUserId, account.getUserId());
        UserAccount freshAccount = userAccountMapper.selectOne(wrapper);
        account.setVersion(freshAccount.getVersion());
        account.setColdBalance(freshAccount.getColdBalance());
        account.setHotBalance(freshAccount.getHotBalance());
    }

    // 3. 从热钱包扣款(使用乐观锁)
    int rows = userAccountMapper.deductHotBalance(
        account.getUserId(),
        amount.longValue(),
        account.getVersion()
    );

    if (rows == 0) {
        throw new WalletException("扣款失败,并发冲突");
    }

    // 更新账户对象
    account.setHotBalance(account.getHotBalance().subtract(amount));

    // 记录交易
    saveTransactionRecord(account, request, response, TransactionType.CONSUME,
                         account.getColdBalance(), account.getHotBalance());
}

6.4 冷热钱包调拨

/**
 * 冷钱包转热钱包(内部调拨)
 */
private void transferColdToHot(UserAccount account, BigDecimal amount) {
    log.info("开始调拨: userId={}, amount=冷转热{}", account.getUserId(), amount);

    // 1. 检查冷钱包余额
    if (account.getColdBalance().compareTo(amount) < 0) {
        throw new WalletException("冷钱包余额不足");
    }

    // 2. 扣减冷钱包
    int rows1 = userAccountMapper.deductColdBalance(
        account.getUserId(),
        amount.longValue(),
        account.getVersion()
    );
    if (rows1 == 0) {
        throw new WalletException("调拨失败,并发冲突");
    }
    account.setColdBalance(account.getColdBalance().subtract(amount));

    // 重新查询获取最新版本号
    LambdaQueryWrapper<UserAccount> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(UserAccount::getUserId, account.getUserId());
    UserAccount freshAccount = userAccountMapper.selectOne(wrapper);
    account.setVersion(freshAccount.getVersion());
    account.setColdBalance(freshAccount.getColdBalance());
    account.setHotBalance(freshAccount.getHotBalance());

    // 3. 增加热钱包
    int rows2 = userAccountMapper.addHotBalance(
        account.getUserId(),
        amount.longValue(),
        account.getVersion()
    );
    if (rows2 == 0) {
        throw new WalletException("调拨失败,并发冲突");
    }
    account.setHotBalance(account.getHotBalance().add(amount));

    log.info("调拨完成: coldBalance={}, hotBalance={}",
             account.getColdBalance(), account.getHotBalance());
}

6.5 自动充值热钱包

/**
 * 自动充值热钱包
 * 当热钱包余额低于阈值时,自动从冷钱包补充
 */
private void autoRechargeHotWallet(UserAccount account) {
    // 重新查询获取最新余额(在消费后)
    LambdaQueryWrapper<UserAccount> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(UserAccount::getUserId, account.getUserId());
    UserAccount freshAccount = userAccountMapper.selectOne(wrapper);
    account.setColdBalance(freshAccount.getColdBalance());
    account.setHotBalance(freshAccount.getHotBalance());
    account.setVersion(freshAccount.getVersion());

    if (account.getHotBalance().compareTo(HOT_WALLET_THRESHOLD) < 0) {
        if (account.getColdBalance().compareTo(HOT_WALLET_RECHARGE_AMOUNT) >= 0) {
            try {
                transferColdToHot(account, HOT_WALLET_RECHARGE_AMOUNT);
                log.info("自动补充热钱包: userId={}, amount={}",
                        account.getUserId(), HOT_WALLET_RECHARGE_AMOUNT);
            } catch (Exception e) {
                log.warn("自动补充热钱包失败: {}", e.getMessage());
            }
        }
    }
}

6.6 开始调拨

 @Override
    @DistributedLock(key = "#request.userId", waitTime = 3, leaseTime = 10)
    @Transactional(rollbackFor = Exception.class)
    public TransactionResponse transfer(TransferRequest request) {
        log.info("开始调拨: userId={}, amount={}, type={}",
                request.getUserId(), request.getAmount(), request.getTransferType());

        // 1. 查询账户
        LambdaQueryWrapper<UserAccount> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(UserAccount::getUserId, request.getUserId());
        UserAccount account = userAccountMapper.selectOne(wrapper);

        if (account == null) {
            throw new WalletException("账户不存在");
        }

        // 保存调拨前余额
        BigDecimal beforeColdBalance = account.getColdBalance();
        BigDecimal beforeHotBalance = account.getHotBalance();

        // 2. 生成调拨流水号
        String transferNo = "TF" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))
                + String.format("%04d", (int)(Math.random() * 10000));

        // 3. 执行调拨
        TransferType type = TransferType.getByCode(request.getTransferType());
        BigDecimal amount = request.getAmount();

        if (type == TransferType.COLD_TO_HOT) {
            // 冷转热
            if (account.getColdBalance().compareTo(amount) < 0) {
                throw new WalletException("冷钱包余额不足");
            }
            transferColdToHot(account, amount);
        } else {
            // 热转冷
            if (account.getHotBalance().compareTo(amount) < 0) {
                throw new WalletException("热钱包余额不足");
            }
            // 扣减热钱包
            int rows1 = userAccountMapper.deductHotBalance(
                    account.getUserId(),
                    amount.longValue(),
                    account.getVersion()
            );
            if (rows1 == 0) {
                throw new WalletException("调拨失败");
            }
            account.setHotBalance(account.getHotBalance().subtract(amount));
            account.setVersion(account.getVersion() + 1);

            // 增加冷钱包
            int rows2 = userAccountMapper.addColdBalance(
                    account.getUserId(),
                    amount.longValue(),
                    account.getVersion()
            );
            if (rows2 == 0) {
                throw new WalletException("调拨失败");
            }
            account.setColdBalance(account.getColdBalance().add(amount));
        }

        // 4. 记录调拨
        WalletTransfer transfer = new WalletTransfer();
        transfer.setTransferNo(transferNo);
        transfer.setUserId(request.getUserId());
        transfer.setTransferType(request.getTransferType());
        transfer.setAmount(amount);
        transfer.setBeforeColdBalance(BigDecimal.ZERO);
        transfer.setAfterColdBalance(account.getColdBalance());
        transfer.setBeforeHotBalance(BigDecimal.ZERO);
        transfer.setAfterHotBalance(account.getHotBalance());
        transfer.setStatus(TransactionStatus.SUCCESS.getCode());
        transfer.setCreateTime(LocalDateTime.now());
        transfer.setUpdateTime(LocalDateTime.now());
        walletTransferMapper.insert(transfer);

        // 5. 发送调拨审计消息
        sendTransferAuditMessage(request.getUserId(), transferNo, type,
                amount, beforeColdBalance, beforeHotBalance,
                account.getColdBalance(), account.getHotBalance(),
                request.getOperatorId(), request.getOperatorName());

        // 6. 返回结果
        TransactionResponse response = new TransactionResponse();
        response.setTransactionNo(transferNo);
        response.setUserId(request.getUserId());
        response.setAmount(amount);
        response.setStatus(TransactionStatus.SUCCESS.getCode());
        response.setStatusDesc(TransactionStatus.SUCCESS.getDesc());
        response.setColdBalance(account.getColdBalance());
        response.setHotBalance(account.getHotBalance());
        response.setMessage("调拨成功");
        response.setCreateTime(LocalDateTime.now());

        return response;
    }

七、完整交易流程

7.1 交易流程图

7.2 时序图

7.3 典型交易场景

场景1:正常消费(热钱包充足)
用户余额:冷钱包=100000,热钱包=20000
消费金额:5000

流程:
1. 检查热钱包余额(20000) >= 消费金额(5000)?  是
2. 直接从热钱包扣款
3. 最终:冷钱包=100000,热钱包=15000
场景2:热钱包不足
用户余额:冷钱包=100000,热钱包=5000
消费金额:15000

流程:
1. 检查热钱包余额(5000) >= 消费金额(15000)?  否
2. 计算需要调拨:15000 - 5000 = 10000
3. 从冷钱包调拨10000到热钱包
4. 从热钱包扣款15000
5. 最终:冷钱包=90000,热钱包=0
场景3:触发自动补充
用户余额:冷钱包=100000,热钱包=5000
消费金额:1000
热钱包阈值:10000
自动补充金额:50000

流程:
1. 从热钱包扣款1000
2. 扣款后热钱包=4000,低于阈值(10000)
3. 触发自动补充:从冷钱包调拨50000到热钱包
4. 最终:冷钱包=50000,热钱包=54000

八、并发控制机制

8.1 并发控制机制图

8.2 双重并发保护

本系统采用双重并发保护机制

第一层:Redis分布式锁
package com.wallet.aspect;

import java.lang.annotation.*;

/**
 * 分布式锁注解
 * 用于防止同一用户的并发操作
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLock {

    /**
     * 锁的key前缀
     */
    String prefix() default "wallet:lock:";

    /**
     * 锁的key表达式,支持SpEL表达式
     */
    String key() default "";

    /**
     * 等待时间(秒)
     */
    int waitTime() default 3;

    /**
     * 锁超时时间(秒)
     */
    int leaseTime() default 10;
}


package com.wallet.aspect;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * 分布式锁切面
 * 使用Redis实现分布式锁,防止同一用户的并发操作
 */
@Slf4j
@Aspect
@Component
public class DistributedLockAspect {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Around("@annotation(com.wallet.aspect.DistributedLock)")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);

        // 获取锁的key
        String lockKey = getLockKey(distributedLock, joinPoint);

        // 生成唯一的锁值
        String lockValue = UUID.randomUUID().toString();

        // 尝试获取锁
        boolean acquired = Boolean.TRUE.equals(stringRedisTemplate.opsForValue()
                .setIfAbsent(lockKey, lockValue, distributedLock.leaseTime(), TimeUnit.SECONDS));

        if (!acquired) {
            // 尝试等待
            long startTime = System.currentTimeMillis();
            long timeout = distributedLock.waitTime() * 1000L;

            while (System.currentTimeMillis() - startTime < timeout) {
                acquired = Boolean.TRUE.equals(stringRedisTemplate.opsForValue()
                        .setIfAbsent(lockKey, lockValue, distributedLock.leaseTime(), TimeUnit.SECONDS));
                if (acquired) {
                    break;
                }
                Thread.sleep(50);
            }

            if (!acquired) {
                throw new RuntimeException("操作频繁,请稍后重试");
            }
        }

        try {
            log.info("获取分布式锁成功: {}", lockKey);
            return joinPoint.proceed();
        } finally {
            // 释放锁
            String currentValue = stringRedisTemplate.opsForValue().get(lockKey);
            if (lockValue.equals(currentValue)) {
                stringRedisTemplate.delete(lockKey);
                log.info("释放分布式锁成功: {}", lockKey);
            }
        }
    }

    /**
     * 获取锁的key
     */
    private String getLockKey(DistributedLock distributedLock, ProceedingJoinPoint joinPoint) {
        String key = distributedLock.key();
        if (key.isEmpty()) {
            // 默认使用第一个参数作为key
            Object[] args = joinPoint.getArgs();
            if (args.length > 0 && args[0] != null) {
                key = String.valueOf(args[0]);
            } else {
                key = "default";
            }
        }
        return distributedLock.prefix() + key;
    }
}


// 使用示例
 @Override
    @DistributedLock(key = "#request.userId", waitTime = 3, leaseTime = 10)
    @Transactional(rollbackFor = Exception.class)
    public TransactionResponse processTransaction(TransactionRequest request) {
        // 记录交易开始时间
        long startTime = System.currentTimeMillis();
        TRANSACTION_START_TIME.set(startTime);

        log.info("开始处理交易: userId={}, amount={}, type={}",
                request.getUserId(), request.getAmount(), request.getTransactionType());

        // 1. 查询账户(加锁)
        LambdaQueryWrapper<UserAccount> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(UserAccount::getUserId, request.getUserId());
        UserAccount account = userAccountMapper.selectOne(wrapper);

        if (account == null) {
            sendTransactionFailureNotification(request.getUserId(), null, request,
                    "账户不存在", startTime);
            throw new WalletException("账户不存在");
        }

        // 保存交易前余额
        BigDecimal beforeColdBalance = account.getColdBalance();
        BigDecimal beforeHotBalance = account.getHotBalance();

        // 2. 检查账户状态
        if (account.getStatus() != AccountStatus.NORMAL.getCode()) {
            sendTransactionFailureNotification(request.getUserId(), null, request,
                    "账户状态异常,无法进行交易", startTime);
            throw new WalletException("账户状态异常,无法进行交易");
        }
    }     
第二层:数据库乐观锁
package com.wallet.entity;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 用户账户实体
 * 冷钱包 - 存储用户的主要余额
 */
@Data
@TableName("user_account")
public class UserAccount {

    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 用户ID
     */
    private Long userId;

    /**
     * 用户名
     */
    private String username;

    /**
     * 冷钱包余额(主账户余额)
     * 用户的主要资金存储
     */
    private BigDecimal coldBalance;

    /**
     * 热钱包余额(可用余额)
     * 用于日常交易和快速扣款
     */
    private BigDecimal hotBalance;

    /**
     * 账户状态:1-正常 2-冻结 3-注销
     */
    private Integer status;

    /**
     * 创建时间
     */
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    private LocalDateTime updateTime;

    /**
     * 版本号(乐观锁)
     */
    @Version
    private Integer version;

    /**
     * 逻辑删除标记
     */
    @TableLogic
    private Integer deleted;
}


/**
 * 扣款操作(使用乐观锁)
 */
UPDATE user_account
SET hot_balance = hot_balance - #{amount},
    version = version + 1,
    update_time = NOW()
WHERE user_id = #{userId}
  AND hot_balance >= #{amount}
  AND version = #{version}
  AND deleted = 0

8.3 为什么需要双重保护?

┌─────────────────────────────────────────────────────────────┐
│  双重保护的必要性                                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  单靠Redis锁的问题:                                          │
│  ├─ Redis宕机时锁失效                                        │
│  ├─ 网络分区时可能出现脑裂                                   │
│  └─ 无法防止DB层面的直接更新                                 │
│                                                             │
│  单靠数据库锁的问题:                                         │
│  ├─ 大量请求时DB连接耗尽                                     │
│  ├─ 事务等待时间长                                          │
│  └─ 系统吞吐量低                                            │
│                                                             │
│  双重保护的优势:                                            │
│  ├─ Redis锁:挡住99%的并发请求                               │
│  ├─ DB锁:挡住漏网的1%                                      │
│  ├─ 容错性强:任一层失效都能保证数据正确                      │
│  └─ 性能优异:大部分请求在Redis层就被拦截                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

8.4 系统交互图


九、部署与运维

9.1 生产环境部署图

9.2 Docker Compose 配置

version: '3'
services:
  # MySQL数据库
  mysql:
    image: mysql:8.0
    container_name: wallet-mysql
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: wallet_db
    volumes:
      - mysql-data:/var/lib/mysql
      - ./src/main/resources/schema.sql:/docker-entrypoint-initdb.d/schema.sql

  # Redis缓存
  redis:
    image: redis:6.0
    container_name: wallet-redis
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data

  # 应用服务
  wallet-app:
    build: .
    container_name: wallet-app
    ports:
      - "8080:8080"
    depends_on:
      - mysql
      - redis
    environment:
      SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306/wallet_db
      SPRING_REDIS_HOST: redis
      SPRING_REDIS_PORT: 6379

volumes:
  mysql-data:
  redis-data:

9.3 关键配置

application.yml
spring:
  # 数据源配置
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:3306/wallet_db?useUnicode=true&characterEncoding=utf8&useSSL=false
      username: root
      password: root
      # 连接池配置
      initial-size: 5
      min-idle: 5
      max-active: 20
      max-wait: 60000
      test-while-idle: true
      validation-query: SELECT 1

  # Redis配置
  redis:
    host: localhost
    port: 6379
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 0
        max-wait: -1ms

  # 事务配置
  transaction:
    rollback-on-commit-failure: true

# MyBatis Plus配置
mybatis-plus:
  mapper-locations: classpath:mapper/*.xml
  type-aliases-package: com.wallet.entity
  configuration:
    map-underscore-to-camel-case: true
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

9.4 性能优化建议

优化项具体措施预期效果
数据库索引user_id、transaction_no等字段建立索引查询速度提升10倍+
连接池优化合理设置初始大小、最大连接数避免连接创建开销
Redis缓存热点账户信息缓存减少DB查询80%
异步处理交易日志异步写入响应时间减少50%
SQL优化使用批量操作、避免SELECT *减少网络传输
JVM调优合理设置堆内存、GC策略减少STW时间

十、总结

10.1 核心要点回顾

┌─────────────────────────────────────────────────────────────┐
│              冷热钱包系统核心要点总结                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  一、设计思想                                                │
│  ├─ 冷热钱包分离:安全性与流动性平衡                          │
│  ├─ 空间换时间:用架构换性能                                  │
│  └─ 竞争变协作:把串行变并行                                  │
│                                                             │
│  二、核心技术                                                │
│  ├─ Redis分布式锁:应用层并发控制                            │
│  ├─ 数据库乐观锁:数据层最终保障                              │
│  ├─ 自动调拨机制:用户体验优化                                │
│  └─ 事务管理:数据一致性保证                                 │
│                                                             │
│  三、实施效果                                                │
│  ├─ 并发能力:提升10倍以上                                   │
│  ├─ 响应时间:降低到100ms以内                                │
│  ├─ 系统稳定性:99.9%可用性                                  │
│  └─ 用户体验:交易成功率接近100%                             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

10.2 适用场景

冷热钱包架构适用于以下场景:

场景适用性原因
电商平台⭐⭐⭐⭐⭐高并发秒杀、订单支付
在线支付⭐⭐⭐⭐⭐实时转账、钱包余额
O2O服务⭐⭐⭐⭐预付款、会员余额
共享经济⭐⭐⭐押金、余额管理

10.3 写在最后

冷热钱包架构是支付系统设计中的经典方案,它巧妙地平衡了安全性、性能和用户体验三者之间的关系。


本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com