Skip to content

Commit

Permalink
fixups
Browse files Browse the repository at this point in the history
  • Loading branch information
arvidn committed Jun 26, 2024
1 parent f5ce235 commit e7d2939
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 42 deletions.
24 changes: 12 additions & 12 deletions include/libtorrent/aux_/debug_disk_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,60 +42,60 @@ inline std::string print_job(aux::disk_job const& j)
explicit print_visitor(std::stringstream& s) : m_ss(s) {}

void operator()(job::read const& j) const {
m_ss << "read ( size:" << j.buffer_size << " piece:" << j.piece << " offset:" << j.offset << " )";
m_ss << "read ( size: " << j.buffer_size << " piece: " << j.piece << " offset: " << j.offset << " )";
}

void operator()(job::write const& j) const {
m_ss << "write( size:" << j.buffer_size << " piece:" << j.piece << " offset:" << j.offset << " )";
m_ss << "write( size: " << j.buffer_size << " piece: " << j.piece << " offset: " << j.offset << " )";
}

void operator()(job::hash const& j) const {
m_ss << "hash( piece:" << j.piece << " )";
m_ss << "hash( piece: " << j.piece << " )";
}

void operator()(job::hash2 const& j) const {
m_ss << "hash( piece:" << j.piece << " offset:" << j.offset << " )";
m_ss << "hash( piece: " << j.piece << " offset: " << j.offset << " )";
}

void operator()(job::move_storage const& j) const {
m_ss << "move-storage( path:" << j.path << " flags:" << int(j.move_flags) << " )";
m_ss << "move-storage( path: " << j.path << " flags: " << int(j.move_flags) << " )";
}

void operator()(job::release_files const&) const {
m_ss << "move-storage( )";
}

void operator()(job::delete_files const& j) const {
m_ss << "delete-files ( flags:" << j.flags << " )";
m_ss << "delete-files ( flags: " << j.flags << " )";
}

void operator()(job::check_fastresume const&) const {
m_ss << "check-fastresume( )";
}

void operator()(job::rename_file const& j) const {
m_ss << "rename-file( file:" << j.file_index << " name:" << j.name << " )";
m_ss << "rename-file( file: " << j.file_index << " name: " << j.name << " )";
}

void operator()(job::stop_torrent const&) const {
m_ss << "stop-torrent( )";
}

void operator()(job::file_priority const& j) const {
m_ss << "file-priority( num-files:" << j.prio.size() << " )";
m_ss << "file-priority( num-files: " << j.prio.size() << " )";
}

void operator()(job::clear_piece const& j) const {
m_ss << "clear-piece( piece:" << j.piece << " )";
m_ss << "clear-piece( piece: " << j.piece << " )";
}

void operator()(job::partial_read const& j) const {
m_ss << "partial-read( piece:" << j.piece << " offset:" << j.offset
<< " buf-offset:" << j.buffer_offset << " size:" << j.buffer_size << " )";
m_ss << "partial-read( piece: " << j.piece << " offset: " << j.offset
<< " buf-offset:" << j.buffer_offset << " size: " << j.buffer_size << " )";
}

void operator()(job::kick_hasher const& j) const {
m_ss << "kick-hasher( piece:" << j.piece << " )";
m_ss << "kick-hasher( piece: " << j.piece << " )";
}

private:
Expand Down
23 changes: 19 additions & 4 deletions include/libtorrent/aux_/disk_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ inline size_t hash_value(piece_location const& l)
return ret;
}

struct piece_hasher
{
piece_hasher() : ph(hasher{}) {}

sha1_hash final_hash();
void update(span<char const> const buf);
lt::hasher& ctx();

private:
std::variant<hasher, sha1_hash> ph;
};

