local function im_recv_loop(userid,chatServer)
ngx.log(ngx.CRIT,'create thread ,userid: ',userid)
while true do
local pack,err = chatServer:recvFromIMServer( userid )
if not pack then
ngx.log(ngx.ERR,userid,',recv bad pack skip : ',err)
break
end
local bytes, err = chatServer:sendToClient(pack)
if not bytes then
ngx.log(ngx.ERR, "failed to send text: ", err)
break
end
end
ngx.log(ngx.ERR,userid,',wb closing sock')
chatServer:close()
-- chatServer close 内部 关闭 上游的tcp ,同时关闭
end
local function ws_recv_loop(wb,userid,token)
local chatServer = chatServer:new(userid,wb)
local ret = chatServer:startLogin(token,userid)
if not ret then
chatServer:close()
return
end
local push_thread = ngx_thread_spawn(im_recv_loop,userid,chatServer)
-- 作为收报的时间处理
local _type,_err = ''
while true do
local data, typ, err = wb:recv_frame()
if wb.fatal then
_type = 'fatal'
ngx.log(ngx.ERR, userid," failed to receive frame: ", err)
break
end
local timeout = false
if ( err and sub(err,-7) =='timeout') then
timeout = true
end
if not err or timeout==false then
ngx.log(ngx.CRIT,typ,',userid: ',userid, ",data: ", data,',err: ',err)
end
_type = typ
-- 持续收报
if timeout==true then
end
if typ == "close" then
ngx.log(ngx.ERR,userid,',',typ, ",data:", data,',err:',err)
break
elseif typ == "text" then
chatServer:recvFromClient(data)
elseif typ == 'ping' then
local bytes, err = wb:send_pong('')
elseif typ == 'pong' then
elseif typ == 'continuation' then
elseif typ == 'binary' then
end
end
ngx.log(ngx.CRIT,userid,',typ:',_type,',logout')
chatServer:close()
-- chatServer close 之后,会在内部标记 wb_closed,
-- 是的 chatServer recvFromIMServer 返回nil,终止 im_recv_loop线程。
ngx.thread.wait(push_thread)
end