在此输入代码...local server = require("commons.websocket.server")
local cjson = require("cjson.safe")
local config = require("lapis.config").get()
local util = require("commons.util")
local main_service = require("model.service.main_service")
local class = require("commons.middleclass")
local memory_monitor = require("model.memory_monitor")
local thread_factory = require("model.thread_factory")
local monitor = memory_monitor:new()
local get_msg = main_service.get_msg
local post_msg = main_service.post_msg
local user_offline_handler = main_service.user_offline_handler
local user_online_handler = main_service.user_online_handler
local wb, err = server:new{
timeout = 300 * 1000, -- in milliseconds
max_payload_len = 65535,
}
if not wb then
ngx.log(ngx.ERR, "failed to new websocket: ", err)
return ngx.exit(ngx.HTTP_CLOSE)
end
local thread = thread_factory:new(monitor)
monitor:add_table_to_monitor("wb",wb)
monitor:add_table_to_monitor("thread class ",thread)
--websocket close code
local CLIENT_CLSOE_CODE = 1000
local PROTOCOL_ERR_CODE = 1002
local SERVER_CLOSE_CODE = 1008
local args = ngx.req.get_uri_args()
local identity_id = args["identity_id"]
local login_id = args["login_id"]
local login_time = args["login_time"]
ngx.ctx.HOST_NAME = ngx.var.hostname
--close the websocket , releases the thread
local function ws_close(code,msg)
if wb == nil then
return
end
local code = CLIENT_CLSOE_CODE or code
ngx.log(ngx.INFO,"close websocket")
local bytes, err =wb:send_close(code, msg)
thread:kill_threads()
wb=nil
end
--Use websocket to send data to the client
local function ws_send_binary(bin,success_fn,error_fn)
if wb == nil then
return
end
local bytes,err = wb:send_binary(bin)
if bytes == nil then
ws_close(CLIENT_CLSOE_CODE,"client send close !")
ngx.log(ngx.ERR,err)
return false
end
return true
end
--Responsible for processing the return data
local function data_handler(data)
local state = true
if data == nil then
return true
end
local _ ,err = pcall(function (data)
local tab = cjson.decode(data)
monitor:add_table_to_monitor("tab",tab)
if util.is_empty(tab) or util.is_empty(tab["status"]) then
state = true
return
end
for _,_data in ipairs(tab["data"]) do
local _data = string.gsub(_data, "\n", "")-- Python base64 encode contains \n ,
--but ngx.decode_base64 function not support \n
local ret = ws_send_binary(ngx.decode_base64(_data))
if not ret then
state = false
return
end
end
-- To determine whether to close websocket
local status = tab["status"]
if tonumber(status) == SERVER_CLOSE_CODE or
tonumber(status) == PROTOCOL_ERR_CODE then
local err = tab["error"]
if util.is_empty(err) then
err = "server closed"
end
ws_close(status,err)
state = false
return
end
state = true
end,data)
if err then
ngx.log(ngx.ERR,err)
return false
end
return state
end
--loop to get msg from logic layer
local function push_msg(...)
local argv = { ... }
local lambda_name = argv[1]
monitor:add_table_to_monitor("argv",argv)
while true do
if wb == nil then
break
end
-- local args = {}
local data = "">id,login_id,login_time,monitor)
local ret = data_handler(data)
if not ret then
break
end
collectgarbage("collect")
end
end
----------------------Create get msg thread , start-----------------------------------
local ret = thread:spawn_thread(push_msg,"get_msg_from_sqs")
if ret then
ret = thread:spawn_thread(push_msg,"get_msg_from_redis")
end
if not ret then
-- If the fetch data data thread creates a failure, releases the thread, closes the link, logs the error log
thread:kill_threads()
ws_close(CLIENT_CLSOE_CODE,"client send close !")
ngx.log(ngx.ERR,"create get msg thread fail ")
return ngx.exit(ngx.HTTP_CLOSE)
end
----------------------Create get msg thread , end----------------------------------
user_online_handler("report_conn_status",identity_id,login_id,login_time,monitor)
--Responsible for processing the data from client send to server
local function wb_recv()
local ret = true
local _,err = pcall(function()
local data, typ, err = wb:recv_frame()
if err then
ret = false
return
end
if typ == "close" or typ == nil then
ret = false
elseif typ == "ping" then
-- send a pong frame back:
local bytes, err = wb:send_pong(data)
else
local ret_data = post_msg("receive_msg",identity_id,login_id,login_time,data,monitor)
local ret = data_handler(ret_data)
if not ret then
ret = false
end
end
end)
if err then
ngx.log(ngx.ERR,err)
return
end
return ret
end
while true do
if wb == nil then
break
end
local ret = wb_recv()
if not ret then
break
end
end
user_offline_handler("report_conn_status",identity_id,login_id,login_time,monitor)
ws_close(CLIENT_CLSOE_CODE,"client send close !")
thread = nil
monitor:mem_leaks_monitoring()