struct cached_block_entry
{
// returns the buffer associated with this block. It either picks it from
Expand Down Expand Up @@ -105,7 +117,9 @@ struct cached_piece_entry
{
cached_piece_entry(piece_location const& loc
, int const num_blocks
, int const piece_size_v2);
, int const piece_size_v2
, bool v1
, bool v2);

span<cached_block_entry> get_blocks() const;

Expand All @@ -117,7 +131,8 @@ struct cached_piece_entry
bool ready_to_flush = false;

// when this is true, there is a thread currently hashing blocks and
// updating the hash context in "ph".
// updating the hash context in "ph". Other threads may not touch "ph",
// "hasing_cursor", and may only read "hasing".
bool hashing = false;

// when a thread is writing this piece to disk, this is true. Only one
Expand Down Expand Up @@ -163,7 +178,7 @@ struct cached_piece_entry

unique_ptr<cached_block_entry[]> blocks;

hasher ph;
piece_hasher ph;

// if there is a hash_job set on this piece, whenever we complete hashing
// the last block, we should post this
Expand Down Expand Up @@ -281,7 +296,7 @@ struct disk_cache
e.hashing = false;
});
});
f(const_cast<hasher&>(piece_iter->ph), hasher_cursor, blocks, v2_hashes);
f(const_cast<piece_hasher&>(piece_iter->ph), hasher_cursor, blocks, v2_hashes);
return true;
}

Expand Down
82 changes: 69 additions & 13 deletions src/disk_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ see LICENSE file.
*/

#include "libtorrent/aux_/disk_cache.hpp"
#include "libtorrent/aux_/debug_disk_thread.hpp"

namespace libtorrent::aux {

Expand Down Expand Up @@ -91,8 +92,41 @@ span<char const> cached_block_entry::write_buf() const
return {nullptr, 0};
}

cached_piece_entry::cached_piece_entry(piece_location const& loc, int const num_blocks, int const piece_size_v2)
template <typename... Type>
struct overload : Type... {
using Type::operator()...;
};
template<class... Type> overload(Type...) -> overload<Type...>;

sha1_hash piece_hasher::final_hash()
{
sha1_hash ret;
std::visit(overload{
[&] (hasher& h) { ret = h.final(); ph = ret; },
[&] (sha1_hash const& h) { ret = h; },
}, ph);
TORRENT_ASSERT(!ret.is_all_zeros());
return ret;
}

void piece_hasher::update(span<char const> const buf)
{
hasher* ctx = std::get_if<hasher>(&ph);
TORRENT_ASSERT(ctx != nullptr);
ctx->update(buf);
}

lt::hasher& piece_hasher::ctx()
{
hasher* ctx = std::get_if<hasher>(&ph);
TORRENT_ASSERT(ctx != nullptr);
return *ctx;
}

cached_piece_entry::cached_piece_entry(piece_location const& loc, int const num_blocks, int const piece_size_v2, bool const v1, bool const v2)
: piece(loc)
, v1_hashes(v1)
, v2_hashes(v2)
, piece_size2(piece_size_v2)
, blocks_in_piece(num_blocks)
, blocks(aux::make_unique<cached_block_entry[], std::ptrdiff_t>(num_blocks))
Expand Down Expand Up @@ -158,13 +192,17 @@ bool disk_cache::insert(piece_location const loc
file_storage const& fs = storage->files();
int const blocks_in_piece = (storage->files().piece_size(loc.piece) + default_block_size - 1) / default_block_size;
int const piece_size2 = fs.piece_size2(loc.piece);
cached_piece_entry pe(loc, blocks_in_piece, piece_size2);
pe.v1_hashes = storage->v1();
pe.v2_hashes = storage->v2();
i = m_pieces.insert(std::move(pe)).first;
i = m_pieces.emplace(loc, blocks_in_piece, piece_size2, storage->v1(), storage->v2()).first;
}

