diff --git a/include/snowflake/client.h b/include/snowflake/client.h index 1dd5f53b30..d1c091937d 100644 --- a/include/snowflake/client.h +++ b/include/snowflake/client.h @@ -294,6 +294,26 @@ typedef enum SF_STMT_ATTRIBUTE { SF_STMT_USER_REALLOC_FUNC } SF_STMT_ATTRIBUTE; +/** + * The query status + */ +typedef enum SF_QUERY_STATUS { + SF_QUERY_STATUS_ABORTED, + SF_QUERY_STATUS_ABORTING, + SF_QUERY_STATUS_BLOCKED, + SF_QUERY_STATUS_DISCONNECTED, + SF_QUERY_STATUS_FAILED_WITH_ERROR, + SF_QUERY_STATUS_FAILED_WITH_INCIDENT, + SF_QUERY_STATUS_NO_DATA, + SF_QUERY_STATUS_RUNNING, + SF_QUERY_STATUS_QUEUED, + SF_QUERY_STATUS_QUEUED_REPAIRING_WAREHOUSE, + SF_QUERY_STATUS_RESTARTED, + SF_QUERY_STATUS_RESUMING_WAREHOUSE, + SF_QUERY_STATUS_SUCCESS, + SF_QUERY_STATUS_UNKNOWN +} SF_QUERY_STATUS; + /** * Snowflake Error */ @@ -476,6 +496,8 @@ typedef struct SF_STMT { SF_STATS *stats; void *stmt_attrs; sf_bool is_dml; + sf_bool is_async; + sf_bool is_async_initialized; /** * User realloc function used in snowflake_fetch @@ -613,6 +635,25 @@ SF_STATUS STDCALL snowflake_get_attribute( */ SF_STMT *STDCALL snowflake_stmt(SF_CONNECT *sf); +/** + * Creates sf SNOWFLAKE_STMT context for async queries. + * + * @param sf The SF_CONNECT context. + * @param query_id the query id of the async query. + * + * @return sfstmt SNOWFLAKE_STMT context for async queries. + */ +SF_STMT* STDCALL snowflake_create_async_query_result(SF_CONNECT *sf, const char *query_id); + +/** + * Get the status of a query + * + * @param sfstmt The SF_STMT context. + * + * @return The query status. + */ +SF_QUERY_STATUS STDCALL snowflake_get_query_status(SF_STMT *sfstmt); + /** * Frees the memory used by a SF_QUERY_RESULT_CAPTURE struct. * Note that this only frees the struct itself, and *not* the underlying @@ -775,6 +816,14 @@ snowflake_stmt_get_attr(SF_STMT *sfstmt, SF_STMT_ATTRIBUTE type, void **value); */ SF_STATUS STDCALL snowflake_execute(SF_STMT *sfstmt); +/** + * Executes a statement asynchronously. + * @param sfstmt SNOWFLAKE_STMT context. + * + * @return 0 if success, otherwise an errno is returned. + */ +SF_STATUS STDCALL snowflake_async_execute(SF_STMT *sfstmt); + /** * Executes a statement with capture. * @param sfstmt SNOWFLAKE_STMT context. diff --git a/include/snowflake/platform.h b/include/snowflake/platform.h index 807d6904f0..b5d938c62d 100755 --- a/include/snowflake/platform.h +++ b/include/snowflake/platform.h @@ -142,6 +142,9 @@ void STDCALL sf_memory_error_handler(); // this should be called by application before any calls of sfclient void STDCALL sf_exception_on_memory_failure(); +void sf_sleep_ms(int sleep_ms); + + #ifdef __cplusplus } #endif diff --git a/lib/client.c b/lib/client.c index c7efb91110..1214173a81 100644 --- a/lib/client.c +++ b/lib/client.c @@ -24,6 +24,8 @@ #include #define strncasecmp _strnicmp #define strcasecmp _stricmp +#else +#include #endif #define curl_easier_escape(curl, string) curl_easy_escape(curl, string, 0) @@ -54,12 +56,194 @@ static SF_STATUS STDCALL _reset_connection_parameters(SF_CONNECT *sf, cJSON *parameters, cJSON *session_info, sf_bool do_validate); +static const char* query_status_names[] = { + "ABORTED", + "ABORTING", + "BLOCKED", + "DISCONNECTED", + "FAILED_WITH_ERROR", + "FAILED_WITH_INCIDENT", + "NO_DATA", + "RUNNING", + "QUEUED", + "QUEUED_REPAIRING_WAREHOUSE", + "RESTARTED", + "RESUMING_WAREHOUSE", + "SUCCESS", + "UNKNOWN" +}; + /** * Validate partner application name. * @param application partner application name */ sf_bool validate_application(const char *application); +/** + * Helper function to get SF_QUERY_STATUS given the string representation + * @param query_status the string representation of the query status + */ +SF_QUERY_STATUS get_status_from_string(const char *query_status) { + if (query_status == NULL) { + return SF_QUERY_STATUS_UNKNOWN; + } + int idx = 0, last = 0; + for (idx = 0, last = (int)SF_QUERY_STATUS_UNKNOWN; idx <= last; ++idx) { + size_t len = strlen(query_status_names[idx]); + if (sf_strncasecmp(query_status_names[idx], query_status, len) == 0) { + return (SF_QUERY_STATUS)idx; + } + } + return SF_QUERY_STATUS_NO_DATA; +} + +/** + * Get the metadata of the query + * + * @param sfstmt The SF_STMT context. + * + * The query metadata + */ +char *get_query_metadata(SF_STMT* sfstmt) { + cJSON *resp = NULL; + cJSON *data = NULL; + cJSON *queries = NULL; + char *s_resp = NULL; + size_t url_size = strlen(QUERY_MONITOR_URL) - 2 + strlen(sfstmt->sfqid) + 1; + char *status_query = (char*)SF_CALLOC(1, url_size); + sf_sprintf(status_query, url_size, QUERY_MONITOR_URL, sfstmt->sfqid); + + if (request(sfstmt->connection, &resp, status_query, NULL, 0, NULL, NULL, + GET_REQUEST_TYPE, &sfstmt->error, SF_BOOLEAN_TRUE, + 0, sfstmt->connection->retry_count, get_retry_timeout(sfstmt->connection), + NULL, NULL, NULL, SF_BOOLEAN_FALSE)) { + + s_resp = snowflake_cJSON_Print(resp); + log_trace("Here is JSON response:\n%s", s_resp); + + data = snowflake_cJSON_GetObjectItem(resp, "data"); + + queries = snowflake_cJSON_GetObjectItem(data, "queries"); + cJSON* query = snowflake_cJSON_GetArrayItem(queries, 0); + + char *metadata = snowflake_cJSON_Print(query); + snowflake_cJSON_Delete(resp); + SF_FREE(s_resp); + SF_FREE(status_query); + return metadata; + } + SF_FREE(status_query); + log_info("No query metadata found. Query id: %s", sfstmt->sfqid); + return NULL; +} + + +SF_QUERY_STATUS STDCALL snowflake_get_query_status(SF_STMT *sfstmt) { + SF_QUERY_STATUS ret = SF_QUERY_STATUS_NO_DATA; + char *metadata = get_query_metadata(sfstmt); + if (metadata) { + cJSON* metadataJson = snowflake_cJSON_Parse(metadata); + + cJSON* status = snowflake_cJSON_GetObjectItem(metadataJson, "status"); + if (snowflake_cJSON_IsString(status)) { + char* queryStatus = snowflake_cJSON_GetStringValue(status); + ret = get_status_from_string(queryStatus); + } + snowflake_cJSON_Delete(metadataJson); + } + + return ret; +} + +/** + * Helper function to determine if the query is still running + * @param query_status the query status + */ +sf_bool is_query_still_running(SF_QUERY_STATUS query_status) { + return (query_status == SF_QUERY_STATUS_RUNNING) || + (query_status == SF_QUERY_STATUS_QUEUED) || + (query_status == SF_QUERY_STATUS_RESUMING_WAREHOUSE) || + (query_status == SF_QUERY_STATUS_QUEUED_REPAIRING_WAREHOUSE) || + (query_status == SF_QUERY_STATUS_NO_DATA); +} + +/** + * Get the results of the async query + * @param sfstmt The SF_STMT context + */ +sf_bool get_real_results(SF_STMT *sfstmt) { + //Get status until query is complete or timed out + SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); + int retry = 0; + int no_data_retry = 0; + int no_data_max_retries = 30; + int retry_pattern[] = {1, 1, 2, 3, 4, 8, 10}; + int max_retries = 7; + while (query_status != SF_QUERY_STATUS_SUCCESS) { + if (!is_query_still_running(query_status) && query_status != SF_QUERY_STATUS_SUCCESS) { + log_error("Query status is done running and did not succeed. Status is %s", + query_status_names[query_status]); + break; + } + if (query_status == SF_QUERY_STATUS_NO_DATA) { + no_data_retry++; + if (no_data_retry >= no_data_max_retries) { + log_error( + "Cannot retrieve data on the status of this query. No information returned from server for queryID=%s", + sfstmt->sfqid); + SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, + SF_STATUS_ERROR_GENERAL, + "Cannot retrieve data on the status of this query.", + NULL, + sfstmt->sfqid); + break; + } + } + + int sleep_time = retry_pattern[retry] * 500; + sf_sleep_ms(sleep_time); + if (retry < max_retries) { + retry++; + } + else { + log_error( + "Cannot retrieve data on the status of this query. Max retries hit with queryID=%s", sfstmt->sfqid); + break; + } + query_status = snowflake_get_query_status(sfstmt); + } + + // Get query results + char query[1024]; + char* query_template = "select * from table(result_scan('%s'))"; + sf_sprintf(query, strlen(query_template) - 2 + strlen(sfstmt->sfqid) + 1, query_template, sfstmt->sfqid); + SF_STATUS ret = snowflake_query(sfstmt, query, strlen(query)); + if (ret != SF_STATUS_SUCCESS) { + snowflake_propagate_error(sfstmt->connection, sfstmt); + return SF_BOOLEAN_FALSE; + } + + // Get query stats + char* metadata_str = get_query_metadata(sfstmt); + if (metadata_str) { + cJSON* metadata = snowflake_cJSON_Parse(metadata_str); + if (metadata && snowflake_cJSON_IsObject(metadata)) { + cJSON* stats = snowflake_cJSON_GetObjectItem(metadata, "stats"); + if (snowflake_cJSON_IsObject(stats)) { + if (sfstmt->stats) { + SF_FREE(sfstmt->stats); + } + sfstmt->stats = set_stats(stats); + } + log_error( + "Error parsing query stats from query id: %s", sfstmt->sfqid); + } + log_error( + "Error parsing query metadata from query id: %s", sfstmt->sfqid); + } + return SF_BOOLEAN_TRUE; +} + #define _SF_STMT_TYPE_DML 0x3000 #define _SF_STMT_TYPE_INSERT (_SF_STMT_TYPE_DML + 0x100) #define _SF_STMT_TYPE_UPDATE (_SF_STMT_TYPE_DML + 0x200) @@ -1576,6 +1760,23 @@ SF_STMT *STDCALL snowflake_stmt(SF_CONNECT *sf) { return sfstmt; } +SF_STMT *STDCALL snowflake_create_async_query_result(SF_CONNECT *sf, const char *query_id) { + if (!sf) { + return NULL; + } + + SF_STMT *sfstmt = (SF_STMT *)SF_CALLOC(1, sizeof(SF_STMT)); + if (sfstmt) { + _snowflake_stmt_reset(sfstmt); + sfstmt->connection = sf; + sf_strcpy(sfstmt->sfqid, SF_UUID4_LEN, query_id); + sfstmt->is_async = SF_BOOLEAN_TRUE; + sfstmt->is_async_initialized = SF_BOOLEAN_FALSE; + } + + return sfstmt; +} + /** * Initializes an SF_QUERY_RESPONSE_CAPTURE struct. * Note that these need to be released by calling snowflake_query_result_capture_term(). @@ -1751,6 +1952,15 @@ SF_STATUS STDCALL snowflake_fetch(SF_STMT *sfstmt) { return SF_STATUS_ERROR_STATEMENT_NOT_EXIST; } + if (sfstmt->is_async && !sfstmt->is_async_initialized) { + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return SF_STATUS_ERROR_GENERAL; + } + } + clear_snowflake_error(&sfstmt->error); SF_STATUS ret = SF_STATUS_ERROR_GENERAL; sf_bool get_chunk_success = SF_BOOLEAN_TRUE; @@ -1954,21 +2164,29 @@ snowflake_prepare(SF_STMT *sfstmt, const char *command, size_t command_size) { SF_STATUS STDCALL snowflake_describe_with_capture(SF_STMT *sfstmt, SF_QUERY_RESULT_CAPTURE *result_capture) { - return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_TRUE); + return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_TRUE, SF_BOOLEAN_FALSE); } SF_STATUS STDCALL snowflake_execute(SF_STMT *sfstmt) { - return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE); + return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE); +} + +SF_STATUS STDCALL snowflake_async_execute(SF_STMT *sfstmt) { + if (!sfstmt->is_async) { + sfstmt->is_async = SF_BOOLEAN_TRUE; + } + return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), NULL, SF_BOOLEAN_FALSE, SF_BOOLEAN_TRUE); } SF_STATUS STDCALL snowflake_execute_with_capture(SF_STMT *sfstmt, SF_QUERY_RESULT_CAPTURE *result_capture) { - return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_FALSE); + return _snowflake_execute_ex(sfstmt, _is_put_get_command(sfstmt->sql_text), result_capture, SF_BOOLEAN_FALSE, SF_BOOLEAN_FALSE); } SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt, sf_bool is_put_get_command, SF_QUERY_RESULT_CAPTURE* result_capture, - sf_bool is_describe_only) { + sf_bool is_describe_only, + sf_bool is_async_exec) { if (!sfstmt) { return SF_STATUS_ERROR_STATEMENT_NOT_EXIST; } @@ -2073,6 +2291,9 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt, body = create_query_json_body(sfstmt->sql_text, sfstmt->sequence_counter, is_string_empty(sfstmt->connection->directURL) ? NULL : sfstmt->request_id, is_describe_only); + + snowflake_cJSON_AddBoolToObject(body, "asyncExec", is_async_exec); + if (bindings != NULL) { /* binding parameters if exists */ snowflake_cJSON_AddItemToObject(body, "bindings", bindings); @@ -2241,124 +2462,127 @@ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt, // Determine query result format and detach rowset object from data. cJSON * qrf = snowflake_cJSON_GetObjectItem(data, "queryResultFormat"); - char * qrf_str = snowflake_cJSON_GetStringValue(qrf); - sfstmt->qrf = SF_CALLOC(1, sizeof(QueryResultFormat_t)); - cJSON * rowset = NULL; + if (qrf) { + char* qrf_str = snowflake_cJSON_GetStringValue(qrf); + sfstmt->qrf = SF_CALLOC(1, sizeof(QueryResultFormat_t)); + cJSON* rowset = NULL; - if (strcmp(qrf_str, "arrow") == 0 || strcmp(qrf_str, "arrow_force") == 0) { + if (strcmp(qrf_str, "arrow") == 0 || strcmp(qrf_str, "arrow_force") == 0) { #ifdef SF_WIN32 SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, SF_STATUS_ERROR_UNSUPPORTED_QUERY_RESULT_FORMAT, - "Query results were fetched using Arrow, " - "but the client library does not yet support decoding Arrow results", "", - sfstmt->sfqid); + "Query results were fetched using Arrow, " + "but the client library does not yet support decoding Arrow results", "", + sfstmt->sfqid); return SF_STATUS_ERROR_UNSUPPORTED_QUERY_RESULT_FORMAT; #endif - *((QueryResultFormat_t *) sfstmt->qrf) = ARROW_FORMAT; + * ((QueryResultFormat_t*)sfstmt->qrf) = ARROW_FORMAT; rowset = snowflake_cJSON_DetachItemFromObject(data, "rowsetBase64"); if (!rowset) { - log_error("No valid rowset found in response"); - SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, - SF_STATUS_ERROR_BAD_JSON, - "Missing rowset from response. No results found.", - SF_SQLSTATE_APP_REJECT_CONNECTION, - sfstmt->sfqid); - goto cleanup; + log_error("No valid rowset found in response"); + SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, + SF_STATUS_ERROR_BAD_JSON, + "Missing rowset from response. No results found.", + SF_SQLSTATE_APP_REJECT_CONNECTION, + sfstmt->sfqid); + goto cleanup; } - } - else if (strcmp(qrf_str, "json") == 0) { - *((QueryResultFormat_t *) sfstmt->qrf) = JSON_FORMAT; - if (json_detach_array_from_object((cJSON **)(&rowset), data, "rowset")) + } + else if (strcmp(qrf_str, "json") == 0) { + *((QueryResultFormat_t*)sfstmt->qrf) = JSON_FORMAT; + if (json_detach_array_from_object((cJSON**)(&rowset), data, "rowset")) { - log_error("No valid rowset found in response"); - SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, - SF_STATUS_ERROR_BAD_JSON, - "Missing rowset from response. No results found.", - SF_SQLSTATE_APP_REJECT_CONNECTION, - sfstmt->sfqid); - goto cleanup; + log_error("No valid rowset found in response"); + SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, + SF_STATUS_ERROR_BAD_JSON, + "Missing rowset from response. No results found.", + SF_SQLSTATE_APP_REJECT_CONNECTION, + sfstmt->sfqid); + goto cleanup; } - } - else { + } + else { log_error("Unsupported query result format: %s", qrf_str); - } + } - // Index starts at 0 and incremented each fetch - sfstmt->total_row_index = 0; + // Index starts at 0 and incremented each fetch + sfstmt->total_row_index = 0; - // When the result set is sufficient large, the server response will contain - // an empty "rowset" object. Instead, it will have a "chunks" object that contains, - // among other fields, a URL from which the result set can be downloaded in chunks. - // In this case, we initialize the chunk downloader, which will download in the - // background as calls to snowflake_fetch() are made. - if ((chunks = snowflake_cJSON_GetObjectItem(data, "chunks")) != NULL) { + // When the result set is sufficient large, the server response will contain + // an empty "rowset" object. Instead, it will have a "chunks" object that contains, + // among other fields, a URL from which the result set can be downloaded in chunks. + // In this case, we initialize the chunk downloader, which will download in the + // background as calls to snowflake_fetch() are made. + if ((chunks = snowflake_cJSON_GetObjectItem(data, "chunks")) != NULL) { // We don't care if there is no qrmk, so ignore return code json_copy_string(&qrmk, data, "qrmk"); chunk_headers = snowflake_cJSON_GetObjectItem(data, "chunkHeaders"); NON_JSON_RESP* (*callback_create_resp)(void) = NULL; - if (ARROW_FORMAT == *((QueryResultFormat_t *)sfstmt->qrf)) { - callback_create_resp = callback_create_arrow_resp; + if (ARROW_FORMAT == *((QueryResultFormat_t*)sfstmt->qrf)) { + callback_create_resp = callback_create_arrow_resp; } sfstmt->chunk_downloader = chunk_downloader_init( - qrmk, - chunk_headers, - chunks, - 2, // thread count - 4, // fetch slot - &sfstmt->error, - sfstmt->connection->insecure_mode, - sfstmt->connection->ocsp_fail_open, - callback_create_resp, - sfstmt->connection->proxy, - sfstmt->connection->no_proxy, - get_retry_timeout(sfstmt->connection), - sfstmt->connection->retry_count); + qrmk, + chunk_headers, + chunks, + 2, // thread count + 4, // fetch slot + &sfstmt->error, + sfstmt->connection->insecure_mode, + sfstmt->connection->ocsp_fail_open, + callback_create_resp, + sfstmt->connection->proxy, + sfstmt->connection->no_proxy, + get_retry_timeout(sfstmt->connection), + sfstmt->connection->retry_count); if (!sfstmt->chunk_downloader) { - // Unable to create chunk downloader. - // Error is set in chunk_downloader_init function. - goto cleanup; + // Unable to create chunk downloader. + // Error is set in chunk_downloader_init function. + goto cleanup; } // Even when the result set is split into chunks, JSON format will still // response with the first chunk in "rowset", so be sure to include it. sfstmt->result_set = rs_create_with_json_result( - rowset, - sfstmt->desc, - (QueryResultFormat_t *)sfstmt->qrf, - sfstmt->connection->timezone); + rowset, + sfstmt->desc, + (QueryResultFormat_t*)sfstmt->qrf, + sfstmt->connection->timezone); // Update chunk row count. Controls the chunk downloader. sfstmt->chunk_rowcount = rs_get_row_count_in_chunk( - sfstmt->result_set, - (QueryResultFormat_t *) sfstmt->qrf); + sfstmt->result_set, + (QueryResultFormat_t*)sfstmt->qrf); // Update total row count. Used in snowflake_num_rows(). if (json_copy_int(&sfstmt->total_rowcount, data, "total")) { - log_warn( - "No total count found in response. Reverting to using array size of results"); - sfstmt->total_rowcount = sfstmt->chunk_rowcount; + log_warn( + "No total count found in response. Reverting to using array size of results"); + sfstmt->total_rowcount = sfstmt->chunk_rowcount; } - } else { + } + else { // Create a result set object and update the total rowcount. sfstmt->result_set = rs_create_with_json_result( - rowset, - sfstmt->desc, - (QueryResultFormat_t *) sfstmt->qrf, - sfstmt->connection->timezone); + rowset, + sfstmt->desc, + (QueryResultFormat_t*)sfstmt->qrf, + sfstmt->connection->timezone); // Update chunk row count. Controls the chunk downloader. sfstmt->chunk_rowcount = rs_get_row_count_in_chunk( - sfstmt->result_set, - (QueryResultFormat_t *) sfstmt->qrf); + sfstmt->result_set, + (QueryResultFormat_t*)sfstmt->qrf); // Update total row count. Used in snowflake_num_rows(). if (json_copy_int(&sfstmt->total_rowcount, data, "total")) { - log_warn( - "No total count found in response. Reverting to using array size of results"); - sfstmt->total_rowcount = sfstmt->chunk_rowcount; + log_warn( + "No total count found in response. Reverting to using array size of results"); + sfstmt->total_rowcount = sfstmt->chunk_rowcount; } + } } } } else if (json_error != SF_JSON_ERROR_NONE) { @@ -2437,6 +2661,15 @@ int64 STDCALL snowflake_num_rows(SF_STMT *sfstmt) { return -1; } + if (sfstmt->is_async && !sfstmt->is_async_initialized) { + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return -1; + } + } + return sfstmt->total_rowcount; } @@ -2444,6 +2677,16 @@ int64 STDCALL snowflake_num_fields(SF_STMT *sfstmt) { if (!sfstmt) { return -1; } + + if (sfstmt->is_async && !sfstmt->is_async_initialized) { + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return -1; + } + } + return sfstmt->total_fieldcount; } @@ -2452,6 +2695,16 @@ uint64 STDCALL snowflake_num_params(SF_STMT *sfstmt) { // TODO change to -1? return 0; } + + if (sfstmt->is_async && !sfstmt->is_async_initialized) { + if (get_real_results(sfstmt)) { + sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; + } + else { + return 0; + } + } + ARRAY_LIST *p = (ARRAY_LIST *) sfstmt->params; return p->used; } diff --git a/lib/client_int.h b/lib/client_int.h index 5a93a9a72c..875bcccb3a 100644 --- a/lib/client_int.h +++ b/lib/client_int.h @@ -29,6 +29,7 @@ #define QUERY_URL "/queries/v1/query-request" #define RENEW_SESSION_URL "/session/token-request" #define DELETE_SESSION_URL "/session" +#define QUERY_MONITOR_URL "/monitoring/queries/%s" // not used for now but add for URL checking on connection requests #define AUTHENTICATOR_URL "/session/authenticator-request" @@ -148,15 +149,17 @@ SF_PUT_GET_RESPONSE *STDCALL sf_put_get_response_allocate(); * @param sfstmt SNOWFLAKE_STMT context. * @param sf_use_application_json_accept type true if this is a put/get command * @param raw_response_buffer optional pointer to an SF_QUERY_RESULT_CAPTURE, - * @param is_describe_only should the statement be executed in describe only mode * if the query response is to be captured. + * @param is_describe_only should the statement be executed in describe only mode + * @param is_async_exec should it execute asynchronously * * @return 0 if success, otherwise an errno is returned. */ SF_STATUS STDCALL _snowflake_execute_ex(SF_STMT *sfstmt, sf_bool use_application_json_accept_type, struct SF_QUERY_RESULT_CAPTURE* result_capture, - sf_bool is_describe_only); + sf_bool is_describe_only, + sf_bool is_async_exec); /** * @return true if this is a put/get command, otherwise false diff --git a/lib/connection.c b/lib/connection.c index 94661ac6d9..b5bf99ef89 100644 --- a/lib/connection.c +++ b/lib/connection.c @@ -212,7 +212,6 @@ cJSON *STDCALL create_query_json_body(const char *sql_text, int64 sequence_id, c #endif body = snowflake_cJSON_CreateObject(); snowflake_cJSON_AddStringToObject(body, "sqlText", sql_text); - snowflake_cJSON_AddBoolToObject(body, "asyncExec", SF_BOOLEAN_FALSE); snowflake_cJSON_AddNumberToObject(body, "sequenceId", (double) sequence_id); snowflake_cJSON_AddNumberToObject(body, "querySubmissionTime", submission_time); snowflake_cJSON_AddBoolToObject(body, "describeOnly", is_describe_only); @@ -414,8 +413,18 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf, break; } - while (strcmp(query_code, QUERY_IN_PROGRESS_CODE) == 0 || - strcmp(query_code, QUERY_IN_PROGRESS_ASYNC_CODE) == 0) { + sf_bool isAsyncExec = SF_BOOLEAN_FALSE; + cJSON *json_body = snowflake_cJSON_Parse(body); + if (json_body && snowflake_cJSON_IsObject(json_body)) { + cJSON* async = snowflake_cJSON_GetObjectItem(json_body, "asyncExec"); + if (async && snowflake_cJSON_IsBool(async)) { + isAsyncExec = snowflake_cJSON_IsTrue(async); + } + } + + if (!isAsyncExec) { + while (strcmp(query_code, QUERY_IN_PROGRESS_CODE) == 0 || + strcmp(query_code, QUERY_IN_PROGRESS_ASYNC_CODE) == 0) { // Remove old result URL and query code if this isn't our first rodeo SF_FREE(result_url); memset(query_code, 0, QUERYCODE_LEN); @@ -449,6 +458,7 @@ sf_bool STDCALL curl_post_call(SF_CONNECT *sf, SF_SQLSTATE_UNABLE_TO_CONNECT); break; } + } } if (stop) { diff --git a/lib/http_perform.c b/lib/http_perform.c index b7291e2034..6bf671e349 100644 --- a/lib/http_perform.c +++ b/lib/http_perform.c @@ -36,8 +36,6 @@ dump(const char *text, FILE *stream, unsigned char *ptr, size_t size, static int my_trace(CURL *handle, curl_infotype type, char *data, size_t size, void *userp); -static void my_sleep_ms(uint32 sleepMs); - static void dump(const char *text, FILE *stream, unsigned char *ptr, size_t size, @@ -128,16 +126,6 @@ int my_trace(CURL *handle, curl_infotype type, return 0; } -static -void my_sleep_ms(uint32 sleepMs) -{ -#ifdef _WIN32 - Sleep(sleepMs); -#else - usleep(sleepMs * 1000); // usleep takes sleep time in us (1 millionth of a second) -#endif -} - sf_bool STDCALL http_perform(CURL *curl, SF_REQUEST_TYPE request_type, char *url, @@ -371,7 +359,7 @@ sf_bool STDCALL http_perform(CURL *curl, if ((renew_injection) && (renew_timeout > 0) && elapsed_time && (*elapsed_time <= 0)) { - my_sleep_ms(renew_timeout * 1000); + sf_sleep_ms(renew_timeout * 1000); res = CURLE_OPERATION_TIMEDOUT; } @@ -387,7 +375,7 @@ sf_bool STDCALL http_perform(CURL *curl, "will retry after %d second", curl_retry_ctx.retry_count, next_sleep_in_secs); - my_sleep_ms(next_sleep_in_secs*1000); + sf_sleep_ms(next_sleep_in_secs*1000); } else if ((res == CURLE_OPERATION_TIMEDOUT) && (renew_timeout > 0)) { retry = SF_BOOLEAN_TRUE; } else { @@ -436,7 +424,7 @@ sf_bool STDCALL http_perform(CURL *curl, "will retry after %d seconds", http_code, curl_retry_ctx.retry_count, next_sleep_in_secs); - my_sleep_ms(next_sleep_in_secs * 1000); + sf_sleep_ms(next_sleep_in_secs * 1000); } else { char msg[1024]; diff --git a/lib/platform.c b/lib/platform.c index baf5a0ead3..de76c49975 100755 --- a/lib/platform.c +++ b/lib/platform.c @@ -236,6 +236,15 @@ struct tm* sfchrono_localtime(const time_t *timep, struct tm *tm) } #endif +void sf_sleep_ms(int sleep_ms) +{ +#ifdef _WIN32 + Sleep(sleep_ms); +#else + usleep(sleep_ms * 1000); // usleep takes sleep time in us (1 millionth of a second) +#endif +} + struct tm *STDCALL sf_gmtime(const time_t *timep, struct tm *result) { #ifdef _WIN32 return sfchrono_gmtime(timep, result); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7eab19d7ea..ef730325bb 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -42,6 +42,7 @@ SET(TESTS_C test_get_describe_only_query_result test_stmt_functions test_unit_oauth + test_async test_unit_mfa_auth test_ocsp_fail_open # FEATURE_INCREASED_MAX_LOB_SIZE_IN_MEMORY is internal switch diff --git a/tests/test_async.c b/tests/test_async.c new file mode 100644 index 0000000000..fd20be1d84 --- /dev/null +++ b/tests/test_async.c @@ -0,0 +1,283 @@ +/* + * Copyright (c) 2024 Snowflake Computing, Inc. All rights reserved. + */ +#include +#include "memory.h" +#include "utils/test_setup.h" + + /** + * Test normal query flow with async + */ +void test_select() { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select 1;", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* get results */ + int64 out = 0; + assert_int_equal(snowflake_num_rows(sfstmt), 1); + + while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_int64(sfstmt, 1, &out); + assert_int_equal(out, 1); + } + if (status != SF_STATUS_EOF) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); +} + +/** + * Test normal getting query status + */ +void test_query_status() { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select system$wait(5);", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); + assert_int_equal(query_status, SF_QUERY_STATUS_RUNNING); + + int retries = 0; + while (query_status != SF_QUERY_STATUS_SUCCESS || retries > 5) { + query_status = snowflake_get_query_status(sfstmt); + sf_sleep_ms(2000); + retries++; + } + + /* get results */ + char *out = NULL; + size_t value_len = 0; + size_t max_value_size = 0; + assert_int_equal(snowflake_num_rows(sfstmt), 1); + + while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_str(sfstmt, 1, &out, &value_len, &max_value_size); + assert_string_equal(out, "waited 5 seconds"); + } + if (status != SF_STATUS_EOF) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); + SF_FREE(out); +} + +/** + * Test premature fetch + */ +void test_premature_fetch() { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select system$wait(5);", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* get results */ + char* out = NULL; + size_t value_len = 0; + size_t max_value_size = 0; + assert_int_equal(snowflake_num_rows(sfstmt), 1); + + while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_str(sfstmt, 1, &out, &value_len, &max_value_size); + assert_string_equal(out, "waited 5 seconds"); + } + if (status != SF_STATUS_EOF) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); + SF_FREE(out); +} + +/** + * Test async with new connection + */ +void test_new_connection() { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select 1;", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + char* sfqid = (char*)SF_CALLOC(1, SF_UUID4_LEN); + sf_strcpy(sfqid, SF_UUID4_LEN, sfstmt->sfqid); + + snowflake_stmt_term(sfstmt); + snowflake_term(sf); + + /* new connection */ + sf = setup_snowflake_connection(); + status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + SF_STMT* async_sfstmt = snowflake_create_async_query_result(sf, sfqid); + SF_FREE(sfqid); + + /* get results */ + int64 out = 0; + assert_int_equal(snowflake_num_rows(async_sfstmt), 1); + + while ((status = snowflake_fetch(async_sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_int64(async_sfstmt, 1, &out); + assert_int_equal(out, 1); + } + if (status != SF_STATUS_EOF) { + dump_error(&(async_sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); +} + +/** + * Test async query with fake table + */ +void test_fake_table() { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select * from my_fake_table;", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* get results */ + status = snowflake_fetch(sfstmt); + assert_int_equal(status, SF_STATUS_ERROR_GENERAL); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); +} + +/** + * Test async query with invalid query id + */ +void test_invalid_query_id() { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + char* fake_sfqid = "fake-query-id"; + SF_STMT* async_sfstmt = snowflake_create_async_query_result(sf, fake_sfqid); + + assert_non_null(async_sfstmt); + assert_non_null(async_sfstmt->connection); + assert_string_equal(async_sfstmt->sfqid, fake_sfqid); + assert_null(async_sfstmt->result_set); +} + +void test_multiple_chunk() { + SF_CONNECT* sf = setup_snowflake_connection(); + SF_STATUS status = snowflake_connect(sf); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sf->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* query */ + SF_STMT* sfstmt = snowflake_stmt(sf); + status = snowflake_prepare(sfstmt, "select randstr(100,random()) from table(generator(rowcount=>2000))", 0); + assert_int_equal(status, SF_STATUS_SUCCESS); + status = snowflake_async_execute(sfstmt); + if (status != SF_STATUS_SUCCESS) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_SUCCESS); + + /* get results */ + char* value = NULL; + size_t value_len = 0; + size_t max_value_size = 0; + assert_int_equal(snowflake_num_rows(sfstmt), 2000); + + while ((status = snowflake_fetch(sfstmt)) == SF_STATUS_SUCCESS) { + snowflake_column_as_str(sfstmt, 1, &value, &value_len, &max_value_size); + assert_int_equal(strlen(value), 100); + } + SF_FREE(value); + if (status != SF_STATUS_EOF) { + dump_error(&(sfstmt->error)); + } + assert_int_equal(status, SF_STATUS_EOF); + snowflake_stmt_term(sfstmt); + snowflake_term(sf); +} + +int main(void) { + initialize_test(SF_BOOLEAN_FALSE); + const struct CMUnitTest tests[] = { + cmocka_unit_test(test_select), + cmocka_unit_test(test_query_status), + cmocka_unit_test(test_premature_fetch), + cmocka_unit_test(test_new_connection), + cmocka_unit_test(test_fake_table), + cmocka_unit_test(test_invalid_query_id), + cmocka_unit_test(test_multiple_chunk), + }; + int ret = cmocka_run_group_tests(tests, NULL, NULL); + snowflake_global_term(); + return ret; +}