Hello, all
I am building an application with Redis and using the Pub/Sub feature.
I am publishing on several channels events at each 5 seconds. I also
subscribe to these channels from multiple clients (loops) in Lua.
A period of time after starting Redis, the events arrive to the clients
in a timely manner. But, after a while, the messages (events) don't
arrive one by one to the clients, but bulk, being delayed. They tend to
arrive 5-6 events at once, at every 30 seconds or so.
Is this a normal behavior? Do you have any idea why this would happen?
During the execution of this flow, some other Redis operations take
place.
Here is the code that handles the websocket feature, which uses the
pub/sub connection in a light thread.
Please do comment on the quality of this code (these are some
excerpts from a module):
local function
redis_connect(timeout)
local redis = require
"resty.redis"
local red = redis:new()
red:set_timeout(timeout*1000)
local ok, err =
red:connect("127.0.0.1", 6380)
if not ok then
return nil, "Failed to
connect to Redis: " .. err
else
return red
end
end
local function subscribe(ws,
hosts, user_id, cid, charts)
local red, err =
redis_connect(31556926) -- timeout Redis connection in a year
if not red then
return
end
for i, host in ipairs(hosts)
do
local channel =
"stats:"..host['id']..":*"
local res, err =
red:psubscribe(channel)
if not res then
ngx.log(ngx.ERR,
"Could not subscribe to channel(s) \""..channel.."\"; err: "..err)
return
end
end
while true do
local bytes, err =
red:read_reply()
if err then
ngx.log(ngx.ERR,
"Error reading reply: "..err)
return
end
if bytes then
-- here we do event
processing (Redis-related too, but on a separate connection with lower
timeout)
-- and printing
something on the web socket
end
end
end
function
_M.run_websocket(subscription, cid)
local server = require
"resty.websocket.server"
local wb, err = server:new{
timeout = 5000, -- in
milliseconds
max_payload_len = 65535,
}
if not wb then
ngx.log(ngx.ERR, "Failed
to create new websocket: ", err)
return ngx.exit(444)
end
local hosts = .... prepare some
data....
local charts = .... prepare some data....
local thread =
ngx.thread.spawn(subscribe, wb, hosts, subscription['user_id'], cid,
charts)
while true do
local data, typ, err =
wb:recv_frame()
if not data then
ngx.log(ngx.ERR,
"Failed to receive a frame: ", err)
ngx.thread.kill(thread)
return ngx.exit(444)
end
if typ == "close" then
-- send a close frame
back:
ngx.thread.kill(thread)
local bytes, err =
wb:send_close(1000, "connection will close")
if not bytes then
ngx.log(ngx.ERR,
"Failed to send the close frame: ", err)
return
ngx.exit(444)
end
return
elseif typ == "ping" then
-- send a pong frame
back:
local bytes, err =
wb:send_pong(data)
if not bytes then
ngx.log(ngx.ERR,
"Failed to send the pong frame: ", err)
ngx.thread.kill(thread)
return
ngx.exit(444)
end
elseif typ == "pong" or
(typ == "text" and string.lower(data) == 'ping') then
-- just discard the
incoming pong frame
elseif typ == "text" then
-- just "echo" back what I received
bytes, err =
wb:send_text("RECEIVED:\""..data.."\"")
if not bytes then
ngx.log(ngx.ERR,
"Failed to send a text frame: ", err)
ngx.thread.kill(thread)
return
ngx.exit(444)
end
end
end
end
Thank you
Bogdan
|