• 欢迎访问VPS岛网站,国外VPS,国内VPS,国外服务器,国内服务器,服务器主机,测评及优惠码,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站 QQ群

Apache Thrift系列详解(二) – 网络服务模型

Apache技术 mb5ff97fc6948e0 31次浏览 已收录 0个评论

前言

Thrift提供的网络服务模型:单线程、多线程、事件驱动,从另一个角度划分为:阻塞服务模型、非阻塞服务模型。

  • 阻塞服务模型: TSimpleServer、 TThreadPoolServer。
  • 非阻塞服务模型: TNonblockingServer、 THsHaServer和 TThreadedSelectorServer。

TServer类的层次关系:
Apache Thrift系列详解(二) - 网络服务模型

正文

TServer

TServer定义了静态内部类 Args, Args继承自抽象类 AbstractServerArgs。 AbstractServerArgs采用了建造者模式,向 TServer提供各种工厂:
Apache Thrift系列详解(二) - 网络服务模型
下面是 TServer的部分核心代码:

public abstract class TServer {

    public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {

        public Args(TServerTransport transport) {

            super(transport);

        }

    }

    public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {

        final TServerTransport serverTransport;

        TProcessorFactory processorFactory;

        TTransportFactory inputTransportFactory = new TTransportFactory();

        TTransportFactory outputTransportFactory = new TTransportFactory();

        TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();

        TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();

        public AbstractServerArgs(TServerTransport transport) {

            serverTransport = transport;

        }

    }

    protected TProcessorFactory processorFactory_;

    protected TServerTransport serverTransport_;

    protected TTransportFactory inputTransportFactory_;

    protected TTransportFactory outputTransportFactory_;

    protected TProtocolFactory inputProtocolFactory_;

    protected TProtocolFactory outputProtocolFactory_;

    private boolean isServing;

    protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {

        processorFactory_ = args.processorFactory;

        serverTransport_ = args.serverTransport;

        inputTransportFactory_ = args.inputTransportFactory;

        outputTransportFactory_ = args.outputTransportFactory;

        inputProtocolFactory_ = args.inputProtocolFactory;

        outputProtocolFactory_ = args.outputProtocolFactory;

    }

    public abstract void serve();

    public void stop() {}

    public boolean isServing() {

        return isServing;

    }

    protected void setServing(boolean serving) {

        isServing = serving;

    }

}

TServer的三个方法: serve()、 stop()和 isServing()。 serve()用于启动服务, stop()用于关闭服务, isServing()用于检测服务的起停状态。

TServer的不同实现类的启动方式不一样,因此 serve()定义为抽象方法。不是所有的服务都需要优雅的退出, 因此 stop()方法没有被定义为抽象。

TSimpleServer

TSimpleServer的工作模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,但是一次只能接收和处理一个 socket连接,效率比较低。它主要用于演示 Thrift的工作过程,在实际开发过程中很少用到它。

(一) 工作流程

Apache Thrift系列详解(二) - 网络服务模型

(二) 使用入门

服务端:

    ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);

    TServerSocket serverTransport = new TServerSocket(serverSocket);

    HelloWorldService.Processor processor =

            new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());

    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();

    TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);

    tArgs.processor(processor);

    tArgs.protocolFactory(protocolFactory);

    // 简单的单线程服务模型 一般用于测试

    TServer tServer = new TSimpleServer(tArgs);

    System.out.println("Running Simple Server");

    tServer.serve();

客户端:

    TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);

    TProtocol protocol = new TBinaryProtocol(transport);

    HelloWorldService.Client client = new HelloWorldService.Client(protocol);

    transport.open();

    String result = client.say("Leo");

    System.out.println("Result =: " + result);

    transport.close();

(三) 源码分析

查看上述流程的源代码,即 TSimpleServer.java中的 serve()方法如下:
Apache Thrift系列详解(二) - 网络服务模型

