Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Thread] Add thread local variable ThreadContext #7234

Merged
merged 2 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ set(RUNTIME_FILES
runtime_state.cpp
runtime_filter_mgr.cpp
string_value.cpp
thread_context.cpp
thread_resource_mgr.cpp
threadlocal.cc
decimalv2_value.cpp
large_int_value.cpp
collection_value.cpp
Expand Down
32 changes: 32 additions & 0 deletions be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/thread_context.h"

namespace doris {

DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, thread_local_ctx);

ThreadContextPtr::ThreadContextPtr() {
INIT_STATIC_THREAD_LOCAL(ThreadContext, thread_local_ctx);
}

ThreadContext* ThreadContextPtr::get() {
return thread_local_ctx;
}

} // namespace doris
143 changes: 143 additions & 0 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>
#include <thread>

#include "common/logging.h"
#include "gen_cpp/Types_types.h"
#include "runtime/threadlocal.h"

#define SCOPED_ATTACH_TASK_THREAD(type, ...) \
auto VARNAME_LINENUM(attach_task_thread) = AttachTaskThread(type, ## __VA_ARGS__)

namespace doris {

class TUniqueId;

// The thread context saves some info about a working thread.
// 2 requried info:
// 1. thread_id: Current thread id, Auto generated.
// 2. type: The type is a enum value indicating which type of task current thread is running.
// For example: QUERY, LOAD, COMPACTION, ...
// 3. task id: A unique id to identify this task. maybe query id, load job id, etc.
//
// There may be other optional info to be added later.
class ThreadContext {
public:
enum TaskType {
UNKNOWN = 0,
QUERY = 1,
LOAD = 2,
COMPACTION = 3
// to be added ...
};

public:
ThreadContext() : _thread_id(std::this_thread::get_id()), _type(TaskType::UNKNOWN) {}

void attach(const TaskType& type, const std::string& task_id,
const TUniqueId& fragment_instance_id) {
DCHECK(_type == TaskType::UNKNOWN && _task_id == "");
_type = type;
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
}

void detach() {
_type = TaskType::UNKNOWN;
_task_id = "";
_fragment_instance_id = TUniqueId();
}

const std::string type() const;
const std::string& task_id() const { return _task_id; }
const std::thread::id& thread_id() const { return _thread_id; }
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }

private:
std::thread::id _thread_id;
TaskType _type;
std::string _task_id;
TUniqueId _fragment_instance_id;
};

// Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error,
// see https://github.com/apache/incubator-doris/pull/7911
//
// If we want to avoid this error,
// 1. For non-trivial variables in thread_local, such as std::string, you need to store them as pointers to
// ensure that thread_local is trivial, these non-trivial pointers will uniformly call destructors elsewhere.
// 2. The default destructor of the thread_local variable cannot be overridden.
//
// This is difficult to implement. Because the destructor is not overwritten, it means that the outside cannot
// be notified when the thread terminates, and the non-trivial pointers in thread_local cannot be released in time.
// The func provided by pthread and std::thread doesn't help either.
//
// So, kudu Class-scoped static thread local implementation was introduced. Solve the above problem by
// Thread-scopedthread local + Class-scoped thread local.
//
// This may look very trick, but it's the best way I can find.
//
// refer to:
// https://gcc.gnu.org/onlinedocs/gcc-3.3.1/gcc/Thread-Local.html
// https://stackoverflow.com/questions/12049684/
// https://sourceware.org/glibc/wiki/Destructor%20support%20for%20thread_local%20variables
// https://www.jianshu.com/p/756240e837dd
// https://man7.org/linux/man-pages/man3/pthread_tryjoin_np.3.html
class ThreadContextPtr {
public:
ThreadContextPtr();

ThreadContext* get();

private:
DECLARE_STATIC_THREAD_LOCAL(ThreadContext, thread_local_ctx);
};

inline thread_local ThreadContextPtr thread_local_ctx;

inline const std::string task_type_string(ThreadContext::TaskType type) {
switch (type) {
case ThreadContext::TaskType::QUERY:
return "QUERY";
case ThreadContext::TaskType::LOAD:
return "LOAD";
case ThreadContext::TaskType::COMPACTION:
return "COMPACTION";
default:
return "UNKNOWN";
}
}

inline const std::string ThreadContext::type() const {
return task_type_string(_type);
}

class AttachTaskThread {
public:
explicit AttachTaskThread(const ThreadContext::TaskType& type, const std::string& task_id = "",
const TUniqueId& fragment_instance_id = TUniqueId()) {
thread_local_ctx.get()->attach(type, task_id, fragment_instance_id);
}

~AttachTaskThread() { thread_local_ctx.get()->detach(); }
};

} // namespace doris
84 changes: 84 additions & 0 deletions be/src/runtime/threadlocal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this file is copied from other project, better add a comment.
Same as other file in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, in threadlocal.h

#include "runtime/threadlocal.h"

#include <pthread.h>

#include <memory>
#include <ostream>
#include <string>

#include "common/logging.h"
#include "gutil/once.h"
#include "util/errno.h"

