接上回的问题, 尝试用tcp发送请求和应答, 试了多次都是 只能发和收, 同时做不行.
原因应是: Starting from the 0.9.9 release,
the cosocket object here is full-duplex, that is, a reader "light
thread" and a writer "light thread" can operate on a single
cosocket object simultaneously (both "light threads" must belong to the same Lua handler though, see reasons above).
报错信息:
lua entry thread
aborted: runtime error: init_worker_by_lua:31: bad request
stack traceback:
coroutine 0:
[C]: in function 'send'
我怎么才能做到同时收发呢? 求教! 目前的代码如下
lua_shared_dict my_locks 100k;
init_worker_by_lua '
transaction = {};
local tcpsock;
local create_socket = function()
if not tcpsock then
tcpsock =
ngx.socket.tcp();
ngx.log(ngx.WARN,"try
connect...");
local
ok, err = tcpsock:connect("10.10.50.115",9013);
if
not ok then
ngx.log(ngx.ERR,"fail
to connect ",err);
tcpsock
= nil;
end;
tcpsock:settimeout(10000);
end;
end;
local lock = require "resty.lock";
local function save(key,value)
local
trans_lock = lock:new("my_locks");
trans_lock:lock(key);
transaction[key]=
{trans_lock};
return;
end;
function send_timer(premature,key,value)
create_socket();
local bytes, err =
tcpsock:send(key.."="..value);
ngx.log(ngx.WARN, bytes," bytes are
sent");
end;
function transaction.send(key,value)
ngx.log(ngx.WARN,"in transaction.send");
save(key,value);
ngx.timer.at(0,send_timer,key,value);
local lock = require "resty.lock"
local trans_lock =
lock:new("my_locks");
local elapsed, err = trans_lock:lock(key);
ngx.say(elapsed, ", ", err);
trans_lock:unlock(key);
local res = transaction[key][2];
transaction[key] = nil;
return res;
end;
local timer = function(premature)
while true do
if not premature then
create_socket();
-- ngx.log(ngx.WARN,"in timer");
local line, err, partial = tcpsock:receive()
if not line then
ngx.log(ngx.ERR, "failed to read a line: ", err);
-- res = string.find(err,".*timed out");
-- if not res then
-- tcpsock = nil;
-- end;
else
local _,_,k,v = string.find(line, "(%a+)%s*=%s*(%a+)")
ngx.log(ngx.WARN, "KEY IS "..k," VALUE IS "..v);
transaction[k][2]=v.." is set ok";
local ok, err = transaction[k][1]:unlock(k);
if not ok then
ngx.say("failed to unlock: ", err)
end;
end;
end;
end;
end;
ngx.timer.at(0,timer);
';
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location
/linsu {
content_by_lua
'
if
not transaction then
ngx.say "transaction is nil";
return;
end;
local
rsp = transaction.send(ngx.var.arg_key, ngx.var.arg_value);
ngx.say(rsp);
';
}
发件人: 林谡
发送时间: 2015年4月27日 16:20
收件人: 'openresty@googlegroups.com';
'age...@gmail.com'
抄送: 吴炳锡; 刘峰; 刘九星; 吕颖; 段牧; 高磊; 武志国
主题: 使用openresty的一个问题, 求教!
前段时间给agentzh
发了一个邮件描述了我们的一个需求如下:
“我们使用nginx做客户端web访问的接入, 后端各种service提供服务,service对外的接口是公司内部实现的异步rpc协议(所有请求可以共用一个tcp连接请求,每个请求有唯一的sequence标记, 应答异步, 访问者通过sequence来匹配 request,response); 希望的流程是Nginx收到http
request,处理模块 把请求转成内部rpc格式
通过一个tcp连接(所有处理共用的)向后端异步发,同时保持http
request上下文,当后端应答回来后, 要匹配到请求的上下文,然后给http response,
需求特别说明的是这个过程与redis,
mysql访问不同,它们还是一种同步的机制,一个连接上只能并发一个请求。请问,我们需要的这个机制ngx_lua能提供吗?
Agentzh:回复如下:
可以。你可以在 init_worker_by_lua 中通过 ngx.timer.at()
函数创建一个背景工作线程(还是“轻线程),然后所有的后端请求都委托给这个轻线程来分发。这个背景轻线程保持一个到后端的唯一连接。各个下游请求的处理程序(比如
content_by_lua)可以通过 lua-resty-lock [1] 来和工作轻线程同步。你可以使用每 nginx worker
进程一个后端连接的模式,也可以使用整个 nginx 实例一个后端连接的模式。对于后者,你需要通过共享内存字典来跨 worker
进程分发到后端的请求和应答数据,同时你需要确保只有一个 worker 能创建 timer(即背景工作线程)。
建议加入
openresty 中文邮件列表讨论这样的问题,谢谢合作!请见 http://openresty.org/#Community
按照这个思路我写的一个demo:
http{
lua_shared_dict my_locks 100k;
init_worker_by_lua '
transation = {};
local lock = require
"resty.lock";
local check_transation = function(premature,key)
if not premature then
ngx.log(ngx.WARN, "key is
"..key);
ngx.log(ngx.WARN,
"transation[key][2] is "..transation[key][2]);
local ok, err = transation[key][1]:unlock(key);
if not ok then
ngx.say("failed to unlock:
", err)
end
ngx.log(ngx.WARN,"in timer");
transation[key][3]=transation[key][2].." is set ok";
end
end
local function save(key,value)
local trans_lock = lock:new("my_locks");
trans_lock:lock(key);
transation[key]= {trans_lock,value};
ngx.timer.at(3,check_transation,key);
return;
end;
function transation.send(key,value)
save(key,value);
local lock
= require "resty.lock"
local
trans_lock = lock:new("my_locks");
local elapsed, err =
trans_lock:lock(key);
ngx.say(elapsed, ",
", err);
trans_lock:unlock(key);
local res = transation[key][3];
transation[key] = nil;
return res;
end;
';
server {
listen 80;
location /linsu
{
content_by_lua '
local rsp = transation.send(ngx.var.arg_key, ngx.var.arg_value);
ngx.say(rsp);
';
}
}
}
测试结果如下:
[root@Master-01 ~]# curl
'127.0.0.1/linsu?key=color&value=red'
3.011, nil
red is set ok
思路总结一下: 就是client要保存一对key&value, nginx异步处理告诉客户端处理成功了。
为了这个需求,我觉得需要解决2个方面的问题
1.
把所有请求从一个socket发出去和从同一个socket等应答回来;这个我在例子里还没有写,只是简单用一个timer来做异步回调处理,但我通过一些小实验觉得在init_worker_by_lua中 的send 方法创建一个socket, 只创建一次, send方法把请求发出去,然后一个ngx.timer.at 创建一个死循环方法,不停地socket.receive,接收rsp,这些应该没有问题,我会继续验证。
2.
content_by_lua所在的entrance thread怎么和异步应答同步的问题。
例子中我采用了agentzh的建议,使用“resty.lock”, 这个的确是成功的,如果socket的问题也解决了,那么我觉得最初的需求被满足了。
但是看resty.lock介绍,似乎里面用的是ngx.sleep, 通过step wise的思路来等锁被unlock, 这个感觉效率上有些低,
不是很满意。
另外我觉得entrance thread 也是一个coroutine, 但是通过实验我发现 entrance thread和ngx.thread.spawn出来的轻线程中如果yield, 其实只是停一下,很快被ngx_lua scheduler重新调起了, 无法停下来等什么事件发生。
我的需求是:entrance thread 把自己 (self= corouting.running())保存在一个全局table里,比如说替代我例子中的那个lock, 下一步yield,然后就真的停下来,ngx_lua scheduler不会重新调起这个thread,然后异步应答回来后,会找到这个self,self.resume(),entrance
thread继续运行把结果给client.
这个办法是我在java中最常用的办法,java中虽然没有coroutine这个东西,但是道理是相通的。
这个需求能有办法做到吗?
Thx all.