serve()方法的操作:

  1. 设置 TServerSocket的 listen()方法启动连接监听。

  2. 以阻塞的方式接受客户端地连接请求,每进入一个连接即为其创建一个通道 TTransport对象。

  3. 为客户端创建处理器对象、输入传输通道对象、输出传输通道对象、输入协议对象和输出协议对象。

  4. 通过 TServerEventHandler对象处理具体的业务请求。

ThreadPoolServer

TThreadPoolServer模式采用阻塞 socket方式工作,主线程负责阻塞式监听是否有新 socket到来,具体的业务处理交由一个线程池来处理。

(一) 工作流程

Apache Thrift系列详解(二) - 网络服务模型

(二) 使用入门

服务端:

    ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);

    TServerSocket serverTransport = new TServerSocket(serverSocket);

    HelloWorldService.Processor<HelloWorldService.Iface> processor =

            new HelloWorldService.Processor<>(new HelloWorldServiceImpl());

    TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();

    TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);

    ttpsArgs.processor(processor);

    ttpsArgs.protocolFactory(protocolFactory);

    // 线程池服务模型 使用标准的阻塞式IO 预先创建一组线程处理请求

    TServer ttpsServer = new TThreadPoolServer(ttpsArgs);

    System.out.println("Running ThreadPool Server");

    ttpsServer.serve();

客户端:

    TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);

    TProtocol protocol = new TBinaryProtocol(transport);

    HelloWorldService.Client client = new HelloWorldService.Client(protocol);

    transport.open();

    String result = client.say("ThreadPoolClient");

    System.out.println("Result =: " + result);

    transport.close();

(三) 源码分析

ThreadPoolServer解决了 TSimpleServer不支持并发和多连接的问题,引入了线程池。实现的模型是 OneThreadPerConnection。查看上述流程的源代码,先查看线程池的代码片段:
Apache Thrift系列详解(二) - 网络服务模型
TThreadPoolServer.java中的 serve()方法如下:
Apache Thrift系列详解(二) - 网络服务模型
serve()方法的操作:

  1. 设置 TServerSocket的 listen()方法启动连接监听。

  2. 以阻塞的方式接受客户端的连接请求,每进入一个连接,将通道对象封装成一个 WorkerProcess对象( WorkerProcess实现了 Runnabel接口),并提交到线程池。

  3. WorkerProcess的 run()方法负责业务处理,为客户端创建了处理器对象、输入传输通道对象、输出传输通道对象、输入协议对象和输出协议对象。

  4. 通过 TServerEventHandler对象处理具体的业务请求。

WorkerProcess的 run()方法:
Apache Thrift系列详解(二) - 网络服务模型

(四) 优缺点

TThreadPoolServer模式的优点

拆分了监听线程( AcceptThread)和处理客户端连接的工作线程( WorkerThread),数据读取和业务处理都交给线程池处理。因此在并发量较大时新连接也能够被及时接受。

线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。

TThreadPoolServer模式的缺点

线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。

TNonblockingServer

TNonblockingServer模式也是单线程工作,但是采用 NIO的模式,借助 Channel/Selector机制, 采用 IO事件模型来处理。

所有的 socket都被注册到 selector中,在一个线程中通过 seletor循环监控所有的 socket。

每次 selector循环结束时,处理所有的处于就绪状态的 socket,对于有数据到来的 socket进行数据读取操作,对于有数据发送的socket则进行数据发送操作,对于监听 socket则产生一个新业务 socket并将其注册到 selector上。

注意:TNonblockingServer要求底层的传输通道必须使用TFramedTransport。

(一) 工作流程

Apache Thrift系列详解(二) - 网络服务模型

(二) 使用入门

服务端:

    TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());

    TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);

    TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);

    tnbArgs.processor(tprocessor);

    tnbArgs.transportFactory(new TFramedTransport.Factory());

    tnbArgs.protocolFactory(new TCompactProtocol.Factory());

    // 使用非阻塞式IO服务端和客户端需要指定TFramedTransport数据传输的方式

    TServer server = new TNonblockingServer(tnbArgs);

    System.out.println("Running Non-blocking Server");

    server.serve();

