Skip to content

Commit

Permalink
Revert "keep mrb->ud consistent before and after run fiber"
Browse files Browse the repository at this point in the history
This reverts commit 551af63.
  • Loading branch information
pyama86 committed Nov 11, 2019
1 parent fea2821 commit 8d8585c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
48 changes: 23 additions & 25 deletions src/stream/ngx_stream_mruby_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,23 @@
typedef struct {
mrb_state *mrb;
mrb_value *fiber;
ngx_stream_mruby_internal_ctx_t *ictx;
ngx_stream_session_t *s;
} ngx_stream_mrb_reentrant_t;

static mrb_value ngx_stream_mrb_run_fiber(ngx_stream_mruby_internal_ctx_t *ictx, mrb_state *mrb, mrb_value *fiber_proc,
mrb_value *result)
static mrb_value ngx_stream_mrb_run_fiber(mrb_state *mrb, mrb_value *fiber_proc, mrb_value *result)
{
mrb_value resume_result = mrb_nil_value();
mrb_value aliving = mrb_false_value();
mrb_value handler_result = mrb_nil_value();
ngx_stream_mruby_ctx_t *ctx;

ctx = ngx_stream_mrb_get_module_ctx(mrb, ictx->s);
ngx_stream_session_t *s = ngx_mrb_get_session();
ctx = ngx_stream_mrb_get_module_ctx(mrb, s);
ctx->fiber_proc = fiber_proc;

mrb->ud = ictx;
// Nginx::Stream::Async.sleep to neeed session
ngx_mrb_push_session(ictx->s);

resume_result = mrb_funcall(mrb, *fiber_proc, "call", 0, NULL);
if (mrb->exc) {
ngx_log_error(NGX_LOG_NOTICE, ictx->s->connection->log, 0, "%s NOTICE %s:%d: fiber got the raise, leave the fiber",
ngx_log_error(NGX_LOG_NOTICE, s->connection->log, 0, "%s NOTICE %s:%d: fiber got the raise, leave the fiber",
MODULE_NAME, __func__, __LINE__);
return mrb_false_value();
}
Expand All @@ -57,36 +53,38 @@ static mrb_value ngx_stream_mrb_run_fiber(ngx_stream_mruby_internal_ctx_t *ictx,
return aliving;
}

mrb_value ngx_stream_mrb_start_fiber(ngx_stream_mruby_internal_ctx_t *ictx, mrb_state *mrb, struct RProc *rproc,
mrb_value *result)
mrb_value ngx_stream_mrb_start_fiber(ngx_stream_session_t *s, mrb_state *mrb, struct RProc *rproc, mrb_value *result)
{
struct RProc *handler_proc;
mrb_value *fiber_proc;
ngx_stream_mruby_ctx_t *ctx;

ctx = ngx_stream_mrb_get_module_ctx(mrb, ictx->s);
ctx = ngx_stream_mrb_get_module_ctx(mrb, s);
ctx->async_handler_result = result;

handler_proc = mrb_closure_new(mrb, rproc->body.irep);
fiber_proc = (mrb_value *)ngx_palloc(ictx->s->connection->pool, sizeof(mrb_value));
fiber_proc = (mrb_value *)ngx_palloc(s->connection->pool, sizeof(mrb_value));
*fiber_proc =
mrb_funcall(mrb, mrb_obj_value(mrb->kernel_module), "_ngx_mrb_prepare_fiber", 1, mrb_obj_value(handler_proc));
if (mrb->exc) {
ngx_log_error(NGX_LOG_NOTICE, ictx->s->connection->log, 0,
ngx_log_error(NGX_LOG_NOTICE, s->connection->log, 0,
"%s NOTICE %s:%d: preparing fiber got the raise, leave the fiber", MODULE_NAME, __func__, __LINE__);
return mrb_false_value();
}

return ngx_stream_mrb_run_fiber(ictx, mrb, fiber_proc, result);
return ngx_stream_mrb_run_fiber(mrb, fiber_proc, result);
}

static ngx_int_t ngx_stream_mrb_post_fiber(ngx_stream_mrb_reentrant_t *re, ngx_stream_mruby_ctx_t *ctx)
{
int ai;
ai = mrb_gc_arena_save(re->mrb);
ngx_stream_mruby_internal_ctx_t *ictx = re->mrb->ud;

if (re->fiber != NULL) {
if (mrb_test(ngx_stream_mrb_run_fiber(re->ictx, re->mrb, re->fiber, ctx->async_handler_result))) {
ngx_mrb_push_session(re->s);

if (mrb_test(ngx_stream_mrb_run_fiber(re->mrb, re->fiber, ctx->async_handler_result))) {
mrb_gc_arena_restore(re->mrb, ai);
return NGX_DONE;
} else {
Expand All @@ -95,23 +93,23 @@ static ngx_int_t ngx_stream_mrb_post_fiber(ngx_stream_mrb_reentrant_t *re, ngx_s
}

if (re->mrb->exc) {
ngx_stream_mruby_raise_error(re->mrb, mrb_obj_value(re->mrb->exc), re->ictx->s);
ngx_stream_mruby_raise_error(re->mrb, mrb_obj_value(re->mrb->exc), re->s);
return NGX_ABORT;
}

} else {
ngx_log_error(NGX_LOG_NOTICE, re->ictx->s->connection->log, 0, "%s NOTICE %s:%d: unexpected error, fiber missing",
ngx_log_error(NGX_LOG_NOTICE, re->s->connection->log, 0, "%s NOTICE %s:%d: unexpected error, fiber missing",
MODULE_NAME, __func__, __LINE__);
return NGX_ABORT;
}

mrb_gc_arena_restore(re->mrb, ai);

if (re->ictx->stream_status == NGX_DECLINED) {
re->ictx->s->phase_handler++;
ngx_stream_core_run_phases(re->ictx->s);
if (ictx->stream_status == NGX_DECLINED) {
re->s->phase_handler++;
ngx_stream_core_run_phases(re->s);
}
return re->ictx->stream_status;
return ictx->stream_status;
}

static void ngx_stream_mrb_timer_handler(ngx_event_t *ev)
Expand All @@ -120,7 +118,7 @@ static void ngx_stream_mrb_timer_handler(ngx_event_t *ev)
ngx_stream_mruby_ctx_t *ctx;

re = ev->data;
ctx = ngx_stream_mrb_get_module_ctx(NULL, re->ictx->s);
ctx = ngx_stream_mrb_get_module_ctx(NULL, re->s);

ngx_stream_mrb_post_fiber(re, ctx);
}
Expand Down Expand Up @@ -160,9 +158,9 @@ static mrb_value ngx_stream_mrb_async_sleep(mrb_state *mrb, mrb_value self)
p = ngx_palloc(s->connection->pool, sizeof(ngx_event_t) + sizeof(ngx_stream_mrb_reentrant_t));
re = (ngx_stream_mrb_reentrant_t *)(p + sizeof(ngx_event_t));
re->mrb = mrb;
re->ictx = mrb->ud;
re->s = s;

ctx = ngx_stream_mrb_get_module_ctx(mrb, re->ictx->s);
ctx = ngx_stream_mrb_get_module_ctx(mrb, s);
re->fiber = ctx->fiber_proc;

// keeps the object from GC when can resume the fiber
Expand Down
5 changes: 2 additions & 3 deletions src/stream/ngx_stream_mruby_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@

#include <ngx_config.h>
#include <ngx_stream.h>

#include <mruby.h>
#include "ngx_stream_mruby_module.h"

mrb_value ngx_stream_mrb_start_fiber(ngx_stream_mruby_internal_ctx_t *ictx, mrb_state *mrb, struct RProc *proc,
mrb_value *result);
mrb_value ngx_stream_mrb_start_fiber(ngx_stream_session_t *s, mrb_state *mrb, struct RProc *proc, mrb_value *result);
void ngx_stream_mrb_async_class_init(mrb_state *mrb, struct RClass *class);

#endif // NGX_STREAM_MRUBY_ASYNC_H
5 changes: 3 additions & 2 deletions src/stream/ngx_stream_mruby_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -704,15 +704,16 @@ static ngx_int_t ngx_stream_mruby_handler(ngx_stream_session_t *s)

mrb = ngx_stream_mrb_state(s);
ai = mrb_gc_arena_save(mrb);

ictx = mrb->ud;

ictx->s = s;
ictx->stream_status = NGX_DECLINED;

mrb_value *mrb_result = (mrb_value *)ngx_palloc(s->connection->pool, sizeof(mrb_value));
*mrb_result = mrb_nil_value();

if (mrb_test(ngx_stream_mrb_start_fiber(ictx, mrb, mscf->code->proc, mrb_result))) {
ngx_mrb_push_session(s);
if (mrb_test(ngx_stream_mrb_start_fiber(s, mrb, mscf->code->proc, mrb_result))) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "%s INFO %s:%d: already can resume this fiber", MODULE_NAME,
__func__, __LINE__);

Expand Down

0 comments on commit 8d8585c

Please sign in to comment.