浅谈Stream

在计算和编程领域,stream(流)通常指的是一系列数据的传输方式。流是数据的持续序列,允许程序在数据没有完全加载或没有完全存储的情况下进行处理。流的使用在处理大数据集、实时数据传输或资源有限的环境下尤为重要。

本质上,流只是一种对于数据的处理手段,相当于给出了接口,没有给出具体的实现。

常见的流有以下几种

  • 输入输出流(I/O Stream)

    I/O流是编程中常见的一种流操作,用于数据的输入和输出。数据可以是文件、网络通信、设备、内存等。I/O流分为两类:

    • 字节流(Byte Stream):以字节为单位处理数据,适合处理二进制文件,如图片、音频等。常见类:InputStreamOutputStream
    • 字符流(Character Stream):以字符为单位处理数据,适合处理文本数据。常见类:ReaderWriter
  • 网络流(Network Stream)

    网络流用于在网络中传输数据。它通常涉及传输控制协议(TCP)或用户数据报协议(UDP)。通过使用网络流,可以实时传输数据,如视频、音频或文件。

  • 视频和音频流

    在多媒体应用中,视频和音频流是连续的数据流。通过流媒体技术,用户可以在数据还在传输的过程中观看视频或听音乐,而无需等待整个文件下载完毕。常见协议包括RTSP(实时流协议)、HLS(HTTP直播流)等。

  • 数据流处理(Data Stream Processing)

    数据流处理是指在数据流动的过程中实时分析和处理数据。这种技术特别适合处理海量数据或实时数据。

    • 批处理 vs 流处理:批处理一次处理大批量的数据,而流处理是实时的、增量的处理数据。
    • 工具和框架:常见的数据流处理框架包括Apache Kafka、Apache Flink、Apache Storm等。
  • Java 8 Stream API

    在Java 8中,Stream是一个用于操作数据集合的API。它允许对集合(如List、Set等)进行过滤、排序、映射和归约等操作,通常与Lambda表达式一起使用。

    • 中间操作:如filtermap,这些操作不会立即执行,而是生成新的流。
    • 终端操作:如forEachcollect,这些操作会触发流的处理,并得到结果。

按照我的理解,流的实现,本质上就是一种对于数据的动态持续处理方式的实现,或许可以把最简单的流理解为一个队列,发送方可以发送任意的数据,接收方可以消费任意的数据,为了实现,需要保证接收方可以高效的分离出待处理的各项数据等等。

浅谈Nio

Nio(Non-blocking io:非阻塞IO):NIO的模型本质上是一种事件驱动的架构,通过选择器监听事件变化,使得应用程序在高并发环境中更加高效

关键特性

  • NIO允许通道(Channel)在执行I/O操作时不阻塞线程,可以继续处理其他任务。线程可以轮询多个通道,避免一个连接阻塞影响整个服务器。
  • NIO中的数据处理是基于缓冲区的。数据读写不直接作用于流(Stream),而是通过缓冲区(如ByteBuffer)。应用程序将数据写入缓冲区,或者从缓冲区中读取数据,从而提供了更高效的内存管理机制。

组成部分:

  • 通道(Channel)
  • 选择器(Selector)
  • 缓冲区(ByteBuffer)
  • 多路复用机制

为什么要面向缓冲区?

传统的Stream在处理的时候 消费者 和 生产者的关系是不能改变的,同时,基于流的特性,对于流的处理必须要按照顺序,逐个元素的处理。

缓冲区的出现是为了拓展流的功能,思路是:将流传递的数据进行缓存,因为数据被缓存了,就不需要按照顺序,逐个元素的处理,同时也相当于解放了CPU,那么现在我的问题是,数据从到达到被放入缓冲区的过程是怎么样的?

  • 硬件接收数据:硬件层(如网卡、磁盘等)接收到数据。
  • 操作系统处理数据:通过中断处理机制,操作系统内核将数据包重组或读取文件数据,存储在内核缓冲区中。
  • 应用程序请求数据:应用程序通过系统调用获取数据。
  • 数据从内核态复制到用户态:操作系统将内核缓冲区中的数据复制到用户态的缓冲区(如ByteBuffer)。
  • 应用程序处理缓冲区中的数据:应用程序在用户态缓冲区中读取、写入或处理数据。

其实,NIO相比较于BIO优化主要在于用户态,对于内核态操作系统对于数据的接收,两者是一样的,相较于BIO,NIO使用了Channel来抽象对于内核缓冲区数据的处理,使用一个Selector线程来循环查看各个Channel的情况,如果Channel可以读出数据,就读取数据,如果Channel不能读出数据,就去查看其他的Channel,而对于BIO来说,BIO没有Channel这一层抽象,对于缓冲区的数据,当BIO尝试获取却获取不到的时候,BIO进程会等待数据的到达,这也就是阻塞。

