实现协程的另一种思路——关于协程的思考

发表于2017-03-14
评论1 3.9k浏览

  偶然看到protothread,用另一种方式实现了协程的效果。说是协程库,其实它并不是以任何库的形式存在的。而仅仅是头文件,在使用时仅仅需要包含头文件即可。基于大神实现了协程效果的思路,我也简单尝试了一下。
  本文会首先介绍协程库代码和它的测试例程,然后分析其实现原理。如果之前并未接触过这一思想,相信你会和我一样惊呼,cool,天才是这样创造世界的!在此之后,会对进程、线程、协程谈谈自己的理解,并且聊聊各种实现协程的方式。

一、如何用头文件实现协程库?
  在给出库代码之前,我们先来看看测试例程:

  1. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include
    #include"xx_thread.h"
             
    static int flag=1;
         
    XX_THREAD_BEGIN(thread1) //flag=0才运行
        while(1){
            xx_wait_while(flag);
            flag=1;
            printf("thread1:1n");
        }  
    XX_THREAD_END(thread1)
             
    XX_THREAD_BEGIN(thread2)
        while(1){
            printf("thread2:1n");
            flag = 0;
            sleep(1); //为了防止输出太快,它当然会阻塞所有协程
            xx_yield();
            printf("thread2:2n");
            flag = 1;
            xx_yield();
        }
    XX_THREAD_END(thread2)
     
    XX_THREAD_BEGIN(thread4)
        static int listen_fd, work_fd;
        char buf[1024];
        int flags, addr_len, recv_len;
        struct sockaddr_in svr_addr;
         
        if((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
            exit(1);
        svr_addr.sin_family = AF_INET;
        svr_addr.sin_port = htons(9999);
        svr_addr.sin_addr.s_addr = htonl(INADDR_ANY);
        addr_len = sizeof(struct sockaddr_in);
        flags = fcntl(listen_fd, F_GETFL, 0);
        fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK);
        if(bind(listen_fd, (struct sockaddr *)&svr_addr, addr_len)<0)
            exit(1);
        if(listen(listen_fd, 4) < 0)
            exit(1);
        xx_accept(listen_fd, NULL, 0, work_fd);
        if(work_fd < 0)
            exit(1);
        while(1){
            xx_recv(work_fd, buf, 1024, recv_len);
            if(recv_len < 0)
                exit(1);
            else if(recv_len == 0)
                break;
            buf[recv_len] = '';
            printf("我收到了%d字节:%sn", recv_len, buf);
        }
    XX_THREAD_END(thread4)
     
    int main()
    {
        xx_add_threads(thread1, thread2, thread4);
        xx_schdule();
        return 0;


   例子中共有三个协程,thread2使用yield的方式放弃cpu,thread1在flag上做等待,也就是简单的协程间同步功能,thread4则是监听一个tcp端口,当一个连接到来时,打印传来的内容(测试时,可以使用nc localhost port的方式)。三个协程都是while(1)循环,所以如果是单进程单线程单协程方式下,是无法做到三个循环“宏观同时”运行的。协程库实现了这一点。

  核心思想(重点,敲黑板):在单进程单线程下,要想做到用户态的协程切换,本质是需要对现场保存&恢复。往简单了想,协程要有入口函数,我把这个函数当成这个协程,那么问题就是怎么保存现场,怎么恢复现场。有经验的人当然想ucontext就是做这个事的,不过我们再往简单想,这个“现场”主要是什么,主要是函数运行到哪一条指令。如果我让一个函数中途能退出,下次进入时在退出的地方继续运行,是不是就是保存并恢复了简单现场呢。我在外层用一个while(1)循环作为“操作系统的进程调度器”,简单点使用轮询调度,那不是多个函数依次运行,又中断,又在中断的地方接着运行。就是这么简单,那么核心问题就转化为,如何让一个函数能中途退出,下次进入时在退出处接着执行。

  答案是goto,每次中途退出前把当前位置做一个lable,并保存起来,为了保存这个lable以便下次进入时可以使用,局部static吧,下次进入判断这个变量直接跳到lable,不就是可以了。你简直是天才,不过有更天才也更有意思的做法:switch case

  好了,我们按照这个思路试一下,它看起来应该是这样子:

1
2
3
4
5
6
7
8
9
10
11
int function(void) {
    static int i, state = 0;
    switch (state) {
        case 0: /* start of function */
        for (i = 0; i < 10; i++) {
            state = 1; /* so we will come back to "case 1" */
            return i;
            case 1:; /* resume control straight after the return */
        }
    }
}

  为了下次调用的时候从这次返回处开始,我们使用一个static变量保存lable,并在return前设置lable,在return后一行case lable,并把整个函数用switch包起来。你可能会有疑问,我们不可避免的把case落在不同的块中,这是合法的么?幸运的是,答案是合法。C语言中著名的"达夫设备"就是利用case语句在其匹配switch语句子块中也是合法的这一事实。

1
2
3
4
5
6
7
8
9
10
11
switch (count % 8) {
        case 0:        do {  *to = *from++;
        case 7:              *to = *from++;
        case 6:              *to = *from++;
        case 5:              *to = *from++;
        case 4:              *to = *from++;
        case 3:              *to = *from++;
        case 2:              *to = *from++;
        case 1:              *to = *from++;
                       } while ((count -= 8) > 0);
    }


  我们现在需要做的只是构造一些精确宏,并且可以把细节隐藏到这些似是而非的定义里。你可能觉得作为库,无法预期使用者在函数中可能的yield次数,那么我们可以使用__LINE__宏解决,通过使用__LINE__行号作为lable,我们可以允许使用者使用任意多lable

  好了,是时候给出代码了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#ifndef _XX_THREAD_H_
#define _XX_THREAD_H_
     
#define XX_THREAD_BEGIN(name)
    void name(){
    static int _xx_s; switch(_xx_s){case 0:{
#define XX_THREAD_END(name) xx_yield();}}}
     
#define xx_add_threads(...)
    void (*_xx_thread_array[])()={__VA_ARGS__}
#define xx_schdule()
    for(int _xx_i=0; ; ++_xx_i){
        if(_xx_i >= sizeof(_xx_thread_array)/sizeof(_xx_thread_array[0])) _xx_i=0;
        _xx_thread_array[_xx_i]();
    }
     
#define xx_yield() do {_xx_s=__LINE__;
                         return;
                   case __LINE__:; } while(0)
#define xx_wait_while(condition) do {_xx_s=__LINE__;
                                       case __LINE__:
                                if(condition) return; } while(0)
#define xx_wait_until(condition) xx_wait_while(!(condition))
#define xx_accept(fd, addr, alen, new_fd)
    xx_wait_while((new_fd=accept(fd, addr, alen))<0
            && errno==EAGAIN)
#define xx_recv(fd, buf, buflen, recv_len)
    xx_wait_while((recv_len=recv(fd, buf, buflen, MSG_DONTWAIT))<0
            && errno==EAGAIN)
 
#endif // _XX_THREAD_H_

  这就是让开头的测试代码正常运转的协程库的所有代码,是不是比你预期的简短呢?

  这只是一个简单版本,它还有诸多限制,其中一些可以进行改进,另外一些则是先天不足
  * 协程函数指针由数组改为指针等动态结构,可以做到协程的动态增删
  * 将lable和函数指针struct在一起,可以增加一个函数作为两个协程实体的可能性,是的,只是增加可能性,因为还有局部变量
  * 如果想要在协程yield前后保留局部变量,就必须使用static,我们也可以像lable一样将这些值返回并struct在协程指针那里,不过这样就会使这个库“变重”,违背了轻量级的原则。这一点要辩证的看
  * 例子中的网络调用使用的是非阻塞io,这会导致整个进程即使在空闲时也会跑满cpu,应该可以使用epoll解决这一点。我之所以这样做只是为了尽量简单的做出demo
  * 协程中当然不能调用阻塞函数,其中网络IO给出了例子,另外还有定时器常用到。 如果使用epoll,那么timer的问题也解决了(利用epoll 的timeout参数),而如果对库的改造不是用于网络io而没有使用epoll,也可以使用其它方式封装timer。啰嗦两句,做法就是:①由库(使用可以快速选出最早超时的数据结构)维护所有timer,②并选出最早超时的交给操作系统做定时。其中①可以使用红黑树、小顶堆、时间轮等数据结构;第②步可以使用传统unix定时器setitimer()POSIX定时器timer_create()linux特有的timerfd timerfd_create()epoll类超时机制、甚至如果进程等待超时时不需要做其它事情的话直接用sleep()。关于linux环境下定时器的实现方式可以参考拙作http://km.oa.com/group/29048/articles/show/279519(这个广告好硬啊)
  * 因为这一库使用switch case,所以恐怕使用者要被要求小心使用switch语法,最好彻底放弃使用switch,当然可以在库中使用goto来避免冲突
  * 你应该已经注意到,这一协程库要求所有yield必须在顶层线程函数。这是一个很强的限制。至于如何解决嘛,你当然可以想到使用longjump,不过以它的复杂度来说,已经非常不值得了,不如直接用ucontext。至于这些实现协程的方式,我会在下文中讨论。

  尽管有如此诸多限制,但这一技巧仍然足够有吸引力。另外它的宏定义中括号不匹配等问题是相当不符合规范的,因此请小心在正式场合下使用。事实上,原作protothread的针对场景是单片机等嵌入式环境,在那里甚至可能连一个操作系统都没有,程序只能串行执行,在这样的环境下能用如此简单的代码做出一个多线程的效果,想必是一个巨大的进步。

二、关于进程、线程、协程的思考
  好了,关于protothread以及switch case的奇技淫巧的介绍到此结束。你可能在怀疑,这一小伎俩怎么能算“协程”?这一技巧说白了只是轮询执行几个函数而已,那么,怎么去理解协程呢,什么又是进程和线程。在这里我谈一谈我的理解,有不对的地方请批评指正。
  我们为什么需要多进程/多线程/多协程呢?因为我们希望能在同一时间做多件事情,而cpu的处理速度又足够快,处理一件事绰绰有余,甚至需要cpu停下来等一些事。如《UNIX网络编程》中,前几章描述了使用系统API(不包括select/epoll)单进程单线程能做到的事情是多么有限。因为单进程单线程情况下,只能顺序的执行一条执行流(即使if else等跳转,也是顺序的执行同一个执行流),除了一个例外:信号,信号处理程序是独立于主执行流的,因此信号中要和多线程情况一样小心处理不可重入函数。那么如何实现多条执行流呢?
  执行流是有状态的,要想切换它,就要保存它的“状态”,或者叫“上下文”,或者叫“现场”。上下文主要指的是栈和寄存器,而切换时机主要有被动抢占&主动放弃。内核级进程线程当然很容易实现这些,有主动放弃的API,如sleep(),也有被动抢占的时机,如定时tick,又如中断或系统调用造成的陷入内核,在从内核返回用户态时也会顺便检查是否可以抢占进程,因为进入内核时已经做了现场保存,这里即将恢复,如果这个进程不久将被强制切走,不如现在直接去恢复别的进程运行。更详细和权威的内容可以参考《深入理解Linux内核》。操作系统内核之所以能做到这些,依赖于一些条件:操作系统可以提供API、操作系统以下的硬件定时器中断让操作系统可以定时检查一些事。那么要在用户态实现协程,思路无非是主动放弃&被动抢占了。主动放弃可以通过提供yield API实现,而理论上使用内核提供的定时器API可以实现被动抢占。然而被动抢占的复杂度要远远高于主动放弃了,并且在使用上也更复杂。举个例子:STL的map是线程不安全的,单线程多协程下,如果协程只会主动放弃运行,则我们清楚的知道一个协程在操作map的时候不会被中断,它只有显式放弃cpu时才会中断,这时多个协程共用一个map是不用加锁的,而如果协程切换有主动抢占功能,这个主动抢占基本必然依赖于操作系统定时器,这个定时器随时可能到来,完全有可能在map操作一半的时候被中断,这种情况下多协程共享map就要像多线程一样加锁。我想这就是目前的协程实现均未引入抢占式的原因,失远远大于得。
  上面一段说的有些混乱了,简单总结一下。裸机编程依赖于硬件,多核CPU可以执行多个执行流,硬件中断可以中断执行流。操作系统对裸机进行了抽象,利用硬件中断等手段做到应用层执行流切换与恢复,抽象出应用层可以并行多个执行流,并且有操作系统定时器可以中断执行流。在应用层,可以再做一层抽象,利用操作系统定时器等手段再做一层“子操作系统”。子操作系统的价值在于:陷入内核的进程/线程切换代价太大,并且多线程的线程不安全很不方便。因此“子操作系统”的定位应该是,切换快速轻量,并发安全。因为后者,所以协程库放弃了利用定时抢占协程。

三、协程的实现方式
  最后聊聊协程的多种实现方式,我能想到的三个,在前文都有提到过
  第一就是最普遍最强大的,利用汇编代码保存&恢复栈和寄存器,这种做法最靠谱,spplibcoucontext都采用这种做法。(ucontext我猜是这样,如果不对请指正)
  第二种做法就是本文提到的,使用c语言的奇技淫巧,实现无栈(stack-less)的协程,典型实现是protothread。优点是轻量,缺点是诸多限制,前文都有提到
  第三种做法是委托setjump&longjump来做到现场保存&恢复,感觉实用性不高。
参考资料
http://blog.csdn.net/qq910894904/article/details/41911175
http://www.oschina.net/translate/coroutines-in-c


如社区发表内容存在侵权行为,您可以点击这里查看侵权投诉指引