Cookie server 性能优化实践


  1. 背景
  2. 异步网络调用
  3. 轻重分离
  4. 异步磁盘 I/O

背景

上图描述的是一般的服务器的模型,即:由主线程作为 server 端,负责接收所有到来的请求,并将请求封装为新的对象放入阻塞队列中。由线程池中的一系列工作线程对请求的业务进行处理(可能包括磁盘读写的操作),最后返回处理结果。Memcached 服务端也是采用类似的架构。

看起来上面的架构相当合理,但是仔细看来,其实里面大有可优化之处。下面是阻塞队列:

线程池中的线程数目毕竟有限,假设只有四条线程轮询上面的阻塞队列,然后分别处理四条请求 R1R4。当 R1R4 都需要进行磁盘操作时此时问题就出现了。假设 R5 请求不需要进行磁盘操作,甚至可以很快处理完成。但是即使线程池里的线程都在等待 I/O 完成而“空闲”下来,后面的请求依然得不到处理。由此可见,消除磁盘 I/O 请求阻塞非磁盘请求可以优化请求的平均处理时间,提高服务器的性能。

后面将陆续更新三种优化方法,当然三种方法全部依托于 twisted 框架。

异步网络调用

一个真正达到性能优化服务器,还是需要我们在程序中真正良好的去应用 Twisted 的各种工具的。  最常见的情况就是我们将一个非阻塞的应用中,加入了长时间的处理过程,从而达到了阻塞的效果,从而让大家都因为一位同志的长时工作而等待。

先来看看下面的这段代码:

from twisted.internet import protocol, reactor
from twisted.protocols import basic
class FingerProtocol(basic.LineReceiver):
    def lineReceived(self, user):
        self.transport.write(self.factory.getUser(user)+"\r\n")
        self.transport.loseConnection()
class FingerFactory(protocol.ServerFactory):
    protocol = FingerProtocol
    def __init__(self, **kwargs): self.users = kwargs
    def getUser(self, user):
        return self.users.get(user, "No such user")
reactor.listenTCP(1079, FingerFactory(hd='Hello my python world'))
reactor.run()

它可能是我们所写的第一个 Twisted 服务器。可能所有人都会认为这样的操作已经不会有什么问题了。但是显然,在这里我们的 getUser 更多的情况下可能会是从数据库中或是 LDAP 服务器上获取相关的信息。哪么最好的处理就是将 get 操作以非即时方式返回,以避免发生处理的阻塞。哪么就需要使用 Deferreds 了:

from twisted.internet import protocol, reactor, defer
from twisted.protocols import basic
class FingerProtocol(basic.LineReceiver):
    def lineReceived(self, user):
        self.factory.getUser(user
        ).addErrback(lambda _: "Internal error in server"
        ).addCallback(lambda m:
                      (self.transport.write(m+"\r\n"),
                       self.transport.loseConnection()))
class FingerFactory(protocol.ServerFactory):
    protocol = FingerProtocol
    def __init__(self, **kwargs): self.users = kwargs
    def getUser(self, user):
        return defer.succeed(self.users.get(user, "No such user"))
reactor.listenTCP(1079, FingerFactory(hd='Hello my python world'))
reactor.run()

这里 getUser 返回的是 defer 处理过的一个事务,而 addCallback 方法注册了 defer 中处理完成后的返回事件。这样,事务的处理就可以在另一个事件可调度的情况下进行了。从而避免了因一个用户的处理阻塞的情况下,让所有的用户都停止了响应。

轻重分离

第二种优化的方法,可以只用下图来解释。即,使用轻量级的线程池(PreProcess)对所有请求进行预处理,所有不需要 I/O 执行时间很短的请求直接执行,如果是需要磁盘 I/O 的则放入下一级阻塞队列,有单独的线程池来处理这些请求。详见下图:

第一级请求使用自己已有的线程池,不再多说。I/O 请求+二级线程池可以使用 twisted 提供的 ThreadPool 机制来实现。而我所说的优化正是使用此方法,代码很简单,如下:

deferred = threads.deferToThread(data_loader.get, sn)
deferred.addCallback(self.loader_callback, (req, other_data))

解释一下:

threads.deferToThread 将会将 data_loader.get 放入 reactor 线程池的队列,并返回一个 defer 对象。data_loader.get 由 reactor 的线程池进行执行,执行完成后放入 reactor 的队列,然后由 reactor 主线程来调用 deferred.addCallback 中注册的回调函数。所以回调函数是不会跨线程调用的,如果在回调函数中调用一些不可跨线程的应用(如,memcached 客户端)也可放心使用,这也正是选择 reactor 的线程池作为二级线程池的原因之一。

