狩猎摸拟器
52.83M · 2026-02-06
本文分为上下两篇,这是下篇。
上篇回顾:
本篇内容: 6. 核心代码实现
/**
* 用户账户服务实现
* 核心逻辑:冷热钱包扣款机制
*/
@Service
public class UserAccountServiceImpl implements UserAccountService {
/**
* 热钱包阈值
* 当热钱包余额低于此值时,自动从冷钱包补充
*/
private static final BigDecimal HOT_WALLET_THRESHOLD = new BigDecimal("10000");
/**
* 热钱包补充金额
* 从冷钱包调拨到热钱包的固定金额
*/
private static final BigDecimal HOT_WALLET_RECHARGE_AMOUNT = new BigDecimal("50000");
}
充值是用户资金流入的主要方式,所有充值直接进入冷钱包:
/**
* 处理充值
* 充值金额直接进入冷钱包
*/
private void processRecharge(UserAccount account, TransactionRequest request,
TransactionResponse response) {
BigDecimal amount = request.getAmount();
// 增加冷钱包余额(使用乐观锁)
int rows = userAccountMapper.addColdBalance(
account.getUserId(),
amount.longValue(),
account.getVersion()
);
if (rows == 0) {
throw new WalletException("充值失败,并发冲突");
}
// 更新账户对象
account.setColdBalance(account.getColdBalance().add(amount));
// 记录交易
saveTransactionRecord(account, request, response,
TransactionType.RECHARGE,
account.getColdBalance(),
account.getHotBalance());
}
这是整个系统最核心的逻辑:优先从热钱包扣款,热钱包不足时自动从冷钱包调拨。
/**
* 处理消费(核心逻辑)
* 优先从热钱包扣款,热钱包不足时自动从冷钱包调拨
*/
private void processConsume(UserAccount account, TransactionRequest request,
TransactionResponse response) {
BigDecimal amount = request.getAmount();
// 1. 检查总余额是否足够
BigDecimal totalBalance = account.getColdBalance().add(account.getHotBalance());
if (totalBalance.compareTo(amount) < 0) {
throw new WalletException("余额不足");
}
// 2. 如果热钱包余额不足,先从冷钱包调拨
if (account.getHotBalance().compareTo(amount) < 0) {
BigDecimal needTransfer = amount.subtract(account.getHotBalance());
transferColdToHot(account, needTransfer);
// 重新查询账户以获取最新的版本号
LambdaQueryWrapper<UserAccount> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(UserAccount::getUserId, account.getUserId());
UserAccount freshAccount = userAccountMapper.selectOne(wrapper);
account.setVersion(freshAccount.getVersion());
account.setColdBalance(freshAccount.getColdBalance());
account.setHotBalance(freshAccount.getHotBalance());
}
// 3. 从热钱包扣款(使用乐观锁)
int rows = userAccountMapper.deductHotBalance(
account.getUserId(),
amount.longValue(),
account.getVersion()
);
if (rows == 0) {
throw new WalletException("扣款失败,并发冲突");
}
// 更新账户对象
account.setHotBalance(account.getHotBalance().subtract(amount));
// 记录交易
saveTransactionRecord(account, request, response, TransactionType.CONSUME,
account.getColdBalance(), account.getHotBalance());
}
/**
* 冷钱包转热钱包(内部调拨)
*/
private void transferColdToHot(UserAccount account, BigDecimal amount) {
log.info("开始调拨: userId={}, amount=冷转热{}", account.getUserId(), amount);
// 1. 检查冷钱包余额
if (account.getColdBalance().compareTo(amount) < 0) {
throw new WalletException("冷钱包余额不足");
}
// 2. 扣减冷钱包
int rows1 = userAccountMapper.deductColdBalance(
account.getUserId(),
amount.longValue(),
account.getVersion()
);
if (rows1 == 0) {
throw new WalletException("调拨失败,并发冲突");
}
account.setColdBalance(account.getColdBalance().subtract(amount));
// 重新查询获取最新版本号
LambdaQueryWrapper<UserAccount> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(UserAccount::getUserId, account.getUserId());
UserAccount freshAccount = userAccountMapper.selectOne(wrapper);
account.setVersion(freshAccount.getVersion());
account.setColdBalance(freshAccount.getColdBalance());
account.setHotBalance(freshAccount.getHotBalance());
// 3. 增加热钱包
int rows2 = userAccountMapper.addHotBalance(
account.getUserId(),
amount.longValue(),
account.getVersion()
);
if (rows2 == 0) {
throw new WalletException("调拨失败,并发冲突");
}
account.setHotBalance(account.getHotBalance().add(amount));
log.info("调拨完成: coldBalance={}, hotBalance={}",
account.getColdBalance(), account.getHotBalance());
}
/**
* 自动充值热钱包
* 当热钱包余额低于阈值时,自动从冷钱包补充
*/
private void autoRechargeHotWallet(UserAccount account) {
// 重新查询获取最新余额(在消费后)
LambdaQueryWrapper<UserAccount> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(UserAccount::getUserId, account.getUserId());
UserAccount freshAccount = userAccountMapper.selectOne(wrapper);
account.setColdBalance(freshAccount.getColdBalance());
account.setHotBalance(freshAccount.getHotBalance());
account.setVersion(freshAccount.getVersion());
if (account.getHotBalance().compareTo(HOT_WALLET_THRESHOLD) < 0) {
if (account.getColdBalance().compareTo(HOT_WALLET_RECHARGE_AMOUNT) >= 0) {
try {
transferColdToHot(account, HOT_WALLET_RECHARGE_AMOUNT);
log.info("自动补充热钱包: userId={}, amount={}",
account.getUserId(), HOT_WALLET_RECHARGE_AMOUNT);
} catch (Exception e) {
log.warn("自动补充热钱包失败: {}", e.getMessage());
}
}
}
}
@Override
@DistributedLock(key = "#request.userId", waitTime = 3, leaseTime = 10)
@Transactional(rollbackFor = Exception.class)
public TransactionResponse transfer(TransferRequest request) {
log.info("开始调拨: userId={}, amount={}, type={}",
request.getUserId(), request.getAmount(), request.getTransferType());
// 1. 查询账户
LambdaQueryWrapper<UserAccount> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(UserAccount::getUserId, request.getUserId());
UserAccount account = userAccountMapper.selectOne(wrapper);
if (account == null) {
throw new WalletException("账户不存在");
}
// 保存调拨前余额
BigDecimal beforeColdBalance = account.getColdBalance();
BigDecimal beforeHotBalance = account.getHotBalance();
// 2. 生成调拨流水号
String transferNo = "TF" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))
+ String.format("%04d", (int)(Math.random() * 10000));
// 3. 执行调拨
TransferType type = TransferType.getByCode(request.getTransferType());
BigDecimal amount = request.getAmount();
if (type == TransferType.COLD_TO_HOT) {
// 冷转热
if (account.getColdBalance().compareTo(amount) < 0) {
throw new WalletException("冷钱包余额不足");
}
transferColdToHot(account, amount);
} else {
// 热转冷
if (account.getHotBalance().compareTo(amount) < 0) {
throw new WalletException("热钱包余额不足");
}
// 扣减热钱包
int rows1 = userAccountMapper.deductHotBalance(
account.getUserId(),
amount.longValue(),
account.getVersion()
);
if (rows1 == 0) {
throw new WalletException("调拨失败");
}
account.setHotBalance(account.getHotBalance().subtract(amount));
account.setVersion(account.getVersion() + 1);
// 增加冷钱包
int rows2 = userAccountMapper.addColdBalance(
account.getUserId(),
amount.longValue(),
account.getVersion()
);
if (rows2 == 0) {
throw new WalletException("调拨失败");
}
account.setColdBalance(account.getColdBalance().add(amount));
}
// 4. 记录调拨
WalletTransfer transfer = new WalletTransfer();
transfer.setTransferNo(transferNo);
transfer.setUserId(request.getUserId());
transfer.setTransferType(request.getTransferType());
transfer.setAmount(amount);
transfer.setBeforeColdBalance(BigDecimal.ZERO);
transfer.setAfterColdBalance(account.getColdBalance());
transfer.setBeforeHotBalance(BigDecimal.ZERO);
transfer.setAfterHotBalance(account.getHotBalance());
transfer.setStatus(TransactionStatus.SUCCESS.getCode());
transfer.setCreateTime(LocalDateTime.now());
transfer.setUpdateTime(LocalDateTime.now());
walletTransferMapper.insert(transfer);
// 5. 发送调拨审计消息
sendTransferAuditMessage(request.getUserId(), transferNo, type,
amount, beforeColdBalance, beforeHotBalance,
account.getColdBalance(), account.getHotBalance(),
request.getOperatorId(), request.getOperatorName());
// 6. 返回结果
TransactionResponse response = new TransactionResponse();
response.setTransactionNo(transferNo);
response.setUserId(request.getUserId());
response.setAmount(amount);
response.setStatus(TransactionStatus.SUCCESS.getCode());
response.setStatusDesc(TransactionStatus.SUCCESS.getDesc());
response.setColdBalance(account.getColdBalance());
response.setHotBalance(account.getHotBalance());
response.setMessage("调拨成功");
response.setCreateTime(LocalDateTime.now());
return response;
}
用户余额:冷钱包=100000,热钱包=20000
消费金额:5000
流程:
1. 检查热钱包余额(20000) >= 消费金额(5000)? 是
2. 直接从热钱包扣款
3. 最终:冷钱包=100000,热钱包=15000
用户余额:冷钱包=100000,热钱包=5000
消费金额:15000
流程:
1. 检查热钱包余额(5000) >= 消费金额(15000)? 否
2. 计算需要调拨:15000 - 5000 = 10000
3. 从冷钱包调拨10000到热钱包
4. 从热钱包扣款15000
5. 最终:冷钱包=90000,热钱包=0
用户余额:冷钱包=100000,热钱包=5000
消费金额:1000
热钱包阈值:10000
自动补充金额:50000
流程:
1. 从热钱包扣款1000
2. 扣款后热钱包=4000,低于阈值(10000)
3. 触发自动补充:从冷钱包调拨50000到热钱包
4. 最终:冷钱包=50000,热钱包=54000
本系统采用双重并发保护机制:
package com.wallet.aspect;
import java.lang.annotation.*;
/**
* 分布式锁注解
* 用于防止同一用户的并发操作
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLock {
/**
* 锁的key前缀
*/
String prefix() default "wallet:lock:";
/**
* 锁的key表达式,支持SpEL表达式
*/
String key() default "";
/**
* 等待时间(秒)
*/
int waitTime() default 3;
/**
* 锁超时时间(秒)
*/
int leaseTime() default 10;
}
package com.wallet.aspect;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 分布式锁切面
* 使用Redis实现分布式锁,防止同一用户的并发操作
*/
@Slf4j
@Aspect
@Component
public class DistributedLockAspect {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Around("@annotation(com.wallet.aspect.DistributedLock)")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);
// 获取锁的key
String lockKey = getLockKey(distributedLock, joinPoint);
// 生成唯一的锁值
String lockValue = UUID.randomUUID().toString();
// 尝试获取锁
boolean acquired = Boolean.TRUE.equals(stringRedisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, distributedLock.leaseTime(), TimeUnit.SECONDS));
if (!acquired) {
// 尝试等待
long startTime = System.currentTimeMillis();
long timeout = distributedLock.waitTime() * 1000L;
while (System.currentTimeMillis() - startTime < timeout) {
acquired = Boolean.TRUE.equals(stringRedisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, distributedLock.leaseTime(), TimeUnit.SECONDS));
if (acquired) {
break;
}
Thread.sleep(50);
}
if (!acquired) {
throw new RuntimeException("操作频繁,请稍后重试");
}
}
try {
log.info("获取分布式锁成功: {}", lockKey);
return joinPoint.proceed();
} finally {
// 释放锁
String currentValue = stringRedisTemplate.opsForValue().get(lockKey);
if (lockValue.equals(currentValue)) {
stringRedisTemplate.delete(lockKey);
log.info("释放分布式锁成功: {}", lockKey);
}
}
}
/**
* 获取锁的key
*/
private String getLockKey(DistributedLock distributedLock, ProceedingJoinPoint joinPoint) {
String key = distributedLock.key();
if (key.isEmpty()) {
// 默认使用第一个参数作为key
Object[] args = joinPoint.getArgs();
if (args.length > 0 && args[0] != null) {
key = String.valueOf(args[0]);
} else {
key = "default";
}
}
return distributedLock.prefix() + key;
}
}
// 使用示例
@Override
@DistributedLock(key = "#request.userId", waitTime = 3, leaseTime = 10)
@Transactional(rollbackFor = Exception.class)
public TransactionResponse processTransaction(TransactionRequest request) {
// 记录交易开始时间
long startTime = System.currentTimeMillis();
TRANSACTION_START_TIME.set(startTime);
log.info("开始处理交易: userId={}, amount={}, type={}",
request.getUserId(), request.getAmount(), request.getTransactionType());
// 1. 查询账户(加锁)
LambdaQueryWrapper<UserAccount> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(UserAccount::getUserId, request.getUserId());
UserAccount account = userAccountMapper.selectOne(wrapper);
if (account == null) {
sendTransactionFailureNotification(request.getUserId(), null, request,
"账户不存在", startTime);
throw new WalletException("账户不存在");
}
// 保存交易前余额
BigDecimal beforeColdBalance = account.getColdBalance();
BigDecimal beforeHotBalance = account.getHotBalance();
// 2. 检查账户状态
if (account.getStatus() != AccountStatus.NORMAL.getCode()) {
sendTransactionFailureNotification(request.getUserId(), null, request,
"账户状态异常,无法进行交易", startTime);
throw new WalletException("账户状态异常,无法进行交易");
}
}
package com.wallet.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 用户账户实体
* 冷钱包 - 存储用户的主要余额
*/
@Data
@TableName("user_account")
public class UserAccount {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户ID
*/
private Long userId;
/**
* 用户名
*/
private String username;
/**
* 冷钱包余额(主账户余额)
* 用户的主要资金存储
*/
private BigDecimal coldBalance;
/**
* 热钱包余额(可用余额)
* 用于日常交易和快速扣款
*/
private BigDecimal hotBalance;
/**
* 账户状态:1-正常 2-冻结 3-注销
*/
private Integer status;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
/**
* 版本号(乐观锁)
*/
@Version
private Integer version;
/**
* 逻辑删除标记
*/
@TableLogic
private Integer deleted;
}
/**
* 扣款操作(使用乐观锁)
*/
UPDATE user_account
SET hot_balance = hot_balance - #{amount},
version = version + 1,
update_time = NOW()
WHERE user_id = #{userId}
AND hot_balance >= #{amount}
AND version = #{version}
AND deleted = 0
┌─────────────────────────────────────────────────────────────┐
│ 双重保护的必要性 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 单靠Redis锁的问题: │
│ ├─ Redis宕机时锁失效 │
│ ├─ 网络分区时可能出现脑裂 │
│ └─ 无法防止DB层面的直接更新 │
│ │
│ 单靠数据库锁的问题: │
│ ├─ 大量请求时DB连接耗尽 │
│ ├─ 事务等待时间长 │
│ └─ 系统吞吐量低 │
│ │
│ 双重保护的优势: │
│ ├─ Redis锁:挡住99%的并发请求 │
│ ├─ DB锁:挡住漏网的1% │
│ ├─ 容错性强:任一层失效都能保证数据正确 │
│ └─ 性能优异:大部分请求在Redis层就被拦截 │
│ │
└─────────────────────────────────────────────────────────────┘
version: '3'
services:
# MySQL数据库
mysql:
image: mysql:8.0
container_name: wallet-mysql
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: wallet_db
volumes:
- mysql-data:/var/lib/mysql
- ./src/main/resources/schema.sql:/docker-entrypoint-initdb.d/schema.sql
# Redis缓存
redis:
image: redis:6.0
container_name: wallet-redis
ports:
- "6379:6379"
command: redis-server --appendonly yes
volumes:
- redis-data:/data
# 应用服务
wallet-app:
build: .
container_name: wallet-app
ports:
- "8080:8080"
depends_on:
- mysql
- redis
environment:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql:3306/wallet_db
SPRING_REDIS_HOST: redis
SPRING_REDIS_PORT: 6379
volumes:
mysql-data:
redis-data:
spring:
# 数据源配置
datasource:
type: com.alibaba.druid.pool.DruidDataSource
druid:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/wallet_db?useUnicode=true&characterEncoding=utf8&useSSL=false
username: root
password: root
# 连接池配置
initial-size: 5
min-idle: 5
max-active: 20
max-wait: 60000
test-while-idle: true
validation-query: SELECT 1
# Redis配置
redis:
host: localhost
port: 6379
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: -1ms
# 事务配置
transaction:
rollback-on-commit-failure: true
# MyBatis Plus配置
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
type-aliases-package: com.wallet.entity
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
| 优化项 | 具体措施 | 预期效果 |
|---|---|---|
| 数据库索引 | user_id、transaction_no等字段建立索引 | 查询速度提升10倍+ |
| 连接池优化 | 合理设置初始大小、最大连接数 | 避免连接创建开销 |
| Redis缓存 | 热点账户信息缓存 | 减少DB查询80% |
| 异步处理 | 交易日志异步写入 | 响应时间减少50% |
| SQL优化 | 使用批量操作、避免SELECT * | 减少网络传输 |
| JVM调优 | 合理设置堆内存、GC策略 | 减少STW时间 |
┌─────────────────────────────────────────────────────────────┐
│ 冷热钱包系统核心要点总结 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 一、设计思想 │
│ ├─ 冷热钱包分离:安全性与流动性平衡 │
│ ├─ 空间换时间:用架构换性能 │
│ └─ 竞争变协作:把串行变并行 │
│ │
│ 二、核心技术 │
│ ├─ Redis分布式锁:应用层并发控制 │
│ ├─ 数据库乐观锁:数据层最终保障 │
│ ├─ 自动调拨机制:用户体验优化 │
│ └─ 事务管理:数据一致性保证 │
│ │
│ 三、实施效果 │
│ ├─ 并发能力:提升10倍以上 │
│ ├─ 响应时间:降低到100ms以内 │
│ ├─ 系统稳定性:99.9%可用性 │
│ └─ 用户体验:交易成功率接近100% │
│ │
└─────────────────────────────────────────────────────────────┘
冷热钱包架构适用于以下场景:
| 场景 | 适用性 | 原因 |
|---|---|---|
| 电商平台 | ⭐⭐⭐⭐⭐ | 高并发秒杀、订单支付 |
| 在线支付 | ⭐⭐⭐⭐⭐ | 实时转账、钱包余额 |
| O2O服务 | ⭐⭐⭐⭐ | 预付款、会员余额 |
| 共享经济 | ⭐⭐⭐ | 押金、余额管理 |
冷热钱包架构是支付系统设计中的经典方案,它巧妙地平衡了安全性、性能和用户体验三者之间的关系。