local uid = ngx.var.uid
local concat = table.concat
local dis = require "dispatcher"
local sessionId = dis:getSessionId()
local sema = dis:getSemaphore(sessionId)
local user = ngx.shared.user
user:set(concat({"user_",uid}),sessionId)
ngx.log(ngx.ERR,"new websocket ",sessionId," workerId ",workerId)
dispatcher 模块代码是
local semaphore = require "ngx.semaphore"
local concat = table.concat
local user = ngx.shared.user
local workerId = ngx.worker.id()
local ok, new_tab = pcall(require, "table.new")
if not ok or type(new_tab) ~= "function" then
new_tab = function (narr, nrec) return {} end
end
local _M = new_tab(0, 155)
_M._VERSION = '0.1'
local mt = { __index = _M }
local semaMap = {}
local messageList = {}
local incrId = 0
local msgId = 0
local prefix = "msg_"
local smdList = {
ngx.shared.smd_1,
ngx.shared.smd_2
}
local smd = smdList[workerId+1]
smd:set('id',0)
--给新建立的连接生成唯一id
function _M.getSessionId(self)
incrId = incrId + 1
return (workerId + 1) * 100000 + incrId
end
--根据sessionId返回对应worker Id
function _M.getWorkerId(self,sessionId)
local workerId = (sessionId - (sessionId % 100000)) / 100000
return workerId - 1
end
--根据sessionId返回 semaphore 对象
function _M.getSemaphore(self,sessionId)
if not semaMap[sessionId] then
semaMap[sessionId] = semaphore.new(0)
end
ngx.log(ngx.ERR," semaphore ",sessionId)
return semaMap[sessionId]
end
function _M.dispatch(self,sessionId,message)
local msg = ""
if messageList[sessionId] then
insert(messageList[sessionId],message)
semaMap[sessionId]:post(1)
msg = concat({"wake up ",workerId," ",sessionId})
else
local wId = self:getWorkerId(sessionId)
local _smd = smdList[wId+1]
local id = _smd:incr('id',1,0)
local key = concat({prefix,id})
_smd:set(key,message,120,sessionId)
end
ngx.log(ngx.ERR,msg)
end
--分发消息
function _M.dispatchMessage(self)
local _id = tonumber(smd:get("id")) or 0
if _id > msgId and _id > 0 then
for i=msgId,_id do
local key = concat({prefix,i})
local message,sessionId = smd:get(key)
ngx.log(ngx.ERR,sessionId," ",message)
if message and sessionId then
if semaMap[sessionId] ~= nil then
ngx.log(ngx.ERR," wake up ",sessionId)
semaMap[sessionId]:post(1)
else
ngx.log(ngx.ERR," no semaphore ",sessionId)
end
end
smd:delete(key)
end
msgId = _id
end
end
return _M
init_worker_by_lua_file file/push_worker_init.lua; #代码为
local d = require "dispatcher"
local function dispatchLooper()
d:dispatchMessage()
ngx.timer.at(3,dispatchLooper)
end
dispatchLooper()