分类 源码阅读 下的文章

https://github.com/pallets/werkzeug

文件在 werkzeug/local.py

看这部分源码,主要想搞清楚以下几个问题:

  • ThreadLocal解决什么问题
  • ThreadLocal如何实现
  • ThreadLocal的生命周期管理

ThreadLocal解决什么问题

ThreadLocal是需要拿来和全局变量对比的。

当大家都需要用相同的逻辑,引用相同的变量名/资源来完成自己的逻辑,但又不希望不同的线程直接一个全局引用会对别的线程造成影响,需要使用ThreadLocal来解决这个问题。

ThreadLocal 解决的不是多线程编程资源共享的问题,更多的是在逻辑层面,用来管理一些跟随线程生命周期的上下文数据,让程序逻辑更加容易编写和维护。

所有的代码能公用同一个逻辑,而不会对另外的线程(全局资源)造成影响。

ThreadLocal如何实现

每一个运行中的Thread都会有自己的一个标识符,这个标识符是线程数据结构的一部分。

通过如下步骤,可以实现ThreadLocal

一个进程内全局的Storage,依靠一个Map来存储ThreadID和其对应的数据。
接下来,使用Local()来获取本地变量,实际上是一个查询函数用于获取数据,这个函数帮忙做的事情,就是自动获取Thread ID,从全局的ThreadVariableMap中获取到线程对应的数据集合。
如此一来,每个线程,都可以使用相同的代码逻辑来执行逻辑而不会对全局资源产生影响/依赖。

在Werkzeug中的ThreadLocal大概是这样的结构。

+---------------------------------------------------------+
|                                                         |
|         +----------------------------------+            |
|         |                                  |            |
|         |    LocalStack/LocalsRegistry     |            |
|         |                                  |            |
|         +---------^------------------------+            |
|                   |                                     |
|                   |                                     |
|                   |                                     |
|                   |                                     |
|                   |                                     |
|   +---------------+-------+                             |
|   |                       |                             |
|   |  Local Variables      |                             |
|   |  Marked by Thred-ID   |                             |
|   |  or Greenlet-ID       |                             |
|   |                       |                             |
|   +-----------------------+                             |
|                                                         |
+---------------------------^-----------------------------+
                            |
                            |
                            |
                            |
                            | Identifier(Greenet-ID or
                            | Thread-ID)
                            |
+---------------------------+-----------------------------+
|                                                         |
|                       LocalProxy                        |
|                                                         |
+---------------------------^-----------------------------+
                            |
                            |
                            |
+---------------------------+-----------------------------+
|                                                         |
|                     Application                         |
|                                                         |
+---------------------------------------------------------+

ThreadLocal的生命周期管理

ThreadLocal是的生命周期是跟随Thread的生命周期的。

通常来说,ThreadLocal应该在Thread生命周期结束的时候进行销毁和清理,不然就会造成内存泄漏。

在Werkeug中,ThreadLocal用于WSGIRequest和WSGIResponse,可以看到,ThreadLocal被作为一个Middleware被插入WSGI处理流程中,然后在Response返回执行完毕之后被销毁。

这两天读了一下Python的Condition实现源码,是实现Queue的工具之一,发现是非常朴素的sleep->loop->query模式。源码很少,直接贴出,就不做注释了:)

