协程-无栈协程(下)
无栈协程库——protothread
ProtoThread源码如下所示:
#define LC_INIT(s) s = 0; #define LC_RESUME(s) switch(s) { case 0: #define LC_SET(s) s = __LINE__; case __LINE__: #define LC_END(s) } typedef unsigned short lc_t; //用于定义一个描述protothread实例的结构体,每一个无栈协程用这个结构体进行描述 struct pt { lc_t lc; }; /** 初始化一个protothread实例,无栈协程实例,核心就是将指令标签设置为0 */ #define PT_INIT(pt) LC_INIT((pt)->lc) /** * 这里用于定义一个protothread实例的接口,name_args是一个包含函数名和形参列表的字符串 * 且这个接口的返回值得是char型 */ #define PT_THREAD(name_args) char name_args /** * 用于定义一个protothread的起始执行位置,其实就是在prototype前面套了一个switch */ #define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc) /** * 用于界定protothread的终止位置,就是在后面加了一个},并对结构体进行初始化 */ #define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \ PT_INIT(pt); return PT_ENDED; } /** *阻塞直到条件为true,实际应用中返回PT_WAITING表示当前进程阻塞让出执行权,其他表示未被阻塞继续执行 */ #define PT_WAIT_UNTIL(pt, condition) \ do { \ LC_SET((pt)->lc); \ if(!(condition)) { \ return PT_WAITING; \ } \ } while(0) /** * 调度一个prototype协程,当返回PT_WAITING,表示调度器阻塞了,让出执行权限给里面的进程 */ #define PT_SCHEDULE(f) ((f) == PT_WAITING) /** * 让出执行权限,本质上就是在让出位置打一个标签,并直接return,把执行权限交给主调接口 */ #define PT_YIELD(pt) \ do { \ PT_YIELD_FLAG = 0; \ LC_SET((pt)->lc); \ if(PT_YIELD_FLAG == 0) { \ return PT_YIELDED; \ } \ } while(0) /** */ #define PT_YIELD_UNTIL(pt, cond) \ do { \ PT_YIELD_FLAG = 0; \ LC_SET((pt)->lc); \ if((PT_YIELD_FLAG == 0) || !(cond)) { \ return PT_YIELDED; \ } \ } while(0)
如上述代码段所示:
·protothread使用结构体struct pt描述一个协程,协程里面含有lc_t类型成员变量,本质上是一个unsigned short类型
·整个PT协程,在创建之前需要调用PT_INIT进行初始化,初始化之后调用PT_BEGIN拉起协程,协程运行完毕之后调用PT_END关闭协程
·ProtoThread通过PT_THREAD封装协程执行接口
·ProtoThread调用PT_WAIT_UNTIL阻塞,直到condition为true。在这里若是condition为false,表示不满足条件,直接通过return交出执行权限;在交出执行权限之前,调用LC_SET,查看LC_SET的代码,看到这里我们看PT是通过记录行号给源码打标签
·ProtoThread通过宏PT_SCHEDULE来实现协程的调度,通常调用PT_SCHEDULE的是主控协程,主控协程决策调度哪个协程之后通过PT_SCHEDULE进行调度
我们尝试用ProtoThread写一个多玩家登陆的代码,如下:
#include "pt.h" struct MessageBuffer { int change_flag; string content; } g_message_buffer; typedef struct RoleData { int id; int step; string name; pt thread_inst_pt; int recv_message; } tagRoleData; std::map<std::string,RoleData> g_role_set; void Timer() { printf("timer work\n"); return; } MessageBuffer recv_message() { MessageBuffer msg = g_message_buffer; reset_message(); return msg; } int receive_message(tagRoleData& data) { if(data.recv_message > 0) { data.recv_message = 0; return 1; } return 0; } int process_online_data(tagRoleData& data){ printf("process online name[%s] current step[%d]\n",data.name.c_str(),data.step); data.step += 1; return 0; } int process_profile_data(tagRoleData& data){ printf("process profile name[%s] current step[%d]\n",data.name.c_str(),data.step); data.step += 1; return 0; } static PT_THREAD(login_thread(tagRoleData& data)) { PT_BEGIN(&data.thread_inst_pt); while(data.step < 4) { PT_WAIT_UNTIL(&data.thread_inst_pt, receive_message(data)); process_online_data(data); PT_WAIT_UNTIL(&data.thread_inst_pt, receive_message(data)); process_profile_data(data); } PT_EXIT(&data.thread_inst_pt); PT_END(&data.thread_inst_pt); } fd_set fds; struct timeval tv; static char c[100] = {0}; static PT_THREAD(network_thread(struct pt *pt)) { FD_ZERO(&fds); FD_SET(0,&fds); PT_BEGIN(pt); while(1) { PT_WAIT_UNTIL(pt,select(1,&fds,NULL,NULL,&tv) > 0); read(0,c,100); g_message_buffer.content = string(c); g_message_buffer.change_flag = 1; if (g_role_set.find(g_message_buffer.content) == g_role_set.end()){ RoleData role_data; role_data.step = 0; role_data.name = g_message_buffer.content; PT_INIT(&role_data.thread_inst_pt); g_role_set.insert(std::pair<std::string,RoleData>(g_message_buffer.content,role_data)); } std::map<std::string,RoleData>::iterator role_iter = g_role_set.find(g_message_buffer.content); role_iter->second.recv_message = 1; PT_SCHEDULE(login_thread(role_iter->second)); } PT_EXIT(pt); PT_END(pt); } static struct timer codelock_timer, input_timer; static PT_THREAD(timer_thread(struct pt *pt)) { PT_BEGIN(pt); timer_set(&input_timer, 1000); PT_WAIT_UNTIL(pt, timer_expired(&input_timer)); PT_EXIT(pt); PT_END(pt); } static struct pt network_thread_pt; static struct pt timer_thread_pt; void Proc() { PT_INIT(&network_thread_pt); while(PT_SCHEDULE(network_thread(&network_thread_pt))) { PT_SCHEDULE(timer_thread(&timer_thread_pt)); sleep(1); } } int main() { Proc(); return 0; }
这其中:
·代码中定义了三个执行单元,一个是network_thread网络协程,一个是timer_thread定时协程,一个是login_thread登录协程;
·其中timer_thread协程负责定时器任务,network_thread负责消息接收并根据消息头拉起对应的登录协程login_thread,而login_thread对应不同的登录实体的登录行为;
·network_thread协程,PT_WAIT_UNTIL会“阻塞”直到文件句柄直到可读(这里我们用标准输入进行替代以便于验证);
·当读到消息之后,对于未开启流程的玩家创建一个协程,其他的则调度对应的协程(PT_SCHEDULE(login_thread(role_iter->second)))继续往后走;
·对于登录协程。需要多步通信过程,一个是需要等待取在线数据并处理(process_online_data),一个是需要取角色数据并处理(process_profile_data);
·在本例中,我们在RoleData中封装了pt类型的成员变量thread_inst_pt用于缓存协程的状态信息,而外层用name->RoleData的映射关系管理协程及其他协程中间态数据;
需要注意的是——以protothread来说:
·对于无栈协程来说,因为不存在指针等信息,所以无栈协程的所有信息是可以缓存在共享内存的,因此进程可以通过共享内存在重启的环境下,也不会导致协程中断;
·但是这种恢复也是有条件的,在protothread中是用行号进行协程恢复,若是用到协程的源文件的行号出现改变,则可能执行错乱,如下所示,假设中断前宏扩展后执行序列如下:
switch(Line){ case 0:{ state1-1; s=Line1-2; } case Line1-2:{ if(!cond){ return; } state1-3; s=Line1-3 } case Line1-3:{ if(!cond){ return; } state1-4; } }
当源码修改之后,可能宏扩展之后代码就变为:
switch(Line){ case 0:{ state2-1; s=Line2-2; } case Line2-2:{ if(!cond){ return; } state2-3; s=Line2-3 } case Line2-3:{ if(!cond){ return; } state2-4; } }
当Line1-xx和Line2-xx不相等的时候,会重新调度进来就会找不到行号了,引发执行流程错乱(所以在使用这类库的时候,应该将函数的实现和协程主流程分开,以避免因为逻辑修改导致协程不可恢复的场景);
对于无栈协程来说,执行流的恢复只是通过找到下一条指令的执行地址,但是不包括上下文,这意味着无栈协程里面不能有局部变量,需要我们手动把后面需要用到的局部变量缓存起来。
此外这里无栈协程是通过switch-case实现的,嵌套的switch-case会产生问题,限制比较多,所以也不适用于线上场景。
Label As Value
标签变量(labels as values)是GCC对C语言的扩展,是指我们可以通过操作符&&得到当前函数中定义的标签地址,这个值的类型是void*,并且是常量,我们可以在任何可以使用这个类型的常量处使用;如下:
#include "stdio.h" void* ptr = NULL; int Test() { printf("local:%d,global:%d:global2:%d\n",&&test_local,&&test_global,&&test_global2); if(NULL == ptr) { printf("here\n"); ptr = &&test_local; } goto *ptr; test_local: ptr = &&test_global; printf("local test %d\n",ptr); return 0; test_global: ptr = &&test_global2; printf("global test\n"); return 0; test_global2: ptr = &&test_local; printf("global2 test\n"); return 0; return 0; } int main() { Test(); Test(); Test(); return 0; }
执行完毕后有如下执行结果:
[ronaldwang@VM-98-253-centos ~/test/label_value_test]$ ./main local:4196026,global:4196069:global2:4196097 here local test 4196069 local:4196026,global:4196069:global2:4196097 global test local:4196026,global:4196069:global2:4196097 global2 test
受此启发,我们对protothread进行修改,可以得到如下代码:
typedef void * lc_t; #define LC_CONCAT2(s1, s2) s1##s2 #define LC_CONCAT(s1, s2) LC_CONCAT2(s1, s2) #define LC_RESUME(s) \ do { \ if(s != NULL) { \ goto *s; \ } \ } while(0) #define LC_SET(s,label) \ do { \ LC_CONCAT(label, __LINE__): \ (s) = &&LC_CONCAT(label, __LINE__); \ } while(0) //block until #define PT_WAIT_UNTIL(pt, label, condition) \ do { \ LC_SET((pt)->lc, label); \ if(!(condition)) { \ return PT_WAITING; \ } \ } while(0)
对于库文件的改造:
·阻塞命令PT_WAIT_UNTIL新增标签字段label,当阻塞时,我们不仅指明解除阻塞所需满足的条件,也指明解除阻塞后要执行的代码段
·调度的指令LC_RESUME,则是根据标签的地址直接跳转的对应代码去执行goto *s
则最终代码的使用样式如下:
static PT_THREAD(login_thread(tagRoleData& data)) { PT_BEGIN(&data.thread_inst_pt); while(data.step < 4) { PT_WAIT_UNTIL(&data.thread_inst_pt, online_label, receive_message(data)); online_label: process_online_data(data); PT_WAIT_UNTIL(&data.thread_inst_pt,profile_label, receive_message(data)); profile_label: process_profile_data(data); PT_WAIT_UNTIL(&data.thread_inst_pt, online_label2, receive_message(data)); online_label2: process_online_data(data); PT_WAIT_UNTIL(&data.thread_inst_pt, online_label3, receive_message(data)); online_label3: process_online_data(data); } PT_EXIT(&data.thread_inst_pt); PT_END(&data.thread_inst_pt); }
从这段代码可以看到:
·每段接口执行完毕后,都阻塞等待对应条件满足,并指明了阻塞解除后要执行的代码通过标签的形式展示出来。
问题
上述采取标签的形式还是解决不了重启后协程恢复的问题,因为标签在内存中的位置会在重新编译的时候地址出现变化,我们遵循标签的修改方式对原先的基于行号的代码进行修改,如下:
#define LC_SET(s, evt_id) s = evt_id; case evt_id: #define PT_WAIT_UNTIL(pt, evt_id, condition) \ do { \ LC_SET((pt)->lc, evt_id); \ if(!(condition)) { \ return PT_WAITING; \ } \ } while(0)
业务方可以通过如下形式进行使用:
using namespace std; struct MessageBuffer { int change_flag; string content; } g_message_buffer; typedef struct RoleData { int id; int step; string name; pt thread_inst_pt; int recv_message; } tagRoleData; std::map<std::string,RoleData> g_role_set; int receive_message(tagRoleData& data) { if(data.recv_message > 0) { data.recv_message = 0; return 1; } return 0; } int process_online_data(tagRoleData& data){ printf("process online name[%s] current step[%d]\n",data.name.c_str(),data.step); data.step += 1; return 0; } int process_profile_data(tagRoleData& data){ printf("process profile name[%s] current step[%d]\n",data.name.c_str(),data.step); data.step += 1; return 0; } #define MSG_ONLINE_RSP 1 #define MSG_PROFILE_RSP 2 #define MSG_ONLINE_RSP_2 3 #define MSG_ONLINE_RSP_3 4 static PT_THREAD(login_thread(tagRoleData& data)) { PT_BEGIN(&data.thread_inst_pt); while(data.step < 4) { PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP, receive_message(data)); process_online_data(data); PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_PROFILE_RSP, receive_message(data)); process_profile_data(data); PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP_2, receive_message(data)); process_online_data(data); PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP_3, receive_message(data)); process_online_data(data); } PT_EXIT(&data.thread_inst_pt); PT_END(&data.thread_inst_pt); } #define NETWORK_EVTID 200 fd_set fds; struct timeval tv; static PT_THREAD(network_thread(struct pt *pt)) { FD_ZERO(&fds); FD_SET(0,&fds); PT_BEGIN(pt); tv.tv_sec=0; tv.tv_usec=0; while(1) { PT_WAIT_UNTIL(pt, NETWORK_EVTID, select(1,&fds,NULL,NULL,&tv) > 0); read(0,d,100); memcpy(c,d,strlen(d)-1); g_message_buffer.content = string(c); g_message_buffer.change_flag = 1; if (g_role_set.find(g_message_buffer.content) == g_role_set.end()){ RoleData role_data; role_data.step = 0; role_data.name = g_message_buffer.content; PT_INIT(&role_data.thread_inst_pt); g_role_set.insert(std::pair<std::string,RoleData>(g_message_buffer.content,role_data)); } std::map<std::string,RoleData>::iterator role_iter = g_role_set.find(g_message_buffer.content); role_iter->second.recv_message = 1; PT_SCHEDULE(login_thread(role_iter->second)); } PT_EXIT(pt); PT_END(pt); } #define TIMER_EVTID 100 static PT_THREAD(timer_thread(struct pt *pt)) { PT_BEGIN(pt); timer_set(&input_timer, 1000); PT_WAIT_UNTIL(pt, TIMER_EVTID, timer_expired(&input_timer)); PT_EXIT(pt); PT_END(pt); } static struct pt network_thread_pt; static struct pt timer_thread_pt; void Proc() { PT_INIT(&network_thread_pt); while(PT_SCHEDULE(network_thread(&network_thread_pt))) { PT_SCHEDULE(timer_thread(&timer_thread_pt)); sleep(1); } } int main() { Proc(); return 0; }
由此可见:
·我们可以在协程让出执行权限的时候,指明要等待的事件,如PT_WAIT_UNTIL(pt, evt_id, condition)所示
·其他的如之前所示,在阻塞分支之前会按照等待的事件ID,新增一个case分支
·因为标签是我们自定义的,不会因为程序的重新编译发生变化,所以重启不会影响协程的恢复和执行