diff --git a/cmd/Makefile.am b/cmd/Makefile.am index ec1b0dea9f6d..68fe0b6879c1 100644 --- a/cmd/Makefile.am +++ b/cmd/Makefile.am @@ -1,3 +1,4 @@ -SUBDIRS = zfs zpool zdb zhack zinject zstreamdump ztest uzfs_test zpios +SUBDIRS = zfs zpool zdb zhack zinject zstreamdump ztest zrepl uzfs_test zpios SUBDIRS += mount_zfs fsck_zfs zvol_id vdev_id arcstat dbufstat zed SUBDIRS += arc_summary raidz_test zgenhostid tgt dmu_io_test + diff --git a/cmd/uzfs_test/uzfs_test.c b/cmd/uzfs_test/uzfs_test.c index 61a898bae2e2..bc30d84f43ed 100644 --- a/cmd/uzfs_test/uzfs_test.c +++ b/cmd/uzfs_test/uzfs_test.c @@ -23,6 +23,7 @@ #include #include #include +#include int total_time_in_sec = 60; int log_device = 0; @@ -261,7 +262,9 @@ void unit_test_create_pool_ds(void) { void *spa1, *spa2, *spa3, *spa4, *spa; - void *zv1, *zv2, *zv3, *zv4, *zv5, *zv; + void *zv1 = NULL; void *zv3 = NULL; + void *zv2 = NULL; void *zv4 = NULL; + void *zv5 = NULL; void *zv = NULL; int err, err1, err2, err3, err4, err5; err1 = uzfs_create_pool(pool, "/tmp/uztest.xyz", &spa1); @@ -498,8 +501,9 @@ static void process_options(int argc, char **argv) printf("total run time in seconds: %d\n", total_time_in_sec); } } + void -open_pool_ds(void **spa, void **zv) +open_pool(void **spa) { int err; err = uzfs_open_pool(pool, spa); @@ -507,7 +511,13 @@ open_pool_ds(void **spa, void **zv) printf("pool open errored.. %d\n", err); exit(1); } - err = uzfs_open_dataset(*spa, ds, zv); +} + +void +open_ds(void *spa, void **zv) +{ + int err; + err = uzfs_open_dataset(spa, ds, zv); if (err != 0) { printf("ds open errored.. %d\n", err); exit(1); @@ -520,11 +530,13 @@ unit_test_fn(void *arg) void *spa, *zv; kthread_t *reader1; kthread_t *writer[3]; + char name[MAXNAMELEN]; int i; kmutex_t mtx; kcondvar_t cv; int threads_done = 0; int num_threads = 0; + zvol_info_t *zinfo = NULL; worker_args_t reader1_args, writer_args[3]; mutex_init(&mtx, NULL, MUTEX_DEFAULT, NULL); @@ -535,7 +547,13 @@ unit_test_fn(void *arg) unit_test_create_pool_ds(); } - open_pool_ds(&spa, &zv); + open_pool(&spa); + if (create == 1) { + open_ds(spa, &zv); + } else { + zinfo = uzfs_zinfo_lookup(ds); + zv = zinfo->zv; + } reader1_args.zv = zv; reader1_args.threads_done = &threads_done; @@ -570,8 +588,15 @@ unit_test_fn(void *arg) cv_destroy(&cv); mutex_destroy(&mtx); - uzfs_close_dataset(zv); - uzfs_close_pool(spa); + if (create == 1) { + uzfs_close_dataset(zv); + uzfs_close_pool(spa); + } else { + strlcpy(name, zinfo->name, MAXNAMELEN); + uzfs_zinfo_drop_refcnt(zinfo, 0); + uzfs_zinfo_destroy(name); + uzfs_close_pool(spa); + } } int diff --git a/cmd/uzfs_test/uzfs_test_sync.c b/cmd/uzfs_test/uzfs_test_sync.c index f00c10c4e0ac..9c81b6465519 100644 --- a/cmd/uzfs_test/uzfs_test_sync.c +++ b/cmd/uzfs_test/uzfs_test_sync.c @@ -23,6 +23,7 @@ #include #include #include +#include int verify_fn(void *zv, char *buf, int block_size) @@ -140,6 +141,8 @@ void replay_fn(void *arg) { void *spa, *zv; + char name[MAXNAMELEN]; + zvol_info_t *zinfo = NULL; zfs_txg_timeout = 30; @@ -148,9 +151,22 @@ replay_fn(void *arg) setup_unit_test(); unit_test_create_pool_ds(); } - open_pool_ds(&spa, &zv); + + open_pool(&spa); + if (create == 1) { + open_ds(spa, &zv); + } else { + zinfo = uzfs_zinfo_lookup(ds); + zv = zinfo->zv; + } } else if (verify != 0) { - open_pool_ds(&spa, &zv); + open_pool(&spa); + if (create == 1) { + open_ds(spa, &zv); + } else { + zinfo = uzfs_zinfo_lookup(ds); + zv = zinfo->zv; + } } else { printf("exiting program..\n"); uzfs_fini(); @@ -162,8 +178,15 @@ replay_fn(void *arg) if (verify != 0) if (silent == 0) printf("verify error: %d\n", verify_err); - uzfs_close_dataset(zv); - uzfs_close_pool(spa); + if (create == 1) { + uzfs_close_dataset(zv); + uzfs_close_pool(spa); + } else { + strlcpy(name, zinfo->name, MAXNAMELEN); + uzfs_zinfo_drop_refcnt(zinfo, 0); + uzfs_zinfo_destroy(name); + uzfs_close_pool(spa); + } if (verify_err) exit(verify_err); diff --git a/cmd/uzfs_test/uzfs_txg_diff.c b/cmd/uzfs_test/uzfs_txg_diff.c index a5a209036897..160cb8e0e796 100644 --- a/cmd/uzfs_test/uzfs_txg_diff.c +++ b/cmd/uzfs_test/uzfs_txg_diff.c @@ -168,7 +168,8 @@ uzfs_txg_diff_verifcation_test(void *arg) setup_unit_test(); unit_test_create_pool_ds(); - open_pool_ds(&spa, &zvol); + open_pool(&spa); + open_ds(spa, &zvol); vol_blocks = active_size / block_size; buf = umem_alloc(block_size, UMEM_NOFAIL); diff --git a/cmd/uzfs_test/uzfs_zvol_zap.c b/cmd/uzfs_test/uzfs_zvol_zap.c index 2bddb4beda38..6cd4411ee408 100644 --- a/cmd/uzfs_test/uzfs_zvol_zap.c +++ b/cmd/uzfs_test/uzfs_zvol_zap.c @@ -124,7 +124,8 @@ uzfs_zvol_zap_operation(void *arg) setup_unit_test(); unit_test_create_pool_ds(); - open_pool_ds(&spa, &zvol); + open_pool(&spa); + open_ds(spa, &zvol); while (i++ < test_iterations) { zap_count = uzfs_random(16) + 1; diff --git a/cmd/zrepl/Makefile.am b/cmd/zrepl/Makefile.am new file mode 100644 index 000000000000..4e07c9c1a49d --- /dev/null +++ b/cmd/zrepl/Makefile.am @@ -0,0 +1,22 @@ +include $(top_srcdir)/config/Rules.am + +# -Wnoformat-truncation to get rid of compiler warning for unchecked +# truncating snprintfs on gcc 7.1.1. +AM_CFLAGS += $(DEBUG_STACKFLAGS) $(FRAME_LARGER_THAN) $(NO_FORMAT_TRUNCATION) +AM_CPPFLAGS += -DDEBUG + +DEFAULT_INCLUDES += \ + -I$(top_srcdir)/include \ + -I$(top_srcdir)/lib/libspl/include + +sbin_PROGRAMS = zrepl + +zrepl_SOURCES = \ + zrepl.c + +zrepl_LDADD = \ + $(top_builddir)/lib/libnvpair/libnvpair.la \ + $(top_builddir)/lib/libuutil/libuutil.la \ + $(top_builddir)/lib/libzpool/libzpool.la \ + $(top_builddir)/lib/libzfs/libzfs.la \ + $(top_builddir)/lib/libzfs_core/libzfs_core.la diff --git a/cmd/zrepl/zrepl.c b/cmd/zrepl/zrepl.c new file mode 100644 index 000000000000..f95d6845a437 --- /dev/null +++ b/cmd/zrepl/zrepl.c @@ -0,0 +1,1115 @@ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + + +#define true 1 +#define false 0 +#define MAXEVENTS 64 + +char *accpt_port = "3232"; +char *mgmt_port = "12000"; + +extern unsigned long zfs_arc_max; +extern unsigned long zfs_arc_min; + +__thread char tinfo[20] = {0}; + +static void uzfs_zvol_io_ack_sender(void *arg); + +static int +create_and_bind(const char *port, int bind_needed) +{ + int s, sfd; + struct addrinfo hints = {0, }; + struct addrinfo *rp, *result = NULL; + + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + + s = getaddrinfo(NULL, port, &hints, &result); + if (s != 0) { + ZREPL_ERRLOG("getaddrinfo failed with error: %d\n", errno); + return (-1); + } + + for (rp = result; rp != NULL; rp = rp->ai_next) { + sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) { + continue; + } else if (bind_needed == 0) { + break; + } + + s = bind(sfd, rp->ai_addr, rp->ai_addrlen); + if (s == 0) { + /* We managed to bind successfully! */ + ZREPL_LOG("bind is successful\n"); + break; + } + close(sfd); + } + + freeaddrinfo(result); + if (rp == NULL) { + ZREPL_ERRLOG("bind failed with err:%d\n", errno); + return (-1); + } + + return (sfd); +} + +static int +make_socket_non_blocking(int sfd) +{ + int flags, s; + + flags = fcntl(sfd, F_GETFL, 0); + if (flags == -1) { + ZREPL_ERRLOG("fcntl() failed errno:%d\n", errno); + return (-1); + } + + flags |= O_NONBLOCK; + s = fcntl(sfd, F_SETFL, flags); + if (s == -1) { + ZREPL_ERRLOG("fcntl() failed errno:%d\n", errno); + return (-1); + } + return (0); +} + +static int +uzfs_zvol_get_ip(char *host) +{ + struct ifaddrs *ifaddr, *ifa; + int family, s, n; + + if (getifaddrs(&ifaddr) == -1) { + ZREPL_ERRLOG("getifaddrs() failed errno:%d\n", errno); + return (-1); + } + + /* + * Walk through linked list, maintaining head + * pointer so we can free list later + */ + + for (ifa = ifaddr, n = 0; ifa != NULL; ifa = ifa->ifa_next, n++) { + if (ifa->ifa_addr == NULL) { + continue; + } + + family = ifa->ifa_addr->sa_family; + + if (family == AF_INET || family == AF_INET6) { + s = getnameinfo(ifa->ifa_addr, (family == AF_INET) ? + sizeof (struct sockaddr_in) : + sizeof (struct sockaddr_in6), + host, NI_MAXHOST, + NULL, 0, NI_NUMERICHOST); + if (s != 0) { + ZREPL_ERRLOG("getnameinfo() failed: %d\n", + errno); + s = -1; + goto exit; + } + + if (family == AF_INET) { + if (strcmp(host, "127.0.0.1") == 0) { + continue; + } + printf("IP address: %s\n", host); + break; + } + } + } +exit: + freeifaddrs(ifaddr); + return (s); +} +/* + * Allocate zio command along with + * buffer needed for IO completion. + */ +static zvol_io_cmd_t * +zio_cmd_alloc(zvol_io_hdr_t *hdr, int fd) +{ + zvol_io_cmd_t *zio_cmd = kmem_zalloc( + sizeof (zvol_io_cmd_t), KM_SLEEP); + + bcopy(hdr, &zio_cmd->hdr, sizeof (zio_cmd->hdr)); + if ((hdr->opcode == ZVOL_OPCODE_READ) || + (hdr->opcode == ZVOL_OPCODE_WRITE) || + (hdr->opcode == ZVOL_OPCODE_HANDSHAKE)) { + zio_cmd->buf = kmem_zalloc(sizeof (char) * hdr->len, KM_SLEEP); + } + + zio_cmd->conn = fd; + return (zio_cmd); +} + +/* + * Free zio command along with buffer. + */ +static void +zio_cmd_free(zvol_io_cmd_t **cmd) +{ + zvol_io_cmd_t *zio_cmd = *cmd; + zvol_op_code_t opcode = zio_cmd->hdr.opcode; + switch (opcode) { + case ZVOL_OPCODE_READ: + case ZVOL_OPCODE_WRITE: + case ZVOL_OPCODE_HANDSHAKE: + if (zio_cmd->buf != NULL) { + free(zio_cmd->buf); + } + break; + default: + ASSERT(!"Wrong Op code"); + break; + } + + free(zio_cmd); + *cmd = NULL; +} + + +static int +uzfs_zvol_socket_read(int fd, char *buf, uint64_t nbytes) +{ + uint64_t count = 0; + char *p = buf; + ZREPL_ERRLOG("Trying to read nbytes: %lu\n", nbytes); + while (nbytes) { + count = read(fd, (void *)p, nbytes); + if ((count <= 0) && (errno == EAGAIN)) { + continue; + } else if (count <= 0) { + printf("Read error\n"); + return (-1); + } + + ZREPL_ERRLOG("In read count:%lu nbytes: %lu\n", count, nbytes); + p += count; + nbytes -= count; + } + ZREPL_LOG("Successful read count:%lu nbytes: %lu\n", count, nbytes); + return (1); +} + + +static inline int +uzfs_zvol_socket_write(int fd, char *buf, int nbytes) +{ + int count = 0; + char *p = buf; + while (nbytes) { + count = write(fd, (void *)p, nbytes); + if (count <= 0) { + printf("Write error\n"); + return (-1); + } + p += count; + nbytes -= count; + } + return (0); +} + +/* + * zvol worker is responsible for actual work. + * It execute read/write/sync command to uzfs. + * It enqueue command to completion queue and + * send signal to ack-sender thread. + */ +static void +uzfs_zvol_worker(void *arg) +{ + zvol_io_cmd_t *zio_cmd; + zvol_info_t *zinfo; + zvol_io_hdr_t *hdr; + int rc = 0; + int write = 0; + + + zio_cmd = (zvol_io_cmd_t *)arg; + hdr = &zio_cmd->hdr; + zinfo = zio_cmd->zv; + ASSERT(zinfo); + switch (hdr->opcode) { + case ZVOL_OPCODE_READ: + rc = uzfs_read_data(zinfo->zv, + (char *)zio_cmd->buf, + hdr->offset, hdr->len, NULL, NULL); + break; + + case ZVOL_OPCODE_WRITE: + write = 1; + rc = uzfs_write_data(zinfo->zv, + (char *)zio_cmd->buf, + hdr->offset, hdr->len, NULL); + break; + + case ZVOL_OPCODE_SYNC: + break; + + default: + VERIFY(!"Should be a valid opcode"); + break; + } + + if (rc < 0) { + ZREPL_ERRLOG("Zvol op_code :%d failed with " + "error: %d\n", hdr->opcode, errno); + hdr->status = ZVOL_OP_STATUS_FAILED; + } else { + ZREPL_LOG("Zvol io_seq:%ld op_code :%d passed\n", + hdr->io_seq, hdr->opcode); + hdr->status = ZVOL_OP_STATUS_OK; + } + + (void) pthread_mutex_lock(&zinfo->complete_queue_mutex); + STAILQ_INSERT_TAIL(&zinfo->complete_queue, zio_cmd, cmd_link); + if (write) { + zinfo->write_req_received_cnt++; + } else { + zinfo->read_req_received_cnt++; + } + + if (zinfo->io_ack_waiting) { + rc = pthread_cond_signal(&zinfo->io_ack_cond); + } + + (void) pthread_mutex_unlock(&zinfo->complete_queue_mutex); + uzfs_zinfo_drop_refcnt(zinfo, false); +} + +/* + * IO-Receiver would be per ZVOL, it would be + * responsible for receiving IOs on given socket. + */ +static void +uzfs_zvol_io_receiver(void *arg) +{ + int rc, fd; + zvol_info_t *zinfo = NULL; + zvol_io_hdr_t hdr; + thread_args_t *thrd_arg; + zvol_io_cmd_t *zio_cmd; + int count = 0; + kthread_t *thrd_info; + fd = *(int *)arg; + free(arg); + + while (1) { + count = uzfs_zvol_socket_read(fd, (char *)&hdr, sizeof (hdr)); + if (count <= 0) { + printf("error has come on socket" + " with error %d\n", errno); + goto exit; + } + + printf("op_code=%d io_seq=%ld offset=%ld len=%ld\n", hdr.opcode, + hdr.io_seq, hdr.offset, hdr.len); + + ASSERT((hdr.opcode == ZVOL_OPCODE_WRITE) || + (hdr.opcode == ZVOL_OPCODE_READ) || + (hdr.opcode == ZVOL_OPCODE_HANDSHAKE) || + (hdr.opcode == ZVOL_OPCODE_SYNC)); + if ((hdr.opcode != ZVOL_OPCODE_HANDSHAKE) && + (zinfo == NULL)) { + /* + * TODO: Stats need to be maintained for any + * such IO which came before handshake ? + */ + ZREPL_ERRLOG("Handshake yet to happen\n"); + continue; + } + + zio_cmd = zio_cmd_alloc(&hdr, fd); + if ((hdr.opcode == ZVOL_OPCODE_WRITE) || + (hdr.opcode == ZVOL_OPCODE_HANDSHAKE)) { + count = uzfs_zvol_socket_read(fd, zio_cmd->buf, + (sizeof (char) * hdr.len)); + if (count <= 0) { + zio_cmd_free(&zio_cmd); + ZREPL_ERRLOG("Socket read failed with " + "error: %d\n", errno); + goto exit; + } + } + + ZREPL_LOG("Count:%d Size: %ld\n", count, hdr.len); + if (hdr.opcode == ZVOL_OPCODE_HANDSHAKE) { + zinfo = uzfs_zinfo_lookup(zio_cmd->buf); + zio_cmd_free(&zio_cmd); + if (zinfo == NULL) { + ZREPL_ERRLOG("Volume/LUN: %s not found", + zinfo->name); + printf("Error in getting zinfo\n"); + goto exit; + } + + ASSERT(!zinfo->is_io_ack_sender_created); + if (zinfo->is_io_ack_sender_created) { + ZREPL_ERRLOG("Multiple handshake on IO port " + "for volume: %s\n", zinfo->name); + uzfs_zinfo_drop_refcnt(zinfo, false); + continue; + } + + (void) pthread_mutex_lock(&zinfo->zinfo_mutex); + if (!zinfo->is_io_ack_sender_created) { + thrd_arg = kmem_alloc( + sizeof (thread_args_t), KM_SLEEP); + thrd_arg->fd = fd; + strlcpy(thrd_arg->zvol_name, zinfo->name, + MAXNAMELEN); + thrd_info = zk_thread_create(NULL, 0, + (thread_func_t)uzfs_zvol_io_ack_sender, + (void *)thrd_arg, 0, NULL, TS_RUN, 0, + PTHREAD_CREATE_DETACHED); + VERIFY3P(thrd_info, !=, NULL); + zinfo->is_io_ack_sender_created = 1; + zinfo->conn_closed = false; + } + (void) pthread_mutex_unlock(&zinfo->zinfo_mutex); + continue; + } + // printf("Enqueuing op_code=%d io_seq=%ld offset=%ld\n", + // hdr.opcode, hdr.io_seq, hdr.offset); + + /* Take refcount for uzfs_zvol_worker to work on it */ + uzfs_zinfo_take_refcnt(zinfo, false); + zio_cmd->zv = zinfo; + taskq_dispatch(zinfo->uzfs_zvol_taskq, uzfs_zvol_worker, + zio_cmd, TQ_SLEEP); + } +exit: + if (zinfo != NULL) { + + (void) pthread_mutex_lock(&zinfo->zinfo_mutex); + zinfo->conn_closed = true; + zinfo->is_io_ack_sender_created = 0; + (void) pthread_mutex_unlock(&zinfo->zinfo_mutex); + /* + * Send signal to ack sender so that it can free + * zio_cmd, close fd and exit. + */ + (void) pthread_mutex_lock(&zinfo->complete_queue_mutex); + if (zinfo->io_ack_waiting) { + rc = pthread_cond_signal(&zinfo->io_ack_cond); + } + (void) pthread_mutex_unlock(&zinfo->complete_queue_mutex); + uzfs_zinfo_drop_refcnt(zinfo, false); + } + + printf("uzfs_zvol_io_receiver thread exiting\n"); + zk_thread_exit(); +} + +/* + * This function suppose to lookup into zvol list + * to find if LUN presented for identification is + * available/online or not. This function also need + * to return IP address of replica along with port + * so that ISTGT controller can open a connection + * for IOs. + */ +static int +uzfs_zvol_mgmt_do_handshake(zvol_io_hdr_t *hdr, int sfd, char *name) +{ + int count, rc = 0; + zvol_info_t *zinfo; + mgmt_ack_t mgmt_ack; + char *packet = NULL; + char *p = NULL; + + printf("Volume: %s sent for enq\n", name); + hdr->len = sizeof (mgmt_ack_t); + zinfo = uzfs_zinfo_lookup(name); + if (zinfo != NULL) { + hdr->status = ZVOL_OP_STATUS_OK; + } else { + hdr->status = ZVOL_OP_STATUS_FAILED; + } + + bzero(&mgmt_ack, sizeof (mgmt_ack)); + strncpy(mgmt_ack.volname, name, strlen(name)); + mgmt_ack.port = atoi(accpt_port); + rc = uzfs_zvol_get_ip(mgmt_ack.ip); + printf("IP address: %s\n", mgmt_ack.ip); + if (rc == -1) { + ZREPL_ERRLOG("Unable to get IP" + " with err:%d\n", errno); + goto exit; + } + + packet = kmem_alloc((sizeof (mgmt_ack_t) + sizeof (*hdr)) * + sizeof (char), KM_SLEEP); + bcopy(hdr, packet, sizeof (*hdr)); + p = packet + sizeof (*hdr); + bcopy(&mgmt_ack, p, sizeof (mgmt_ack)); + count = write(sfd, packet, (sizeof (*hdr) + sizeof (mgmt_ack_t))); + if (count == -1) { + ZREPL_ERRLOG("Write to socket failed" + " with err:%d\n", errno); + rc = -1; + } +exit: + if (packet != NULL) { + free(packet); + } + uzfs_zinfo_drop_refcnt(zinfo, false); + return (rc); +} + +/* + * TODO: IMHO, this should be a through away API. + * Side Car has to find a more smart way to pass + * ISCSI Controller IP address. + */ +static char * +get_controller_ip_address(void) +{ + size_t nbytes; + char *buf = NULL; + + FILE *fp = fopen("/var/openebs/controllers.conf", "r"); + if (fp == NULL) { + printf("Error opening file\n"); + return (buf); + } + + buf = kmem_alloc(1024, KM_SLEEP); + nbytes = fread(buf, sizeof (char), 1024, fp); + + if (nbytes <= 0) { + printf("Read error\n"); + return (buf); + } + printf("buffer value is %s", buf); + return (buf); +} + +/* + * One thread per replica, which will be + * responsible for initial handshake and + * exchanging info like IP add, port etc. + */ +static void +uzfs_zvol_mgmt_thread(void *arg) +{ + + int sfd, rc, count; + struct sockaddr_in istgt_addr; + zvol_io_hdr_t hdr = {0, }; + char *name = NULL; + char *buf = NULL; + + + sfd = create_and_bind(mgmt_port, false); + if (sfd == -1) { + goto exit; + } + + buf = get_controller_ip_address(); + if (buf == NULL) { + printf("parsing IP address did not work\n"); + goto exit; + } + + printf("Controller IP address is: %s", buf); + bzero((char *)&istgt_addr, sizeof (istgt_addr)); + istgt_addr.sin_family = AF_INET; + istgt_addr.sin_addr.s_addr = inet_addr(buf); + istgt_addr.sin_port = htons(6060); + free(buf); +retry: + rc = connect(sfd, (struct sockaddr *)&istgt_addr, sizeof (istgt_addr)); + if ((rc == -1) && ((errno == EINTR) || (errno == ECONNREFUSED) || + (errno == ETIMEDOUT) || (errno == EINPROGRESS))) { + ZREPL_ERRLOG("Failed to connect to istgt_controller" + " with err:%d\n", errno); + sleep(2); + printf("Retrying ....\n"); + goto retry; + } else { + printf("Connection to TGT controller successful\n"); + ZREPL_LOG("Connection to TGT controller iss successful\n"); + } + + while (1) { + bzero(&hdr, sizeof (hdr)); + count = read(sfd, (char *)&hdr, sizeof (hdr)); + if (count <= 0) { + ZREPL_ERRLOG("Replica-iSCSI Tgt connection got " + "disconnected with err:%d\n", errno); + /* + * Error has occurred on this socket + * close it and open a new socket after + * 5 sec of sleep. + */ + close(sfd); + printf("Retrying ....\n"); + sleep(5); + sfd = create_and_bind(mgmt_port, false); + if (sfd == -1) { + goto exit; + } + +retry1: + rc = connect(sfd, (struct sockaddr *)&istgt_addr, + sizeof (istgt_addr)); + if ((rc == -1) && ((errno == EINTR) || + (errno == ECONNREFUSED) || (errno == ETIMEDOUT))) { + ZREPL_ERRLOG("Failed to connect to" + " istgt_controller with err:%d\n", + errno); + sleep(2); + goto retry1; + } else { + printf("Connection to TGT controller " + "successful\n"); + ZREPL_LOG("Connection to TGT controller" + "is successful\n"); + } + continue; + } + + if (hdr.opcode == ZVOL_OPCODE_HANDSHAKE) { + name = kmem_alloc( + hdr.len * sizeof (char), KM_SLEEP); + count = read(sfd, name, sizeof (char) * hdr.len); + if (count == -1) { + ZREPL_ERRLOG("Read from socket failed" + " with err:%d\n", errno); + goto exit; + } + + rc = uzfs_zvol_mgmt_do_handshake(&hdr, sfd, name); + free(name); + if (rc == -1) { + ZREPL_ERRLOG("handshake failed with" + " errno:%d\n", errno); + goto exit; + } + } + } +exit: + printf("uzfs_zvol_mgmt_thread thread exiting\n"); + zk_thread_exit(); +} +#if 0 +/* + * One thread per replica, which will be + * responsible for initial handshake and + * exchanging info like IP add, port etc. + */ +static void +uzfs_zvol_mgmt_thread(void *arg) +{ + + int sfd, efd, rc, count; + struct epoll_event event; + struct sockaddr_in istgt_addr; + zvol_io_hdr_t hdr = {0, }; + struct epoll_event *events = NULL; + char *name = NULL; + char *buf = NULL; + + + sfd = create_and_bind(mgmt_port, false); + if (sfd == -1) { + goto exit; + } + + rc = make_socket_non_blocking(sfd); + if (rc == -1) { + goto exit; + } + + buf = get_controller_ip_address(); + if (buf == NULL) { + printf("parsing IP address did not work\n"); + goto exit; + } + printf("Controller IP address is: %s", buf); + bzero((char *)&istgt_addr, sizeof (istgt_addr)); + istgt_addr.sin_family = AF_INET; + istgt_addr.sin_addr.s_addr = inet_addr(buf); + istgt_addr.sin_port = htons(6060); + free(buf); +retry: + rc = connect(sfd, (struct sockaddr *)&istgt_addr, sizeof (istgt_addr)); + if ((rc == -1) && (errno == EINTR)) { + ZREPL_ERRLOG("Failed to connect to istgt_controller" + " with err:%d\n", errno); + sleep(10); + goto retry; + } else { + printf("Connection to TGT controller successful\n"); + ZREPL_LOG("Connection to TGT controller iss successful\n"); + } + + efd = epoll_create1(0); + if (efd == -1) { + ZREPL_ERRLOG("epoll_create() failed with errno:%d\n", errno); + goto exit; + } + + event.data.fd = sfd; + event.events = EPOLLIN | EPOLLET | EPOLLERR | + EPOLLHUP | EPOLLRDHUP; + rc = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event); + if (rc == -1) { + ZREPL_ERRLOG("epoll_ctl() failed with errno:%d\n", errno); + goto exit; + } + + /* Buffer where events are returned */ + events = calloc(MAXEVENTS, sizeof (event)); + + /* The event loop */ + while (1) { + int n, i; + n = epoll_wait(efd, events, MAXEVENTS, -1); + for (i = 0; i < n; i++) { + if ((events[i].events & EPOLLERR) || + (events[i].events & EPOLLHUP) || + (events[i].events & EPOLLRDHUP)) { + /* + * Error has occurred on this socket + * close it and open a new socket after + * 5 sec of sleep. + */ + ZREPL_ERRLOG("epoll err() :%d\n", errno); + close(events[i].data.fd); + printf("Retrying ....\n"); + sleep(5); + sfd = create_and_bind(mgmt_port, false); + if (sfd == -1) { + goto exit; + } + + rc = make_socket_non_blocking(sfd); + if (rc == -1) { + goto exit; + } +retry1: + rc = connect(sfd, + (struct sockaddr *)&istgt_addr, + sizeof (istgt_addr)); + if ((rc == -1) && (errno == EINTR)) { + ZREPL_ERRLOG("Failed to connect to" + " istgt_controller with err:%d\n", + errno); + sleep(2); + goto retry1; + } + + event.data.fd = sfd; + event.events = EPOLLIN | EPOLLET | + EPOLLERR | EPOLLHUP | EPOLLRDHUP; + rc = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event); + if (rc == -1) { + ZREPL_ERRLOG("epoll_ctl() failed with" + " errno:%d\n", errno); + goto exit; + } + continue; + } + + bzero(&hdr, sizeof (hdr)); + count = read(events[i].data.fd, (char *)&hdr, + sizeof (hdr)); + if (count == -1) { + ZREPL_ERRLOG("Read from socket failed" + " with err:%d\n", errno); + goto exit; + } + + if (hdr.opcode == ZVOL_OPCODE_HANDSHAKE) { + name = kmem_alloc( + hdr.len * sizeof (char), KM_SLEEP); + count = read(events[i].data.fd, name, + sizeof (char) * hdr.len); + if (count == -1) { + ZREPL_ERRLOG("Read from socket failed" + " with err:%d\n", errno); + goto exit; + } + + rc = uzfs_zvol_mgmt_do_handshake(&hdr, + events[i].data.fd, name); + free(name); + if (rc == -1) { + ZREPL_ERRLOG("handshake failed with" + " errno:%d\n", errno); + goto exit; + } + } + } + } +exit: + if (events != NULL) { + free(events); + } + printf("uzfs_zvol_mgmt_thread thread exiting\n"); + zk_thread_exit(); +} +#endif +/* + * One thread per replica. Responsible for accepting + * IO connections. This thread will accept a connection + * and spawn a new thread for each new connection req. + */ +static void +uzfs_zvol_io_conn_acceptor(void) +{ + int rc, sfd, efd; +#ifdef DEBUG + char *hbuf; + char *sbuf; +#endif + int new_fd; + socklen_t in_len; + struct sockaddr in_addr; + struct epoll_event event; + struct epoll_event *events = NULL; + + sfd = efd = -1; + sfd = create_and_bind(accpt_port, true); + if (sfd == -1) { + goto exit; + } + + rc = make_socket_non_blocking(sfd); + if (rc == -1) { + goto exit; + } + + rc = listen(sfd, SOMAXCONN); + if (rc == -1) { + ZREPL_ERRLOG("listen() failed with errno:%d\n", errno); + goto exit; + } + + efd = epoll_create1(0); + if (efd == -1) { + ZREPL_ERRLOG("epoll_create() failed with errno:%d\n", errno); + goto exit; + } + + event.data.fd = sfd; + event.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLHUP | EPOLLRDHUP; + rc = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event); + if (rc == -1) { + ZREPL_ERRLOG("epoll_ctl() failed with errno:%d\n", errno); + goto exit; + } + + /* Buffer where events are returned */ + events = calloc(MAXEVENTS, sizeof (event)); + + /* The event loop */ + while (1) { + int i, n = 0; + kthread_t *thrd_info; + n = epoll_wait(efd, events, MAXEVENTS, -1); + /* + * EINTR err can come when signal handler + * interrupt epoll_wait system call. It + * should be okay to continue in that case. + */ + if ((n < 0) && (errno == EINTR)) { + continue; + } else if (n < 0) { + goto exit; + } + + for (i = 0; i < n; i++) { + /* + * An error has occured on this fd, or + * the socket is not ready for reading + * (why were we notified then?) + */ + if (!(events[i].events & EPOLLIN)) { + ZREPL_ERRLOG("epoll err() :%d\n", errno); + close(events[i].data.fd); + /* + * TODO:We have choosen to exit + * instead of continuing here. + */ + goto exit; + } + /* + * We have a notification on the listening + * socket, which means one or more incoming + * connections. + */ +#if 0 + while (1) { +#endif + in_len = sizeof (in_addr); + new_fd = accept(events[i].data.fd, + &in_addr, &in_len); +#if 0 + if ((errno == EAGAIN) || + (errno == EWOULDBLOCK)) { + break; + } +#endif + if (new_fd == -1) { + ZREPL_ERRLOG("accept err() :%d\n", + errno); + goto exit; + } +#ifdef DEBUG + hbuf = kmem_alloc( + sizeof (NI_MAXHOST), KM_SLEEP); + sbuf = kmem_alloc( + sizeof (NI_MAXSERV), KM_SLEEP); + rc = getnameinfo(&in_addr, in_len, hbuf, + sizeof (hbuf), sbuf, sizeof (sbuf), + NI_NUMERICHOST | NI_NUMERICSERV); + if (rc == 0) { + ZREPL_LOG("Accepted connection on " + "descriptor %d " + "(host=%s, port=%s)\n", + new_fd, hbuf, sbuf); + printf("Accepted IO conn on " + "descriptor %d " + "(host=%s, port=%s)\n", + new_fd, hbuf, sbuf); + } + + free(hbuf); + free(sbuf); +#endif + int *thread_fd = kmem_alloc( + sizeof (int), KM_SLEEP); + *thread_fd = new_fd; + thrd_info = zk_thread_create(NULL, 0, + (thread_func_t)uzfs_zvol_io_receiver, + (void *)thread_fd, 0, NULL, TS_RUN, 0, + PTHREAD_CREATE_DETACHED); + VERIFY3P(thrd_info, !=, NULL); +#if 0 + } +#endif + } + } +exit: + if (events != NULL) { + free(events); + } + + if (sfd != -1) { + close(sfd); + } + + if (efd != -1) { + close(efd); + } + + printf("uzfs_zvol_io_conn_acceptor thread exiting\n"); + ZREPL_ERRLOG("uzfs_zvol_io_conn_acceptor thread exiting\n"); + zk_thread_exit(); +} + +/* + * One thread per LUN/vol. This thread works + * on queue and it sends ack back to client on + * a given fd. + */ +static void +uzfs_zvol_io_ack_sender(void *arg) +{ + int fd; + zvol_info_t *zinfo; + thread_args_t *thrd_arg; + zvol_io_cmd_t *zio_cmd = NULL; + + thrd_arg = (thread_args_t *)arg; + fd = thrd_arg->fd; + zinfo = uzfs_zinfo_lookup(thrd_arg->zvol_name); + free(arg); + while (1) { + int rc = 0; + (void) pthread_mutex_lock(&zinfo->complete_queue_mutex); + do { + if (STAILQ_EMPTY(&zinfo->complete_queue)) { + zinfo->io_ack_waiting = 1; + pthread_cond_wait(&zinfo->io_ack_cond, + &zinfo->complete_queue_mutex); + + zinfo->io_ack_waiting = 0; + if ((zinfo->state == ZVOL_INFO_STATE_OFFLINE) || + (zinfo->conn_closed == true)) { + (void) pthread_mutex_unlock( + &zinfo->complete_queue_mutex); + goto exit; + } + } + } while (STAILQ_EMPTY(&zinfo->complete_queue)); + + zio_cmd = STAILQ_FIRST(&zinfo->complete_queue); + STAILQ_REMOVE_HEAD(&zinfo->complete_queue, cmd_link); + (void) pthread_mutex_unlock(&zinfo->complete_queue_mutex); + + ASSERT(zio_cmd->conn == fd); + ZREPL_LOG("ACK for op:%d with seq-id %ld\n", + zio_cmd->hdr.opcode, zio_cmd->hdr.io_seq); + + rc = uzfs_zvol_socket_write(zio_cmd->conn, + (char *)&zio_cmd->hdr, sizeof (zio_cmd->hdr)); + if (rc == -1) { + ZREPL_ERRLOG("socket write err :%d\n", errno); + zio_cmd_free(&zio_cmd); + goto exit; + } + + switch (zio_cmd->hdr.opcode) { + case ZVOL_OPCODE_HANDSHAKE: + case ZVOL_OPCODE_WRITE: + case ZVOL_OPCODE_SYNC: + zinfo->write_req_ack_cnt++; + /* Send handsake ack */ + break; + case ZVOL_OPCODE_READ: + printf("ACK for op:%d with seq-id %ld\n", + zio_cmd->hdr.opcode, zio_cmd->hdr.io_seq); + /* Send data read from disk */ + rc = uzfs_zvol_socket_write(zio_cmd->conn, + zio_cmd->buf, + (sizeof (char) * zio_cmd->hdr.len)); + if (rc == -1) { + ZREPL_ERRLOG("socket write err :%d\n", + errno); + ASSERT(0); + goto exit; + } + zinfo->read_req_ack_cnt++; + break; + + default: + VERIFY(!"Should be a valid opcode"); + break; + } + zio_cmd_free(&zio_cmd); + } +exit: + close(fd); + while (!STAILQ_EMPTY(&zinfo->complete_queue)) { + zio_cmd = STAILQ_FIRST(&zinfo->complete_queue); + STAILQ_REMOVE_HEAD(&zinfo->complete_queue, cmd_link); + zio_cmd_free(&zio_cmd); + } + uzfs_zinfo_drop_refcnt(zinfo, false); + + printf("uzfs_zvol_io_ack_sender thread exiting\n"); + zk_thread_exit(); +} + +static void +uzfs_zrepl_open_log(void) +{ + openlog("zrepl", LOG_PID, LOG_LOCAL7); +} + +static void +uzfs_zrepl_close_log(void) +{ + closelog(); +} + +static void +uzfs_zrepl_walk_pool_directory(void) +{ + spa_t *spa = NULL; + spa_t *sp = NULL; + int rc = 0; + + mutex_enter(&spa_namespace_lock); + while ((spa = spa_next(spa)) != NULL) { + (void) printf(">>>>>>>\t%s\n", spa_name(spa)); + rc = spa_open(spa_name(spa), &sp, spa); + if (rc == 0) { + spa_close(spa, spa); + } + } + mutex_exit(&spa_namespace_lock); +} + +/* + * Main function for replica. + */ +int +main(void) +{ + int rc; + kthread_t *conn_accpt_thrd; + kthread_t *uzfs_mgmt_thread; + + + pthread_t slf = pthread_self(); + snprintf(tinfo, sizeof (tinfo), "m#%d.%d", + (int)(((uint64_t *)slf)[0]), getpid()); + + rc = uzfs_init(); + uzfs_zrepl_open_log(); + if (rc != 0) { + printf("initialization errored.. %d\n", rc); + return (-1); + } + + rc = pthread_mutex_init(&zvol_list_mutex, NULL); + if (rc != 0) { + ZREPL_ERRLOG("zvol_global mutex_init() failed\n"); + return (-1); + } + + uzfs_zrepl_walk_pool_directory(); + sleep(5); + + /* Ignore SIGPIPE signal */ + signal(SIGPIPE, SIG_IGN); + if (libuzfs_ioctl_init() < 0) { + ZREPL_ERRLOG("Failed to initialize libuzfs ioctl\n"); + (void) fprintf(stderr, "%s", + "failed to initialize libuzfs ioctl\n"); + goto initialize_error; + } + + conn_accpt_thrd = zk_thread_create(NULL, 0, + (thread_func_t)uzfs_zvol_io_conn_acceptor, + NULL, 0, NULL, TS_RUN, 0, + PTHREAD_CREATE_DETACHED); + VERIFY3P(conn_accpt_thrd, !=, NULL); + + uzfs_mgmt_thread = zk_thread_create(NULL, 0, + (thread_func_t)uzfs_zvol_mgmt_thread, + NULL, 0, NULL, TS_RUN, 0, + PTHREAD_CREATE_DETACHED); + VERIFY3P(uzfs_mgmt_thread, !=, NULL); + + while (1) { + sleep(5); + } + +initialize_error: + uzfs_zrepl_close_log(); + uzfs_fini(); + return (-1); +} diff --git a/configure.ac b/configure.ac index b400255400e4..3bb1d2721ee8 100644 --- a/configure.ac +++ b/configure.ac @@ -108,6 +108,7 @@ AC_CONFIG_FILES([ cmd/zpool/Makefile cmd/zstreamdump/Makefile cmd/ztest/Makefile + cmd/zrepl/Makefile cmd/zpios/Makefile cmd/mount_zfs/Makefile cmd/fsck_zfs/Makefile diff --git a/include/Makefile.am b/include/Makefile.am index afad0ebd89dd..627b1a0cab2b 100644 --- a/include/Makefile.am +++ b/include/Makefile.am @@ -23,6 +23,7 @@ USER_H = \ $(top_srcdir)/include/libzfs_impl.h \ $(top_srcdir)/include/uzfs_io.h \ $(top_srcdir)/include/uzfs_mgmt.h \ + $(top_srcdir)/include/zrepl_mgmt.h \ $(top_srcdir)/include/uzfs_test.h \ $(top_srcdir)/include/rte_ring.h \ $(top_srcdir)/include/rte_atomic_64.h \ diff --git a/include/sys/zfs_context.h b/include/sys/zfs_context.h index 62e0c45c4663..29257ef41c7a 100644 --- a/include/sys/zfs_context.h +++ b/include/sys/zfs_context.h @@ -133,7 +133,7 @@ extern "C" { #define noinline __attribute__((noinline)) #define likely(x) __builtin_expect((x), 1) -#define unlikely(x) __builtin_expect((x), 0) +#define unlikely(x) __builtin_expect(!!(x), 0) /* * Debugging diff --git a/include/uzfs_mgmt.h b/include/uzfs_mgmt.h index 1048b50afc2f..13ba501dc0d7 100644 --- a/include/uzfs_mgmt.h +++ b/include/uzfs_mgmt.h @@ -30,6 +30,8 @@ extern int uzfs_vdev_add(void *spa, char *path, int ashift, int log); extern int uzfs_create_dataset(void *spa, char *ds, uint64_t vol_size, uint64_t block_size, void **zv); extern int uzfs_open_dataset(void *spa, char *ds, void **zv); +extern int uzfs_zvol_create_cb(const char *ds_name, void *n); +extern int uzfs_zvol_destroy_cb(const char *ds_name, void *n); extern uint64_t uzfs_synced_txg(void *zv); extern void uzfs_close_dataset(void *zv); extern void uzfs_close_pool(void *spa); diff --git a/include/uzfs_test.h b/include/uzfs_test.h index 43f6ba57a7be..389736ee1d4c 100644 --- a/include/uzfs_test.h +++ b/include/uzfs_test.h @@ -44,7 +44,8 @@ extern unsigned long zfs_arc_min; extern void replay_fn(void *arg); extern void setup_unit_test(void); extern void unit_test_create_pool_ds(void); -extern void open_pool_ds(void **, void **); +extern void open_pool(void **); +extern void open_ds(void *, void **); typedef struct worker_args { void *zv; diff --git a/include/zrepl_mgmt.h b/include/zrepl_mgmt.h new file mode 100644 index 000000000000..67d4cb80e313 --- /dev/null +++ b/include/zrepl_mgmt.h @@ -0,0 +1,131 @@ + +#ifndef ZREPL_MGMT_H +#define ZREPL_MGMT_H +#include +#include + +#define uZFS_ZVOL_WORKERS_MAX 128 +#define uZFS_ZVOL_WORKERS_DEFAULT 6 +#define MAX_IP_LEN 56 + +extern pthread_mutex_t zvol_list_mutex; +struct zvol_io_cmd_s; + +typedef enum zvol_info_state_e { + ZVOL_INFO_STATE_ONLINE, + ZVOL_INFO_STATE_OFFLINE, +} zvol_info_state_t; + +typedef struct thread_args_s { + char zvol_name[MAXNAMELEN]; + int fd; +} thread_args_t; + +typedef struct zvol_info_s { + + SLIST_ENTRY(zvol_info_s) zinfo_next; + + /* Logical Unit related fields */ + zvol_info_state_t state; + char name[MAXPATHLEN]; + void *zv; + int refcnt; + int is_io_ack_sender_created; + taskq_t *uzfs_zvol_taskq; /* Taskq for minor management */ + + /* Thread sync related */ + + /* For protection of complete_queue */ + pthread_mutex_t zinfo_mutex; + pthread_mutex_t complete_queue_mutex; + pthread_cond_t io_ack_cond; + + pthread_t io_receiver_thread; + pthread_t io_ack_sender_thread; + + /* All cmds after execution will go here for ack */ + STAILQ_HEAD(, zvol_io_cmd_s) complete_queue; + + uint8_t io_ack_waiting; + uint8_t error_count; + + /* Will be used to singal ack-sender to exit */ + uint8_t conn_closed; + + /* Perfromance counter */ + + /* Debug counters */ + int read_req_received_cnt; + int write_req_received_cnt; + int read_req_ack_cnt; + int write_req_ack_cnt; +} zvol_info_t; + +typedef enum zvol_op_code_e { + ZVOL_OPCODE_HANDSHAKE = 1, + ZVOL_OPCODE_READ, + ZVOL_OPCODE_WRITE, + ZVOL_OPCODE_UNMAP, + ZVOL_OPCODE_SYNC, + ZVOL_OPCODE_SNAP_CREATE, + ZVOL_OPCODE_SNAP_ROLLBACK, +} zvol_op_code_t; + +typedef enum zvol_op_status_e { + ZVOL_OP_STATUS_OK = 1, + ZVOL_OP_STATUS_FAILED, +} zvol_op_status_t; + +typedef struct zvol_io_hdr_s { + zvol_op_code_t opcode; + uint64_t io_seq; + uint64_t offset; + uint64_t len; + void *q_ptr; + zvol_op_status_t status; +} zvol_io_hdr_t; + +typedef struct zvol_io_cmd_s { + STAILQ_ENTRY(zvol_io_cmd_s) cmd_link; + zvol_io_hdr_t hdr; + void *zv; + void *buf; + int conn; +} zvol_io_cmd_t; + +typedef struct mgmt_ack_s { + char volname[MAXNAMELEN]; + char ip[MAX_IP_LEN]; + int port; +} mgmt_ack_t; + +extern int uzfs_zinfo_init(void *zv, const char *ds_name); +extern zvol_info_t *uzfs_zinfo_lookup(const char *name); +extern void uzfs_zinfo_drop_refcnt(zvol_info_t *zinfo, int locked); +extern void uzfs_zinfo_take_refcnt(zvol_info_t *zinfo, int locked); +extern void uzfs_zinfo_replay_zil_all(void); +extern int uzfs_zinfo_destroy(const char *ds_name); + +#define ZREPL_LOG(fmt, ...) syslog(LOG_NOTICE, \ + "%-18.18s:%4d: %-20.20s: " fmt, __func__, __LINE__, \ + tinfo, ##__VA_ARGS__) + +#define ZREPL_NOTICELOG(fmt, ...) syslog(LOG_NOTICE, \ + "%-18.18s:%4d: %-20.20s: " fmt, __func__, __LINE__, \ + tinfo, ##__VA_ARGS__) + +#define ZREPL_ERRLOG(fmt, ...) syslog(LOG_ERR, \ + "%-18.18s:%4d: %-20.20s: " fmt, __func__, __LINE__, \ + tinfo, ##__VA_ARGS__) + +#define ZREPL_WARNLOG(fmt, ...) syslog(LOG_ERR, \ + "%-18.18s:%4d: %-20.20s: " fmt, __func__, __LINE__, \ + tinfo, ##__VA_ARGS__) + +#define ZREPL_TRACELOG(FLAG, fmt, ...) \ + do { \ + syslog(LOG_NOTICE, "%-18.18s:%4d: %-20.20s: " \ + fmt, __func__, __LINE__, tinfo, ##__VA_ARGS__); \ + } while (0) + +#endif /* ZREPL_MGMT_H */ diff --git a/lib/libzpool/Makefile.am b/lib/libzpool/Makefile.am index 33ed5096f6c3..048ddc63b719 100644 --- a/lib/libzpool/Makefile.am +++ b/lib/libzpool/Makefile.am @@ -24,7 +24,8 @@ USER_C = \ uzfs_mtree.c \ uzfs_test_mgmt.c \ uzfs_zap.c \ - vdev_disk_aio.c + vdev_disk_aio.c \ + zrepl_mgmt.c KERNEL_C = \ zfs_comutil.c \ diff --git a/lib/libzpool/uzfs_mgmt.c b/lib/libzpool/uzfs_mgmt.c index 9ff03ace0e27..b8b75ffe730a 100644 --- a/lib/libzpool/uzfs_mgmt.c +++ b/lib/libzpool/uzfs_mgmt.c @@ -24,6 +24,7 @@ #include #include #include +#include static int uzfs_fd_rand = -1; @@ -259,11 +260,11 @@ uzfs_objset_create_cb(objset_t *new_os, void *arg, cred_t *cr, dmu_tx_t *tx) VERIFY(error == 0); } + /* owns objset with name 'ds_name' in pool 'spa'. Sets 'sync' property */ int -uzfs_open_dataset(spa_t *spa, const char *ds_name, zvol_state_t **z) +uzfs_open_dataset_init(spa_t *spa, const char *ds_name, zvol_state_t **z) { - char name[ZFS_MAX_DATASET_NAME_LEN]; zvol_state_t *zv = NULL; int error = -1; objset_t *os; @@ -274,7 +275,6 @@ uzfs_open_dataset(spa_t *spa, const char *ds_name, zvol_state_t **z) if (spa == NULL) goto ret; - (void) snprintf(name, sizeof (name), "%s/%s", spa_name(spa), ds_name); zv = kmem_zalloc(sizeof (zvol_state_t), KM_SLEEP); if (zv == NULL) @@ -284,9 +284,9 @@ uzfs_open_dataset(spa_t *spa, const char *ds_name, zvol_state_t **z) zfs_rlock_init(&zv->zv_range_lock); zfs_rlock_init(&zv->zv_mrange_lock); - strlcpy(zv->zv_name, name, MAXNAMELEN); + strlcpy(zv->zv_name, ds_name, MAXNAMELEN); - error = dmu_objset_own(name, DMU_OST_ZVOL, 1, zv, &os); + error = dmu_objset_own(ds_name, DMU_OST_ZVOL, 1, zv, &os); if (error) goto free_ret; zv->zv_objset = os; @@ -346,6 +346,21 @@ uzfs_open_dataset(spa_t *spa, const char *ds_name, zvol_state_t **z) return (error); } +/* owns objset with name 'ds_name' in pool 'spa'. Sets 'sync' property */ +int +uzfs_open_dataset(spa_t *spa, const char *ds_name, zvol_state_t **z) +{ + char name[ZFS_MAX_DATASET_NAME_LEN]; + int error = -1; + + if (spa == NULL) + return (error); + (void) snprintf(name, sizeof (name), "%s/%s", spa_name(spa), ds_name); + + error = uzfs_open_dataset_init(spa, name, z); + return (error); +} + /* * Creates dataset 'ds_name' in pool 'spa' with volume size 'vol_size', * block size as 'block_size' @@ -384,6 +399,47 @@ uzfs_create_dataset(spa_t *spa, char *ds_name, uint64_t vol_size, return (error); } +/* uZFS Zvol create call back function */ +int +uzfs_zvol_create_cb(const char *ds_name, void *arg) +{ + + zvol_state_t *zv = NULL; + spa_t *spa; + int error = -1; + + printf("ds_name %s\n", ds_name); + error = spa_open(ds_name, &spa, "UZINFO"); + if (error != 0) { + (void) spa_destroy((char *)ds_name); + return (error); + } + + error = uzfs_open_dataset_init(spa, ds_name, &zv); + if (error) { + spa_close(spa, "UZINFO"); + printf("Failed to open dataset: %s\n", ds_name); + return (error); + } + + if (uzfs_zinfo_init(zv, ds_name) != 0) { + printf("Failed in uzfs_zinfo_init\n"); + return (error); + } + return (0); +} + +/* uZFS Zvol destroy call back function */ +int +uzfs_zvol_destroy_cb(const char *ds_name, void *arg) +{ + + printf("deleting ds_name %s\n", ds_name); + + uzfs_zinfo_destroy(ds_name); + return (0); +} + /* disowns, closes dataset */ void uzfs_close_dataset(zvol_state_t *zv) diff --git a/lib/libzpool/zrepl_mgmt.c b/lib/libzpool/zrepl_mgmt.c new file mode 100644 index 000000000000..f5a0754ea731 --- /dev/null +++ b/lib/libzpool/zrepl_mgmt.c @@ -0,0 +1,207 @@ +#include +#include +#include +#include +#include +#include + +#define true 1 +#define false 0 + +#define ZVOL_THREAD_STACKSIZE (2 * 1024 * 1024) + +__thread char tinfo[20] = {0}; +clockid_t clockid; + +pthread_mutex_t zvol_list_mutex; +SLIST_HEAD(, zvol_info_s) zvol_list; +SLIST_HEAD(, zvol_info_s) stale_zv_list; + +static int uzfs_zinfo_free(zvol_info_t *zinfo); + +/* + * API to drop refcnt on zinfo. If refcnt + * dropped to zero then free zinfo. + */ +void +uzfs_zinfo_drop_refcnt(zvol_info_t *zinfo, int locked) +{ + if (!locked) { + (void) pthread_mutex_lock(&zvol_list_mutex); + } + + zinfo->refcnt--; + if (zinfo->refcnt == 0) { + (void) uzfs_zinfo_free(zinfo); + } + + if (!locked) { + (void) pthread_mutex_unlock(&zvol_list_mutex); + } +} + +/* + * API to take refcount on zinfo. + */ +void +uzfs_zinfo_take_refcnt(zvol_info_t *zinfo, int locked) +{ + if (!locked) { + (void) pthread_mutex_lock(&zvol_list_mutex); + } + zinfo->refcnt++; + if (!locked) { + (void) pthread_mutex_unlock(&zvol_list_mutex); + } +} + +static void +uzfs_insert_zinfo_list(zvol_info_t *zinfo) +{ + + /* Base refcount is taken here */ + (void) pthread_mutex_lock(&zvol_list_mutex); + uzfs_zinfo_take_refcnt(zinfo, true); + SLIST_INSERT_HEAD(&zvol_list, zinfo, zinfo_next); + (void) pthread_mutex_unlock(&zvol_list_mutex); +} + +static void +uzfs_remove_zinfo_list(zvol_info_t *zinfo) +{ + + SLIST_REMOVE(&zvol_list, zinfo, zvol_info_s, zinfo_next); + zinfo->state = ZVOL_INFO_STATE_OFFLINE; + /* Send signal to ack_sender thread about offline */ + (void) pthread_mutex_lock(&zinfo->complete_queue_mutex); + if (zinfo->io_ack_waiting) { + (void) pthread_cond_signal(&zinfo->io_ack_cond); + } + (void) pthread_mutex_unlock(&zinfo->complete_queue_mutex); + /* Base refcount is droped here */ + uzfs_zinfo_drop_refcnt(zinfo, true); +} + +zvol_info_t * +uzfs_zinfo_lookup(const char *name) +{ + int pathlen; + char *p; + zvol_info_t *zv = NULL; + int namelen = ((name) ? strlen(name) : 0); + + (void) pthread_mutex_lock(&zvol_list_mutex); + SLIST_FOREACH(zv, &zvol_list, zinfo_next) { + /* + * TODO: Come up with better approach. + * Since iSCSI tgt can send volname in desired format, + * we have added this hack where we do calculate length + * of name passed as arg, look for those many bytes in + * zv->name from tail/end. + */ + pathlen = strlen(zv->name); + p = zv->name + (pathlen - namelen); + + /* + * Name can be in any of these formats + * "vol1" or "zpool/vol1" + */ + if (name == NULL || (strcmp(zv->name, name) == 0) || + ((strcmp(p, name) == 0) && (*(--p) == '/'))) { + break; + } + } + + /* Take refcount */ + uzfs_zinfo_take_refcnt(zv, true); + (void) pthread_mutex_unlock(&zvol_list_mutex); + return (zv); +} + +static void +uzfs_zinfo_init_mutex(zvol_info_t *zinfo) +{ + + (void) pthread_mutex_init(&zinfo->complete_queue_mutex, NULL); + (void) pthread_mutex_init(&zinfo->zinfo_mutex, NULL); + (void) pthread_cond_init(&zinfo->io_ack_cond, NULL); +} + +static void +uzfs_zinfo_destroy_mutex(zvol_info_t *zinfo) +{ + + (void) pthread_mutex_destroy(&zinfo->complete_queue_mutex); + (void) pthread_mutex_destroy(&zinfo->zinfo_mutex); + (void) pthread_cond_destroy(&zinfo->io_ack_cond); +} + +int +uzfs_zinfo_destroy(const char *name) +{ + + zvol_info_t *zinfo = NULL; + int namelen = ((name) ? strlen(name) : 0); + zvol_state_t *zv; + + (void) pthread_mutex_lock(&zvol_list_mutex); + SLIST_FOREACH(zinfo, &zvol_list, zinfo_next) { + if (name == NULL || strcmp(zinfo->name, name) == 0 || + (strncmp(zinfo->name, name, namelen) == 0 && + (zinfo->name[namelen] == '/' || + zinfo->name[namelen] == '@'))) { + zv = zinfo->zv; + uzfs_remove_zinfo_list(zinfo); + zil_close(zv->zv_zilog); + zfs_rlock_destroy(&zv->zv_range_lock); + zfs_rlock_destroy(&zv->zv_mrange_lock); + dnode_rele(zv->zv_dn, zv); + dmu_objset_disown(zv->zv_objset, zv); + spa_close(zv->zv_spa, "UZINFO"); + kmem_free(zv, sizeof (zvol_state_t)); + break; + } + } + (void) pthread_mutex_unlock(&zvol_list_mutex); + + printf("uzfs_zinfo_destroy path\n"); + return (0); +} + +int +uzfs_zinfo_init(void *zv, const char *ds_name) +{ + + zvol_info_t *zinfo; + + zinfo = kmem_zalloc(sizeof (zvol_info_t), KM_SLEEP); + bzero(zinfo, sizeof (zvol_info_t)); + ASSERT(zinfo != NULL); + + zinfo->uzfs_zvol_taskq = taskq_create("replica", boot_ncpus, + defclsyspri, boot_ncpus, INT_MAX, + TASKQ_PREPOPULATE | TASKQ_DYNAMIC); + + STAILQ_INIT(&zinfo->complete_queue); + uzfs_zinfo_init_mutex(zinfo); + + strlcpy(zinfo->name, ds_name, MAXNAMELEN); + zinfo->zv = zv; + /* Update zvol list */ + uzfs_insert_zinfo_list(zinfo); + + printf("uzfs_zinfo_init in success path\n"); + return (0); +} + +static int +uzfs_zinfo_free(zvol_info_t *zinfo) +{ + taskq_destroy(zinfo->uzfs_zvol_taskq); + (void) uzfs_zinfo_destroy_mutex(zinfo); + ASSERT(STAILQ_EMPTY(&zinfo->complete_queue)); + printf("Freeing volume =%s\n", zinfo->name); + + free(zinfo); + return (0); +} diff --git a/module/zfs/dsl_destroy.c b/module/zfs/dsl_destroy.c index d980f7d1fd78..9e1764592a33 100644 --- a/module/zfs/dsl_destroy.c +++ b/module/zfs/dsl_destroy.c @@ -42,6 +42,7 @@ #include #include #include +#include typedef struct dmu_snapshots_destroy_arg { nvlist_t *dsda_snaps; @@ -929,6 +930,9 @@ dsl_destroy_head(const char *name) zfs_destroy_unmount_origin(name); #endif +#ifndef _KERNEL + uzfs_zvol_destroy_cb(name, NULL); +#endif error = spa_open(name, &spa, FTAG); if (error != 0) return (error); diff --git a/module/zfs/spa.c b/module/zfs/spa.c index 771f4c8d18ae..fdebce272661 100644 --- a/module/zfs/spa.c +++ b/module/zfs/spa.c @@ -77,7 +77,7 @@ #include #include #include - +#include #ifdef _KERNEL #include #include @@ -3505,9 +3505,14 @@ spa_open_common(const char *pool, spa_t **spapp, void *tag, nvlist_t *nvpolicy, mutex_exit(&spa_namespace_lock); } - if (firstopen) + if (firstopen) { +#ifdef _KERNEL zvol_create_minors(spa, spa_name(spa), B_TRUE); - +#else + dmu_objset_find(spa_name(spa), uzfs_zvol_create_cb, NULL, + DS_FIND_CHILDREN); +#endif + } *spapp = spa; return (0); @@ -4560,6 +4565,13 @@ spa_export_common(char *pool, int new_state, nvlist_t **oldconfig, zvol_remove_minors(spa, spa_name(spa), B_TRUE); taskq_wait(spa->spa_zvol_taskq); } + +#ifndef _KERNEL + if ((new_state == POOL_STATE_DESTROYED) || + (new_state == POOL_STATE_EXPORTED)) { + uzfs_zvol_destroy_cb(spa_name(spa), NULL); + } +#endif mutex_enter(&spa_namespace_lock); spa_close(spa, FTAG); diff --git a/module/zfs/zfs_ioctl.c b/module/zfs/zfs_ioctl.c index e44bed324e15..323318157c54 100644 --- a/module/zfs/zfs_ioctl.c +++ b/module/zfs/zfs_ioctl.c @@ -282,6 +282,7 @@ static int get_nvlist(uint64_t nvl, uint64_t size, int iflag, nvlist_t **nvp); #include #include #include +#include #include "zfs_fletcher.h" #include "zfs_namecheck.h" @@ -3397,6 +3398,9 @@ zfs_ioc_create(const char *fsname, nvlist_t *innvl, nvlist_t *outnvl) is_insensitive ? DS_FLAG_CI_DATASET : 0, cbfunc, &zct); nvlist_free(zct.zct_zplprops); +#if !defined(_KERNEL) + (void) uzfs_zvol_create_cb((char *)fsname, NULL); +#endif /* * It would be nice to do this atomically. */ diff --git a/tests/cbtest/script/test_uzfs.sh b/tests/cbtest/script/test_uzfs.sh index 4b322641d371..b9229019bc34 100755 --- a/tests/cbtest/script/test_uzfs.sh +++ b/tests/cbtest/script/test_uzfs.sh @@ -23,7 +23,7 @@ fi ZPOOL="$SRC_PATH/cmd/zpool/zpool" ZFS="$SRC_PATH/cmd/zfs/zfs" ZDB="$SRC_PATH/cmd/zdb/zdb" -TGT="$SRC_PATH/cmd/tgt/tgt" +TGT="$SRC_PATH/cmd/zrepl/zrepl" GTEST="$SRC_PATH/tests/cbtest/gtest/test_uzfs" ZTEST="$SRC_PATH/cmd/ztest/ztest" UZFS_TEST="$SRC_PATH/cmd/uzfs_test/uzfs_test" @@ -159,7 +159,7 @@ run_zvol_tests() log_must check_prop "$SRCPOOL/$SRCVOL" sync always # dump some data - log_must dump_data + #log_must dump_data # test snapshot creation log_must create_snapshot "$SRCPOOL/$SRCVOL" "snap" @@ -271,7 +271,6 @@ create_snapshot() { fs_vol=$1 snap=$2 - test -z $fs_vol && log_fail "Filesystem or volume's name is undefined." test -z $snap && log_fail "Snapshot's name is undefined." @@ -559,7 +558,7 @@ test_raidz_pool() setup_uzfs_test() { $TGT & - sleep 1 + sleep 10 TGT_PID2=$! export_pool $UZFS_TEST_POOL @@ -648,6 +647,7 @@ run_dmu_test() } init_test +sleep 10 log_must test_stripe_pool log_must test_mirror_pool