local data = "">
--ok, err = n:publish('l_heart', nil, data)
local function request_callback(msg)
ngx.say(msg.payload)
end
ok, err = n:request('rpc', data, request_callback)
n:wait(1)
ok, err = n:set_keepalive(0, 1024)
-- /usr/local/share/lua/5.1/lua_resty_nats.lua
local ngx_re = require "ngx.re"
local uuid = require "uuid"
local _M = {}
local mt = { __index = _M }
-- registration callback table
local r = {}
-- proto line re-use tables
local line_t = {}
local function create_inbox()
return '_INBOX.' .. uuid()
end
local function parse_message(line)
local res = ngx_re.split(line, [[\s+]], "oj", nil, nil, line_t)
local msg = {}
msg.type = string.upper(res[1])
if msg.type == "MSG" then
msg.subject = res[2]
msg.sid = res[3]
-- TODO check this logic
if res[5] == nil then
msg.len = tonumber(res[4])
else
msg.reply_to = res[4]
msg.len = tonumber(res[5])
end
end
return msg
end
function _M.subscribe(self, subject, callback, sid)
if not self.connected then
return false, "not connected"
end
if type(subject) ~= "string" then
return false, "subject is not a string"
end
if type(callback) ~= "function" then
return false, "callback is not a function"
end
if sid and type(sid) ~= "string" then
return false, "sid is not a string"
end
if not sid then
sid = uuid()
end
if r[sid] then
return false, "already subscribed with sid " .. sid
end
local _, err = self.sock:send(string.format("SUB %s %s\r\n", subject, sid))
if err then
return false, err
end
r[sid] = callback
return sid
end
function _M.unsubscribe(self, sid)
if not self.connected then
return false, "not connected"
end
if type(sid) ~= "string" then
return false, "sid is not a string"
end
local _, err = self.sock:send(string.format("UNSUB %s\r\n", sid))
if err then
return false, err
end
r[sid] = nil
return true
end
function _M.publish(self, subject, reply_to, payload)
if not self.connected then
return false, "not connected"
end
local msg
if reply_to then
msg = string.format("PUB %s %s %d\r\n%s\r\n", subject, reply_to, #payload, payload)
else
msg = string.format("PUB %s %d\r\n%s\r\n", subject, #payload, payload)
end
local _, err = self.sock:send(msg)
if err then
return false, err
end
return true, ""
end
function _M.connect(self)
self.sock:settimeout(self.timeout)
local ok, err = self.sock:connect(self.host, self.port)
if not ok then
return false, err
end
-- initial INFO packet
local data, err = self.sock:receive()
if not data then
return false, err
end
-- TODO authorize, CONNECT
self.connected = true
return true
end
local function ping_task(premature, self)
if premature then
return
end
local _, err = self.sock:send("PING\r\n")
ngx.say(self.socket)
ngx.timer.at(5, ping_task, self)
end
function _M.set_keepalive(self, ...)
if not self.connected then
return false, "not connected"
end
return self.sock:setkeepalive(...)
end
function _M.new(opts)
local m = {
sock = ngx.socket.tcp(),
timeout = opts.timeout,
host = opts.host,
port = opts.port,
}
return setmetatable(m, mt)
end
local cid
function _M.request(self, subject, payload, callback)
if not self.connected then
return false, "not connected"
end
local inbox = create_inbox()
cid = self.subscribe(self, inbox, function(message)
self.unsubscribe(self, cid)
callback(message)
end)
self.publish(self, subject, inbox, payload)
return unique_id, inbox
end
function _M.wait(self, quantity)
quantity = quantity or 0
local count = 0
repeat
local data, err = self.sock:receive()
if not data then
ngx.log(ngx.WARN, err)
break
end
local msg = parse_message(data)
if msg.type == "PING" then
local _, err = self.sock:send("PONG\r\n")
if err then
ngx.log(ngx.WARN, err)
break
end
elseif msg.type == "MSG" then
local payload, err = self.sock:receive(msg.len)
if not payload then
ngx.log(ngx.WARN, err)
end
msg.payload = payload
count = count + 1
-- discard trailing newline
local _, err = self.sock:receive(2)
if err then
ngx.log(ngx.WARN, err)
end
if r[msg.sid] then
r[msg.sid](msg)
end
end
until quantity > 0 and count >= quantity
end
function _M.close(self)
if not self.connected then
return false, "not connected"
end
return self.sock.close()
end