Skip to content

Commit

Permalink
Some job and control cleanup and fixing
Browse files Browse the repository at this point in the history
1. For code readability use 'control' instead of 's'
2. Simplify dt_control_running()
3. Removed not needed checks on errors
4. Two functions are strictly internal so no exposing via .h files and be static
   - _control_get_threadid()
   - _control_job_set_synchronous()
5. Two function were not used at all (so were untested) so the were removed
   - dt_control_job_set_state_callback()
   - dt_control_job_wait()
6. gphoto joining after cleanup test
  • Loading branch information
jenshannoschwalm committed Dec 14, 2024
1 parent 1161ef3 commit 2a6d4f4
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 126 deletions.
81 changes: 36 additions & 45 deletions src/control/control.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,29 +288,27 @@ void dt_control_change_cursor(dt_cursor_t curs)

gboolean dt_control_running()
{
dt_control_t *dc = darktable.control;
const int status = dc ? dt_atomic_get_int(&dc->running) : DT_CONTROL_STATE_DISABLED;
return status == DT_CONTROL_STATE_RUNNING;
dt_control_t *control = darktable.control;
return control ? dt_atomic_get_int(&control->running) == DT_CONTROL_STATE_RUNNING : FALSE;
}

void dt_control_quit()
{
if(dt_control_running())
{
dt_control_t *dc = darktable.control;

dt_control_t *control = darktable.control;
#ifdef HAVE_PRINT
dt_printers_abort_discovery();
// Cups timeout could be pretty long, at least 30seconds
// but don't rely on cups returning correctly so a timeout
for(int i = 0; i < 40000 && !dc->cups_started; i++)
for(int i = 0; i < 40000 && !control->cups_started; i++)
g_usleep(1000);
#endif

dt_pthread_mutex_lock(&dc->cond_mutex);
dt_pthread_mutex_lock(&control->cond_mutex);
// set the "pending cleanup work" flag to be handled in dt_control_shutdown()
dt_atomic_set_int(&dc->running, DT_CONTROL_STATE_CLEANUP);
dt_pthread_mutex_unlock(&dc->cond_mutex);
dt_atomic_set_int(&control->running, DT_CONTROL_STATE_CLEANUP);
dt_pthread_mutex_unlock(&control->cond_mutex);
}

if(g_atomic_int_get(&darktable.gui_running))
Expand All @@ -320,61 +318,54 @@ void dt_control_quit()
}
}

void dt_control_shutdown(dt_control_t *s)
void dt_control_shutdown(dt_control_t *control)
{
if(!s)
if(!control)
return;

dt_pthread_mutex_lock(&s->cond_mutex);
const gboolean cleanup = dt_atomic_exch_int(&s->running, DT_CONTROL_STATE_DISABLED) == DT_CONTROL_STATE_CLEANUP;
pthread_cond_broadcast(&s->cond);
dt_pthread_mutex_unlock(&s->cond_mutex);
dt_pthread_mutex_lock(&control->cond_mutex);
const gboolean cleanup = dt_atomic_exch_int(&control->running, DT_CONTROL_STATE_DISABLED) == DT_CONTROL_STATE_CLEANUP;
pthread_cond_broadcast(&control->cond);
dt_pthread_mutex_unlock(&control->cond_mutex);

int err = 0; // collect all joining errors
/* first wait for gphoto device updater */
#ifdef HAVE_GPHOTO2
err = pthread_join(s->update_gphoto_thread, NULL);
#endif
dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] closing control threads%s",
cleanup ? " in cleanup mode" : "");

if(!cleanup)
return; // if not running there are no threads to join

dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] closing control threads");
#ifdef HAVE_GPHOTO2
/* first and always wait for gphoto device updater */
pthread_join(control->update_gphoto_thread, NULL);
#endif

/* then wait for kick_on_workers_thread */
err = pthread_join(s->kick_on_workers_thread, NULL);
dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] joined kicker%s", err ? ", error" : "");
/* wait for kick_on_workers_thread */
pthread_join(control->kick_on_workers_thread, NULL);

for(int k = 0; k < s->num_threads-1; k++)
{
err = pthread_join(s->thread[k], NULL);
dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] joined num_thread %i%s", k, err ? ", error" : "");
}
for(int k = 0; k < control->num_threads-1; k++)
pthread_join(control->thread[k], NULL);

for(int k = 0; k < DT_CTL_WORKER_RESERVED; k++)
{
err = pthread_join(s->thread_res[k], NULL);
dt_print(DT_DEBUG_CONTROL, "[dt_control_shutdown] joined worker %i%s", k, err ? ", error" : "");
}
pthread_join(control->thread_res[k], NULL);
}