NIO的非阻塞机制本质上是通过减少无效的等待时间来优化资源利用,而在内核态层面,BIO和NIO的底层数据接收机制是相同的,都依赖于操作系统的中断、协议栈和缓冲区机制。NIO的核心优化点就是在用户态的I/O操作方式,使得应用程序可以更灵活地处理大量并发连接。

浅谈操作系统层面的数据接收与多路复用机制

在Linux下使用NIO配合操作系统接收数据的全流程如下:

  • 内核通过 硬件设备(网卡,其他IO设备, …) 经由DMA技术,将数据读取到Ring Buffer,并触发操作系统硬中断
  • 内核暂时屏蔽硬中断,触发软中断,调用专用的内核线程来将数据从Ring Buffer读取数据,并将数据交给 网络协议栈(或者其他可以解析数据的技术)来处理
    • 在处理硬中断时,内核会尽量快速完成最低限度的操作,以便释放CPU资源,并将更多复杂的工作推给软中断。在这个阶段,内核通常不会“屏蔽”硬中断,而是确保中断处理程序尽快返回,使得系统能够处理其他中断请求。硬中断只会执行非常小的一部分工作,主要是将数据转移到缓冲区,然后立即返回。
    • 软中断会通过ksoftirqd线程或者直接在内核上下文中执行。软中断负责从Ring Buffer中读取数据并将其交给协议栈处理。该过程可能直接通过软中断完成,也可能在多核CPU情况下由不同的ksoftirqd线程并行处理。
  • 负责处理网络协议栈的各个线程经过处理后,将数据放到内核缓冲区中,并将对应的文件描述符与内核缓冲区进行绑定
  • 操作系统会将内核中的各个文件描述符组织起来,(select、poll、epoll),并对外暴露多路复用接口
    • **select()**:使用的是一个固定大小的文件描述符集合(FD集合),其效率相对较低,特别是当文件描述符数量较多时。
    • **poll()**:使用的是动态数组来管理文件描述符,没有固定的上限,但与select一样,每次调用都要遍历整个数组,性能并不出色。
    • **epoll()**:使用的是基于红黑树的高效事件通知机制,支持大规模并发连接,且不会像selectpoll那样每次遍历所有描述符,只处理活跃的描述符,效率更高。
  • 用户态的应用调用对应的获取数据的接口(rede()、recv()…),内核会检测对应文件描述符对应的内核缓冲区中是否存在数据,若是存在,则拷贝到用户态
    • 在应用程序调用read()recv()时,内核通过多路复用机制(如epoll)会检测哪个文件描述符上有数据可读。内核并不主动将数据推送到用户态,而是等待用户态程序通过系统调用来获取。当数据准备好后,内核将从对应的socket buffer中取出数据,并将其拷贝到用户态的缓冲区中。
  • 用户态对数据进行处理

NIO的三大组件,Selector、Channel、ByteBuffer,其实是模仿操作系统的多路复用机制。Selector相当于操作系统中对应epoll、poll、select,Channle相当于操作系统对应的各个文件描述符,ByteBuffer相当于内核缓冲区。

NIO中的这三大组件实际上是对操作系统底层机制的抽象,提供了更高层次的、面向对象的API,使得开发者可以高效地管理I/O操作,并利用操作系统提供的多路复用能力来处理大量并发连接。

Netty

NIO解决了如何高效的处理多并发连接和读取数据的问题,Netty则是基于NIO,给出一套通用的高性能、可维护的网络客户端和服务端的框架

Netty 的优势

  • Netty vs NIO,工作量大,bug 多
    • 需要自己构建协议
    • 解决 TCP 传输问题,如粘包、半包
    • epoll 空轮询导致 CPU 100%
    • 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
  • Netty vs 其它网络应用框架
    • Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
    • 久经考验,16年,Netty 版本
      • 2.x 2004
      • 3.x 2008
      • 4.x 2013
      • 5.x 已废弃(没有明显的性能提升,维护成本高)

组件

EventLoop

事件循环对象,EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup
EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

image-20240908171454571

Channel

channel 的主要作用

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入
  • writeAndFlush() 方法将数据写入并刷出

Future & Promise

netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

Handler & Pipeline

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

这个是Netty的灵魂,也是Netty可以高拓展性的来源。

image-20240908171329370

image-20240908171348243

ByteBuf

是对字节数据的封装,相较于ByteBuffer,提供了更加灵活的数据管理,提供了池化、直接内存等选择。

方法签名 含义 备注
writeBoolean(boolean value) 写入 boolean 值 用一字节 01|00 代表 true|false
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串

看看Netty源码

我得先复习复习Netty的各项操作,待定ing