客户端:

    TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));

    // 协议要和服务端一致

    TProtocol protocol = new TCompactProtocol(transport);

    HelloWorldService.Client client = new HelloWorldService.Client(protocol);

    transport.open();

    String result = client.say("NonBlockingClient");

    System.out.println("Result =: " + result);

    transport.close();

(三) 源码分析

TNonblockingServer继承于 AbstractNonblockingServer,这里我们更关心基于 NIO的 selector部分的关键代码。
Apache Thrift系列详解(二) - 网络服务模型

(四) 优缺点

TNonblockingServer模式优点

相比于 TSimpleServer效率提升主要体现在 IO多路复用上, TNonblockingServer采用非阻塞 IO,对 accept/read/write等 IO事件进行监控和处理,同时监控多个 socket的状态变化。

TNonblockingServer模式缺点

TNonblockingServer模式在业务处理上还是采用单线程顺序来完成。在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,会导致整个服务被阻塞住,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。

THsHaServer

鉴于 TNonblockingServer的缺点, THsHaServer继承于 TNonblockingServer,引入了线程池提高了任务处理的并发能力。 THsHaServer是半同步半异步( Half-Sync/Half-Async)的处理模式, Half-Aysnc用于 IO事件处理( Accept/Read/Write), Half-Sync用于业务 handler对 rpc的同步处理上。

注意:THsHaServer和TNonblockingServer一样,要求底层的传输通道必须使用TFramedTransport。

(一) 工作流程

Apache Thrift系列详解(二) - 网络服务模型

(二) 使用入门

服务端:

    TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);

    TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());

    // 半同步半异步

    THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);

    thhsArgs.processor(tprocessor);

    thhsArgs.transportFactory(new TFramedTransport.Factory());

    thhsArgs.protocolFactory(new TBinaryProtocol.Factory());

    TServer server = new THsHaServer(thhsArgs);

    System.out.println("Running HsHa Server");

    server.serve();

客户端:

    TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));

    // 协议要和服务端一致

    TProtocol protocol = new TBinaryProtocol(transport);

    HelloWorldService.Client client = new HelloWorldService.Client(protocol);

    transport.open();

    String result = client.say("HsHaClient");

    System.out.println("Result =: " + result);

    transport.close();

(三) 源码分析

THsHaServer继承于 TNonblockingServer,新增了线程池并发处理工作任务的功能,查看线程池的相关代码:
Apache Thrift系列详解(二) - 网络服务模型
任务线程池的创建过程:
Apache Thrift系列详解(二) - 网络服务模型

下文的TThreadedSelectorServer囊括了THsHaServer的大部分特性,源码分析可参考TThreadedSelectorServer。

(四) 优缺点

THsHaServer的优点

THsHaServer与 TNonblockingServer模式相比, THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升。

THsHaServer的缺点

主线程仍然需要完成所有 socket的监听接收、数据读取和数据写入操作。当并发请求数较大时,且发送数据量较多时,监听 socket上新连接请求不能被及时接受。

TThreadedSelectorServer

TThreadedSelectorServer是对 THsHaServer的一种扩充,它将 selector中的读写 IO事件( read/write)从主线程中分离出来。同时引入 worker工作线程池,它也是种 Half-Sync/Half-Async的服务模型。

TThreadedSelectorServer模式是目前 Thrift提供的最高级的线程服务模型,它内部有如果几个部分构成:

  1. 一个 AcceptThread线程对象,专门用于处理监听 socket上的新连接。

  2. 若干个 SelectorThread对象专门用于处理业务 socket的网络 I/O读写操作,所有网络数据的读写均是有这些线程来完成。

  3. 一个负载均衡器 SelectorThreadLoadBalancer对象,主要用于 AcceptThread线程接收到一个新 socket连接请求时,决定将这个新连接请求分配给哪个 SelectorThread线程。

  4. 一个 ExecutorService类型的工作线程池,在 SelectorThread线程中,监听到有业务 socket中有调用请求过来,则将请求数据读取之后,交给 ExecutorService线程池中的线程完成此次调用的具体执行。主要用于处理每个 rpc请求的 handler回调处理(这部分是同步的)。