void dt_control_cleanup(dt_control_t *s)
void dt_control_cleanup(dt_control_t *control)
{
if(!s)
if(!control)
return;
// vacuum TODO: optional?
// DT_DEBUG_SQLITE3_EXEC(dt_database_get(darktable.db), "PRAGMA incremental_vacuum(0)", NULL, NULL, NULL);
// DT_DEBUG_SQLITE3_EXEC(dt_database_get(darktable.db), "vacuum", NULL, NULL, NULL);
dt_control_jobs_cleanup(s);
dt_pthread_mutex_destroy(&s->queue_mutex);
dt_pthread_mutex_destroy(&s->cond_mutex);
dt_pthread_mutex_destroy(&s->log_mutex);
dt_pthread_mutex_destroy(&s->toast_mutex);
dt_pthread_mutex_destroy(&s->res_mutex);
dt_pthread_mutex_destroy(&s->progress_system.mutex);
if(s->widgets) g_hash_table_destroy(s->widgets);
if(s->shortcuts) g_sequence_free(s->shortcuts);
if(s->input_drivers) g_slist_free_full(s->input_drivers, g_free);
dt_control_jobs_cleanup(control);
dt_pthread_mutex_destroy(&control->queue_mutex);
dt_pthread_mutex_destroy(&control->cond_mutex);
dt_pthread_mutex_destroy(&control->log_mutex);
dt_pthread_mutex_destroy(&control->toast_mutex);
dt_pthread_mutex_destroy(&control->res_mutex);
dt_pthread_mutex_destroy(&control->progress_system.mutex);
if(control->widgets) g_hash_table_destroy(control->widgets);
if(control->shortcuts) g_sequence_free(control->shortcuts);
if(control->input_drivers) g_slist_free_full(control->input_drivers, g_free);
}


Expand Down
106 changes: 32 additions & 74 deletions src/control/jobs.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ static inline gboolean _control_job_equal(_dt_job_t *j1, _dt_job_t *j2)
&& (g_strcmp0(j1->description, j2->description) == 0));
}

static __thread int threadid = -1;

static inline int32_t _control_get_threadid()
{
if(threadid > -1) return threadid;
return darktable.control->num_threads;
}

static inline int32_t _control_get_threadid_res()
{
if(threadid > -1) return threadid;
return DT_CTL_WORKER_RESERVED;
}

static void _control_job_set_state(_dt_job_t *job,
dt_job_state_t state)
{
Expand Down Expand Up @@ -159,7 +173,7 @@ gboolean dt_control_job_is_synchronous(const dt_job_t *job)
return job->is_synchronous;
}

void dt_control_job_set_synchronous(dt_job_t *job, gboolean sync)
static void _control_job_set_synchronous(dt_job_t *job, gboolean sync)
{
job->is_synchronous = sync;
}
Expand All @@ -178,14 +192,6 @@ void dt_control_job_dispose(_dt_job_t *job)
free(job);
}

void dt_control_job_set_state_callback(_dt_job_t *job, dt_job_state_change_callback cb)
{
// once the job got added to the queue it may not be changed from the outside
if(dt_control_job_get_state(job) != DT_JOB_STATE_INITIALIZED)
return; // get_state returns DISPOSED when job == NULL
job->state_changed_cb = cb;
}

// We don't want to log dt_get_wtime() as we already show the stamp
static void _control_job_print(_dt_job_t *job, const char *info, const char *err, int32_t res)
{
Expand All @@ -199,34 +205,6 @@ void dt_control_job_cancel(_dt_job_t *job)
_control_job_set_state(job, DT_JOB_STATE_CANCELLED);
}

void dt_control_job_wait(_dt_job_t *job)
{
if(!job) return;
dt_job_state_t state = dt_control_job_get_state(job);

// NOTE: could also use signals.

// if the job is merely queued and hasn't started yet, we
// need to wait until it is actually started before attempting
// to grab the mutex, or it will always succeed immediately
while(state == DT_JOB_STATE_QUEUED)
{
g_usleep(100000); // wait 0.1 seconds
state = dt_control_job_get_state(job);
}

/* if job execution is not finished let's wait for it */
if(state == DT_JOB_STATE_RUNNING || state == DT_JOB_STATE_CANCELLED)
{
// once the job finishes, it unlocks the mutex
// so by locking the mutex here, we will only get the lock once the job
// has finished and unlocked it.
dt_pthread_mutex_lock(&job->wait_mutex);
// yay, the job finished, we got the lock. nothing more to do.
dt_pthread_mutex_unlock(&job->wait_mutex);
}
}

