Skip to content

Commit

Permalink
[DE85] Rebuild thread need to be exited when sender thread exits (#96)
Browse files Browse the repository at this point in the history
Signed-off-by: Vishnu Itta <[email protected]>
  • Loading branch information
vishnuitta authored Aug 16, 2018
1 parent fd3407e commit 81aa89e
Show file tree
Hide file tree
Showing 11 changed files with 805 additions and 552 deletions.
464 changes: 10 additions & 454 deletions cmd/zrepl/zrepl.c

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions include/data_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ thread_func_t rebuild_scanner;
extern void (*io_receiver)(void *arg);
extern void (*rebuild_scanner)(void *arg);

extern void uzfs_zvol_io_receiver(void *);

extern uint16_t io_server_port;
extern uint16_t rebuild_io_server_port;
extern uint64_t zvol_rebuild_step_size;
Expand All @@ -70,6 +72,8 @@ void uzfs_zvol_rebuild_scanner(void *arg);
void uzfs_update_ionum_interval(zvol_info_t *zinfo, uint32_t timeout);
void uzfs_zvol_timer_thread(void);

void signal_fds_related_to_zinfo(zvol_info_t *zinfo);

#ifdef __cplusplus
}
#endif
Expand Down
29 changes: 25 additions & 4 deletions include/zrepl_mgmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ extern kmutex_t zvol_list_mutex;
extern struct zvol_list zvol_list;
struct zvol_io_cmd_s;

#if DEBUG
typedef struct inject_delay_s {
int helping_replica_rebuild_step;
} inject_delay_t;

typedef struct inject_error_s {
inject_delay_t delay;
} inject_error_t;

extern inject_error_t inject_error;
#endif

