Okio中“神奇”的IO库
2018-02-06 10:58 阅读(297)

本节主要讲讲 Okhttp 底层使用的 IO 库--Okio,Okio 同样是 Square 公司推出的增强型 IO处理库,旨在增强原生 Java IO 流的处理,以更加简便,高效的方式处理 IO 流操作。接下来我们从以下方面来分析它。


Okio 的特点和优势


我们知道 Java 原生的 IO 处理已经很强大了,有针对字节和字符的输入输出接口,实现有缓存的处理,以及各种子类实现比如文件的(FileInputStream 和 FileOutputStream),数据的(DataInputStream 和 DataOutputStream),对象的(ObjectInputStream 和 ObjectOutputStream)等等。为什么 Square 还要搞出个 Okio 来呢?其实吧,我们要明白,Okio 不是用来完全取代原生 IO 的,事实上它本身也是基于原生 IO 之上的,比如要从文件得到一个文件输入流,还是得通过 FileIntputStream 来得到,所以 Okio 的用意不是取代,而是在某些场合作更加优化的使用,意思就是你原生 IO 有些地方没有做好,我要用我自己的方式得到更高效简便的 IO 处理。那么 Okio 具体有哪些优势呢?主要有以下:

精简的api接口

我们知道原生的 Java IO 流使用是比较复杂的,基础的字节流接口有 InpuStream 和 OutputStream,字符流接口有 Reader 和 Writer,每个接口都有很多实现的子类,里面大量使用了装饰着模式。假如我要创建一个 DataOutputStream 用于将一些数据类型数据输出到文件中,我可能需要经历 FileOutputStream->BufferedOutputStream->DataOutputStream的创建过程。而如果使用 Okio 来操作的话可以很简单。

File file = new File(Environment.getExternalStorageDirectory() + "/" + "output.txt");
    String name = "xiaoming";
    int age = 11;

    //使用原生IO
    DataOutputStream dos = null;
    try {
      dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
      dos.writeChars(name);
      dos.writeInt(age);
    }catch (IOException ex){
      ex.printStackTrace();
    }finally {
      try {
        if(dos != null){
          dos.close();
        }
      }catch (Exception ex){
        ex.printStackTrace();
      }
    }

    //使用Okio
    try {
      Okio.buffer(Okio.sink(file)).writeUtf8(name).writeInt(age).close();
    } catch (FileNotFoundException e) {
      e.printStackTrace();
    }catch (IOException ex){
      ex.printStackTrace();
    }

从上面可以看出 Okio 可以很方便的使用链式调用,一句代码就可以完成流的创建->操作->关闭,是不是有一气呵成的感觉。

Okio 为了精简我们常用的字节字符流操作,抽象出了 BufferedSource 缓存流输入和BufferedSink 缓存流输出接口,它们具有处理字节和字符,和数据缓存的功能。这么看,是不是 Okio 一个接口实现了原生 IO 中三个接口的功能呢。

性能高效的缓存处理功能

这应该是推出 Okio 的关键原因吧,Okio 不满足于原生 IO 中 BufferedOutputStream 等缓存流的简单粗暴的缓存处理,转而自己使用更高效的方式处理处理流中数据缓存的操作。我们看看原生中缓存的处理。

原生中缓存的处理是这样的,每个 buffer 流中维护一个默认大小 8192 的 buf 数组,自己负责创建和回收,当缓存的数据快要满时(buf 剩余空间不足以存储这一次的数据时),就会将 buf 中的缓存数据全部输出,然后重新缓存。如果一次操作的数据大小大于缓存的大小(默认8192),那么缓存就没法使用了,因为缓存一次性存不了这么多数据。
然后原生中的缓存与缓存之间没有直接的交流,这样造成的影响是,输入流中的数据转移到输出流中是:输入 buf -> 临时 byte 数组 -> 输出 buf,经历两次拷贝。

原生中的缓存功能看起来并不高效,存在不少问题,Okio 要改变这种窘境。

可见 Okio 为了提高 IO 缓存的高效处理性能可谓是煞费苦心,从输入输出缓存的直接数据对接,到内部 Segment 结构的引入,以 Segment 为单位进行数据操作的高效,以及 Segment池的引入等等不一而足,为的就是尽可能快的完成 IO 操作。

TimeOut 超时的引入

