This repository has been archived by the owner on Jun 23, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
/
mutation_duplicator.h
87 lines (72 loc) · 3.08 KB
/
mutation_duplicator.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <dsn/utility/errors.h>
#include <dsn/dist/replication/replication_types.h>
#include <dsn/dist/replication/replica_base.h>
#include <dsn/cpp/pipeline.h>
namespace dsn {
namespace replication {
/// \brief Each of the mutation is a tuple made up of
/// <timestamp, task_code, dsn::blob>.
/// dsn::blob is the content of the mutation.
typedef std::tuple<uint64_t, task_code, blob> mutation_tuple;
/// mutations are sorted by timestamp in mutation_tuple_set.
struct mutation_tuple_cmp
{
inline bool operator()(const mutation_tuple &lhs, const mutation_tuple &rhs) const
{
// different mutations is probable to be batched together
// and sharing the same timestamp, so here we also compare
// the data pointer.
if (std::get<0>(lhs) == std::get<0>(rhs)) {
return std::get<2>(lhs).data() < std::get<2>(rhs).data();
}
return std::get<0>(lhs) < std::get<0>(rhs);
}
};
typedef std::set<mutation_tuple, mutation_tuple_cmp> mutation_tuple_set;
/// \brief This is an interface for handling the mutation logs intended to
/// be duplicated to remote cluster.
/// \see dsn::replication::replica_duplicator
class mutation_duplicator : public replica_base
{
public:
typedef std::function<void(size_t /*total_shipped_size*/)> callback;
/// Duplicate the provided mutations to the remote cluster.
/// The implementation must be non-blocking.
///
/// \param cb: Call it when all the given mutations were sent successfully
virtual void duplicate(mutation_tuple_set mutations, callback cb) = 0;
// Singleton creator of mutation_duplicator.
static std::function<std::unique_ptr<mutation_duplicator>(
replica_base *, string_view /*remote cluster*/, string_view /*app name*/)>
creator;
explicit mutation_duplicator(replica_base *r) : replica_base(r) {}
virtual ~mutation_duplicator() = default;
void set_task_environment(pipeline::environment *env) { _env = *env; }
protected:
friend class replica_duplicator_test;
pipeline::environment _env;
};
inline std::unique_ptr<mutation_duplicator>
new_mutation_duplicator(replica_base *r, string_view remote_cluster_address, string_view app)
{
return mutation_duplicator::creator(r, remote_cluster_address, app);
}
} // namespace replication
} // namespace dsn