static gboolean _control_run_job_res(dt_control_t *control, int32_t res)
{
if(((unsigned int)res) >= DT_CTL_WORKER_RESERVED)
Expand Down Expand Up @@ -311,7 +289,7 @@ static _dt_job_t *_control_schedule_job(dt_control_t *control)
if(winner_queue == DT_JOB_QUEUE_USER_EXPORT) control->export_scheduled = TRUE;

// and place it in scheduled job array (for job deduping)
control->job[dt_control_get_threadid()] = job;
control->job[_control_get_threadid()] = job;

// increment the priorities of the others
for(int i = 0; i < DT_JOB_QUEUE_MAX; i++)
Expand All @@ -327,15 +305,15 @@ static _dt_job_t *_control_schedule_job(dt_control_t *control)

static void _control_job_execute(_dt_job_t *job)
{
_control_job_print(job, "run_job+", "", DT_CTL_WORKER_RESERVED + dt_control_get_threadid());
_control_job_print(job, "run_job+", "", DT_CTL_WORKER_RESERVED + _control_get_threadid());

_control_job_set_state(job, DT_JOB_STATE_RUNNING);

/* execute job */
job->result = job->execute(job);

_control_job_set_state(job, DT_JOB_STATE_FINISHED);
_control_job_print(job, "run_job-", "", DT_CTL_WORKER_RESERVED + dt_control_get_threadid());
_control_job_print(job, "run_job-", "", DT_CTL_WORKER_RESERVED + _control_get_threadid());
}

static gboolean _control_run_job(dt_control_t *control)
Expand All @@ -353,7 +331,7 @@ static gboolean _control_run_job(dt_control_t *control)

// remove the job from scheduled job array (for job deduping)
dt_pthread_mutex_lock(&control->queue_mutex);
control->job[dt_control_get_threadid()] = NULL;
control->job[_control_get_threadid()] = NULL;
if(job->queue == DT_JOB_QUEUE_USER_EXPORT) control->export_scheduled = FALSE;
dt_pthread_mutex_unlock(&control->queue_mutex);

Expand Down Expand Up @@ -413,7 +391,7 @@ gboolean dt_control_add_job(dt_control_t *control,
{
// whatever we are adding here won't be scheduled as the system isn't running. execute it synchronous instead.
dt_pthread_mutex_lock(&job->wait_mutex); // is that even needed?
dt_control_job_set_synchronous(job, TRUE);
_control_job_set_synchronous(job, TRUE);
_control_job_execute(job);
dt_pthread_mutex_unlock(&job->wait_mutex);

Expand Down Expand Up @@ -457,7 +435,7 @@ gboolean dt_control_add_job(dt_control_t *control,
// if the job is already in the queue -> move it to the top
for(GList *iter = *queue; iter; iter = g_list_next(iter))
{
_dt_job_t *other_job = (_dt_job_t *)iter->data;
_dt_job_t *other_job = iter->data;
if(_control_job_equal(job, other_job))
{
_control_job_print(other_job, "add_job", "found job already in queue", -1);
Expand All @@ -480,8 +458,8 @@ gboolean dt_control_add_job(dt_control_t *control,
if(length > DT_CONTROL_MAX_JOBS)
{
GList *last = g_list_last(*queue);
_control_job_set_state((_dt_job_t *)last->data, DT_JOB_STATE_DISCARDED);
dt_control_job_dispose((_dt_job_t *)last->data);
_control_job_set_state(last->data, DT_JOB_STATE_DISCARDED);
dt_control_job_dispose(last->data);
*queue = g_list_delete_link(*queue, last);
length--;
}
Expand Down Expand Up @@ -515,27 +493,13 @@ gboolean dt_control_add_job(dt_control_t *control,
return FALSE;
}

static __thread int threadid = -1;

int32_t dt_control_get_threadid()
{
if(threadid > -1) return threadid;
return darktable.control->num_threads;
}

static inline int32_t _control_get_threadid_res()
{
if(threadid > -1) return threadid;
return DT_CTL_WORKER_RESERVED;
}

static void *_control_work_res(void *ptr)
{
#ifdef _OPENMP // need to do this in every thread
omp_set_num_threads(dt_get_num_threads());
#endif
worker_thread_parameters_t *params = (worker_thread_parameters_t *)ptr;
dt_control_t *s = params->self;
dt_control_t *control = params->self;
threadid = params->threadid;
char name[16] = {0};
snprintf(name, sizeof(name), "worker res %d", threadid);
Expand All @@ -544,15 +508,14 @@ static void *_control_work_res(void *ptr)
int32_t threadid_res = _control_get_threadid_res();
while(dt_control_running())
{
// dt_print(DT_DEBUG_CONTROL, "[control_work] %d", threadid_res);
if(_control_run_job_res(s, threadid_res))
if(_control_run_job_res(control, threadid_res))
{
// wait for a new job.
int old;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old);
dt_pthread_mutex_lock(&s->cond_mutex);
dt_pthread_cond_wait(&s->cond, &s->cond_mutex);
dt_pthread_mutex_unlock(&s->cond_mutex);
dt_pthread_mutex_lock(&control->cond_mutex);
dt_pthread_cond_wait(&control->cond, &control->cond_mutex);
dt_pthread_mutex_unlock(&control->cond_mutex);
int tmp;
pthread_setcancelstate(old, &tmp);
}
Expand Down Expand Up @@ -589,7 +552,6 @@ static void *_control_work(void *ptr)
// int32_t threadid = dt_control_get_threadid();
while(dt_control_running())
{
// dt_print(DT_DEBUG_CONTROL, "[control_work] %d", threadid);
if(_control_run_job(control))
{
// wait for a new job.
Expand Down Expand Up @@ -640,18 +602,16 @@ void dt_control_jobs_init(dt_control_t *control)

g_atomic_int_set(&control->running, DT_CONTROL_STATE_RUNNING);

int err = 0; // collected errors while creating all threads

for(int k = 0; k < control->num_threads; k++)
{
worker_thread_parameters_t *params = calloc(1, sizeof(worker_thread_parameters_t));
params->self = control;
params->threadid = k;
err |= dt_pthread_create(&control->thread[k], _control_work, params);
dt_pthread_create(&control->thread[k], _control_work, params);
}

/* create queue kicker thread */
err |= dt_pthread_create(&control->kick_on_workers_thread, _control_worker_kicker, control);
dt_pthread_create(&control->kick_on_workers_thread, _control_worker_kicker, control);

for(int k = 0; k < DT_CTL_WORKER_RESERVED; k++)
{
Expand All @@ -660,14 +620,12 @@ void dt_control_jobs_init(dt_control_t *control)
worker_thread_parameters_t *params = calloc(1, sizeof(worker_thread_parameters_t));
params->self = control;
params->threadid = k;
err |= dt_pthread_create(&control->thread_res[k], _control_work_res, params);
dt_pthread_create(&control->thread_res[k], _control_work_res, params);
}
/* create thread taking care of connecting gphoto2 devices */
#ifdef HAVE_GPHOTO2
err |= dt_pthread_create(&control->update_gphoto_thread, dt_update_cameras_thread, control);
dt_pthread_create(&control->update_gphoto_thread, dt_update_cameras_thread, control);
#endif
if(err != 0)
dt_print(DT_DEBUG_ALWAYS, "[dt_control_jobs_init] couldn't create all threads, problems ahead");
}

void dt_control_jobs_cleanup(dt_control_t *control)
Expand Down
7 changes: 0 additions & 7 deletions src/control/jobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,9 @@ typedef void (*dt_job_destroy_callback)(void *data);
dt_job_t *dt_control_job_create(dt_job_execute_callback execute, const char *msg, ...) __attribute__((format(printf, 2, 3)));
/** destroy a job object and free its memory. this does NOT remove it from any job queues! */
void dt_control_job_dispose(dt_job_t *job);
/** setup a state callback for job. */
void dt_control_job_set_state_callback(dt_job_t *job, dt_job_state_change_callback cb);
/** cancel a job, running or in queue. */
void dt_control_job_cancel(dt_job_t *job);
dt_job_state_t dt_control_job_get_state(dt_job_t *job);
/** wait for a job to finish execution. */
void dt_control_job_wait(dt_job_t *job);
/** set job params and a callback to destroy those params */
void dt_control_job_set_params(dt_job_t *job, void *params, dt_job_destroy_callback callback);
/** set job params (with size params_size) and a callback to destroy those params.
Expand All @@ -92,9 +88,6 @@ gboolean dt_control_add_job_res(struct dt_control_t *s, dt_job_t *job, int32_t r

dt_view_type_flags_t dt_control_job_get_view_creator(const dt_job_t *job);
gboolean dt_control_job_is_synchronous(const dt_job_t *job);
void dt_control_job_set_synchronous(dt_job_t *job, gboolean sync);

int32_t dt_control_get_threadid();

#ifdef HAVE_GPHOTO2
#include "control/jobs/camera_jobs.h"
Expand Down

0 comments on commit 2a6d4f4

Please sign in to comment.