def wait(self, timeout=None):
        """Wait until notified or until a timeout occurs.
 
        If the calling thread has not acquired the lock when this method is
        called, a RuntimeError is raised.
 
        This method releases the underlying lock, and then blocks until it is
        awakened by a notify() or notifyAll() call for the same condition
        variable in another thread, or until the optional timeout occurs. Once
        awakened or timed out, it re-acquires the lock and returns.
 
        When the timeout argument is present and not None, it should be a
        floating point number specifying a timeout for the operation in seconds
        (or fractions thereof).
 
        When the underlying lock is an RLock, it is not released using its
        release() method, since this may not actually unlock the lock when it
        was acquired multiple times recursively. Instead, an internal interface
        of the RLock class is used, which really unlocks it even when it has
        been recursively acquired several times. Another internal interface is
        then used to restore the recursion level when the lock is reacquired.
 
        """
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self.__waiters.append(waiter)
        saved_state = self._release_save()
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                if __debug__:
                    self._note("%s.wait(): got it", self)
            else:
                # Balancing act:  We can't afford a pure busy loop, so we
                # have to sleep; but if we sleep the whole timeout time,
                # we'll be unresponsive.  The scheme here sleeps very
                # little at first, longer as time goes on, but never longer
                # than 20 times per second (or the timeout time remaining).
                endtime = _time() + timeout
                delay = 0.0005 # 500 us -> initial delay of 1 ms
                while True:
                    gotit = waiter.acquire(0)
                    if gotit:
                        break
                    remaining = endtime - _time()
                    if remaining <= 0:
                        break
                    delay = min(delay * 2, remaining, .05)
                    _sleep(delay)
                if not gotit:
                    if __debug__:
                        self._note("%s.wait(%s): timed out", self, timeout)
                    try:
                        self.__waiters.remove(waiter)
                    except ValueError:
                        pass
                else:
                    if __debug__:
                        self._note("%s.wait(%s): got it", self, timeout)
        finally:
            self._acquire_restore(saved_state)

老坑,现在来填掉:)

之前做一个需求,是需要解析Flask里的URL Rule里的参数名字和类型,用来自动生成命令行内的Rest Client的参数。

Rule:  /disks/<int:disk_id>

需要得到参数: disk (type int)

解铃还须系铃人,直接看Flask源码吧:),看看它是如何管理/解析用户在 route 内添加的URL Parttern的.

首先,一路跟踪下去找到源码里的函数吧~~

from flask import Flask
 
app.route()

从 app.py中,可以看到

url_rule_class = Rule()

搜索parse 关键字,可以找到 parse_rule 函数,让我们一起来看看parse_rule函数吧:)

为了将parse_rule和 flask解耦,我拆出了以下代码。

import re

_rule_re = re.compile(r'''
    (?P<static>[^<]*)                           # static rule data
    <
    (?:
        (?P<converter>[a-zA-Z_][a-zA-Z0-9_]*)   # converter name
        (?:\((?P<args>.*?)\))?                  # converter arguments
        \:                                      # variable delimiter
    )?
    (?P<variable>[a-zA-Z_][a-zA-Z0-9_]*)        # variable name
    >
''', re.VERBOSE)


def parse_rule(rule):
    """Parse a rule and return it as generator. Each iteration yields tuples
    in the form ``(converter, arguments, variable)``. If the converter is
    `None` it's a static url part, otherwise it's a dynamic one.

    :internal:
    """
    pos = 0
    end = len(rule)
    do_match = _rule_re.match
    used_names = set()
    while pos < end:
        m = do_match(rule, pos)
        if m is None:
            break
        data = m.groupdict()
        if data['static']:
            yield None, None, data['static']
        variable = data['variable']
        converter = data['converter'] or 'default'
        if variable in used_names:
            raise ValueError('variable name %r used twice.' % variable)
        used_names.add(variable)
        yield converter, data['args'] or None, variable
        pos = m.end()
    if pos < end:
        remaining = rule[pos:]
        if '>' in remaining or '<' in remaining:
            raise ValueError('malformed url rule: %r' % rule)
        yield None, None, remaining

可以看到,flask讲一个URL Rule,首先拆解为 “/” 分隔的一个个单元。

之后,每个单元被解析为 converter(转换器), arguments(参数),variable(变量)。

以 /api/<int:id>/create 为例,可以知道得到三个单元

(None, None, "/api/"),
("int", None, "id"),
(None, None, "/create"),

第一个单元里,converter是None,则这个参数是一个URL中的静态字符串;

第二个单元内,converter是int,意即参数“id”是一个整型的变量;

第三个单元同第一个单元相同,不做多解释了。

通过以上分析,我们就拿到了Flask的Url Rule中变量的类型和名字,就可以做CLI自动生成中的参数解析了:)

PS:最近越来越赞同GongZi提的,“你会多少其实不重要要,重要的是你解决新问题的能力”,这种“Meta Learning”的能力,才是核心竞争力啊。