Skip to content

Commit

Permalink
Synchronize console writes (#227)
Browse files Browse the repository at this point in the history
Provide an internal mechanism for console writes. This will prevent messages from being interleaved.

Signed-off-by: Michael Carroll <[email protected]>
Co-authored-by: Addisu Z. Taddese <[email protected]>
Co-authored-by: Alejandro Hernández Cordero <[email protected]>
Co-authored-by: Nate Koenig <[email protected]>
  • Loading branch information
4 people committed Jun 24, 2022
1 parent dbc35c0 commit 7be12d1
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 12 deletions.
15 changes: 14 additions & 1 deletion include/ignition/common/Console.hh
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,17 @@ namespace ignition
/// \brief Destructor.
public: virtual ~Buffer();

/// \brief Writes _count characters to the string buffer
/// \param[in] _char Input rharacter array.
/// \param[in] _count Number of characters in array.
/// \return The number of characters successfully written.
public: std::streamsize xsputn(
const char *_char, std::streamsize _count) override;

/// \brief Sync the stream (output the string buffer
/// contents).
/// \return Return 0 on success.
public: virtual int sync();
public: int sync() override;

/// \brief Destination type for the messages.
public: LogType type;
Expand All @@ -198,6 +205,12 @@ namespace ignition

/// \brief Level of verbosity
public: int verbosity;

/// \brief Mutex to synchronize writes to the string buffer
/// and the output stream.
/// \todo(nkoenig) Put this back in for ign-common5, and
/// remove the corresponding static version.
// public: std::mutex syncMutex;
};

IGN_COMMON_WARN_IGNORE__DLL_INTERFACE_MISSING
Expand Down
61 changes: 52 additions & 9 deletions src/Console.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
#include <string>
#include <sstream>
#include <unordered_map>

#include <ignition/common/Console.hh>
#include <ignition/common/config.hh>
Expand All @@ -27,6 +28,7 @@
using namespace ignition;
using namespace common;


FileLogger ignition::common::Console::log("");

// On UNIX, these are ANSI-based color codes. On Windows, these are colors from
Expand Down Expand Up @@ -107,6 +109,23 @@ Logger &Logger::operator()(const std::string &_file, int _line)
return (*this);
}


/////////////////////////////////////////////////
/// \brief Provide a lock_guard for a given buffer
/// This is workaround to keep us from breaking ABI, and can be removed
/// in future versions
/// \param[in] _bufferId a unique id of the requesting buffer
/// Acquired via reinterpret_cast<uintptr_t>(this)
/// \returns A RAII lock guard
std::lock_guard<std::mutex> BufferLock(std::uintptr_t _bufferId)
{
static std::unordered_map<uintptr_t, std::mutex> *kSyncMutex{nullptr};
if(nullptr == kSyncMutex)
kSyncMutex = new std::unordered_map<uintptr_t, std::mutex>();

return std::lock_guard<std::mutex>(kSyncMutex->operator[](_bufferId));
}

/////////////////////////////////////////////////
Logger::Buffer::Buffer(LogType _type, const int _color, const int _verbosity)
: type(_type), color(_color), verbosity(_verbosity)
Expand All @@ -119,14 +138,29 @@ Logger::Buffer::~Buffer()
this->pubsync();
}

/////////////////////////////////////////////////
std::streamsize Logger::Buffer::xsputn(const char *_char,
std::streamsize _count)
{
auto lk = BufferLock(reinterpret_cast<std::uintptr_t>(this));
return std::stringbuf::xsputn(_char, _count);
}

