--生产者
function p ()
while true do
local bean, err = beanstalkd:new()
if not bean then
ngx.say("failed to init beanstalkd:", err)
return
end
--ngx.say("initialized ok")
local ok, err = bean:connect()
if not ok then
ngx.say("failed to connect beanstalkd:", err)
return
end
--ngx.say("connect ok")
-- use tube
local ok, err = bean:use("smallfish")
if not ok then
ngx.say("failed to use tube:", err)
end
--读文件内容
local f = assert(io.open("./d.txt",r))
local t = f:read("*all")
-- put job
local id, err = bean:put(t)
if not id then
ngx.say("failed to put hello to smallfish tube, error:", err)
end
f:close()
coroutine.yield()
end
end
--消费者
function c ()
while true do
local bean, err = beanstalkd:new()
if not bean then
ngx.say("failed to init beanstalkd:", err)
return
end
--ngx.say("initialized ok")
local ok, err = bean:connect()
if not ok then
ngx.say("failed to connect beanstalkd:", err)
return
end
--ngx.say("connect ok")
-- watch tube
local ok, err = bean:watch("smallfish")
if not ok then
ngx.say("failed to watch tube smallfish, error:", err)
return
end
ngx.say("watch smallfish tube ok, tube size:", ok)
local id, data = "">
if not id then
ngx.say("reserve hello failed, error:", id, data)
else
ngx.say("reserve hello ok, id:", id, "data:", data)
-- delete job
local ok, err = bean:delete(id)
if ok then
ngx.say("delete ok, id:", id)
else
ngx.say("delete failed, id:", id, ok, err)
end
end
--发送文件内容
wb:send_text(data)
coroutine.yield()
end
end
p = coroutine.create(p)
c = coroutine.create(c)
while true do
ngx.sleep(5)
coroutine.resume(p)
coroutine.resume(c)
end