namespace doris {

// One key used by the entire process to attach destructors on thread exit.
static pthread_key_t destructors_key;

// The above key must only be initialized once per process.
static GoogleOnceType once = GOOGLE_ONCE_INIT;

namespace {

// List of destructors for all thread locals instantiated on a given thread.
struct PerThreadDestructorList {
void (*destructor)(void*);
void* arg;
PerThreadDestructorList* next;
};

} // anonymous namespace

// Call all the destructors associated with all THREAD_LOCAL instances in this
// thread.
static void invoke_destructors(void* t) {
PerThreadDestructorList* d = reinterpret_cast<PerThreadDestructorList*>(t);
while (d != nullptr) {
d->destructor(d->arg);
PerThreadDestructorList* next = d->next;
delete d;
d = next;
}
}

// This key must be initialized only once.
static void create_key() {
int ret = pthread_key_create(&destructors_key, &invoke_destructors);
// Linux supports up to 1024 keys, we will use only one for all thread locals.
CHECK_EQ(0, ret) << "pthread_key_create() failed, cannot add destructor to thread: "
<< "error " << ret << ": " << errno_to_string(ret);
}

// Adds a destructor to the list.
void add_destructor(void (*destructor)(void*), void* arg) {
GoogleOnceInit(&once, &create_key);

// Returns NULL if nothing is set yet.
std::unique_ptr<PerThreadDestructorList> p(new PerThreadDestructorList());
p->destructor = destructor;
p->arg = arg;
p->next = reinterpret_cast<PerThreadDestructorList*>(pthread_getspecific(destructors_key));
int ret = pthread_setspecific(destructors_key, p.release());
// The only time this check should fail is if we are out of memory, or if
// somehow key creation failed, which should be caught by the above CHECK.
CHECK_EQ(0, ret) << "pthread_setspecific() failed, cannot update destructor list: "
<< "error " << ret << ": " << errno_to_string(ret);
}

} // namespace doris
127 changes: 127 additions & 0 deletions be/src/runtime/threadlocal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Reference from kudu, Solve the problem of gcc11 compiling
// non-trivial thread_local variables on lower versions of GLIBC.
// see https://github.com/apache/incubator-doris/pull/7911
//
// Block-scoped static thread local implementation.
//
// Usage is similar to a C++11 thread_local. The BLOCK_STATIC_THREAD_LOCAL macro
// defines a thread-local pointer to the specified type, which is lazily
// instantiated by any thread entering the block for the first time. The
// constructor for the type T is invoked at macro execution time, as expected,
// and its destructor is invoked when the corresponding thread's Runnable
// returns, or when the thread exits.
//
// Inspired by Poco <http://pocoproject.org/docs/Poco.ThreadLocal.html>,
// Andrew Tomazos <http://stackoverflow.com/questions/12049684/>, and
// the C++11 thread_local API.
//
// Example usage:
//
// // Invokes a 3-arg constructor on SomeClass:
// BLOCK_STATIC_THREAD_LOCAL(SomeClass, instance, arg1, arg2, arg3);
// instance->DoSomething();
//

#pragma once

#include "gutil/port.h"

#define BLOCK_STATIC_THREAD_LOCAL(T, t, ...) \
static __thread T* t; \
do { \
if (PREDICT_FALSE(t == NULL)) { \
t = new T(__VA_ARGS__); \
add_destructor(destroy<T>, t); \
} \
} while (false)

// Class-scoped static thread local implementation.
//
// Very similar in implementation to the above block-scoped version, but
// requires a bit more syntax and vigilance to use properly.
//
// DECLARE_STATIC_THREAD_LOCAL(Type, instance_var_) must be placed in the
// class header, as usual for variable declarations.
//
// Because these variables are static, they must also be defined in the impl
// file with DEFINE_STATIC_THREAD_LOCAL(Type, Classname, instance_var_),
// which is very much like defining any static member, i.e. int Foo::member_.
//
// Finally, each thread must initialize the instance before using it by calling
// INIT_STATIC_THREAD_LOCAL(Type, instance_var_, ...). This is a cheap
// call, and may be invoked at the top of any method which may reference a
// thread-local variable.
//
// Due to all of these requirements, you should probably declare TLS members
// as private.
//
// Example usage:
//
// // foo.h
// #include "kudu/utils/file.h"
// class Foo {
// public:
// void DoSomething(std::string s);
// private:
// DECLARE_STATIC_THREAD_LOCAL(utils::File, file_);
// };
//
// // foo.cc
// #include "kudu/foo.h"
// DEFINE_STATIC_THREAD_LOCAL(utils::File, Foo, file_);
// void Foo::WriteToFile(std::string s) {
// // Call constructor if necessary.
// INIT_STATIC_THREAD_LOCAL(utils::File, file_, "/tmp/file_location.txt");
// file_->Write(s);
// }

// Goes in the class declaration (usually in a header file).
// dtor must be destructed _after_ t, so it gets defined first.
// Uses a mangled variable name for dtor since it must also be a member of the
// class.
#define DECLARE_STATIC_THREAD_LOCAL(T, t) static __thread T* t

// You must also define the instance in the .cc file.
#define DEFINE_STATIC_THREAD_LOCAL(T, Class, t) __thread T* Class::t

// Must be invoked at least once by each thread that will access t.
#define INIT_STATIC_THREAD_LOCAL(T, t, ...) \
do { \
if (PREDICT_FALSE(t == NULL)) { \
t = new T(__VA_ARGS__); \
add_destructor(destroy<T>, t); \
} \
} while (false)

// Internal implementation below.

namespace doris {

// Add a destructor to the list.
void add_destructor(void (*destructor)(void*), void* arg);

// Destroy the passed object of type T.
template <class T>
static void destroy(void* t) {
// With tcmalloc, this should be pretty cheap (same thread as new).
delete reinterpret_cast<T*>(t);
}

} // namespace doris