(一) 工作流程

Apache Thrift系列详解(二) - 网络服务模型

(二) 使用入门

服务端:

    TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);

    TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());

    // 多线程半同步半异步

    TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);

    ttssArgs.processor(processor);

    ttssArgs.protocolFactory(new TBinaryProtocol.Factory());

    // 使用非阻塞式IO时 服务端和客户端都需要指定数据传输方式为TFramedTransport

    ttssArgs.transportFactory(new TFramedTransport.Factory());

    // 多线程半同步半异步的服务模型

    TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);

    System.out.println("Running ThreadedSelector Server");

    server.serve();

客户端:

for (int i = 0; i < 10; i++) {

    new Thread("Thread " + i) {

        @Override

        public void run() {

            // 设置传输通道 对于非阻塞服务 需要使用TFramedTransport(用于将数据分块发送)

            for (int j = 0; j < 10; j++) {

                TTransport transport = null;

                try {

                    transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));

                    TProtocol protocol = new TBinaryProtocol(transport);

                    HelloWorldService.Client client = new HelloWorldService.Client(protocol);

                    transport.open();

                    String result = client.say("ThreadedSelector Client");

                    System.out.println("Result =: " + result);

                    transport.close();

                } catch (Exception e) {

                    e.printStackTrace();

                } finally {

                    // 关闭传输通道

                    transport.close();

                }

            }

        }

    }.start();

}

(三) 核心代码

以上工作流程的三个组件 AcceptThread、 SelectorThread和 ExecutorService在源码中的定义如下:

TThreadedSelectorServer模式中有一个专门的线程 AcceptThread用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个 SelectorThread线程中来完成,因此能够快速对网络 I/O进行读写操作,能够很好地应对网络 I/O较多的情况。
Apache Thrift系列详解(二) - 网络服务模型

TThreadedSelectorServer默认参数定义如下:
Apache Thrift系列详解(二) - 网络服务模型

  • 负责网络IO读写的selector默认线程数(selectorThreads):2

  • 负责业务处理的默认工作线程数(workerThreads):5

  • 工作线程池单个线程的任务队列大小(acceptQueueSizePerThread):4

创建、初始化并启动 AcceptThread和 SelectorThreads,同时启动 selector线程的负载均衡器( selectorThreads)。
Apache Thrift系列详解(二) - 网络服务模型

AcceptThread源码

AcceptThread继承于 Thread,可以看出包含三个重要的属性:非阻塞式传输通道( TNonblockingServerTransport)、 NIO选择器( acceptSelector)和选择器线程负载均衡器( threadChooser)。
Apache Thrift系列详解(二) - 网络服务模型

查看 AcceptThread的 run()方法,可以看出 accept线程一旦启动,就会不停地调用 select()方法:
Apache Thrift系列详解(二) - 网络服务模型

查看 select()方法, acceptSelector选择器等待 IO事件的到来,拿到 SelectionKey即检查是不是 accept事件。如果是,通过 handleAccept()方法接收一个新来的连接;否则,如果是 IO读写事件, AcceptThread不作任何处理,交由 SelectorThread完成。
Apache Thrift系列详解(二) - 网络服务模型

在 handleAccept()方法中,先通过 doAccept()去拿连接通道,然后 Selector线程负载均衡器选择一个 Selector线程,完成接下来的 IO读写事件。
Apache Thrift系列详解(二) - 网络服务模型

