Netty-复习
浅谈Stream
在计算和编程领域,stream(流)通常指的是一系列数据的传输方式。流是数据的持续序列,允许程序在数据没有完全加载或没有完全存储的情况下进行处理。流的使用在处理大数据集、实时数据传输或资源有限的环境下尤为重要。
本质上,流只是一种对于数据的处理手段,相当于给出了接口,没有给出具体的实现。
常见的流有以下几种
输入输出流(I/O Stream)
I/O流是编程中常见的一种流操作,用于数据的输入和输出。数据可以是文件、网络通信、设备、内存等。I/O流分为两类:
- 字节流(Byte Stream):以字节为单位处理数据,适合处理二进制文件,如图片、音频等。常见类:
InputStream
、OutputStream
。 - 字符流(Character Stream):以字符为单位处理数据,适合处理文本数据。常见类:
Reader
、Writer
。
- 字节流(Byte Stream):以字节为单位处理数据,适合处理二进制文件,如图片、音频等。常见类:
网络流(Network Stream)
网络流用于在网络中传输数据。它通常涉及传输控制协议(TCP)或用户数据报协议(UDP)。通过使用网络流,可以实时传输数据,如视频、音频或文件。
视频和音频流
在多媒体应用中,视频和音频流是连续的数据流。通过流媒体技术,用户可以在数据还在传输的过程中观看视频或听音乐,而无需等待整个文件下载完毕。常见协议包括RTSP(实时流协议)、HLS(HTTP直播流)等。
数据流处理(Data Stream Processing)
数据流处理是指在数据流动的过程中实时分析和处理数据。这种技术特别适合处理海量数据或实时数据。
- 批处理 vs 流处理:批处理一次处理大批量的数据,而流处理是实时的、增量的处理数据。
- 工具和框架:常见的数据流处理框架包括Apache Kafka、Apache Flink、Apache Storm等。
Java 8 Stream API
在Java 8中,
Stream
是一个用于操作数据集合的API。它允许对集合(如List、Set等)进行过滤、排序、映射和归约等操作,通常与Lambda表达式一起使用。- 中间操作:如
filter
、map
,这些操作不会立即执行,而是生成新的流。 - 终端操作:如
forEach
、collect
,这些操作会触发流的处理,并得到结果。
- 中间操作:如
…
按照我的理解,流的实现,本质上就是一种对于数据的动态持续处理方式的实现,或许可以把最简单的流理解为一个队列,发送方可以发送任意的数据,接收方可以消费任意的数据,为了实现,需要保证接收方可以高效的分离出待处理的各项数据等等。
浅谈Nio
Nio(Non-blocking io:非阻塞IO):NIO的模型本质上是一种事件驱动的架构,通过选择器监听事件变化,使得应用程序在高并发环境中更加高效。
关键特性:
- NIO允许通道(Channel)在执行I/O操作时不阻塞线程,可以继续处理其他任务。线程可以轮询多个通道,避免一个连接阻塞影响整个服务器。
- NIO中的数据处理是基于缓冲区的。数据读写不直接作用于流(Stream),而是通过缓冲区(如
ByteBuffer
)。应用程序将数据写入缓冲区,或者从缓冲区中读取数据,从而提供了更高效的内存管理机制。
组成部分:
- 通道(Channel)
- 选择器(Selector)
- 缓冲区(ByteBuffer)
- 多路复用机制
- …
为什么要面向缓冲区?
传统的Stream在处理的时候 消费者 和 生产者的关系是不能改变的,同时,基于流的特性,对于流的处理必须要按照顺序,逐个元素的处理。
缓冲区的出现是为了拓展流的功能,思路是:将流传递的数据进行缓存,因为数据被缓存了,就不需要按照顺序,逐个元素的处理,同时也相当于解放了CPU,那么现在我的问题是,数据从到达到被放入缓冲区的过程是怎么样的?
- 硬件接收数据:硬件层(如网卡、磁盘等)接收到数据。
- 操作系统处理数据:通过中断处理机制,操作系统内核将数据包重组或读取文件数据,存储在内核缓冲区中。
- 应用程序请求数据:应用程序通过系统调用获取数据。
- 数据从内核态复制到用户态:操作系统将内核缓冲区中的数据复制到用户态的缓冲区(如
ByteBuffer
)。 - 应用程序处理缓冲区中的数据:应用程序在用户态缓冲区中读取、写入或处理数据。
其实,NIO相比较于BIO优化主要在于用户态,对于内核态操作系统对于数据的接收,两者是一样的,相较于BIO,NIO使用了Channel来抽象对于内核缓冲区数据的处理,使用一个Selector线程来循环查看各个Channel的情况,如果Channel可以读出数据,就读取数据,如果Channel不能读出数据,就去查看其他的Channel,而对于BIO来说,BIO没有Channel这一层抽象,对于缓冲区的数据,当BIO尝试获取却获取不到的时候,BIO进程会等待数据的到达,这也就是阻塞。
NIO的非阻塞机制本质上是通过减少无效的等待时间来优化资源利用,而在内核态层面,BIO和NIO的底层数据接收机制是相同的,都依赖于操作系统的中断、协议栈和缓冲区机制。NIO的核心优化点就是在用户态的I/O操作方式,使得应用程序可以更灵活地处理大量并发连接。
浅谈操作系统层面的数据接收与多路复用机制
在Linux下使用NIO配合操作系统接收数据的全流程如下:
- 内核通过 硬件设备(网卡,其他IO设备, …) 经由DMA技术,将数据读取到Ring Buffer,并触发操作系统硬中断
- 内核暂时屏蔽硬中断,触发软中断,调用专用的内核线程来将数据从Ring Buffer读取数据,并将数据交给 网络协议栈(或者其他可以解析数据的技术)来处理
- 在处理硬中断时,内核会尽量快速完成最低限度的操作,以便释放CPU资源,并将更多复杂的工作推给软中断。在这个阶段,内核通常不会“屏蔽”硬中断,而是确保中断处理程序尽快返回,使得系统能够处理其他中断请求。硬中断只会执行非常小的一部分工作,主要是将数据转移到缓冲区,然后立即返回。
- 软中断会通过
ksoftirqd
线程或者直接在内核上下文中执行。软中断负责从Ring Buffer中读取数据并将其交给协议栈处理。该过程可能直接通过软中断完成,也可能在多核CPU情况下由不同的ksoftirqd
线程并行处理。
- 负责处理网络协议栈的各个线程经过处理后,将数据放到内核缓冲区中,并将对应的文件描述符与内核缓冲区进行绑定
- 操作系统会将内核中的各个文件描述符组织起来,(select、poll、epoll),并对外暴露多路复用接口
- **
select()
**:使用的是一个固定大小的文件描述符集合(FD集合),其效率相对较低,特别是当文件描述符数量较多时。 - **
poll()
**:使用的是动态数组来管理文件描述符,没有固定的上限,但与select
一样,每次调用都要遍历整个数组,性能并不出色。 - **
epoll()
**:使用的是基于红黑树的高效事件通知机制,支持大规模并发连接,且不会像select
和poll
那样每次遍历所有描述符,只处理活跃的描述符,效率更高。
- **
- 用户态的应用调用对应的获取数据的接口(rede()、recv()…),内核会检测对应文件描述符对应的内核缓冲区中是否存在数据,若是存在,则拷贝到用户态
- 在应用程序调用
read()
或recv()
时,内核通过多路复用机制(如epoll
)会检测哪个文件描述符上有数据可读。内核并不主动将数据推送到用户态,而是等待用户态程序通过系统调用来获取。当数据准备好后,内核将从对应的socket buffer
中取出数据,并将其拷贝到用户态的缓冲区中。
- 在应用程序调用
- 用户态对数据进行处理
- …
NIO的三大组件,Selector、Channel、ByteBuffer,其实是模仿操作系统的多路复用机制。Selector相当于操作系统中对应epoll、poll、select,Channle相当于操作系统对应的各个文件描述符,ByteBuffer相当于内核缓冲区。
NIO中的这三大组件实际上是对操作系统底层机制的抽象,提供了更高层次的、面向对象的API,使得开发者可以高效地管理I/O操作,并利用操作系统提供的多路复用能力来处理大量并发连接。
Netty
NIO解决了如何高效的处理多并发连接和读取数据的问题,Netty则是基于NIO,给出一套通用的高性能、可维护的网络客户端和服务端的框架
Netty 的优势
- Netty vs NIO,工作量大,bug 多
- 需要自己构建协议
- 解决 TCP 传输问题,如粘包、半包
- epoll 空轮询导致 CPU 100%
- 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
- Netty vs 其它网络应用框架
- Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
- 久经考验,16年,Netty 版本
- 2.x 2004
- 3.x 2008
- 4.x 2013
- 5.x 已废弃(没有明显的性能提升,维护成本高)
组件
EventLoop
事件循环对象,EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 parent 方法来看看自己属于哪个 EventLoopGroup
EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
Channel
channel 的主要作用
- close() 可以用来关闭 channel
- closeFuture() 用来处理 channel 的关闭
- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入
- writeAndFlush() 方法将数据写入并刷出
Future & Promise
netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
Handler & Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
这个是Netty的灵魂,也是Netty可以高拓展性的来源。
ByteBuf
是对字节数据的封装,相较于ByteBuffer,提供了更加灵活的数据管理,提供了池化、直接内存等选择。
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
看看Netty源码
我得先复习复习Netty的各项操作,待定ing