我们知道原生 IO 中是没有超时这个机制的,如果在输入或输出过程中发生阻塞,那么在这个过程中就没有好的方式对它进行中断操作,在抛出 IO 异常前可能会一直阻塞下去,这显然不时我们想要的结果,我们希望如果如果它在阻塞到一定时间后能够抛出异常告诉我们发生 TimeOut 超时了。因此 Okio 推出了 TimeOut 机制,实现有 TimeOut(同步计时的超时处理)和 AsyncTimeOut(异步计时的超时处理)。

我想在描述了以上优点和它大致的实现原理之后,你已经对 Okio 已经有初步的了解了。大概知道了它有什么功能,是怎么样的设计理念。接下来对 Okio 中重要的类作一个简单的介绍,能让你快速熟悉类的结构。


 Okio结构分析


Okio 有一些重要类:

Source,Sink

Okio 中封装的输入流接口和输出流接口,对应原生 IO 的 InputStream 和 OutputStream。分别引入了 read 方法用于直接将数据读取到传入的 Sink 的 Buffer 缓存中,和引入了 write方法用于直接从传入的 Source 缓存中读取数据并写入到自己的 Buffer 缓存中。然后还有timeout 提供超时接口。

BufferedSource,BufferedSink

带有缓存功能的 Source 接口,Sink 接口,分别继承自 Source 和 Sink。同时提供一系列读写字节,字符数据的接口。

Okio

Okio 是 Okio 库的入口类,也是工厂类,它提供 source 方法得到一个 Source 输入流,提供 sink 方法得到一个 Sink 输出流,提供 buffer 方法得到具有缓存功能的 Source 或者 Sink 对象。它提供对 File,Socket,和(InputStream,OutputStream)三种类型的源进行操作,可见,Okio 其实是构建在(InputStream,OutputStream)之上的,得到封装之后的(Source,Sink)。

Segment

SegmentPool

管理 Segment 的池,使用单链表记录无用的 Segment,提供了 take 获取一个可用的 Segment,提供 recycle 将无用的 Segment 进行回收或维护。如果 SegmentPool 中的 Segment 的数量小于32个,recycle 时会将它加入到单链表中记录起来,同时重置 pos 和 limit以方便后期的 Segment 重用。如果超过了32个了,则 recycle 不进行任何操作,这将导致该 Segment 没有任何引用了,也就将会被回收了。

Buffer

Okio 的核心类,用于数据缓存的管理,它实现了 BufferedSource 和 BufferedSink 接口,它还支持两个 Buffer 之间数据的转移(copyTo,注意是转移,不是拷贝,转移的话就是数据指向发送改变了,速度不是拷贝能比的),这就是为啥 Buffer 这么牛逼的原因了,因为它是唯一一个既能进行读取数据管理,又能进行写入数据管理,而且相互之间还能直接数据转移操作,真是神一样的存在。

RealBufferedSource,RealBufferedSink

RealBufferedSource 是缓存 Source 接口的具体实现,继承自 BufferedSource。同时提供一系列读取字节,字符数据的接口。内部的操作基本都是有 Buffer 来参与处理的,首先会调用 request 来读取 source 里的一段数据到 Buffer 中,然后后续的读取数据都是从Buffer 中读的。
RealBufferedSink 缓存 Sink 接口的具体实现,继承自 BufferedSink。同时提供一系列写入字节,字符数据的接口。内部的操作基本都是有 Buffer 来参与处理的,首先会将数据写到Buffer 中,然后调用 emitCompleteSegments,如果 Buffer 存储缓存数据的 size 小于 Segment 大小的一半,即1024的话,不会可以继续缓存,否则会将缓存的内容全部写到输出中。


Okio 的流程


上面已经说明了大部分 Okio 相关类的信息和作用了,那么 Okio 是怎么样的一个执行流程呢?我想从一个简单的入口,来逐步分析 Okio 在此期间经历了些什么。下面是将 srcFile文件的数据全部复制到文件 destFile 中

try {
  Okio.buffer(Okio.sink(destFile)).writeAll(Okio.buffer(Okio.source(srcFile)));
} catch (FileNotFoundException e) {
  e.printStackTrace();
}catch (IOException ex){
  ex.printStackTrace();
}

看看,一句话就搞定了两个文件之间的数据复制。它其实等价于

try {
  RealBufferedSource source = Okio.buffer(Okio.sink(srcFile));
  RealBufferedSink sink = Okio.buffer(Okio.sink(destFile));
  sink.writeAll(source);
} catch (FileNotFoundException e) {
  e.printStackTrace();
}catch (IOException ex){
  ex.printStackTrace();
}

