前段时间给agentzh
发了一个邮件描述了我们的一个需求如下:
“我们使用nginx做客户端web访问的接入, 后端各种service提供服务,service对外的接口是公司内部实现的异步rpc协议(所有请求可以共用一个tcp连接请求,每个请求有唯一的sequence标记, 应答异步, 访问者通过sequence来匹配 request,response); 希望的流程是Nginx收到http
request,处理模块 把请求转成内部rpc格式
通过一个tcp连接(所有处理共用的)向后端异步发,同时保持http
request上下文,当后端应答回来后, 要匹配到请求的上下文,然后给http response,
需求特别说明的是这个过程与redis,
mysql访问不同,它们还是一种同步的机制,一个连接上只能并发一个请求。请问,我们需要的这个机制ngx_lua能提供吗?
Agentzh:回复如下:
可以。你可以在 init_worker_by_lua 中通过 ngx.timer.at()
函数创建一个背景工作线程(还是“轻线程),然后所有的后端请求都委托给这个轻线程来分发。这个背景轻线程保持一个到后端的唯一连接。各个下游请求的处理程序(比如
content_by_lua)可以通过 lua-resty-lock [1] 来和工作轻线程同步。你可以使用每 nginx worker
进程一个后端连接的模式,也可以使用整个 nginx 实例一个后端连接的模式。对于后者,你需要通过共享内存字典来跨 worker
进程分发到后端的请求和应答数据,同时你需要确保只有一个 worker 能创建 timer(即背景工作线程)。
建议加入
openresty 中文邮件列表讨论这样的问题,谢谢合作!请见 http://openresty.org/#Community
按照这个思路我写的一个demo:
http{
lua_shared_dict my_locks 100k;
init_worker_by_lua '
transation = {};
local lock = require "resty.lock";
local check_transation = function(premature,key)
if not premature then
ngx.log(ngx.WARN, "key is
"..key);
ngx.log(ngx.WARN, "transation[key][2]
is "..transation[key][2]);
local
ok, err = transation[key][1]:unlock(key);
if not ok then
ngx.say("failed
to unlock: ", err)
end
ngx.log(ngx.WARN,"in
timer");
transation[key][3]=transation[key][2].."
is set ok";
end
end
local function save(key,value)
local
trans_lock = lock:new("my_locks");
trans_lock:lock(key);
transation[key]=
{trans_lock,value};
ngx.timer.at(3,check_transation,key);
return;
end;
function transation.send(key,value)
save(key,value);
local lock = require
"resty.lock"
local trans_lock =
lock:new("my_locks");
local elapsed, err = trans_lock:lock(key);
ngx.say(elapsed, ", ", err);
trans_lock:unlock(key);
local res = transation[key][3];
transation[key] = nil;
return res;
end;
';
server {
listen 80;
location /linsu {
content_by_lua '
local
rsp = transation.send(ngx.var.arg_key, ngx.var.arg_value);
ngx.say(rsp);
';
}
}
}
测试结果如下:
[root@Master-01 ~]# curl
'127.0.0.1/linsu?key=color&value=red'
3.011, nil
red is set ok
思路总结一下: 就是client要保存一对key&value, nginx异步处理告诉客户端处理成功了。
为了这个需求,我觉得需要解决2个方面的问题
1.
把所有请求从一个socket发出去和从同一个socket等应答回来;这个我在例子里还没有写,只是简单用一个timer来做异步回调处理,但我通过一些小实验觉得在init_worker_by_lua中 的send 方法创建一个socket, 只创建一次, send方法把请求发出去,然后一个ngx.timer.at 创建一个死循环方法,不停地socket.receive,接收rsp,这些应该没有问题,我会继续验证。
2.
content_by_lua所在的entrance thread怎么和异步应答同步的问题。
例子中我采用了agentzh的建议,使用“resty.lock”, 这个的确是成功的,如果socket的问题也解决了,那么我觉得最初的需求被满足了。
但是看resty.lock介绍,似乎里面用的是ngx.sleep, 通过step wise的思路来等锁被unlock, 这个感觉效率上有些低,
不是很满意。
另外我觉得entrance thread 也是一个coroutine, 但是通过实验我发现 entrance thread和ngx.thread.spawn出来的轻线程中如果yield, 其实只是停一下,很快被ngx_lua scheduler重新调起了, 无法停下来等什么事件发生。
我的需求是:entrance thread 把自己 (self= corouting.running())保存在一个全局table里,比如说替代我例子中的那个lock, 下一步yield,然后就真的停下来,ngx_lua scheduler不会重新调起这个thread,然后异步应答回来后,会找到这个self,self.resume(),entrance
thread继续运行把结果给client.
这个办法是我在java中最常用的办法,java中虽然没有coroutine这个东西,但是道理是相通的。
这个需求能有办法做到吗?
Thx all.