Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA]: Create Dynamic Batcher node type #480

Open
2 tasks done
mdemoret-nv opened this issue Jun 4, 2024 · 0 comments
Open
2 tasks done

[FEA]: Create Dynamic Batcher node type #480

mdemoret-nv opened this issue Jun 4, 2024 · 0 comments
Assignees
Labels
feature request New feature or request

Comments

@mdemoret-nv
Copy link
Contributor

Is this a new feature, an improvement, or a change to existing functionality?

New Feature

How would you describe the priority of this feature request

High

Please provide a clear description of problem this feature solves

Often its better to work on multiple messages at the same time for efficiency in the pipeline. Simple nodes could help group individual messages into groups for processing.

Describe your ideal solution

Create a new node type (similar to the Broadcast node) which can buffer upstream messages, of type T, into a vector of downstream buffers, of type std::vector<T>. The downstream buffer should be emitted when either:

  1. The maximum timeout duration has been exceeded
  2. The maximum buffer size has been reached

Before either of these conditions have been met, the node should hold onto messages in memory.

For example, if the upstream emitted 3 messages all in a row and the max size was set to 2 and duration 100ms, the output should be:

Input:
Emit 1
Emit 2
Emit 3

Output
Emit [1, 2] at 0 ms
Emit [3] at 100 ms

Describe any alternatives you have considered

No response

Additional context

Scaffolding class:

template <typename T>
class DynamicBatcher : public mrc::node::WritableProvider<T>,
                     public mrc::node::ReadableAcceptor<T>,
                     public mrc::node::SinkChannelOwner<T>,
                     public mrc::node::WritableAcceptor<std::vector<T>>,
                     public mrc::node::ReadableProvider<std::vector<T>>,
                     public mrc::node::SourceChannelOwner<std::vector<T>>,
                     public mrc::runnable::RunnableWithContext<>
{
    using state_t  = mrc::runnable::Runnable::State;
    using input_t  = T;
    using output_t = std::vector<T>;

  public:
    DynamicBatcher(size_t max_count)
    {
        // Set the default channel
        mrc::node::SinkChannelOwner<input_t>::set_channel(std::make_unique<mrc::channel::BufferedChannel<input_t>>());
        mrc::node::SourceChannelOwner<output_t>::set_channel(
            std::make_unique<mrc::channel::BufferedChannel<output_t>>());
    }
    ~DynamicBatcher() override = default;

  private:
    /**
     * @brief Runnable's entrypoint.
     */
    void run(mrc::runnable::Context& ctx) override
    {
        T input_data;
        auto status = this->get_readable_edge()->await_read(input_data);










        // Only drop the output edges if we are rank 0
        if (ctx.rank() == 0)
        {
            // Need to drop the output edges
            mrc::node::SourceProperties<output_t>::release_edge_connection();
            mrc::node::SinkProperties<T>::release_edge_connection();
        }
    }

    /**
     * @brief Runnable's state control, for stopping from MRC.
     */
    void on_state_update(const state_t& state) final;

    std::stop_source m_stop_source;
};

Code of Conduct

  • I agree to follow MRC's Code of Conduct
  • I have searched the open feature requests and have found no duplicates for this feature request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
Status: Todo
Development

No branches or pull requests

2 participants