也就是进入到 RealBufferedSink.writeAll 方法了,我们看里面做了些什么

final class RealBufferedSink implements BufferedSink {
  @Override 
  public long writeAll(Source source) throws IOException {
    if (source == null) throw new IllegalArgumentException("source == null");
    long totalBytesRead = 0;
    //循环中,不断从source读取信息到Buffer中
    for (long readCount; (readCount = source.read(buffer, Segment.SIZE)) != -1; ) {
      totalBytesRead += readCount;
      //考虑将Buffer的数据输出到sink
      emitCompleteSegments();
    }
    return totalBytesRead;
  }
}

读取数据的处理

我们先看看 source.read(buffer, Segment.SIZE)是怎么完成读取工作的

final class RealBufferedSource implements BufferedSource {
  @Override 
  public long read(Buffer sink, long byteCount) throws IOException {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");

    if (buffer.size == 0) {
      //如果当前source中的buffer缓存没有数据,则先读取一部分数据到缓存中,省的每次都从source读取
      long read = source.read(buffer, Segment.SIZE);
      if (read == -1) return -1;
    }

    //接着就是从buffer中读取byteCount个数据到sink中,但是byteCount大小不能超过buffer缓存的大小,因为当前只有这么多缓存数据
    long toRead = Math.min(byteCount, buffer.size);
    return buffer.read(sink, toRead);
  }
}

注意不要被上面的 sink 和 buffer 给迷惑了,sink 表示要写入到 Sink 的缓存 Buffer,而buffer 是当前 source 的缓存 Buffer。上面做的就是首先从 source 读取一段数据到自己的buffer 中,然后再从 buffer 读取数据到对方的 sink 缓存中,虽然指定了要读取 byteCount个数据,但是实际能读的大小要看 buffer的size,最大不会超过 Segment.SIZE。
这里你可能会疑惑 source.read(buffer, Segment.SIZE) 和 buffer.read(sink, toRead) 是怎么实现的。
我们先看看 source.read(buffer, Segment.SIZE)

public final class Okio {
  private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override 
      public long read(Buffer sink, long byteCount) throws IOException {
        //这里就是不带缓存的Source的读取方式
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        //判断是否超时(同步方式)
        timeout.throwIfReached();
        //这里获取sink缓存中最后一个Segment,准备将数据读取到这个Segment中
        Segment tail = sink.writableSegment(1);
        //判断这个Segment还能容纳多少数据
        int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
        //这里是从原生的InputStream输入流中读取数据到Segment的数组中
        int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
        if (bytesRead == -1) return -1;
        tail.limit += bytesRead;
        sink.size += bytesRead;
        return bytesRead;
      }

      @Override 
      public void close() throws IOException {
        in.close();
      }

      @Override 
      public Timeout timeout() {
        return timeout;
      }

      @Override 
      public String toString() {
        return "source(" + in + ")";
      }
    };
  }
}

它的真实实现是在这里的,就是先找到 sink 缓存的最后一个 Segment,然后将从InputStream 输入流中读取数据到该 Segment 的数组中。所以它完成的操作就是从输入流读取一部分数据到 buffer 缓存中。 接着再看 buffer.read(sink, toRead) 是怎么实现的。

public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
  @Override 
  public long read(Buffer sink, long byteCount) {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (size == 0) return -1L;
    if (byteCount > size) byteCount = size;
    //调用write方法,将自身Buffer的部分数据写到sink的Buffer中
    sink.write(this, byteCount);
    return byteCount;
  }
}

很简单就是,Buffer 的 read 操作就相当于调用对方 Buffer 的 write 写入数据,其实就是两个 Buffer 之间数据的传递过程,这是重点部分,我们在后面重点讲解。接下来我们看Buffer 有了数据之后是怎么输出到输出流的。

写入数据的处理

数据读取到 Buffer 缓存之后,我们再看 emitCompleteSegments 是干嘛的。

