function _M.get_messages(queue, receiver, start, max, retry_num)
assert(start>=0)
assert(max>=1)
local result = {}
local index = string.format('%s:%d,%d', queue, start, max)
log(INFO, index)
local db = database.connect()
if not db then
return 500, 'failed to connect to mysql'
end
for id = start,start+max-1 do
local message = cache_message:get(queue..':'..id)
if message then
table.insert(result, message)
else
-- 1. select from mysql if cache miss
log(INFO, 'message #', id, ' miss, query message ', index)
local sql = string.format('select * from %s_msg where id >= %d limit %d', queue, start, max)
log(INFO, 'sql: ', sql)
local err, errcode, sqlstate
result, err, errcode, sqlstate = db:query(sql)
assert(next(result))
-- print('res: ', inspect(result))
if not result then
log(ERR, "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
return 500, 'mysql error'
end
set_messages(queue, result)
break
end
end
-- 2. insert result into mysql
local values = {}
local sql
if retry_num > 0 then
for i, message in ipairs(result) do
table.insert(values, string.format("(%d, '%s')", message['id'], receiver))
end
sql = string.format([[insert into %s_rst(m_id, receiver)
values%s]], queue, table.concat(values, ','))
else
-- handle the special case when retry_num is 0
for i, message in ipairs(result) do
table.insert(values, string.format("(%d, '%s', 'failed')", message['id'], receiver))
end
sql = string.format([[insert into %s_rst(m_id, receiver, status)
values%s]], queue, table.concat(values, ','))
end
log(INFO, 'sql: ', sql)
local res, err, errcode, sqlstate = db:query(sql)
-- print('res: ', inspect(res))
if not res then
log(ERR, "bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
return 500, 'mysql error'
end
-- 3. update queue.recv.last and queue.recv.processing
set_last_id(queue, receiver, result[#result]['id'])
if retry_num > 0 then
update_processing_num(queue, receiver, #result)
end
database.keepalive(db)
return 200, result
end