hi 春哥&各位,在用openresty做redis pub/sub时部是报 :app.lua:37: attempt to call method 'subcribe' (a nil value)
不知道啥原因
麻烦大神帮看看,thanks ,整体代码如下:
--
-- Created by IntelliJ IDEA.
-- User: Administrator
-- Date: 2016/11/3
-- Time: 16:06
-- To change this template use File | Settings | File Templates.
--
local semaphore = require('ngx.semaphore')
local redis = require('resty.redis')
local server = require('resty.websocket.server')
local strings = require('stringutil')
return function()
local M = {
client_id = nil,
closed = false
}
--[[
开启一个协程subscribe
]]
local function listen()
local red, err = redis:new()
if not red then
ngx.log(ngx.ERR, 'failed to new sub redis: ', err)
return
end
red:set_timeout(1000)
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.log(ngx.ERR, 'failed to connect to sub redis: ', err)
return
end
ngx.log(ngx.ERR, " success to connect to sub redis 1:")
M.sub_redis = red
--red:subcribe("lua_test_channel");
--开新线程监听订阅,主循环结束时要等待其结束
return ngx.thread.spawn(function()
ngx.log(ngx.ERR, " success to connect to sub redis 2:"..type(M.sub_redis.subscribe))
M.sub_redis:subcribe("lua_test_channel");--这里会报异常
M.sema:post(1)
ngx.log(ngx.ERR, " success to connect to sub redis 3:")
while not M.closed do
ngx.log(ngx.ERR, " success to connect to sub redis 4:")
local ret, err = M.sub_redis:read_reply()
if not ret and err ~= 'timeout' then
ngx.log(ngx.ERR, 'failed to read reply: ', err)
-- M.close()
end
if ret and ret[1] == 'message' then
-- local msg = ret[3]
local bs, err = M.sock:send_text(ret[3])
if not bs then
ngx.log(ngx.ERR, 'failed to send text: ', err)
--M.close()
end
end
end
end)
end
M.start = function()
-- ngx.say("hello lua")
local sema, err = semaphore.new()
if not sema then
ngx.log(ngx.ERR, 'failed to create semaphore: ', err)
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
end
M.sema = sema
--[[
1. 主协程监听websocket,收到消息后publish到redis
2. 另开一个协程subscribe redis channel
]]
local sock, err = server:new{
timeout = 3000, -- in milliseconds
max_payload_len = 8192,
}
--当客户端意外关闭连接时的回调
--[[ local ok, err = ngx.on_abort(function()
ngx.log(ngx.ERR, "closed", err)
end)
if not ok then
ngx.log(ngx.ERR, 'failed to register the on_abort callback: ', err)
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
end]]
--创建 socket对象
if not sock then
ngx.log(ngx.ERR, "create websocket server error", err)
ngx.exit(ngx.HTTP_CLOSE) --异常退出。这里只是退出当前请求,不是nginx整体退出
end
--赋值到M对象
M.sock = sock
--创建用来puhblis的redis连接
local pub_redis, err = redis:new()
if not pub_redis then
ngx.log(ngx.ERR, 'failed to new redis: ', err)
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
end
--redis连接
pub_redis:set_timeout(1000)
local ok, err = pub_redis:connect("127.0.0.1", 6379)
if not ok then
ngx.log(ngx.ERR, 'failed to connect to redis: ', err)
ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
end
M.pub_redis = pub_redis
--主协程
while not M.closed do
if M.sub_redis == nil then
listen() --订阅
M.sema:wait(3)
end
local data, typ, err = sock:recv_frame()
if sock.fatal then
ngx.log(ngx.ERR, "failed to receive frame: ", err)
return ngx.exit(444)
end
ngx.log(ngx.ERR, " do type:")
--没有data 发送ping
if not data then
local bytes, err = sock:send_ping()
if not bytes then
ngx.log(ngx.ERR, "failed to send ping:", err)
return ngx.exit(444)
end
-- close
elseif typ == "close" then break
--pong命令
elseif typ == "ping" then
local bytes, err = sock:send_pong()
if not bytes then
ngx.log(ngx.ERR, "failed to send pong:", err)
return ngx.exit(444)
end
--client pong
elseif typ == "pong" then
ngx.log(ngx.INFO, "client ponged")
--握手完成,接收到业务数据
elseif typ == "text" then
local result = strings().split(data, "#")
M.client_id = result[1]
local bytes, err = sock:send_text(data)
if not bytes then
ngx.log(ngx.ERR, "failed to send text:", err)
return ngx.exit(444)
end
pub_redis:publish("lua_test_channel", data)
end
end
end
return M
end