final class RealBufferedSink implements BufferedSink {
  @Override 
  public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    //这里是计算buffer是否有Segment的数据已经满了,如果有的话,就会将满了的Segment数据写入到sink中
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
  }
}
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
  /**
   * Returns the number of bytes in segments that are not writable. This is the
   * number of bytes that can be flushed immediately to an underlying sink
   * without harming throughput.
   */
  public long completeSegmentByteCount() {
    // result是Buffer中所有的Segment的有效数据大小
    long result = size;
    if (result == 0) return 0;

    // Omit the tail if it's still writable.
    Segment tail = head.prev;
    if (tail.limit < Segment.SIZE && tail.owner) {
      result -= tail.limit - tail.pos;
    }

    //tail.limit - tail.pos; 是最后一个Segment的有效数据大小
    //所以result如果大于0,说明Buffer中至少有两个Segment了,也就是有Segment满了。

    return result;
  }
}

在 emitCompleteSegments 中判断 Buffer 中的是否有两个以上的 Segment 了(也就是说有 Segment 满了),如果有的话,会将之前满了的 Segment 数据全部输出到 sink 中(留下最后一段未满的 Segment 数据继续作为缓存用),也就是将满的那部分缓存数据 flush到输出流中。


Buffer写数据的精华操作


Buffer 的 write(Buffer source, long byteCount)方法是 Buffer 缓存处理中的精华操作,它描述的是将一个 Buffer 的数据转移到另一个 Buffer 中时,是怎么样的一个处理过程。

if (source == null) throw new IllegalArgumentException("source == null");
    if (source == this) throw new IllegalArgumentException("source == this");

    //检查操作,读取的byteCount大小不能超过source缓存Buffer的大小。
    checkOffsetAndCount(source.size, 0, byteCount);

    while (byteCount > 0) {
      // Is a prefix of the source's head segment all that we need to move?
      //要复制的数据size小于source中Segment有效数据的size
      if (byteCount < (source.head.limit - source.head.pos)) {
        //获取写入Buffer中最后一个可写的Segment
        Segment tail = head != null ? head.prev : null;
        if (tail != null && tail.owner
            && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
          // Our existing segments are sufficient. Move bytes from source's head to our tail.

          //如果最后一个Segment存在,并且它是独立的(不是共享其他Segment的),要写的byteCount+有效数据大小<Segment的大小
          //也就是说整个Segment的可以容易现存有效数据和要写入的byteCount个数据。
          //则将数据byteCount个数据复制过去。
          //这里首先会将sink的数组数据整体前移offset,然后在复制byteCount个数据到sink中,意思就是丢弃sink前面offset的数据,腾出空间来放更多的数据,所以是验证byteCount+有效数据大小<Segment的大小。

          source.head.writeTo(tail, (int) byteCount);
          source.size -= byteCount;
          size += byteCount;
          return;
        } else {
          // We're going to need another segment. Split the source's head
          // segment in two, then move the first of those two to this buffer.

          //如果要写的byteCount数据不能全部写到最后一个Segment中,那就要考虑将source中的Segment进行拆分了
          //拆分之后,就可以将这个byteCount个数组组成的Segment移动到新的Buffer中,不用复制数据
          source.head = source.head.split((int) byteCount);
        }
      }

      // Remove the source's head segment and append it to our tail.
      //将这个source的Segment从头部移除,添加到自己Buffer的尾部
      Segment segmentToMove = source.head;
      long movedByteCount = segmentToMove.limit - segmentToMove.pos;
      //将这个source的Segment从头部移除
      source.head = segmentToMove.pop();
      if (head == null) {
        head = segmentToMove;
        head.next = head.prev = head;
      } else {
        Segment tail = head.prev;
        tail = tail.push(segmentToMove);

        //添加了新的Segment之后,看看这个Segment能不能和前一个Segment进行数据合并,节省出一个Segment空间
        tail.compact();
      }
      source.size -= movedByteCount;
      size += movedByteCount;
      byteCount -= movedByteCount;
    }
  }
}

这里根据不同情况进行处理。

   1. 首先判断操作的 byteCount 是否是在 source 的当前 Segment 范围内。如果是在范围内,判断操作的 byteCount 数据在要写入的 Segment 中是否容纳的下,如果容纳的下,则将数据复制过去(copy),返回完成操作。如果容纳不下,就考虑进行 split 拆分,拆分之后,就不考虑复制数据了,而是后面的直接将整个 Segment 移动到目标Buffer 中(move)。

   2. 接下来就是移动 Segment 了,直接从 soucrce 中移除头部的 Segment,然后添加到目标 Buffer 的 Segment 尾部,添加之后,判断是否需要和前一个 Segment 进行数据合并,以腾出一个 Segment 空间。