cached_block_entry& blk = i->blocks[block_idx];
DLOG("disk_cache.insert: piece: %d blk: %d flushed: %d write_job: %p flushed_cursor: %d hashed_cursor: %d\n"
, static_cast<int>(i->piece.piece)
, block_idx
, blk.flushed_to_disk
, blk.write_job
, i->flushed_cursor
, i->hasher_cursor);
TORRENT_ASSERT(!blk.buf_holder);
TORRENT_ASSERT(blk.write_job == nullptr);
TORRENT_ASSERT(blk.flushed_to_disk == false);
Expand Down Expand Up @@ -211,7 +249,7 @@ disk_cache::hash_result disk_cache::try_hash_piece(piece_location const loc, pre
e.piece_hash_returned = true;

auto& job = std::get<aux::job::hash>(hash_job->action);
job.piece_hash = e.ph.final();
job.piece_hash = e.ph.final_hash();
if (!job.block_hashes.empty())
{
TORRENT_ASSERT(i->v2_hashes);
Expand Down Expand Up @@ -269,13 +307,16 @@ void disk_cache::kick_hasher(piece_location const& loc, jobqueue_t& completed_jo
}
auto const blocks = blocks_storage.first(block_idx);

auto& ctx = const_cast<hasher&>(piece_iter->ph);

view.modify(piece_iter, [](cached_piece_entry& e) { e.hashing = true; });

bool const need_v1 = piece_iter->v1_hashes;
bool const need_v2 = piece_iter->v2_hashes;

DLOG("kick_hasher: piece: %d hashed_cursor: [%d, %d] v1: %d v2: %d ctx: %p\n"
, static_cast<int>(piece_iter->piece.piece)
, cursor, end
, need_v1, need_v2
, &piece_iter->ph);
l.unlock();

int bytes_left = piece_iter->piece_size2 - (cursor * default_block_size);
Expand All @@ -284,7 +325,10 @@ void disk_cache::kick_hasher(piece_location const& loc, jobqueue_t& completed_jo
cached_block_entry& cbe = piece_iter->blocks[cursor];

if (need_v1)
{
aux::piece_hasher& ctx = const_cast<aux::piece_hasher&>(piece_iter->ph);
ctx.update(buf);
}

if (need_v2 && bytes_left > 0)
{
Expand All @@ -297,6 +341,7 @@ void disk_cache::kick_hasher(piece_location const& loc, jobqueue_t& completed_jo
}

l.lock();

for (auto& cbe : piece_iter->get_blocks().subspan(piece_iter->hasher_cursor, block_idx))
{
// TODO: free these in bulk, acquiring the mutex just once
Expand All @@ -315,20 +360,26 @@ void disk_cache::kick_hasher(piece_location const& loc, jobqueue_t& completed_jo
// if some other thread added the next block, keep going
if (piece_iter->blocks[cursor].buf().data())
goto keep_going;
DLOG("kick_hasher: no attached hash job\n");
return;
}

if (!piece_iter->hash_job) return;

// there's a hash job hung on this piece, post it now
pread_disk_job* j = nullptr;
span<cached_block_entry> const cached_blocks = piece_iter->get_blocks();
view.modify(piece_iter, [&cached_blocks, &j](cached_piece_entry& e) {

sha1_hash piece_hash;
TORRENT_ASSERT(!piece_iter->piece_hash_returned);
view.modify(piece_iter, [&cached_blocks, &j, &piece_hash](cached_piece_entry& e) {
j = std::exchange(e.hash_job, nullptr);
e.ready_to_flush = compute_ready_to_flush(cached_blocks);
e.piece_hash_returned = true;
// we've hashed all blocks, and there's a hash job associated with
// this piece, post it.
piece_hash = e.ph.final_hash();
});
// we've hashed all blocks, and there's a hash job associated with
// this piece, post it.
sha1_hash const piece_hash = ctx.final();

auto& job = std::get<job::hash>(j->action);
job.piece_hash = piece_hash;
Expand All @@ -341,6 +392,8 @@ void disk_cache::kick_hasher(piece_location const& loc, jobqueue_t& completed_jo
for (int i = 0; i < to_copy; ++i)
job.block_hashes[i] = piece_iter->blocks[i].block_hash;
}
DLOG("kick_hasher: posting attached job piece: %d\n"
, static_cast<int>(piece_iter->piece.piece));
completed_jobs.push_back(j);
}

Expand Down Expand Up @@ -408,6 +461,8 @@ Iter disk_cache::flush_piece_impl(View& view
TORRENT_ASSERT(e.num_jobs >= jobs);
e.num_jobs -= jobs;
});
DLOG("flush_piece_impl: piece: %d flushed_cursor: %d ready_to_flush: %d\n"
, static_cast<int>(piece_iter->piece.piece), piece_iter->flushed_cursor, piece_iter->ready_to_flush);
TORRENT_ASSERT(count <= blocks.size());
TORRENT_ASSERT(m_blocks >= count);
m_blocks -= count;
Expand Down Expand Up @@ -674,7 +729,8 @@ void disk_cache::clear_piece_impl(cached_piece_entry& cpe, jobqueue_t& aborted)
cpe.flushed_cursor = 0;
TORRENT_ASSERT(cpe.num_jobs >= jobs);
cpe.num_jobs -= jobs;
cpe.ph = hasher{};
cpe.ph = piece_hasher{};
DLOG("clear_piece: piece: %d\n", static_cast<int>(cpe.piece.piece));
}

}
4 changes: 2 additions & 2 deletions src/disk_completed_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void disk_completed_queue::abort_jobs(io_context& ioc, jobqueue_t jobs)
std::lock_guard<std::mutex> l(m_completed_jobs_mutex);
m_completed_jobs.append(std::move(jobs));

if (!m_job_completions_in_flight)
if (!m_job_completions_in_flight && !m_completed_jobs.empty())
{
DLOG("posting job handlers (%d)\n", m_completed_jobs.size());

Expand All @@ -74,7 +74,7 @@ void disk_completed_queue::append(io_context& ioc, jobqueue_t jobs)
std::lock_guard<std::mutex> l(m_completed_jobs_mutex);
m_completed_jobs.append(std::move(jobs));

if (!m_job_completions_in_flight)
if (!m_job_completions_in_flight && !m_completed_jobs.empty())
{
DLOG("posting job handlers (%d)\n", m_completed_jobs.size());

Expand Down
16 changes: 12 additions & 4 deletions src/pread_disk_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ status_t pread_disk_io::do_job(aux::job::hash& a, aux::pread_disk_job* j)

// this creates a function object, ready to be passed to
// m_cache.hash_piece()
auto hash_partial_piece = [&] (lt::hasher& ph
auto hash_partial_piece = [&] (lt::aux::piece_hasher& ph
, int const hasher_cursor
, span<char const*> const blocks
, span<sha256_hash> const v2_hashes)
Expand Down Expand Up @@ -930,7 +930,7 @@ status_t pread_disk_io::do_job(aux::job::hash& a, aux::pread_disk_job* j)
? (j->flags & ~disk_interface::flush_piece)
: j->flags;

j->storage->hash(m_settings, ph, len, a.piece
j->storage->hash(m_settings, ph.ctx(), len, a.piece
, offset, file_mode, flags, j->error);
}
if (v2_block)
Expand All @@ -955,7 +955,7 @@ status_t pread_disk_io::do_job(aux::job::hash& a, aux::pread_disk_job* j)
}

if (v1)
a.piece_hash = ph.final();
a.piece_hash = ph.final_hash();

if (!j->error.ec)
{
Expand All @@ -976,7 +976,7 @@ status_t pread_disk_io::do_job(aux::job::hash& a, aux::pread_disk_job* j)
TORRENT_ALLOCA(blocks, char const*, blocks_to_read);
TORRENT_ALLOCA(v2_hashes, sha256_hash, blocks_in_piece2);
for (char const*& b : blocks) b = nullptr;
hasher ph;
lt::aux::piece_hasher ph;
hash_partial_piece(ph, 0, blocks, v2_hashes);
}
return j->error ? disk_status::fatal_disk_error : status_t{};
Expand Down Expand Up @@ -1471,6 +1471,14 @@ void pread_disk_io::thread_fun(aux::disk_io_thread_pool& pool
}

auto* j = static_cast<aux::pread_disk_job*>(pool.pop_front());

if (&pool == &m_generic_threads)
{
// This will attempt to flush any pieces that have been completely
// downloaded
try_flush_cache(m_cache.size(), l);
}

l.unlock();

TORRENT_ASSERT((j->flags & aux::disk_job::in_progress) || !j->storage);
Expand Down
Loading

0 comments on commit e7d2939

Please sign in to comment.