问个pull change的问题。我试着使用semaphore,可是如果我做了 local ngx_sem = require "resty.core.semaphore",他会抱怨没有这个semaphore.so (),
2015/12/20 18:06:00 [error] 15881#0: *2 lua entry thread aborted: runtime error: ....3.2/bundle/lua-bid-proxy/lib/resty/bid_proxy/common.lua:1: module 'resty.core.semaphore' not found:
no field package.preload['resty.core.semaphore']
no file '/home/user/ansible/nginx/ngx_openresty-1.9.3.2/bundle/lua-bid-proxy/lib/resty/core/semaphore.lua'
no file '/opt/openresty/lualib/resty/core/semaphore.lua'
no file '/opt/openresty/lualib/resty/core/semaphore/init.lua'
no file './resty/core/semaphore.lua'
no file '/opt/openresty/luajit/share/luajit-2.1.0-beta1/resty/core/semaphore.lua'
no file '/usr/local/share/lua/5.1/resty/core/semaphore.lua'
no file '/usr/local/share/lua/5.1/resty/core/semaphore/init.lua'
no file '/opt/openresty/luajit/share/lua/5.1/resty/core/semaphore.lua'
no file '/opt/openresty/luajit/share/lua/5.1/resty/core/semaphore/init.lua'
no file '/opt/openresty/lualib/resty/core/semaphore.so'
no file './resty/core/semaphore.so'
no file '/usr/local/lib/lua/5.1/resty/core/semaphore.so'
no file '/opt/openresty/luajit/lib/lua/5.1/resty/core/semaphore.so'
no file '/usr/local/lib/lua/5.1/loadall.so'
no file '/opt/openresty/lualib/resty.so'
no file './resty.so'
no file '/usr/local/lib/lua/5.1/resty.so'
no file '/opt/openresty/luajit/lib/lua/5.1/resty.so'
no file '/usr/local/lib/lua/5.1/loadall.so'
但如果去掉这一行当invode ngx.semaphore.new(),ngx.semaphore是nil (....3.2/bundle/lua-bid-proxy/lib/resty/bid_proxy/common.lua:25: attempt to index field 'semaphore' (a nil value))
下面是我pull change的步骤,请问应该如何从github上pick这个pull request?
- 用openresty image from https://openresty.org/download/ngx_openresty-1.9.3.2.tar.gz
- 把bundle/ngx_lua-0.9.19换成ngx_lua-0.9.20 (https://github.com/openresty/lua-nginx-module/archive/v0.9.20.tar.gz)
- 然后我把下面link里面的5个文件(config, src/ngx_http_lua_common.h, src/ngx_http_lua_module.c, src/ngx_http_lua_semaphore.c, src/ngx_http_lua_semaphore.h)复制过去
https://github.com/cuiweixie/lua-nginx-module
On Saturday, December 19, 2015 at 6:42:49 PM UTC-8, xie cui wrote:
1.最好一个worker一个长连接,ngx.
semaphore不能跨worker唤醒协程(如果需要也有些方法,可以问@doujang), 最好不要要worker之间的交互,通过ngx.semaphore可以发现新的请求,也可以通过ngx.semapore唤醒等待的请求,如果使用ngx.semapore不需要用户(你)调用yield.2.upstream timeout ,就唤醒等待的请求并通知它超时了(建议)
4.啥时都可以,看怎么方便
在 2015年12月20日星期日 UTC+8上午9:34:04,黄川写道:
我用按照春哥建议用Lua写了一下,但还是对有些问题。
1. 对一个upstream, 所有worker共用一个长链接(还是最好一个worker 对upstream有一个长链接),如果使用init_worker_by_lua建立的thread来做dispatch,dispatching worker和request如何交互比较合适?
举个例子,worker w1, w2分别有request r1, r2进来, thread t1,t2分别是每worker launch的处理tcp交互的线程
- 他们最好如何发现新的请求r1 & r1(like polling 每worker queue)?
- 当t1,t2 用sock:recvuntil收到回复,如何notify w1,w2 finalize request?
- 还有w1, w2 在content_by_lua_block要如何yield等待回复?
2. 还有就是如何处理timeout,假设dispatching thread 100ms没有收到回复,timeout如何处理?
3. 另外是不是只能有一个message on-the-fly? 因为感觉ngx_http_upstream里面就算有keepalive, 一次也只有一个message用一个connection,ngx-lua底层是不是也是如此?
4. 在哪一步做tcp的初始化和sock:connect()比较合适?还是动态初始化比较合适?
对ngx_lua还有很多不熟,十分感谢宝贵建议!
On Thursday, December 17, 2015 at 12:05:47 AM UTC-8, 黄川 wrote:
我先试试,多谢指导!
On Wednesday, December 16, 2015 at 7:42:39 PM UTC-8, doujiang wrote:
Hello,
在 2015年12月17日 上午9:26,黄川
<chua...@gmail.com>写道:
agentzh, 感谢你的回复!
是,我们就是想实现multiplexing,
新的请求不create新的connection.
nginx proxy_pass是收到request,对于每一个request都用ngx_http_upstream_connect()建立一个新链接,然后有connection有回应后worker来处理回应。我可不可以参考他的框架,但是只初始化一次connection,所有并发的请求都
- lock() (btw, 是不是如果只有一个worker就可以不lock?)
也需要 lock,同一个 socket 不能并发写(在 OpenResty 里并发写是指在两个轻量级线程里写)
正如春哥推荐的,在你的场景里,只有一个轻线程负责读请求发送和上游响应的接收与分发,所以,并不需要锁
- send request
- add request in queue with
- unlock()
当connection 有read event,因为回复是in order,从pipe 读出后由前向后在queue里面寻找对应的request,target request之前的request都是过期或丢失的,返回错误消息。
我看了你给的链接,有几个基本问题,因为我不懂lua,那lua module可以直接被C使用吗,还是说整个程序要用lua写?
C 可以调用 Lua 提供的 C api 来与 Lua 交互,但是应该没有你想的那种直接调用的方式
如果你想用 C 来完成,估计要复杂一个数量级
commit 47f599a0a6bf2836567d233ebbc7e2f2c60b678e
Author: User <user@nhproxy1.fractionalmedia.com>
Date: Sun Dec 20 18:17:03 2015 +0000
Cherry-pick semaphore change
diff --git a/bundle/ngx_lua-0.9.20/config b/bundle/ngx_lua-0.9.20/config
index 4b770d7..101fd19 100644
--- a/bundle/ngx_lua-0.9.20/config
+++ b/bundle/ngx_lua-0.9.20/config
@@ -339,6 +339,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
$ngx_addon_dir/src/ngx_http_lua_api.c \
$ngx_addon_dir/src/ngx_http_lua_logby.c \
$ngx_addon_dir/src/ngx_http_lua_sleep.c \
+ $ngx_addon_dir/src/ngx_http_lua_semaphore.c\
$ngx_addon_dir/src/ngx_http_lua_coroutine.c \
$ngx_addon_dir/src/ngx_http_lua_bodyfilterby.c \
$ngx_addon_dir/src/ngx_http_lua_initby.c \
@@ -392,6 +393,7 @@ NGX_ADDON_DEPS="$NGX_ADDON_DEPS \
$ngx_addon_dir/src/api/ngx_http_lua_api.h \
$ngx_addon_dir/src/ngx_http_lua_logby.h \
$ngx_addon_dir/src/ngx_http_lua_sleep.h \
+ $ngx_addon_dir/src/ngx_http_lua_semaphore.h\
$ngx_addon_dir/src/ngx_http_lua_coroutine.h \
$ngx_addon_dir/src/ngx_http_lua_bodyfilterby.h \
$ngx_addon_dir/src/ngx_http_lua_initby.h \
@@ -460,3 +462,4 @@ ngx_feature_test='setsockopt(1, SOL_SOCKET, SO_PASSCRED, NULL, 0);'
#CFLAGS=$"$CFLAGS -DLUA_DEFAULT_PATH='\"/usr/local/openresty/lualib/?.lua\"'"
#CFLAGS=$"$CFLAGS -DLUA_DEFAULT_CPATH='\"/usr/local/openresty/lualib/?.so\"'"
+
diff --git a/bundle/ngx_lua-0.9.20/src/ngx_http_lua_common.h b/bundle/ngx_lua-0.9.20/src/ngx_http_lua_common.h
index 6f11bf2..917d181 100644
--- a/bundle/ngx_lua-0.9.20/src/ngx_http_lua_common.h
+++ b/bundle/ngx_lua-0.9.20/src/ngx_http_lua_common.h
@@ -105,6 +105,9 @@ typedef struct {
typedef struct ngx_http_lua_main_conf_s ngx_http_lua_main_conf_t;
+typedef struct ngx_http_lua_semaphore_mm_s ngx_http_lua_semaphore_mm_t;
+
+
typedef ngx_int_t (*ngx_http_lua_conf_handler_pt)(ngx_log_t *log,
ngx_http_lua_main_conf_t *lmcf, lua_State *L);
@@ -153,6 +156,8 @@ struct ngx_http_lua_main_conf_s {
ngx_uint_t shm_zones_inited;
+ ngx_http_lua_semaphore_mm_t *semaphore_mm;
+
unsigned requires_header_filter:1;
unsigned requires_body_filter:1;
unsigned requires_capture_filter:1;
@@ -303,6 +308,8 @@ struct ngx_http_lua_co_ctx_s {
ngx_event_t sleep; /* used for ngx.sleep */
+ ngx_queue_t sem_wait_queue;
+
#ifdef NGX_LUA_USE_ASSERT
int co_top; /* stack top after yielding/creation,
only for sanity checks */
@@ -330,6 +337,7 @@ struct ngx_http_lua_co_ctx_s {
unsigned thread_spawn_yielded:1; /* yielded from
the ngx.thread.spawn()
call */
+ unsigned sem_resume_status:1;
};
diff --git a/bundle/ngx_lua-0.9.20/src/ngx_http_lua_module.c b/bundle/ngx_lua-0.9.20/src/ngx_http_lua_module.c
index 5bb5d5e..fea7e93 100644
--- a/bundle/ngx_lua-0.9.20/src/ngx_http_lua_module.c
+++ b/bundle/ngx_lua-0.9.20/src/ngx_http_lua_module.c
@@ -23,6 +23,7 @@
#include "ngx_http_lua_initby.h"
#include "ngx_http_lua_initworkerby.h"
#include "ngx_http_lua_probe.h"
+#include "ngx_http_lua_semaphore.h"
#if !defined(nginx_version) || nginx_version < 8054
@@ -554,6 +555,7 @@ ngx_http_lua_init(ngx_conf_t *cf)
ngx_http_handler_pt *h;
ngx_http_core_main_conf_t *cmcf;
ngx_http_lua_main_conf_t *lmcf;
+ ngx_pool_cleanup_t *cln;
lmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_lua_module);
@@ -632,6 +634,15 @@ ngx_http_lua_init(ngx_conf_t *cf)
}
}
+ /* add the cleanup of semaphores after the lua_close */
+ cln = ngx_pool_cleanup_add(cf->pool, 0);
+ if (cln == NULL) {
+ return NGX_ERROR;
+ }
+
+ cln->data = lmcf;
+ cln->handler = ngx_http_lua_cleanup_semaphore_mm;
+
if (lmcf->lua == NULL) {
dd("initializing lua vm");
@@ -694,6 +705,7 @@ static void *
ngx_http_lua_create_main_conf(ngx_conf_t *cf)
{
ngx_http_lua_main_conf_t *lmcf;
+ ngx_http_lua_semaphore_mm_t *mm;
lmcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_lua_main_conf_t));
if (lmcf == NULL) {
@@ -725,6 +737,7 @@ ngx_http_lua_create_main_conf(ngx_conf_t *cf)
lmcf->pool = cf->pool;
lmcf->max_pending_timers = NGX_CONF_UNSET;
lmcf->max_running_timers = NGX_CONF_UNSET;
+
#if (NGX_PCRE)
lmcf->regex_cache_max_entries = NGX_CONF_UNSET;
lmcf->regex_match_limit = NGX_CONF_UNSET;
@@ -732,6 +745,21 @@ ngx_http_lua_create_main_conf(ngx_conf_t *cf)
lmcf->postponed_to_rewrite_phase_end = NGX_CONF_UNSET;
lmcf->postponed_to_access_phase_end = NGX_CONF_UNSET;
+
+ mm = ngx_palloc(cf->pool, sizeof(ngx_http_lua_semaphore_mm_t));
+ if (mm == NULL) {
+ return NULL;
+ }
+
+ lmcf->semaphore_mm = mm;
+
+ ngx_queue_init(&mm->free_queue);
+ mm->cur_epoch = 0;
+ mm->total = 0;
+ mm->used = 0;
+ mm->num_per_block = NGX_CONF_UNSET_UINT;
+ mm->lmcf = lmcf;
+
dd("nginx Lua module main config structure initialized!");
return lmcf;
@@ -761,6 +789,13 @@ ngx_http_lua_init_main_conf(ngx_conf_t *cf, void *conf)
lmcf->max_running_timers = 256;
}
+ if (lmcf->semaphore_mm->num_per_block == NGX_CONF_UNSET_UINT) {
+ /* the origin is set to 4096, but it needs some space for
+ * ngx_http_lua_semaphore_mm_block_t, one is enough, so
+ * it is 4095 */
+ lmcf->semaphore_mm->num_per_block = 4095;
+ }
+
lmcf->cycle = cf->cycle;
return NGX_CONF_OK;
diff --git a/bundle/ngx_lua-0.9.20/src/ngx_http_lua_semaphore.c b/bundle/ngx_lua-0.9.20/src/ngx_http_lua_semaphore.c
new file mode 100644
index 0000000..a7ddac7
--- /dev/null
+++ b/bundle/ngx_lua-0.9.20/src/ngx_http_lua_semaphore.c
@@ -0,0 +1,522 @@
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ * Copyright (C) cuiweixie
+ * I hereby assign copyright in this code to the lua-nginx-module project,
+ * to be licensed under the same terms as the rest of the code
+ */
+
+#ifndef NGX_LUA_NO_FFI_API
+
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+#include "ddebug.h"
+
+
+#include "ngx_http_lua_util.h"
+#include "ngx_http_lua_semaphore.h"
+#include "ngx_http_lua_contentby.h"
+
+
+ngx_int_t ngx_http_lua_semaphore_init_mm(ngx_http_lua_semaphore_mm_t *mm);
+static ngx_http_lua_semaphore_t *ngx_http_lua_alloc_semaphore(void);
+void ngx_http_lua_cleanup_semaphore_mm(void *data);
+static void ngx_http_lua_free_semaphore(ngx_http_lua_semaphore_t *sem);
+static ngx_int_t ngx_http_lua_semaphore_resume(ngx_http_request_t *r);
+int ngx_http_lua_ffi_semaphore_new(ngx_http_lua_semaphore_t **psem,
+ int n, char **errmsg);
+int ngx_http_lua_ffi_semaphore_post(ngx_http_lua_semaphore_t *sem,
+ int n);
+int ngx_http_lua_ffi_semaphore_wait(ngx_http_request_t *r,
+ ngx_http_lua_semaphore_t *sem, int wait_ms, u_char *err, size_t *errlen);
+static void ngx_http_lua_semaphore_cleanup(void *data);
+static void ngx_http_lua_semaphore_handler(ngx_event_t *ev);
+static void ngx_http_lua_semaphore_timeout_handler(ngx_event_t *ev);
+void ngx_http_lua_ffi_semaphore_gc(ngx_http_lua_semaphore_t *sem);
+
+
+enum {
+ SEMAPHORE_WAIT_SUCC = 0,
+ SEMAPHORE_WAIT_TIMEOUT = 1
+};
+
+
+static ngx_http_lua_semaphore_t *
+ngx_http_lua_alloc_semaphore(void)
+{
+ ngx_http_lua_semaphore_t *sem, *iter;
+ ngx_http_lua_main_conf_t *lmcf;
+ ngx_queue_t *q;
+ ngx_uint_t i, n;
+ ngx_http_lua_semaphore_mm_block_t *block;
+ ngx_http_lua_semaphore_mm_t *mm;
+
+ ngx_http_lua_assert(ngx_cycle && ngx_cycle->conf_ctx);
+
+ lmcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,
+ ngx_http_lua_module);
+
+ mm = lmcf->semaphore_mm;
+
+ if (!ngx_queue_empty(&mm->free_queue)) {
+ q = ngx_queue_head(&mm->free_queue);
+ ngx_queue_remove(q);
+
+ sem = ngx_queue_data(q, ngx_http_lua_semaphore_t, chain);
+ sem->block->used++;
+
+ mm->used++;
+ sem->sem_event.posted = 0;
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "from head of free queue, alloc semaphore: %p", sem);
+
+ return sem;
+ }
+
+ /* free_queue is empty */
+
+ n = sizeof(ngx_http_lua_semaphore_mm_block_t)
+ + mm->num_per_block * sizeof(ngx_http_lua_semaphore_t);
+
+ dd("block size: %d, item size: %d",
+ (int) sizeof(ngx_http_lua_semaphore_mm_block_t),
+ (int) sizeof(ngx_http_lua_semaphore_t));
+
+ block = ngx_alloc(n, ngx_cycle->log);
+
+ if (block == NULL) {
+ return NULL;
+ }
+
+ mm->cur_epoch++;
+ mm->total += mm->num_per_block;
+ mm->used++;
+
+ block->mm = mm;
+ block->epoch = mm->cur_epoch;
+
+ sem = (ngx_http_lua_semaphore_t *) (block + 1);
+ sem->block = block;
+ sem->block->used = 1;
+ sem->sem_event.posted = 0;
+
+ for (iter = sem + 1, i = 1; i < mm->num_per_block; i++, iter++) {
+ iter->block = block;
+ ngx_queue_insert_tail(&mm->free_queue, &iter->chain);
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "new block, alloc semaphore: %p block: %p", sem, block);
+
+ return sem;
+}
+
+
+void
+ngx_http_lua_cleanup_semaphore_mm(void *data)
+{
+ ngx_http_lua_semaphore_t *sem, *iter;
+ ngx_uint_t i;
+ ngx_http_lua_main_conf_t *lmcf;
+ ngx_queue_t *q;
+ ngx_http_lua_semaphore_mm_block_t *block;
+ ngx_http_lua_semaphore_mm_t *mm;
+
+ lmcf = (ngx_http_lua_main_conf_t *) data;
+ mm = lmcf->semaphore_mm;
+
+ while (!ngx_queue_empty(&mm->free_queue)) {
+ q = ngx_queue_head(&mm->free_queue);
+
+ sem = ngx_queue_data(q, ngx_http_lua_semaphore_t, chain);
+ block = sem->block;
+
+ if (block->used == 0) {
+ iter = (ngx_http_lua_semaphore_t *) (block + 1);
+
+ for (i = 0; i < block->mm->num_per_block; i++, iter++) {
+ ngx_queue_remove(&iter->chain);
+ }
+
+ dd("free semaphore block: %p at final", block);
+
+ ngx_free(block);
+
+ } else {
+ /* just return directly when some thing goes wrong */
+
+ ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0,
+ "ngx_http_lua_cleanup_semaphore_mm when cleanup"
+ " block %p is still used by someone", block);
+
+ return;
+ }
+ }
+
+ dd("ngx_http_lua_cleanup_semaphore_mm");
+}
+
+
+static void
+ngx_http_lua_free_semaphore(ngx_http_lua_semaphore_t *sem)
+{
+ ngx_http_lua_semaphore_t *iter;
+ ngx_uint_t i, mid_epoch;
+ ngx_http_lua_semaphore_mm_block_t *block;
+ ngx_http_lua_semaphore_mm_t *mm;
+
+ block = sem->block;
+ block->used--;
+
+ mm = block->mm;
+ mm->used--;
+
+ mid_epoch = mm->cur_epoch - ((mm->total / mm->num_per_block) >> 1);
+
+ if (block->epoch < mid_epoch) {
+ ngx_queue_insert_tail(&mm->free_queue, &sem->chain);
+ ngx_log_debug4(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "add to free queue tail semaphore: %p epoch: %d"
+ "mid_epoch: %d cur_epoch: %d", sem, (int) block->epoch,
+ (int) mid_epoch, (int) mm->cur_epoch);
+
+ } else {
+ ngx_queue_insert_head(&mm->free_queue, &sem->chain);
+ ngx_log_debug4(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "add to free queue head semaphore: %p epoch: %d"
+ "mid_epoch: %d cur_epoch: %d", sem, (int) block->epoch,
+ (int) mid_epoch, (int) mm->cur_epoch);
+ }
+
+ dd("used: %d", (int) block->used);
+
+ if (block->used == 0 && mm->used <= (mm->total >> 1)
+ && block->epoch < mid_epoch)
+ {
+ /* load <= 50% and it's on the older side */
+ iter = (ngx_http_lua_semaphore_t *) (block + 1);
+
+ for (i = 0; i < mm->num_per_block; i++, iter++) {
+ ngx_queue_remove(&iter->chain);
+ }
+
+ mm->total -= mm->num_per_block;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "free semaphore block: %p", block);
+
+ ngx_free(block);
+ }
+}
+
+
+static ngx_int_t
+ngx_http_lua_semaphore_resume(ngx_http_request_t *r)
+{
+ lua_State *vm;
+ ngx_connection_t *c;
+ ngx_int_t rc;
+ ngx_http_lua_ctx_t *ctx;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
+ if (ctx == NULL) {
+ return NGX_ERROR;
+ }
+
+ ctx->resume_handler = ngx_http_lua_wev_handler;
+
+ c = r->connection;
+ vm = ngx_http_lua_get_lua_vm(r, ctx);
+
+ if (ctx->cur_co_ctx->sem_resume_status == SEMAPHORE_WAIT_SUCC) {
+ lua_pushboolean(ctx->cur_co_ctx->co, 1);
+ lua_pushnil(ctx->cur_co_ctx->co);
+
+ } else {
+ lua_pushboolean(ctx->cur_co_ctx->co, 0);
+ lua_pushstring(ctx->cur_co_ctx->co, "timeout");
+ }
+
+ rc = ngx_http_lua_run_thread(vm, r, ctx, 2);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "lua run thread returned %d", rc);
+
+ if (rc == NGX_AGAIN) {
+ return ngx_http_lua_run_posted_threads(c, vm, r, ctx);
+ }
+
+ if (rc == NGX_DONE) {
+ ngx_http_lua_finalize_request(r, NGX_DONE);
+ return ngx_http_lua_run_posted_threads(c, vm, r, ctx);
+ }
+
+ /* rc == NGX_ERROR || rc >= NGX_OK */
+
+ if (ctx->entered_content_phase) {
+ ngx_http_lua_finalize_request(r, rc);
+ return NGX_DONE;
+ }
+
+ return rc;
+}
+
+
+int
+ngx_http_lua_ffi_semaphore_new(ngx_http_lua_semaphore_t **psem,
+ int n, char **errmsg)
+{
+ ngx_http_lua_semaphore_t *sem;
+
+ sem = ngx_http_lua_alloc_semaphore();
+ if (sem == NULL) {
+ *errmsg = "no memory";
+ return NGX_ERROR;
+ }
+
+ ngx_queue_init(&sem->wait_queue);
+
+ sem->resource_count = n;
+ sem->wait_count = 0;
+ sem->event_posted = 0;
+ sem->log = ngx_cycle->log;
+ *psem = sem;
+
+ dd("ngx_http_lua_ffi_semaphore_new semaphore: %p, resource_count: %d",
+ sem, sem->resource_count);
+
+ return NGX_OK;
+}
+
+
+int
+ngx_http_lua_ffi_semaphore_post(ngx_http_lua_semaphore_t *sem, int n)
+{
+ sem->resource_count += n;
+
+ dd("calling semp:post(%d) semaphore: %p, resource_count: %d",
+ n, sem, sem->resource_count);
+
+ if (!sem->event_posted && !ngx_queue_empty(&sem->wait_queue)) {
+
+ sem->sem_event.handler = ngx_http_lua_semaphore_handler;
+ sem->sem_event.data = sem;
+ sem->sem_event.log = sem->log;
+
+ ngx_post_event(&sem->sem_event, &ngx_posted_events);
+ }
+
+ return NGX_OK;
+}
+
+
+int
+ngx_http_lua_ffi_semaphore_wait(ngx_http_request_t *r,
+ ngx_http_lua_semaphore_t *sem, int wait_ms, u_char *err, size_t *errlen)
+{
+ ngx_http_lua_ctx_t *ctx;
+ ngx_http_lua_co_ctx_t *wait_co_ctx;
+ ngx_int_t rc;
+
+ dd("ngx_http_lua_ffi_semaphore_wait semaphore: %p"
+ "value: %d, event_posted: %d",
+ sem, sem->resource_count, sem->event_posted);
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
+ if (ctx == NULL) {
+ *errlen = ngx_snprintf(err, *errlen, "no request ctx found") - err;
+ return NGX_ERROR;
+ }
+
+ rc = ngx_http_lua_ffi_check_context(ctx, NGX_HTTP_LUA_CONTEXT_REWRITE
+ | NGX_HTTP_LUA_CONTEXT_ACCESS
+ | NGX_HTTP_LUA_CONTEXT_CONTENT
+ | NGX_HTTP_LUA_CONTEXT_TIMER,
+ err, errlen);
+
+ if (rc != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ if (!sem->event_posted && sem->resource_count > 0) {
+ sem->resource_count--;
+ return NGX_OK;
+ }
+
+ if (wait_ms == 0) {
+ return NGX_DECLINED;
+ }
+
+ sem->wait_count++;
+ wait_co_ctx = ctx->cur_co_ctx;
+
+ wait_co_ctx->sleep.handler = ngx_http_lua_semaphore_timeout_handler;
+ wait_co_ctx->sleep.data = ctx->cur_co_ctx;
+ wait_co_ctx->sleep.log = r->connection->log;
+
+ ngx_add_timer(&wait_co_ctx->sleep, (ngx_msec_t) wait_ms);
+
+ dd("ngx_http_lua_ffi_semaphore_wait add timer coctx:%p wait: %d(ms)",
+ wait_co_ctx, wait_ms);
+
+ ngx_queue_insert_tail(&sem->wait_queue, &wait_co_ctx->sem_wait_queue);
+
+ wait_co_ctx->data = sem;
+ wait_co_ctx->cleanup = ngx_http_lua_semaphore_cleanup;
+
+ return NGX_AGAIN;
+}
+
+
+int
+ngx_http_lua_ffi_semaphore_count(ngx_http_lua_semaphore_t *sem)
+{
+ return sem->resource_count - sem->wait_count;
+}
+
+
+static void
+ngx_http_lua_semaphore_cleanup(void *data)
+{
+ ngx_http_lua_co_ctx_t *coctx = data;
+ ngx_queue_t *q;
+ ngx_http_lua_semaphore_t *sem;
+
+ sem = coctx->data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, sem->log, 0,
+ "http lua semaphore cleanup");
+
+ if (coctx->sleep.timer_set) {
+ ngx_del_timer(&coctx->sleep);
+ }
+
+ q = &coctx->sem_wait_queue;
+
+ ngx_queue_remove(q);
+ sem->wait_count--;
+ coctx->cleanup = NULL;
+}
+
+
+static void
+ngx_http_lua_semaphore_handler(ngx_event_t *ev)
+{
+ ngx_http_lua_semaphore_t *sem;
+ ngx_http_request_t *r;
+ ngx_http_lua_ctx_t *ctx;
+ ngx_http_lua_co_ctx_t *wait_co_ctx;
+ ngx_connection_t *c;
+ ngx_queue_t *q;
+
+ sem = ev->data;
+
+ while (!ngx_queue_empty(&sem->wait_queue) && sem->resource_count > 0) {
+
+ q = ngx_queue_head(&sem->wait_queue);
+ ngx_queue_remove(q);
+
+ sem->wait_count--;
+
+ wait_co_ctx = ngx_queue_data(q, ngx_http_lua_co_ctx_t, sem_wait_queue);
+ wait_co_ctx->cleanup = NULL;
+
+ if (wait_co_ctx->sleep.timer_set) {
+ ngx_del_timer(&wait_co_ctx->sleep);
+ }
+
+ r = ngx_http_lua_get_req(wait_co_ctx->co);
+ c = r->connection;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
+ ngx_http_lua_assert(ctx != NULL);
+
+ sem->resource_count--;
+
+ ctx->cur_co_ctx = wait_co_ctx;
+
+ wait_co_ctx->sem_resume_status = SEMAPHORE_WAIT_SUCC;
+
+ if (ctx->entered_content_phase) {
+ (void) ngx_http_lua_semaphore_resume(r);
+
+ } else {
+ ctx->resume_handler = ngx_http_lua_semaphore_resume;
+ ngx_http_core_run_phases(r);
+ }
+
+ ngx_http_run_posted_requests(c);
+ }
+
+ /* after dealing with semaphore post event */
+ sem->event_posted = 0;
+}
+
+
+static void
+ngx_http_lua_semaphore_timeout_handler(ngx_event_t *ev)
+{
+ ngx_http_lua_co_ctx_t *wait_co_ctx;
+ ngx_http_request_t *r;
+ ngx_http_lua_ctx_t *ctx;
+ ngx_connection_t *c;
+ ngx_http_lua_semaphore_t *sem;
+
+ wait_co_ctx = ev->data;
+ wait_co_ctx->cleanup = NULL;
+
+ dd("ngx_http_lua_semaphore_timeout_handler timeout coctx:%p", wait_co_ctx);
+
+ sem = wait_co_ctx->data;
+
+ ngx_queue_remove(&wait_co_ctx->sem_wait_queue);
+ sem->wait_count--;
+
+ r = ngx_http_lua_get_req(wait_co_ctx->co);
+ c = r->connection;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
+ ngx_http_lua_assert(ctx != NULL);
+
+ ctx->cur_co_ctx = wait_co_ctx;
+
+ wait_co_ctx->sem_resume_status = SEMAPHORE_WAIT_TIMEOUT;
+
+ if (ctx->entered_content_phase) {
+ (void) ngx_http_lua_semaphore_resume(r);
+
+ } else {
+ ctx->resume_handler = ngx_http_lua_semaphore_resume;
+ ngx_http_core_run_phases(r);
+ }
+
+ ngx_http_run_posted_requests(c);
+}
+
+
+void
+ngx_http_lua_ffi_semaphore_gc(ngx_http_lua_semaphore_t *sem)
+{
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "in lua gc, semaphore %p", sem);
+
+ if (sem == NULL) {
+ return;
+ }
+
+ if (!ngx_queue_empty(&sem->wait_queue)) {
+ ngx_log_error(NGX_LOG_CRIT, ngx_cycle->log, 0,
+ "in lua semaphore gc wait queue is"
+ " not empty while the semaphore %p is being "
+ "destroyed", sem);
+ }
+
+ ngx_http_lua_free_semaphore(sem);
+}
+
+
+#endif /* NGX_LUA_NO_FFI_API */
+
+/* vi:set ft=c ts=4 sw=4 et fdm=marker: */
diff --git a/bundle/ngx_lua-0.9.20/src/ngx_http_lua_semaphore.h b/bundle/ngx_lua-0.9.20/src/ngx_http_lua_semaphore.h
new file mode 100644
index 0000000..6dcf6e3
--- /dev/null
+++ b/bundle/ngx_lua-0.9.20/src/ngx_http_lua_semaphore.h
@@ -0,0 +1,51 @@
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ * Copyright (C) cuiweixie
+ * I hereby assign copyright in this code to the lua-nginx-module project,
+ * to be licensed under the same terms as the rest of the code
+ */
+
+
+#ifndef _NGX_HTTP_LUA_SEMAPHORE_H_INCLUDED_
+#define _NGX_HTTP_LUA_SEMAPHORE_H_INCLUDED_
+
+
+#include "ngx_http_lua_common.h"
+
+
+typedef struct ngx_http_lua_semaphore_mm_block_s {
+ ngx_uint_t used;
+ ngx_http_lua_semaphore_mm_t *mm;
+ ngx_uint_t epoch;
+} ngx_http_lua_semaphore_mm_block_t;
+
+
+struct ngx_http_lua_semaphore_mm_s {
+ ngx_queue_t free_queue;
+ ngx_uint_t total;
+ ngx_uint_t used;
+ ngx_uint_t num_per_block;
+ ngx_uint_t cur_epoch;
+ ngx_http_lua_main_conf_t *lmcf;
+};
+
+
+typedef struct ngx_http_lua_semaphore_s {
+ ngx_queue_t wait_queue;
+ ngx_queue_t chain;
+ ngx_event_t sem_event;
+ ngx_log_t *log;
+ ngx_http_lua_semaphore_mm_block_t *block;
+ int resource_count;
+ unsigned wait_count:31;
+ unsigned event_posted:1;
+} ngx_http_lua_semaphore_t;
+
+
+void ngx_http_lua_cleanup_semaphore_mm(void *data);
+
+
+#endif /* _NGX_HTTP_LUA_SEMAPHORE_H_INCLUDED_ */
+
+/* vi:set ft=c ts=4 sw=4 et fdm=marker: */