所以我们可以发现,这个操作非常精彩,对于目标 Segment 容纳的下的小段数据,采用直接复制的方法,而大段的 Segment 数据,则是直接移动,而不是复制,只是一个引用指向的变化,那效率超级高啊,这个设计很绝妙,所以说 Okio 为什么要设计成一小段的 Segment,因为段小好操作啊,你要复制多少个数据,我可以根据情况来考虑大段的整个移动,小段的采用复制,而如果像原生 IO 那一大段的数组,就只能乖乖的采用复制的方法了。
接下来我们分析小段数据的复制,slit 分割,compact 合并的实现。
先看小段数据的复制

final class Segment {
  /** Moves {@code byteCount} bytes from this segment to {@code sink}. */
  public void writeTo(Segment sink, int byteCount) {
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      //其实就是将sink数组的数据整体前移pos个位置,丢弃pos之前的数据
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }

    //将数据复制到sink数组中
    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
    sink.limit += byteCount;
    pos += byteCount;
  }
}

很简单,就是处理了sink数组的整体前移,然后将数据复制到 sink中。接下来看 split 分割的实现

final class Segment {
  /**
   * Splits this head of a circularly-linked list into two segments. The first
   * segment contains the data in {@code [pos..pos+byteCount)}. The second
   * segment contains the data in {@code [pos+byteCount..limit)}. This can be
   * useful when moving partial segments from one buffer to another.
   *
   * <p>Returns the new head of the circularly-linked list.
   */
  public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    //创建一个新的Segment,并且声明它是共享的,即共享另外一个Segment的数据
    Segment prefix = new Segment(this);
    //这里声明了新Segment的有效数据范围[pos,pos+byteCount],它作为前置的Segment
    prefix.limit = prefix.pos + byteCount;
    //这里声明了原来Segment的有效数据范围[pos+byteCount,limit],它作为后置的Segment
    pos += byteCount;
    //将新Segment添加到原来Segment的前面,作为前置Segment
    prev.push(prefix);
    return prefix;
  }

  Segment(Segment shareFrom) {
    this(shareFrom.data, shareFrom.pos, shareFrom.limit);
    shareFrom.shared = true;
  }
}

以上将一个 Segment 拆分为相连的两个 Segment,新 Segment 共享原来 Segment 的数据,新 Segment 作为前置 Segment,有效范围[pos,pos+byteCount],原来 Segment 作为后置,有效范围[pos+byteCount,limit]。为什么要 split 分割?就是为了能将单个 Segment 逻辑分为两个 Segment,以便完成 byteCount 的独立操作,比如整体移动,而不用进行耗时的复制操作。因为是共享一个数组数据的,所以没有多占用什么内存空间,只是逻辑上分离,有了各自独立的有效区域标识而已,数据还是公用的,这个想法很秒。接下来再看 compact 合并的实现。

final class Segment {
  /**
   * Call this when the tail and its predecessor may both be less than half
   * full. This will copy data so that segments can be recycled.
   */
  public void compact() {
    if (prev == this) throw new IllegalStateException();
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    //当前Segment的有效数据size
    int byteCount = limit - pos;
    //前一个Segment的可用空间,包括pos之前部分和limit之后的部分
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    //若果前一个Segment的可用空间能容纳当前Segment的数据,则复制数据过去,然后移除当前Segment,交给SegmentPool回收
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    writeTo(prev, byteCount);
    pop();
    SegmentPool.recycle(this);
  }
}

以上判断前一个 Segment 的可用空间是否能容纳当前 Segment 的数据,如果能容纳,则复制数据到前一个中,移除当前的 Segment,交给 SegmentPool 回收,这样可用腾出一个Segment 空间,这个合并数据操作同样也是个优化操作。


Buffer缓存的总结


讲到这里,如果你都明白了的话,我想你已经能体会到 Okio 优化的精妙之处,构建在原生输入输出流的基础上,舍弃原生IO的缓存功能,自己实现一套流读取和写入的缓存机制,大大提高了缓存使用的效率。我们对比原生 IO 和 Okio 缓存处理时,数据从输入流到输出流的工程。


TimeOut 超时机制


TimeOut 超时机制有同步实现的 TimeOut 和异步实现的 AsynTimeOut。我们来分析这两个

TimeOut 同步超时

同步超时可以作用在 sourc 和 sink 中,这里我们以 souce 为例,sink 原理相同