/////////////////////////////////////////////////
int Logger::Buffer::sync()
{
std::string outstr = this->str();
std::string outstr;
{
auto lk = BufferLock(reinterpret_cast<std::uintptr_t>(this));
outstr = this->str();
}

// Log messages to disk
Console::log << outstr;
Console::log.flush();
{
auto lk = BufferLock(reinterpret_cast<std::uintptr_t>(this));
Console::log << outstr;
Console::log.flush();
}

// Output to terminal
if (Console::Verbosity() >= this->verbosity && !outstr.empty())
Expand All @@ -143,7 +177,10 @@ int Logger::Buffer::sync()
if (lastNewLine)
ss << std::endl;

fprintf(outstream, "%s", ss.str().c_str());
{
auto lk = BufferLock(reinterpret_cast<std::uintptr_t>(this));
fprintf(outstream, "%s", ss.str().c_str());
}
#else
HANDLE hConsole = CreateFileW(
L"CONOUT$", GENERIC_WRITE|GENERIC_READ, 0, nullptr, OPEN_EXISTING,
Expand All @@ -168,14 +205,20 @@ int Logger::Buffer::sync()
std::ostream &outStream =
this->type == Logger::STDOUT ? std::cout : std::cerr;

if (vtProcessing)
outStream << "\x1b[" << this->color << "m" << outstr << "\x1b[m";
else
outStream << outstr;
{
auto lk = BufferLock(reinterpret_cast<std::uintptr_t>(this));
if (vtProcessing)
outStream << "\x1b[" << this->color << "m" << outstr << "\x1b[m";
else
outStream << outstr;
}
#endif
}

this->str("");
{
auto lk = BufferLock(reinterpret_cast<std::uintptr_t>(this));
this->str("");
}
return 0;
}

Expand Down
21 changes: 19 additions & 2 deletions src/Util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,19 +249,36 @@ std::string ignition::common::systemTimeIso()
return timeToIso(IGN_SYSTEM_TIME());
}

// Taken from gtest.cc
static bool PortableLocaltime(time_t seconds, struct tm* out) {
#if defined(_MSC_VER)
return localtime_s(out, &seconds) == 0;
#elif defined(__MINGW32__) || defined(__MINGW64__)
// MINGW <time.h> provides neither localtime_r nor localtime_s, but uses
// Windows' localtime(), which has a thread-local tm buffer.
struct tm* tm_ptr = localtime(&seconds); // NOLINT
if (tm_ptr == nullptr) return false;
*out = *tm_ptr;
return true;
#else
return localtime_r(&seconds, out) != nullptr;
#endif
}

/////////////////////////////////////////////////
std::string ignition::common::timeToIso(
const std::chrono::time_point<std::chrono::system_clock> &_time)
{
char isoStr[25];

auto epoch = _time.time_since_epoch();
auto sec = std::chrono::duration_cast<std::chrono::seconds>(epoch).count();
auto nano = std::chrono::duration_cast<std::chrono::nanoseconds>(
epoch).count() - sec * IGN_SEC_TO_NANO;

time_t tmSec = static_cast<time_t>(sec);
std::strftime(isoStr, sizeof(isoStr), "%FT%T", std::localtime(&tmSec));
struct tm localTime;
PortableLocaltime(tmSec, &localTime);
std::strftime(isoStr, sizeof(isoStr), "%FT%T", &localTime);

return std::string(isoStr) + "." + std::to_string(nano);
}
Expand Down
207 changes: 207 additions & 0 deletions test/performance/logging.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright (C) 2021 Open Source Robotics Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <gtest/gtest.h>

#include <atomic>
#include <map>
#include <thread>

#include <ignition/common/Console.hh>