选择 reactor 的线程池作为二级线程池的原因二:回调函数。因为 Read Thread 将自己负责恢复请求,所以回调函数必不可少。

接下来深入 twisted 源码探究此方法的原理,以下代码均是节选自 twisted2.0.0 源码,其他版本大致相同:

# threads.py
def deferToThread(f, *args, **kwargs):
     d = defer.Deferred()
     from twisted.internet import reactor
     reactor.callInThread(_putResultInDeferred, d, f, args, kwargs)
     return d
def _putResultInDeferred(deferred, f, args, kwargs):
     from twisted.internet import reactor
     try:
         result = f(*args, **kwargs)
     except:
         f = failure.Failure()
         reactor.callFromThread(deferred.errback, f)
     else:
         reactor.callFromThread(deferred.callback, result)
# base.py
def callInThread(self, _callable, *args, **kwargs):
    if not self.threadpool:
        self._initThreadPool()
    self.threadpool.callInThread(_callable, *args, **kwargs)  //由线程池执行具体的读取操作
def callFromThread(self, f, *args, **kw):
    ...
    self.threadCallQueue.append((f, args, kw))  //放入主线程队列,由主线程执行回调函数
self.wakeUp()
    ...

注:callInThread / allFromThread,前者是放入线程池执行,后者是 reactor 的队列里,由 reactor 的主线程来执行。

至于 threadpool 的代码在 twisted / python / threadpool 是一个线程池

异步磁盘 I/O

第三种方法是使用经典的服务器模型的 select(epoll)异步 I/O。使用 twisted 框架中的 reactor(epoll/select)+reader,将磁盘 I/O 封装为 reader,交给 reactor 来管理,磁盘 I/O 完成后调用回调函数将数据返回发送改请求的客户端。这样既不会因为 I/O 阻塞请求处理线程也不会如方法二一样因为 I/O 阻塞读取线程,详见下图:

reactor(epoll / select)+ reader 的方法需要继承 abstract.FileDescriptor 并且实现其几个方法,而 twisted 框架中的网络(TCP / UDP)、标准 I/O、进程都有类似的实现。使用时传入文件描述符,如下:

fileReader = FileReader(fd, loader_callback, other_data)
reactor.addReader(fileReader)

FileReader 类的实现如下:

class FileReader(abstract.FileDescriptor):

  def __init__(self, fd, result_callback, args):
    ...
    self.fd = fd
self.setNonBlocking(self.fd)
self.dataRecieved=result_callback
self.args=args
self.all_data=""
  def setNonBlocking(self, fd):
   ...
  def fileno(self):
return self.fd
  def connectionLost(self, reason):
sys.close(self.fd)
  def doRead(self)://fdesc.readFromFD(self.fd, self.dataReceived)
    data = os.read(self.fd, 10240)     //每次读取1M
    self.all_data += data
    if not data:
       self.dataRecieved(self.all_data , self.args)
      return CONNECTION_LOST

自己实现的 reader 并没有使用类似其他标准实现中的 fdesc.readFromFD(self.fd, self.dataReceived)来读取数据,因为该函数中提供的回调函数不允许传参,所以自己将 fdesc 实现在了 FileReader 内。

下面是此方法的理论依据:

# pollreactor.py
def addReader(self, reader):
    fd = reader.fileno()
    if not reads.has_key(fd):
        selectables[fd] = reader
        reads[fd] =  1
        self._updateRegistration(fd)
def _updateRegistration(self, fd):
    ...
    mask = 0
    if reads.has_key(fd): mask = mask | select.POLLIN
    poller.register(fd, mask)
def _doReadOrWrite(self, selectable, fd, event, POLLIN, POLLOUT, log,
    faildict={
        error.ConnectionDone: failure.Failure(error.ConnectionDone()),
        error.ConnectionLost: failure.Failure(error.ConnectionLost())
    }):
    ...
    if event & POLLIN:
      why = selectable.doRead()
      inRead = True
       ...
    if why:
        self._disconnectSelectable(selectable, why, inRead)
# posixbase.py
def _disconnectSelectable(self, selectable, why, isRead, faildict={
    error.ConnectionDone: failure.Failure(error.ConnectionDone()),
    error.ConnectionLost: failure.Failure(error.ConnectionLost())
}):
             ...
             selectable.connectionLost(f)

本文作者 : cyningsun
本文地址https://www.cyningsun.com/08-27-2012/cookie-server-performance-optimization.html
版权声明 :本博客所有文章除特别声明外,均采用 CC BY-NC-ND 3.0 CN 许可协议。转载请注明出处!