public final class Okio {
  /** Returns a source that reads from {@code in}. */
  public static Source source(final InputStream in) {
    //传入一个默认的Timeout,默认是没有超时效果的
    return source(in, new Timeout());
  }

  private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override 
      public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        //这里就是同步判断是否会超时的,它是阻塞的,如果这个read方法阻塞了,那么它无法进行正常判断了。
        timeout.throwIfReached();
        Segment tail = sink.writableSegment(1);
        int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
        int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
        if (bytesRead == -1) return -1;
        tail.limit += bytesRead;
        sink.size += bytesRead;
        return bytesRead;
      }

      @Override 
      public void close() throws IOException {
        in.close();
      }

      @Override 
      public Timeout timeout() {
        return timeout;
      }

      @Override 
      public String toString() {
        return "source(" + in + ")";
      }
    };
  }
}

我们进去看 timeout.throwIfReached() 的实现

public class Timeout {
  /**
   * Throws an {@link InterruptedIOException} if the deadline has been reached or if the current
   * thread has been interrupted. This method doesn't detect timeouts; that should be implemented to
   * asynchronously abort an in-progress operation.
   */
  public void throwIfReached() throws IOException {
    //如果线程标记为中断了,抛出线程中断异常
    if (Thread.interrupted()) {
      throw new InterruptedIOException("thread interrupted");
    }

    //如果设置了有超时限制,并且当前时间超过了超时时间,则抛出超时异常
    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
      throw new InterruptedIOException("deadline reached");
    }
  }
}

同步超时异常还是很简单的,就是在 read 或者 write 方法中进行时间判断是否超时。它有缺陷就是如果 read 或者 write 发送阻塞了,就不能及时判断是否超时了。所以有了 AsynTimeOut 异步超时的机制。

AsyncTimeOut 异步超时

AsyncTimeOut 在 Okio 中只用作了处理 Socket 中,当然它也可以用到其他地方,由你实现。我们从这里分析

public final class Okio {
  /**
   * Returns a source that reads from {@code socket}. Prefer this over {@link
   * #source(InputStream)} because this method honors timeouts. When the socket
   * read times out, the socket is asynchronously closed by a watchdog thread.
   */
  public static Source source(final Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    //获取一个针对Socket的AsyncTimeout
    AsyncTimeout timeout = timeout(socket);
    //根据socket获取source
    Source source = source(socket.getInputStream(), timeout);
    //这里是重点,包装一个新的Source来处理TimeOut
    return timeout.source(source);
  }

  private static AsyncTimeout timeout(final Socket socket) {
    return new AsyncTimeout() {
      @Override 
      protected IOException newTimeoutException(IOException cause) {
        InterruptedIOException ioe = new SocketTimeoutException("timeout");
        if (cause != null) {
          ioe.initCause(cause);
        }
        return ioe;
      }

      @Override 
      protected void timedOut() {
        //发生超时,关闭Socket
        try {
          socket.close();
        } catch (Exception e) {
          logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
        } catch (AssertionError e) {
          if (e.getCause() != null && e.getMessage() != null
              && e.getMessage().contains("getsockname failed")) {
            // Catch this exception due to a Firmware issue up to android 4.2.2
            // https://code.google.com/p/android/issues/detail?id=54072
            logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
          } else {
            throw e;
          }
        }
      }
    };
  }
}

上面创建了一个 AsycTimeOut 对象,用于处理发生超时时关闭 Socket。根据 Socket 得到一个输入流的 Source,然后交给 AsycTimeOut.source 处理得到一个新的 Source 对象,那么新的 Source 有什么特别的呢,我们往下看,创建一个新的 Source,在 Source 每次在 read 的之前,调用 enter,其实就是scheduleTimeout 来调度 AsyncTimeout,而完成 read 时,调用 exit,其实就是cancelScheduledTimeout 来取消 AsyncTimeout 超时的调度,我们来分析。将 AsyncTimeout 异步超时加入到超时队列中,同时可能需要开启看门狗进行监听(如果看门狗没有启动的话),我们在看看看门狗 Watchdog 的实现。

public class AsyncTimeout extends Timeout {
  private static final class Watchdog extends Thread {
    public Watchdog() {
      super("Okio Watchdog");
      //设置为守护线程
      setDaemon(true);
    }