typedef enum zvol_info_state_e {
ZVOL_INFO_STATE_ONLINE,
ZVOL_INFO_STATE_OFFLINE,
Expand Down Expand Up @@ -102,14 +114,20 @@ typedef struct zvol_info_s {
/* All cmds after execution will go here for ack */
STAILQ_HEAD(, zvol_io_cmd_s) complete_queue;

/* fds related to this zinfo on which threads are waiting */
STAILQ_HEAD(, zinfo_fd_s) fd_list;

uint8_t io_ack_waiting;

/* Will be used to singal ack-sender to exit */
uint8_t conn_closed;
/* Pointer to mgmt connection for this zinfo */
void *mgmt_conn;

/* Perfromance counter */
/* ongoing command that is being worked on to ack to its sender */
void *zio_cmd_in_ack;

/* Performance counter */

/* Debug counters */
uint64_t read_req_received_cnt;
Expand All @@ -118,9 +136,6 @@ typedef struct zvol_info_s {
uint64_t read_req_ack_cnt;
uint64_t write_req_ack_cnt;
uint64_t sync_req_ack_cnt;

/* ongoing command that is being worked on to ack to its sender */
void *zio_cmd_in_ack;
} zvol_info_t;

typedef struct thread_args_s {
Expand All @@ -132,6 +147,11 @@ typedef struct thread_args_s {
extern void (*zinfo_create_hook)(zvol_info_t *, nvlist_t *);
extern void (*zinfo_destroy_hook)(zvol_info_t *);

typedef struct zinfo_fd_s {
STAILQ_ENTRY(zinfo_fd_s) fd_link;
int fd;
} zinfo_fd_t;

typedef struct zvol_io_cmd_s {
STAILQ_ENTRY(zvol_io_cmd_s) cmd_link;
zvol_io_hdr_t hdr;
Expand Down Expand Up @@ -159,6 +179,7 @@ extern int set_socket_keepalive(int sfd);
extern int create_and_bind(const char *port, int bind_needed,
boolean_t nonblocking);
int uzfs_zvol_name_compare(zvol_info_t *zv, const char *name);
void shutdown_fds_related_to_zinfo(zvol_info_t *zinfo);

/*
* API to drop refcnt on zinfo. If refcnt
Expand Down
9 changes: 7 additions & 2 deletions lib/fio/replica.c
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ static int fio_repl_open_file(struct thread_data *td, struct fio_file *f)
if (get_data_endpoint(td, f->file_name, &port, host) != 0)
return (1);
}

again:
f->fd = socket(AF_INET, SOCK_STREAM, 0);
if (f->fd < 0) {
td_verror(td, errno, "socket");
Expand All @@ -498,10 +498,12 @@ static int fio_repl_open_file(struct thread_data *td, struct fio_file *f)

if (set_window_size(td, f->fd)) {
close(f->fd);
f->fd = -1;
return (1);
}
if (set_mss(td, f->fd)) {
close(f->fd);
f->fd = -1;
return (1);
}

Expand All @@ -512,20 +514,23 @@ static int fio_repl_open_file(struct thread_data *td, struct fio_file *f)
if (inet_pton(AF_INET, host, &addr.sin_addr) <= 0) {
td_verror(td, errno, "inet_pton");
close(f->fd);
f->fd = -1;
return (1);
}
log_info("repl: opening zvol %s on data connection\n",
f->file_name);
if (connect(f->fd, (struct sockaddr *)&addr, sizeof (addr)) < 0) {
td_verror(td, errno, "connect");
close(f->fd);
f->fd = -1;
return (1);
}

// send volume name we want to open to replica
if (open_zvol(td, f->fd, f->file_name) != 0) {
close(f->fd);
return (1);
sleep(2);
goto again;
}

return (0);
Expand Down
2 changes: 1 addition & 1 deletion lib/libzpool/uzfs_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ void
uzfs_zvol_set_rebuild_status(zvol_state_t *zv, zvol_rebuild_status_t status)
{
LOG_INFO("zvol %s rebuild status change: %s -> %s", zv->zv_name,
rebuild_status_to_str(zv->zv_status),
rebuild_status_to_str(zv->rebuild_info.zv_rebuild_status),
rebuild_status_to_str(status));
zv->rebuild_info.zv_rebuild_status = status;
}
Expand Down
23 changes: 23 additions & 0 deletions lib/libzpool/zrepl_mgmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,31 @@ uzfs_insert_zinfo_list(zvol_info_t *zinfo)
(void) mutex_exit(&zvol_list_mutex);
}

void
shutdown_fds_related_to_zinfo(zvol_info_t *zinfo)
{
zinfo_fd_t *zinfo_fd = NULL;

(void) pthread_mutex_lock(&zinfo->zinfo_mutex);
while (1) {
STAILQ_FOREACH(zinfo_fd, &zinfo->fd_list, fd_link) {
LOG_INFO("shutting down %d on %s", zinfo_fd->fd,
zinfo->name);
shutdown(zinfo_fd->fd, SHUT_RDWR);
}
(void) pthread_mutex_unlock(&zinfo->zinfo_mutex);
sleep(1);
(void) pthread_mutex_lock(&zinfo->zinfo_mutex);
if (STAILQ_EMPTY(&zinfo->fd_list))
break;
}
(void) pthread_mutex_unlock(&zinfo->zinfo_mutex);
}

static void
uzfs_mark_offline_and_free_zinfo(zvol_info_t *zinfo)
{
shutdown_fds_related_to_zinfo(zinfo);
(void) pthread_mutex_lock(&zinfo->zinfo_mutex);
zinfo->state = ZVOL_INFO_STATE_OFFLINE;
/* Send signal to ack_sender thread about offline */
Expand Down Expand Up @@ -338,6 +360,7 @@ uzfs_zinfo_init(void *zv, const char *ds_name, nvlist_t *create_props)
TASKQ_PREPOPULATE | TASKQ_DYNAMIC);

STAILQ_INIT(&zinfo->complete_queue);
STAILQ_INIT(&zinfo->fd_list);
uzfs_zinfo_init_mutex(zinfo);

strlcpy(zinfo->name, ds_name, MAXNAMELEN);
Expand Down
Loading

0 comments on commit 81aa89e

Please sign in to comment.