namespace {
// Lower value than spdlog to keep CI from flaking
const uint64_t g_iterations{10000};

std::atomic<size_t> g_counter = {0};

void WriteToFile(std::string result_filename, std::string content)
{
std::ofstream out;
std::ios_base::openmode mode = std::ios_base::out | std::ios_base::app;
out.open(result_filename.c_str(), mode);
if (!out.is_open())
{
std::cerr << "Error writing to " << result_filename << std::endl;
}
out << content << std::flush;
std::cout << content;
}

void MeasurePeakDuringLogWrites(const size_t id, std::vector<uint64_t> &result)
{
while (true)
{
const size_t value_now = ++g_counter;
if (value_now > g_iterations)
{
return;
}
std::stringstream ss;
ss << "Some text to log for thread: " << id << "\n";
auto start_time = std::chrono::high_resolution_clock::now();
ignmsg << ss.str();
auto stop_time = std::chrono::high_resolution_clock::now();
uint64_t time_us = std::chrono::duration_cast<std::chrono::microseconds>(
stop_time - start_time)
.count();
result.push_back(time_us);
}
}

void PrintStats(const std::string &filename,
const std::map<size_t, std::vector<uint64_t>> &threads_result,
const uint64_t total_time_in_us)
{
size_t idx = 0;
std::ostringstream oss;
for (auto t_result : threads_result)
{
uint64_t worstUs =
(*std::max_element(t_result.second.begin(), t_result.second.end()));
oss << idx++ << " the worst thread latency was:" << worstUs / uint64_t(1000)
<< " ms (" << worstUs << " us)] " << std::endl;
}

oss << "Total time :" << total_time_in_us / uint64_t(1000) << " ms ("
<< total_time_in_us << " us)" << std::endl;
oss << "Average time: " << double(total_time_in_us) / double(g_iterations)
<< " us" << std::endl;
WriteToFile(filename, oss.str());
}

void SaveResultToBucketFile(
std::string result_filename,
const std::map<size_t, std::vector<uint64_t>> &threads_result)
{
// now split the result in buckets of 1ms each so that it's obvious how the
// peaks go
std::vector<uint64_t> all_measurements;
all_measurements.reserve(g_iterations);
for (auto &t_result : threads_result)
{
all_measurements.insert(all_measurements.end(), t_result.second.begin(),
t_result.second.end());
}

std::map<uint64_t, uint64_t> bucketsWithEmpty;
std::map<uint64_t, uint64_t> buckets;
// force empty buckets to appear
auto maxValueIterator =
std::max_element(all_measurements.begin(), all_measurements.end());
const auto kMaxValue = *maxValueIterator;

for (uint64_t idx = 0; idx <= kMaxValue; ++idx)
{
bucketsWithEmpty[idx] = 0;
}

for (auto value : all_measurements)
{
++bucketsWithEmpty[value];
++buckets[value];
}

/*
std::cout << "\n\n Microsecond bucket measurement" << std::endl;
for (const auto ms_bucket : buckets) {
std::cout << ms_bucket.first << "\t, " << ms_bucket.second << std::endl;
}
std::cout << "\n\n";
*/

std::ostringstream oss;
// Save to xcel and google doc parsable format. with empty buckets
oss << "\n\n Microsecond bucket measurement with zero buckets till max"
<< std::endl;
for (const auto ms_bucket : bucketsWithEmpty)
{
oss << ms_bucket.first << "\t, " << ms_bucket.second << std::endl;
}
WriteToFile(result_filename, oss.str());
std::cout << "Worst Case Latency, max value: " << kMaxValue << std::endl;
std::cout << "microsecond bucket result is in file: " << result_filename
<< std::endl;
}
} // namespace

void run(size_t number_of_threads)
{
g_counter = 0;
ignition::common::Console::SetVerbosity(4);
std::vector<std::thread> threads(number_of_threads);
std::map<size_t, std::vector<uint64_t>> threads_result;

for (size_t idx = 0; idx < number_of_threads; ++idx)
{
// reserve to 1 million for all the result
// it's a test so let's not care about the wasted space
threads_result[idx].reserve(g_iterations);
}

// int queue_size = 1048576; // 2 ^ 20
// int queue_size = 524288; // 2 ^ 19
// spdlog::set_async_mode(queue_size); // default size is 1048576
auto filename_result = std::to_string(number_of_threads) + ".result.csv";
std::ostringstream oss;
oss << "Using " << number_of_threads;
oss << " to log in total " << g_iterations << " log entries to "
<< filename_result << std::endl;
WriteToFile(filename_result, oss.str());

auto start_time_application_total =
std::chrono::high_resolution_clock::now();
for (uint64_t idx = 0; idx < number_of_threads; ++idx)
{
threads[idx] = std::thread(MeasurePeakDuringLogWrites, idx,
std::ref(threads_result[idx]));
}
for (size_t idx = 0; idx < number_of_threads; ++idx)
{
threads[idx].join();
}
auto stop_time_application_total = std::chrono::high_resolution_clock::now();

uint64_t total_time_in_us =
std::chrono::duration_cast<std::chrono::microseconds>(
stop_time_application_total - start_time_application_total)
.count();

PrintStats(filename_result, threads_result, total_time_in_us);
SaveResultToBucketFile(filename_result, threads_result);
}

class LoggingTest:
public ::testing::TestWithParam<std::size_t>
{
};

TEST_P(LoggingTest, RunThreads)
{
run(GetParam());
SUCCEED();
}

INSTANTIATE_TEST_SUITE_P(LoggingTest, LoggingTest,
::testing::Values(1, 2, 4, 8, 16, 32));

/////////////////////////////////////////////////
// This test is valid (passes) if it runs without segfaults.
int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

0 comments on commit 7be12d1

Please sign in to comment.