你的代码这里有bug:
if not producer_type then
local producer_type = "async"
end
并没有实际修改 producer_type 的值。
在 2017年12月7日 下午3:45, <ch...@sina.com> 写道:
> 如果需要额外打开定时器,封装和不封装的两种模式都不可以用才对;
>
> 或者,已经打开了定时器,应该两种模式都可以用;
>
> 我只是将函数简单封装了一下,方便下次直接调用,并没有太多的操作在里面;
>
> 看起来只是多了一层函数调用关系;
>
> 不是很理解为什么就不能使用了;
>
> 在 2017年12月7日星期四 UTC+8下午3:28:58,tokers写道:
>>
>> Cosocket 没法在 log 阶段使用,lua-resty-kafka 应该是额外开定时器,在定时器里使用 cosocket 的。
>>
>> On Thursday, December 7, 2017 at 3:26:53 PM UTC+8, che...@sina.com wrote:
>>>
>>> daemon 1:
>>>
>>> log_by_lua_file 使用下面文件内容,可以往kafka里面写数据
>>>
>>> local cjson = require "cjson"
>>> local client = require "resty.kafka.client"
>>> local producer = require "resty.kafka.producer"
>>>
>>> local broker_list = {
>>> { host = "192.168.80.212",port = 9092},
>>> }
>>>
>>> local args = ngx.var.args or 'parameter null'
>>> local msg = args
>>> local topic = 'hehe'
>>>
>>> local bp = producer:new(broker_list, { producer_type = "async"})
>>>
>>> local ok, err = bp:send(topic, nil, msg)
>>>
>>> if not ok then
>>> error_msg = '<Msg: send [',msg,'] to kafka faild! Reason: [',err,']>'
>>> ngx.log(ngx.ERR,error_msg)
>>> return nil, error_msg
>>> end
>>>
>>>
>>> ------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>>
>>> 以下为自己简单封装了一下的文件
>>>
>>> local cjson = require "cjson"
>>> local client = require "resty.kafka.client"
>>> local producer = require "resty.kafka.producer"
>>>
>>> local _M = {
>>> _VERSION = '1'
>>> }
>>>
>>> function _M.SendMsgToKafka(opts)
>>> -- check producer_type
>>> local producer_type = opts.producer_type
>>> if not producer_type then
>>> local producer_type = "async"
>>> end
>>>
>>> -- check topic
>>> local topic = opts.topic
>>> if not topic then
>>> return nil, "\"topic\" option required"
>>> end
>>>
>>> -- check msg
>>> local msg = opts.msg
>>> if not msg then
>>> return nil, "\"msg\" option required"
>>> end
>>>
>>> -- check broker_list
>>> local broker_list = opts.broker_list
>>> if not broker_list then
>>> return nil, "\"broker_list\" option required"
>>> end
>>>
>>> local bp = producer:new(broker_list, { producer_type = producer_type})
>>>
>>> local ok, err = bp:send(topic, nil, msg)
>>>
>>> if not ok then
>>> error_msg = '<Msg: send [',msg,'] to kafka faild! Reason: [',err,']>'
>>> ngx.log(ngx.ERR,error_msg)
>>> return nil, error_msg
>>> end
>>>
>>> end
>>>
>>> return _M
>>>
>>>
>>>
>>>
>>> -----
>>>
>>> 以下文件引用上诉自己封装的文件就会抛出异常, API disabled in the context of log_by_lua
>>>
>>> local kafka_producer = require "Lua_Kafka_Producer.lua_kafka_producer"
>>>
>>> local args = ngx.var.args or 'parameter null'
>>> local msg = args
>>>
>>> local ok, err = kafka_producer.SendMsgToKafka{
>>> topic = "hehe",
>>> msg = msg,
>>> broker_list = {
>>> {host = "192.168.80.212",port=9092},
>>> },
>>> }
>>>
>>>
>>>
>>>
>>> 附上异常信息:
>>> 2017/12/07 14:08:54 [error] 61209#0: *529003 failed to run log_by_lua*:
>>> /usr/local/openresty/lualib/resty/kafka/broker.lua:26: API disabled in the
>>> context of log_by_lua*
>>> stack traceback:
>>> [C]: in function 'tcp'
>>> /usr/local/openresty/lualib/resty/kafka/broker.lua:26: in function
>>> 'send_receive'
>>> /usr/local/openresty/lualib/resty/kafka/client.lua:139: in function
>>> '_fetch_metadata'
>>> /usr/local/openresty/lualib/resty/kafka/client.lua:200: in function
>>> 'fetch_metadata'
>>> /usr/local/openresty/lualib/resty/kafka/producer.lua:108: in function
>>> 'choose_partition'
>>> /usr/local/openresty/lualib/resty/kafka/producer.lua:356: in function
>>> 'send'
>>> ...ualib/StartOps/Lua_Kafka_Producer/lua_kafka_producer.lua:40: in
>>> function 'SendMsgToKafka'
>>> ...b/StartOps/Lua_Kafka_Producer/lua_kafka_qos_Producer.lua:5: in
>>> function <...b/StartOps/Lua_Kafka_Producer/lua_kafka_qos_Producer.lua:1>
>>> while logging request, client: 127.0.0.1
>>>
>>>
>>> 同样是调用kafka模块,为什么会体现不一样,kafka模块都调用了tcp函数信息
>>>
>>> 求解
>>>
>>>
> --
>