diff --git a/src/stream/ngx_stream_mruby_async.c b/src/stream/ngx_stream_mruby_async.c index ae018772..3f318c1b 100644 --- a/src/stream/ngx_stream_mruby_async.c +++ b/src/stream/ngx_stream_mruby_async.c @@ -16,27 +16,23 @@ typedef struct { mrb_state *mrb; mrb_value *fiber; - ngx_stream_mruby_internal_ctx_t *ictx; + ngx_stream_session_t *s; } ngx_stream_mrb_reentrant_t; -static mrb_value ngx_stream_mrb_run_fiber(ngx_stream_mruby_internal_ctx_t *ictx, mrb_state *mrb, mrb_value *fiber_proc, - mrb_value *result) +static mrb_value ngx_stream_mrb_run_fiber(mrb_state *mrb, mrb_value *fiber_proc, mrb_value *result) { mrb_value resume_result = mrb_nil_value(); mrb_value aliving = mrb_false_value(); mrb_value handler_result = mrb_nil_value(); ngx_stream_mruby_ctx_t *ctx; - ctx = ngx_stream_mrb_get_module_ctx(mrb, ictx->s); + ngx_stream_session_t *s = ngx_mrb_get_session(); + ctx = ngx_stream_mrb_get_module_ctx(mrb, s); ctx->fiber_proc = fiber_proc; - mrb->ud = ictx; - // Nginx::Stream::Async.sleep to neeed session - ngx_mrb_push_session(ictx->s); - resume_result = mrb_funcall(mrb, *fiber_proc, "call", 0, NULL); if (mrb->exc) { - ngx_log_error(NGX_LOG_NOTICE, ictx->s->connection->log, 0, "%s NOTICE %s:%d: fiber got the raise, leave the fiber", + ngx_log_error(NGX_LOG_NOTICE, s->connection->log, 0, "%s NOTICE %s:%d: fiber got the raise, leave the fiber", MODULE_NAME, __func__, __LINE__); return mrb_false_value(); } @@ -57,36 +53,38 @@ static mrb_value ngx_stream_mrb_run_fiber(ngx_stream_mruby_internal_ctx_t *ictx, return aliving; } -mrb_value ngx_stream_mrb_start_fiber(ngx_stream_mruby_internal_ctx_t *ictx, mrb_state *mrb, struct RProc *rproc, - mrb_value *result) +mrb_value ngx_stream_mrb_start_fiber(ngx_stream_session_t *s, mrb_state *mrb, struct RProc *rproc, mrb_value *result) { struct RProc *handler_proc; mrb_value *fiber_proc; ngx_stream_mruby_ctx_t *ctx; - ctx = ngx_stream_mrb_get_module_ctx(mrb, ictx->s); + ctx = ngx_stream_mrb_get_module_ctx(mrb, s); ctx->async_handler_result = result; handler_proc = mrb_closure_new(mrb, rproc->body.irep); - fiber_proc = (mrb_value *)ngx_palloc(ictx->s->connection->pool, sizeof(mrb_value)); + fiber_proc = (mrb_value *)ngx_palloc(s->connection->pool, sizeof(mrb_value)); *fiber_proc = mrb_funcall(mrb, mrb_obj_value(mrb->kernel_module), "_ngx_mrb_prepare_fiber", 1, mrb_obj_value(handler_proc)); if (mrb->exc) { - ngx_log_error(NGX_LOG_NOTICE, ictx->s->connection->log, 0, + ngx_log_error(NGX_LOG_NOTICE, s->connection->log, 0, "%s NOTICE %s:%d: preparing fiber got the raise, leave the fiber", MODULE_NAME, __func__, __LINE__); return mrb_false_value(); } - return ngx_stream_mrb_run_fiber(ictx, mrb, fiber_proc, result); + return ngx_stream_mrb_run_fiber(mrb, fiber_proc, result); } static ngx_int_t ngx_stream_mrb_post_fiber(ngx_stream_mrb_reentrant_t *re, ngx_stream_mruby_ctx_t *ctx) { int ai; ai = mrb_gc_arena_save(re->mrb); + ngx_stream_mruby_internal_ctx_t *ictx = re->mrb->ud; if (re->fiber != NULL) { - if (mrb_test(ngx_stream_mrb_run_fiber(re->ictx, re->mrb, re->fiber, ctx->async_handler_result))) { + ngx_mrb_push_session(re->s); + + if (mrb_test(ngx_stream_mrb_run_fiber(re->mrb, re->fiber, ctx->async_handler_result))) { mrb_gc_arena_restore(re->mrb, ai); return NGX_DONE; } else { @@ -95,23 +93,23 @@ static ngx_int_t ngx_stream_mrb_post_fiber(ngx_stream_mrb_reentrant_t *re, ngx_s } if (re->mrb->exc) { - ngx_stream_mruby_raise_error(re->mrb, mrb_obj_value(re->mrb->exc), re->ictx->s); + ngx_stream_mruby_raise_error(re->mrb, mrb_obj_value(re->mrb->exc), re->s); return NGX_ABORT; } } else { - ngx_log_error(NGX_LOG_NOTICE, re->ictx->s->connection->log, 0, "%s NOTICE %s:%d: unexpected error, fiber missing", + ngx_log_error(NGX_LOG_NOTICE, re->s->connection->log, 0, "%s NOTICE %s:%d: unexpected error, fiber missing", MODULE_NAME, __func__, __LINE__); return NGX_ABORT; } mrb_gc_arena_restore(re->mrb, ai); - if (re->ictx->stream_status == NGX_DECLINED) { - re->ictx->s->phase_handler++; - ngx_stream_core_run_phases(re->ictx->s); + if (ictx->stream_status == NGX_DECLINED) { + re->s->phase_handler++; + ngx_stream_core_run_phases(re->s); } - return re->ictx->stream_status; + return ictx->stream_status; } static void ngx_stream_mrb_timer_handler(ngx_event_t *ev) @@ -120,7 +118,7 @@ static void ngx_stream_mrb_timer_handler(ngx_event_t *ev) ngx_stream_mruby_ctx_t *ctx; re = ev->data; - ctx = ngx_stream_mrb_get_module_ctx(NULL, re->ictx->s); + ctx = ngx_stream_mrb_get_module_ctx(NULL, re->s); ngx_stream_mrb_post_fiber(re, ctx); } @@ -160,9 +158,9 @@ static mrb_value ngx_stream_mrb_async_sleep(mrb_state *mrb, mrb_value self) p = ngx_palloc(s->connection->pool, sizeof(ngx_event_t) + sizeof(ngx_stream_mrb_reentrant_t)); re = (ngx_stream_mrb_reentrant_t *)(p + sizeof(ngx_event_t)); re->mrb = mrb; - re->ictx = mrb->ud; + re->s = s; - ctx = ngx_stream_mrb_get_module_ctx(mrb, re->ictx->s); + ctx = ngx_stream_mrb_get_module_ctx(mrb, s); re->fiber = ctx->fiber_proc; // keeps the object from GC when can resume the fiber diff --git a/src/stream/ngx_stream_mruby_async.h b/src/stream/ngx_stream_mruby_async.h index eaae5476..64753cff 100644 --- a/src/stream/ngx_stream_mruby_async.h +++ b/src/stream/ngx_stream_mruby_async.h @@ -9,11 +9,10 @@ #include #include + #include -#include "ngx_stream_mruby_module.h" -mrb_value ngx_stream_mrb_start_fiber(ngx_stream_mruby_internal_ctx_t *ictx, mrb_state *mrb, struct RProc *proc, - mrb_value *result); +mrb_value ngx_stream_mrb_start_fiber(ngx_stream_session_t *s, mrb_state *mrb, struct RProc *proc, mrb_value *result); void ngx_stream_mrb_async_class_init(mrb_state *mrb, struct RClass *class); #endif // NGX_STREAM_MRUBY_ASYNC_H diff --git a/src/stream/ngx_stream_mruby_module.c b/src/stream/ngx_stream_mruby_module.c index 3d27837a..6d455404 100644 --- a/src/stream/ngx_stream_mruby_module.c +++ b/src/stream/ngx_stream_mruby_module.c @@ -704,15 +704,16 @@ static ngx_int_t ngx_stream_mruby_handler(ngx_stream_session_t *s) mrb = ngx_stream_mrb_state(s); ai = mrb_gc_arena_save(mrb); - ictx = mrb->ud; + ictx->s = s; ictx->stream_status = NGX_DECLINED; mrb_value *mrb_result = (mrb_value *)ngx_palloc(s->connection->pool, sizeof(mrb_value)); *mrb_result = mrb_nil_value(); - if (mrb_test(ngx_stream_mrb_start_fiber(ictx, mrb, mscf->code->proc, mrb_result))) { + ngx_mrb_push_session(s); + if (mrb_test(ngx_stream_mrb_start_fiber(s, mrb, mscf->code->proc, mrb_result))) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "%s INFO %s:%d: already can resume this fiber", MODULE_NAME, __func__, __LINE__);