    public void run() {
      while (true) {
        try {
          //这里是查找一个超时的AsyncTimeout
          AsyncTimeout timedOut = awaitTimeout();

          // Didn't find a node to interrupt. Try again.
          //没找到超时AsyncTimeout,继续
          if (timedOut == null) continue;

          // Close the timed out node.
          //找到超时AsyncTimeout,就通知超时了
          timedOut.timedOut();
        } catch (InterruptedException ignored) {
        }
      }
    }
  }

  /**
   * Removes and returns the node at the head of the list, waiting for it to
   * time out if necessary. Returns null if the situation changes while waiting:
   * either a newer node is inserted at the head, or the node being waited on
   * has been removed.
   */
  private static synchronized AsyncTimeout awaitTimeout() throws InterruptedException {
    // Get the next eligible node.
    AsyncTimeout node = head.next;

    // The queue is empty. Wait for something to be enqueued.
    //如果超时队列空的话,wait
    if (node == null) {
      AsyncTimeout.class.wait();
      return null;
    }

    //当前AsyncTimeout离超时还剩多少时间
    long waitNanos = node.remainingNanos(System.nanoTime());

    // The head of the queue hasn't timed out yet. Await that.
    //等待第一个AsyncTimeout离超时的时间
    if (waitNanos > 0) {
      // Waiting is made complicated by the fact that we work in nanoseconds,
      // but the API wants (millis, nanos) in two arguments.
      long waitMillis = waitNanos / 1000000L;
      waitNanos -= (waitMillis * 1000000L);
      AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
      return null;
    }

    // The head of the queue has timed out. Remove it.
    //到超时了,移除这个超时的AsyncTimeout,返回
    head.next = node.next;
    node.next = null;
    return node;
  }
}

以上就是在看门狗这个线程中,不断遍历超时队列,超时队列是根据超时时间排序的,第一个是里超时时间最近的,所以每次从第一个进行判断离超时还剩多少时间,然后 wait 等待,以让出 CPU,当等待时间到了之后,继续判断,如果队列中还存在超时,则从超时队列移除,并通知超时。所以 enter 负责添加超时,exit 负责移除超时,如果 read,write 方法发生超时了,那么 exit 不能正常移除超时,看门狗监听该超时的时间到了,就能抛出超时通知了。

public class AsyncTimeout extends Timeout {
  /**
   * Invoked by the watchdog thread when the time between calls to {@link
   * #enter()} and {@link #exit()} has exceeded the timeout.
   */
  protected void timedOut() {
  }
}

默认的timeOut没有实现,针对 socket 的超时实现是关闭 Socket,关闭了 Socket 之后,IO 阻塞就会抛出 IO 异常了,阻塞也就中断了。你可以实现你自己的超时发生的操作。
总体来说,默认情况下,超时机制没有开启的,但是有实现超时功能,开启的话需要指定超时的时间,需要用户自己去实现。使用同步超时还是异步超时,可以自己根据情况去使用。


Gzip压缩简要分析


Okio 同时也是支持 Gzip 压缩的,当然 Gzip 压缩并不是自己实现的,而是使用 Java zip 包中的 CRC32 来进行数据的压缩。这里我们简单分析 GzipSink 来看看 Gzip 压缩是怎么实现的。

GzipSink 实现了 Sink 接口,是带有压缩功能的 Sink,会将要写入的数据压缩之后再写入,内部有 CRC32 对象负责将原生 sink 的数据进行 Gzip 压缩,然后由 DeflaterSink 对象负责将压缩后的数据进行整理并执行写入。
GzipSource 实现了 Source 接口,是带有解压缩功能的 Source,实现原理和 GzipSink 相反,由 InflaterSource 读取压缩的数据并整理,然后 CRC32 解压缩数据,得到原始的数据。


总结


Okio 还有还有诸如 ByteString 等,包括之前讲的缓存机制,超时机制,都是为了使 IO 流的操作更加简单高效,弥补原始 IO 操作效率不高的问题。Okio 重点优化的就是缓存处理方面,目的就是优化原生 IO 不足的地方,所以说它不是 Okhttp 的附属库,是可以很方便我们日常进行 IO 操作时使用的,它有更简洁API使用方式和更高效的 IO 处理方式,有了Okio,我感觉再也不想用原生 IO 中复杂的使用方式啦。


本篇来自  Ihesong 的投稿,分享了精简高效的 IO 库 Okio,一起来看看!希望大家喜欢。

 Ihesong 的博客地址:

https://www.jianshu.com/u/75d212bdd107