掘地攀登
100.61M · 2026-03-29
还记得你写的第一个Socket程序吗?大概长这样:
// 上古时代网络编程(请勿模仿!)
while(true) {
Socket client = serverSocket.accept(); // 阻塞!等连接
new Thread(() -> { // 来个线程伺候
InputStream in = client.getInputStream();
// 读取数据... 等等,数据没读完怎么办?
// 数据粘包了怎么办?
// 客户端突然掉线怎么办?
// (程序员逐渐崩溃)
}).start();
// 线程数爆炸,服务器卒。
}
恭喜你,喜提“C10K问题”体验卡! (C10K:1万个并发连接就把服务器搞崩)
这时候,Netty戴着墨镜闪亮登场:“哥们,异步非阻塞,了解一下?”
public class NettyServer {
public static void main(String[] args) throws Exception {
// 1. 创建两个“包工头”团队
// BossGroup:接待新客户(连接)
// WorkerGroup:处理客户请求(I/O)
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 1个老板
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 一群工人(默认CPU核数×2)
try {
// 2. 创建服务器“施工图纸”
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 用NIO当“交通工具”
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 3. 给每个连接装配“流水线”
ChannelPipeline pipeline = ch.pipeline();
// 添加处理器,像流水线上的工人
pipeline.addLast(new StringDecoder()); // 解码工:字节转字符串
pipeline.addLast(new StringEncoder()); // 编码工:字符串转字节
pipeline.addLast(new MyBusinessHandler()); // 业务工:处理实际业务
}
});
// 4. 绑定端口,开业大吉!
ChannelFuture f = b.bind(8888).sync();
System.out.println("服务器启动在 8888 端口,可以接客了!");
// 5. 优雅地等待关机(有客户来别想关!)
f.channel().closeFuture().sync();
} finally {
// 6. 打烊,给工人们发工资(关闭线程池)
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
业务处理器长这样:
public class MyBusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// msg已经是String了,感谢StringDecoder!
String request = (String) msg;
// 处理业务
String response = "Hello, " + request + "!";
// 写回给客户端
ctx.writeAndFlush(response);
// 注意:这里没有new Thread()!
// Netty的异步魔法让一个线程能处理N个连接
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 异常处理:记日志,关连接
cause.printStackTrace();
ctx.close();
}
}
发生了什么魔法?
// EventLoop的内心独白
public class NioEventLoop extends SingleThreadEventLoop {
private Selector selector; // 多个Channel
private Queue<Runnable> taskQueue; // 待办任务列表
public void run() {
while (!isTerminated()) {
// 第一步:检查哪些Channel有数据来了(I/O事件)
int readyChannels = selector.select(timeout);
if (readyChannels > 0) {
// 处理I/O事件
processSelectedKeys();
}
// 第二步:处理其他任务(比如定时任务、用户提交的任务)
runAllTasks();
// 第三步:如果没有活,适当睡一会儿
// 但不会睡死,有活马上醒!
}
}
}
打个比方:
EventLoop就像个“外卖小哥”,他负责一片区域(多个Channel):
这是Netty的“爱情观” :
// Channel对EventLoop说:“从你注册我的那一刻起,我就认定你了!”
channel.eventLoop().execute(() -> {
// 这个任务一定由绑定我的那个EventLoop执行
// 不会有线程安全问题,因为永远只有这一个线程操作我
channel.write("我只属于你~");
});
// 这避免了多线程的“三角恋”问题:
// 线程A在写,线程B也在写 → 数据混乱!
// 现在:一生一世一双“线程”,妥妥的!
想象你网购了一个包裹:
快递员(网络) → 快递柜(ByteBuf) → 拆箱工(Decoder)
→ 质检员(Handler1) → 组装工(Handler2)
→ 打包工(Encoder) → 发货(网络)
代码实现:
pipeline.addLast("decoder", new StringDecoder()); // 拆箱:字节转字符串
pipeline.addLast("handler1", new AuthHandler()); // 安检:检查权限
pipeline.addLast("handler2", new BusinessHandler()); // 处理:核心业务
pipeline.addLast("encoder", new StringEncoder()); // 打包:字符串转字节
数据流动方向:
接收数据: 网络 → decoder → handler1 → handler2 → 你的业务代码
发送数据: 你的业务代码 → encoder → 网络
场景:客户端发来协议:前4字节是长度,后面是实际数据
// 自定义解码器
public class LengthFieldDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 如果可读字节小于4,说明数据还没收全
if (in.readableBytes() < 4) {
return; // 继续等待
}
// 标记当前读位置,万一数据不够可以回退
in.markReaderIndex();
// 读取长度(前4字节)
int length = in.readInt();
// 如果实际数据长度不够
if (in.readableBytes() < length) {
in.resetReaderIndex(); // 回退,假装没读过
return; // 继续等待
}
// 读取实际数据
ByteBuf data = in.readBytes(length);
out.add(data); // 交给下一个Handler
}
}
// 使用
pipeline.addLast(new LengthFieldDecoder());
pipeline.addLast(new StringDecoder()); // 现在可以转字符串了
Netty内置了很多现成的解码器:
LineBasedFrameDecoder:按行分割(n或rn)DelimiterBasedFrameDecoder:按自定义分隔符FixedLengthFrameDecoder:固定长度LengthFieldBasedFrameDecoder:最强大,各种长度字段协议// JDK ByteBuffer的“反人类设计”
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 写数据
buffer.put("Hello".getBytes());
// 想读?先flip!
buffer.flip(); // 切换模式,limit=position, position=0
// 读数据
byte[] dst = new byte[buffer.remaining()];
buffer.get(dst);
// 想再写?先clear或compact!
buffer.clear(); // position=0, limit=capacity
// 但clear会丢弃未读数据!用compact?更复杂!
// 结论:ByteBuffer是“精分患者”,读写模式切换让人崩溃
// Netty ByteBuf的“优雅”
ByteBuf buf = Unpooled.buffer(1024);
// 写数据
buf.writeBytes("Hello".getBytes()); // writerIndex移动
// 读数据
byte b = buf.readByte(); // readerIndex移动
// 看看指针位置
int readerIndex = buf.readerIndex(); // 当前读位置
int writerIndex = buf.writerIndex(); // 当前写位置
int readable = buf.readableBytes(); // 可读字节数
int writable = buf.writableBytes(); // 可写字节数
// 标记和重置(就像游戏存档)
buf.markReaderIndex(); // 存档读位置
buf.readInt(); // 读个int
buf.resetReaderIndex(); // 读位置回档!
// 切片(零拷贝!)
ByteBuf slice = buf.slice(0, 5); // 不复制数据,共享底层数组
ByteBuf copy = buf.copy(0, 5); // 复制数据,独立副本
// 自动扩容
buf.writeBytes(new byte[2000]); // 超过1024?自动扩!
// 还有各种便捷方法
buf.readChar();
buf.writeInt(42);
buf.getBytes(0, dst); // 不移动readerIndex的读
ByteBuf的“核心优势” :
// 没有内存池:每次分配新内存,用完就扔
ByteBuf buf = Unpooled.directBuffer(1024);
// 用完后被GC回收
// 问题:频繁GC,内存碎片,性能下降
// 有内存池:内存复用
ByteBuf pooledBuf = PooledByteBufAllocator.DEFAULT.directBuffer(1024);
// 用完后...
pooledBuf.release(); // 放回池子,不是给GC
// 下次有人申请类似大小的,直接复用!
// 这就像:
// 没有池子:每次吃饭用一次性筷子,用完就扔
// 有池子:用消毒筷子,吃完回收消毒,下个人接着用
Netty的内存池像“精细化管理的仓库”:
// 内存分配策略
public class PoolArena {
// 小内存:Tiny(<512B)和Small(<8KB)
// 用SubpagePool管理,像“文具店”,卖铅笔橡皮
private final PoolSubpage<T>[] tinySubpagePools; // 16B, 32B, 48B...
private final PoolSubpage<T>[] smallSubpagePools; // 512B, 1KB, 2KB...
// 中等内存:Normal(8KB-16MB)
// 用PoolChunk管理,像“大卖场”,按页(8KB)分配
private final PoolChunkList<T> qInit; // 使用率 0-25%
private final PoolChunkList<T> q000; // 使用率 1-50%
private final PoolChunkList<T> q025; // 使用率 25-75%
private final PoolChunkList<T> q050; // 使用率 50-100%
private final PoolChunkList<T> q075; // 使用率 75-100%
private final PoolChunkList<T> q100; // 使用率 100%
// 大内存:Huge(>16MB)
// 直接分配,不池化,像“定制家具”
}
分配算法:伙伴系统(Buddy System)+ 位图
关键源码:NioEventLoop.run()
protected void run() {
for (;;) { // 无限循环,但很聪明
try {
// 1. 检查是否有普通任务
boolean hasTasks = hasTasks();
// 2. 根据是否有任务选择策略
int strategy = selectStrategy.calculateStrategy(hasTasks);
switch (strategy) {
case SelectStrategy.CONTINUE:
continue; // 继续循环
case SelectStrategy.SELECT:
// 没有任务,select可能阻塞(最多1秒)
strategy = selector.select(timeoutMillis);
break;
default:
// 有任务,不阻塞,立即处理
}
// 3. 处理I/O事件
if (strategy > 0) {
processSelectedKeys(); // 处理就绪的Channel
}
// 4. 处理任务(限制时间,避免饿死I/O)
long ioTime = System.nanoTime() - ioStartTime;
long ioRatio = this.ioRatio;
if (ioRatio == 100) {
// 全部时间给任务
runAllTasks();
} else {
// 按比例分配时间
long taskTime = ioTime * (100 - ioRatio) / ioRatio;
runAllTasks(taskTime);
}
} catch (Throwable t) {
// 异常处理
}
}
}
精妙之处:
ioRatio默认50:I/O和任务各一半时间JDK的Bug:epoll在某些情况下会立即返回,但实际没有事件,导致CPU 100%
Netty的修复:
// 在NioEventLoop中
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
for (;;) {
int selectedKeys = selector.select(timeoutMillis);
selectCnt++;
// 如果有事件,或者有任务,重置计数
if (selectedKeys != 0 || hasTasks() || ...) {
selectCnt = 0;
}
// 空轮询超过512次?重建Selector!
else if (selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 老Selector,你累了,退休吧
rebuildSelector();
selectCnt = 0;
}
}
翻译:
如果select()总是立即返回但没事件,说明Selector“精神失常”了。Netty数着次数,超过512次就送它去“精神病院”(重建),换个新的Selector。
症状:内存缓慢增长,最终OOM
错误代码:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
// 处理buf...
// 忘记 release()!内存泄漏!
}
正确代码:
// 方法1:手动释放
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
try {
// 处理buf...
} finally {
buf.release(); // 一定要释放!
}
}
// 方法2:让SimpleChannelInboundHandler自动释放
public class MyHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// 处理buf...
// 不用release(),父类会处理
}
}
// 方法3:引用计数
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
ByteBuf sliced = buf.slice(); // 切片,引用计数+1
// 要传递下去?调用retain()
ctx.fireChannelRead(sliced.retain()); // 引用计数+1
// 现在有责任释放原始buf
buf.release();
}
记住规则:谁最后使用,谁负责释放!
错误代码:
// 全局单例Handler
public class GlobalHandler extends ChannelInboundHandlerAdapter {
private int count = 0; // 多个Channel共享,线程不安全!
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
count++; // 竞态条件!
}
}
// 多个Channel共享同一个实例
pipeline.addLast(globalHandler); // 危险!
正确做法:
// 要么每次new新的
pipeline.addLast(new MyHandler()); // 每个Channel独立实例
// 要么标记@Sharable,并确保线程安全
@Sharable
public class SafeHandler extends ChannelInboundHandlerAdapter {
// 无状态,或用AtomicInteger等线程安全类
private AtomicInteger count = new AtomicInteger(0);
}
错误代码:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 在EventLoop线程中执行耗时操作
Thread.sleep(5000); // 阻塞5秒!
// 这个EventLoop上的所有Channel都卡住了!
}
正确做法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 提交到业务线程池
businessExecutor.execute(() -> {
// 耗时操作,比如查数据库
String result = queryFromDatabase(msg);
// 写回结果(必须回到Channel的EventLoop)
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(result);
});
});
}
Netty 5的“雄心壮志” :
Netty 4.x的“哲学” :简单、稳定、高性能
建议:生产环境用Netty 4.x最新稳定版。Netty 5?等它成熟再说。
Netty成功的秘诀:
用一句话总结Netty:
最后,记住Netty的三条“军规”:
一天一个开源项目(第57篇):Unsloth - 2x 更快、70% 更省显存的 LLM 微调库
活用 Claude Code : 从协作者变成可编程的智能基础设施
2026-03-29
2026-03-29