接下来继续查看 doAddAccept()方法的实现,毫无悬念,它进一步调用了 SelectorThread的 addAcceptedConnection()方法,把非阻塞传输通道对象传递给选择器线程做进一步的 IO读写操作。
Apache Thrift系列详解(二) - 网络服务模型

SelectorThreadLoadBalancer源码

SelectorThreadLoadBalancer如何创建?
Apache Thrift系列详解(二) - 网络服务模型
SelectorThreadLoadBalancer是一个基于轮询算法的 Selector线程选择器,通过线程迭代器为新进来的连接顺序分配 SelectorThread。
Apache Thrift系列详解(二) - 网络服务模型

SelectorThread源码

SelectorThread和 AcceptThread一样,是 TThreadedSelectorServer的一个成员内部类,每个 SelectorThread线程对象内部都有一个阻塞式的队列,用于存放该线程被接收的连接通道。
Apache Thrift系列详解(二) - 网络服务模型

阻塞队列的大小可由构造函数指定:
Apache Thrift系列详解(二) - 网络服务模型

上面看到,在 AcceptThread的 doAddAccept()方法中调用了 SelectorThread的 addAcceptedConnection()方法。

这个方法做了两件事:

  1. 将被此 SelectorThread线程接收的连接通道放入阻塞队列中。

  2. 通过 wakeup()方法唤醒 SelectorThread中的 NIO选择器 selector。

Apache Thrift系列详解(二) - 网络服务模型

既然 SelectorThread也是继承于 Thread,查看其 run()方法的实现:
Apache Thrift系列详解(二) - 网络服务模型

SelectorThread方法的 select()监听 IO事件,仅仅用于处理数据读取和数据写入。如果连接有数据可读,读取并以 frame的方式缓存;如果需要向连接中写入数据,缓存并发送客户端的数据。且在数据读写处理完成后,需要向 NIO的 selector清空和注销自身的 SelectionKey。
Apache Thrift系列详解(二) - 网络服务模型

  • 数据写操作完成以后,整个 rpc调用过程也就结束了, handleWrite()方法如下:
    Apache Thrift系列详解(二) - 网络服务模型

  • 数据读操作完成以后, Thrift会利用已读数据执行目标方法, handleRead()方法如下:

Apache Thrift系列详解(二) - 网络服务模型

handleRead方法在执行 read()方法,将数据读取完成后,会调用 requestInvoke()方法调用目标方法完成具体业务处理。 requestInvoke()方法将请求数据封装为一个 Runnable对象,提交到工作任务线程池( ExecutorService)进行处理。
Apache Thrift系列详解(二) - 网络服务模型

select()方法完成后,线程继续运行 processAcceptedConnections()方法处理下一个连接的 IO事件。
Apache Thrift系列详解(二) - 网络服务模型

这里比较核心的几个操作:

  1. 尝试从 SelectorThread的阻塞队列 acceptedQueue中获取一个连接的传输通道。如果获取成功,调用 registerAccepted()方法;否则,进入下一次循环。

  2. registerAccepted()方法将传输通道底层的连接注册到 NIO的选择器 selector上面,获取到一个 SelectionKey。

  3. 创建一个 FrameBuffer对象,并绑定到获取的 SelectionKey上面,用于数据传输时的中间读写缓存。

总结

本文对 Thrift的各种线程服务模型进行了介绍,包括2种阻塞式服务模型: TSimpleServer、 TThreadPoolServer,3种非阻塞式服务模型: TNonblockingServer、 THsHaServer和 TThreadedSelectorServer。对各种服务模型的具体用法、工作流程、原理和源码实现进行了一定程度的分析。

鉴于篇幅较长,请各位看官请慢慢批阅!

欢迎关注技术公众号: 零壹技术栈

Apache Thrift系列详解(二) - 网络服务模型

本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。


VPS岛 的文章和资源来自互联网,仅作为参考资料,如果有侵犯版权的资源请尽快联系站长,我们会在24h内删除有争议的资源。丨 转载请注明Apache Thrift系列详解(二) – 网络服务模型
喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址