#include #include #include #include #include #include #include "was/storage_account.h" #include "was/queue.h" #include "was/blob.h" #include "was/table.h" #include "was/common.h" #include "cpprest/filestream.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //#include #include // text file approach, all support but abort in ~locale typedef boost::log::sinks::text_file_backend file_backend; typedef boost::log::sinks::asynchronous_sink< boost::log::sinks::text_file_backend > file_sink; #define HTTP_BAD_REQUEST 400 #define HTTP_OK 200 #define HTTP_MIN_SANE_STATUS 100 #define HTTP_MAX_SANE_STATUS 600 #define HTTP_BOGUS_STATUS 9999 #define HTTP_NOT_FOUND 404 #define HTTP_REQUEST_TIMEOUT 408 #define HTTP_TOO_MANY_REQUESTS 429 #define HTTP_PARTIAL_CONTENT 206 #define HTTP_SERVICE_UNAVAILABLE 503 using namespace azure::storage; const static uint64_t epoch_offset = 11644473600000LL; namespace azure { namespace storage { class td_exponential_retry_policy : public basic_common_retry_policy { public: /// /// Initializes a new instance of the class. /// /// The delta backoff. /// The maximum number of retries to attempt. td_exponential_retry_policy(std::chrono::seconds delta_backoff, int max_attempts) : basic_common_retry_policy(max_attempts), m_rand_distribution(static_cast(delta_backoff.count()) * 0.8, static_cast(delta_backoff.count()) * 1.2), m_delta_backoff(delta_backoff) { } WASTORAGE_API retry_info evaluate(const retry_context& retry_context, operation_context context) override // retry_info td_exponential_retry_policy::evaluate(const retry_context& retry_context, operation_context context) { auto result = basic_common_retry_policy::evaluate(retry_context, context); if (result.should_retry()) { auto random_backoff = m_rand_distribution(m_rand_engine); std::chrono::milliseconds increment(static_cast((std::pow(2, retry_context.current_retry_count()) - 1) * random_backoff * 1000)); auto interval = increment < std::chrono::milliseconds::zero() ? max_exponential_retry_interval : min_exponential_retry_interval + increment; result.set_retry_interval(std::min(interval, max_exponential_retry_interval)); align_retry_interval(result); } return result; } /// /// Clones the retry policy. /// /// A cloned . retry_policy clone() const override { return retry_policy(std::make_shared(m_delta_backoff, m_max_attempts)); } private: std::uniform_real_distribution<> m_rand_distribution; std::default_random_engine m_rand_engine; std::chrono::seconds m_delta_backoff; }; class td_retry_policy : public retry_policy { public: /// /// Initializes a new instance of the class. /// td_retry_policy() : retry_policy(std::make_shared(protocol::default_retry_interval, default_attempts)) { } /// /// Initializes a new instance of the class. /// /// The delta backoff. /// The maximum number of retries to attempt. td_retry_policy(std::chrono::seconds delta_backoff, int max_attempts) : retry_policy(std::make_shared(delta_backoff, max_attempts)) { } }; }} namespace azure { namespace storage { }} // namespace azure::storage bool initialize_blob_client(const utility::string_t &connection_string, cloud_blob_client &blob_client) { bool res = false; try { fprintf(stderr,"creating azure client handle with connection string=<%s>\n", connection_string.c_str()); cloud_storage_account storage_account = cloud_storage_account::parse(connection_string.c_str()); azure::storage::blob_request_options options; azure::storage::td_retry_policy policy(std::chrono::seconds(1), 4); options.set_retry_policy(policy); options.set_server_timeout(std::chrono::seconds(5)); options.set_parallelism_factor(1); //options.set_maximum_execution_time(std::chrono::milliseconds(10000)); blob_client = storage_account.create_cloud_blob_client(options); // EXTFS-1379 verify the connection first to trigger an exception if connection string is invalid // that can be only done through a list call // download properties is not an expensive call //auto containerList = blob_client.download_service_properties(); res = true; } catch (const azure::storage::storage_exception& e) { //std::cout << U("Error:") << e.what() << std::endl << U("The blob client can not be created.") << std::endl; //fprintf(stderr,"Error: the blob client can not be created <%s> with connection string: <%s> authentication error", e.what(), connection_string.c_str()); int http_code = e.result().http_status_code(); fprintf(stderr,"Error: storage_exception the blob client can not be created <%s> with http code=<%d>", e.what(), http_code); res = false; throw e; } catch (const std::runtime_error& e) { //std::cout << U("Error:") << e.what() << std::endl << U("The blob client can not be created.") << std::endl; //fprintf(stderr,"Error: the blob client can not be created <%s> with connection string: %s", e.what(), connection_string.c_str()); fprintf(stderr,"Error: runtime_exception the blob client can not be created <%s>", e.what()); res = false; throw e; } catch (const std::exception &e) { //std::cout << U("Error:") << e.what() << std::endl << U("The blob client can not be created.") << std::endl; //fprintf(stderr,"Error: the blob client can not be created <%s> with connection string: %s", e.what(), connection_string.c_str()); fprintf(stderr,"Error: generic exception the blob client can not be created <%s>", e.what()); res = false; throw e; } return res; } /// /// Creates a container in the blob storage /// cloud_blob_container find_container(cloud_blob_client &blob_client, std::string &container_name, bool &exception) { // Get a reference to the container // Create the container if it does not exist yet cloud_blob_container container; fprintf(stderr,"Finding container: %s\n", container_name.c_str()); try { container = blob_client.get_container_reference(container_name); if ( !container.exists() ) { fprintf(stderr,"Error: container is not valid\n"); } return container; } catch (const azure::storage::storage_exception& e) { int http_code = e.result().http_status_code(); fprintf(stderr,"<%s> failed due to <%s> with code <%d>", container_name.c_str(), e.what(), http_code); } return container; } /// /// upload operation /// bool upload_blob_operation(cloud_blob_client &blob_client, std::string &obj_name, std::string &blob_str) { // Generate unique container name bool result = true; std::string container_name("stresstest"); fprintf(stderr,"creating azure container <%s>\n", container_name.c_str()); try { // NOTE: it will not create one if it already exists so it is safe cloud_blob_container container = find_container(blob_client, container_name, result); if ( !container.exists() ) { fprintf(stderr,"Error:container not found <%s>", container_name.c_str()); return false; } fprintf(stderr,"uploading blob to <%s> ", container_name.c_str()); //Push a file from memory buffer to the cloud block blob auto ss = Concurrency::streams::stringstream::open_istream(blob_str); //Concurrency::streams::istream input_stream(ss.streambuf()); boost::shared_ptr m_istream = boost::make_shared(ss.streambuf()); //block_blob.upload_from_stream(input_stream); azure::storage::cloud_block_blob block_blob = container.get_block_blob_reference(obj_name); azure::storage::blob_request_options reqOptions; azure::storage::td_retry_policy policy(std::chrono::seconds(1), 4); reqOptions.set_retry_policy(policy); reqOptions.set_parallelism_factor(1); reqOptions.set_server_timeout(std::chrono::seconds(5)); /* reqOptions.set_maximum_execution_time(std::chrono::milliseconds(10000)); */ //reqOptions.set_retry_policy(azure::storage::linear_retry_policy(std::chrono::seconds(1), 1)); //boost::shared_ptr m_istream; reqOptions.set_use_transactional_md5(true); //reqOptions.set_use_transactional_md5(false); reqOptions.set_store_blob_content_md5(true); reqOptions.set_disable_content_md5_validation(false); block_blob.upload_from_stream((*m_istream),azure::storage::access_condition(), reqOptions, azure::storage::operation_context()); // change it to a per object based shared ptr of input stream to avoid race condition m_istream->close().wait(); } catch (const azure::storage::storage_exception& e) { int http_code = e.result().http_status_code(); fprintf(stderr,"uploading blob to <%s> failed due to <%s> with code <%d>", container_name.c_str(), e.what(), http_code); result = false; } catch (const std::exception &e) // put in a general catch all just in case { fprintf(stderr,"uploading blob to <%s> failed due to <%s>", container_name.c_str(), e.what() ); result = false; } return result; } /// /// download operation /// //int download_blob_operation(cloud_blob_client &blob_client, std::string &obj_name, long offset,char *optr, size_t olen) int download_blob_operation(cloud_blob_client &blob_client, std::string &obj_name, long offset, size_t olen) { bool result = true; concurrency::streams::container_buffer> buffer; concurrency::streams::ostream output_stream(buffer); utility::size64_t download_size = 0; std::string container_name("stresstest"); try { fprintf(stderr,"download_blob_operation container=%s obj=%s offset=%ld size=%ld\n", container_name.c_str(), obj_name.c_str(), offset, olen); azure::storage::cloud_blob_container container = find_container(blob_client, container_name, result); if (!container.exists()) { int http_code = HTTP_NOT_FOUND; // there is no lib code in azure sdk so set to 0 for now // but we are hard coding it to BAD_REQUEST as a generic error code int lib_code = 0; int libcode = 0; const char *msgtxt = NULL; fprintf(stderr,"conatiner not found <%s>", container_name.c_str()); return -1; } azure::storage::cloud_block_blob block_blob = container.get_block_blob_reference(obj_name); azure::storage::blob_request_options reqOptions; //azure::storage::td_retry_policy policy(std::chrono::seconds(1), 4, iface); //reqOptions.set_retry_policy(policy); azure::storage::td_retry_policy policy(std::chrono::seconds(1), 4); reqOptions.set_retry_policy(policy); reqOptions.set_parallelism_factor(2); reqOptions.set_server_timeout(std::chrono::seconds(5)); // Yang Adding this line would cause the mem leak // without this line no leak reqOptions.set_maximum_execution_time(std::chrono::milliseconds(10000)); // increase read buffer to 16M in order to give a boost to performance const size_t new_stream_read_size = 16 * 1024 * 1024; reqOptions.set_stream_read_size_in_bytes(new_stream_read_size); // increase read buffer to 16M in order to give a boost to performance // const size_t new_stream_read_size = 16 * 1024 * 1024; // reqOptions.set_stream_read_size_in_bytes(new_stream_read_size); //need to obtain last modified time block_blob.download_attributes(); fprintf(stderr,"modified on=%llu %s\n", block_blob.properties().last_modified().to_interval(), block_blob.properties().last_modified().to_string().c_str()); long unsigned int input = block_blob.properties().last_modified().to_interval(); long unsigned int linuxtime_milisecs = input/10000 - epoch_offset;// diff between windows and unix epochs (seconds) // need to modify olen to m_size utility::size64_t actual_size = block_blob.properties().size(); download_size = olen + offset; if (download_size > actual_size) { fprintf(stderr,"Download offset: %ld + olen: %ld beyond object range %ld readjusted\n", offset, olen, actual_size); download_size = actual_size - offset; } else { download_size = olen; } azure::storage::operation_context context; concurrency::streams::container_buffer> download_buffer; concurrency::streams::ostream output_stream(buffer); /* // block_blob.download_range_to_stream(output_stream, offset, download_size, azure::storage::access_condition(), azure::storage::blob_request_options(), azure::storage::operation_context()); //block_blob.download_range_to_stream(output_stream, offset, download_size); { block_blob.download_range_to_stream(download_buffer.create_ostream(), offset, download_size, azure::storage::access_condition(), reqOptions, azure::storage::operation_context()); // block_blob.download_to_stream(output_stream); } */ reqOptions.set_use_transactional_md5(true); //reqOptions.set_use_transactional_md5(false); reqOptions.set_store_blob_content_md5(true); reqOptions.set_disable_content_md5_validation(false); { block_blob.download_range_to_stream(download_buffer.create_ostream(), offset, download_size, azure::storage::access_condition(), reqOptions, azure::storage::operation_context()); } } catch (const azure::storage::storage_exception& e) { //std::cout << U("Error:") << e.what() << std::endl << U("The object can not be downloaded.") << std::endl; // azure does not return REST CPI http response code so we are setting this to be a generic BAD_REQUEST int http_code = e.result().http_status_code(); int libcode = 0; const char *msgtxt = NULL; fprintf(stderr,"Storage exception The object %s can not be downloaded <%s> failed with http code=<%d> ", obj_name.c_str(), e.what(), http_code); fprintf(stderr,"partial buffer size=%d request length=%d ",buffer.collection().size(), download_size); return -1; } catch (const std::exception &e) { fprintf(stderr,"General exception The object can not be downloaded <%s> failed ", e.what()); return -1; } // utility::string_t output = utility::string_t(buffer.collection().cbegin(), buffer.collection().cend()); // We are copying all bytes from the downloaded buffer to the output stream including null bytes /* if ( buffer.collection().size() != download_size) { fprintf(stderr,"Only obtained partial buffer size=%d request length=%d ",buffer.collection().size(), download_size); return -1; } else { fprintf(stderr,"download_blob_operation container=%s obj=%s completed download size=%d\n", container_name.c_str(), obj_name.c_str(), download_size); std::copy(buffer.collection().cbegin(), buffer.collection().cend(), optr); buffer.close(); } */ return download_size; } int main(int argc, char *argv[]) { boost::shared_ptr m_backend; boost::shared_ptr m_sink; int start_num = atoi(argv[1]); int num_of_objs = atoi(argv[2]); azure::storage::operation_context::set_default_log_level(azure::storage::client_log_level::log_level_verbose); // td-1921 only initialize boost log sink when trace flag is enabled // set up the backend // as per td-1921, we no longer use date time stamp in creating file name, it will be a sequence number oly // otherwise the locale destructor on date_and_time will cause a sigabort /* boost::log::add_file_log ( boost::log::keywords::file_name = "test_log.log", boost::log::keywords::format = ( boost::log::expressions::stream << " " << boost::log::expressions::smessage ) ); */ m_backend = boost::make_shared( boost::log::keywords::file_name = "/var/opt/tdtemp/azure_crash_%4N.log", boost::log::keywords::rotation_size = 10 * 1024 * 1024, // rotation size set at 10Mgeg boost::log::keywords::min_free_space = 30 * 1024 * 1024, boost::log::keywords::open_mode = std::ios_base::app|std::ios_base::out, boost::log::keywords::scan_method = boost::log::sinks::file::scan_matching, boost::log::keywords::format = "%UTCTimeStamp% (%TimeStamp%) [%ThreadID%]: %Message%", boost::log::keywords::auto_flush = true ); // Wrap it into the frontend and register in the core. m_sink = boost::make_shared(m_backend); boost::log::core::get()->add_sink(m_sink); fprintf(stderr, "starting number for this thread is: %d\n", start_num); cloud_blob_client block_blob; const utility::string_t connection_string = "DefaultEndpointsProtocol=https;AccountName=XXXXXX;AccountKey=YYYYYY;BlobEndpoint=https://XXXXXX.blob.core.windows.net"; initialize_blob_client( connection_string, block_blob); // azure::storage::blob_request_options options; // options.set_location_mode(azure::storage::location_mode::secondary_only); //auto stats = block_blob.download_service_stats(options, azure::storage::operation_context()); //auto isInitialized = stats.geo_replication().last_sync_time().is_initialized(); //auto status = stats.geo_replication().status(); //utility::string_t string_data_value = stats.geo_replication().last_sync_time().to_string(utility::datetime::ISO_8601); // for (int i=0; i