-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-692968: Async queries support #787
base: master
Are you sure you want to change the base?
Changes from 1 commit
763153f
974613b
a2f2701
3829b60
d947159
8ab3758
43c5653
76ac1f1
c4fa76d
387a59f
cb067ec
6cd5c78
582913e
98cb876
d3b94a0
f84fc25
97531c8
f32ada0
942ae0a
ed33af6
75e6565
9e457ae
2aaea2a
9003f03
eb4ae08
a793549
45fa5a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,26 +98,28 @@ SF_QUERY_STATUS get_status_from_string(const char *query_status) { | |
|
||
/** | ||
* Get the metadata of the query | ||
* @param sf the SF_CONNECT context | ||
* @param query_id the query id | ||
* | ||
* @param sfstmt The SF_STMT context. | ||
* | ||
* The query metadata | ||
*/ | ||
char *get_query_metadata(SF_CONNECT *sf, const char *query_id) { | ||
char *get_query_metadata(SF_STMT* sfstmt) { | ||
cJSON *resp = NULL; | ||
cJSON *data = NULL; | ||
cJSON *queries = NULL; | ||
char *s_resp = NULL; | ||
const char *error_msg; | ||
size_t url_size = strlen(QUERY_MONITOR_URL) -2 + strlen(query_id) + 1; | ||
size_t url_size = strlen(QUERY_MONITOR_URL) -2 + strlen(sfstmt->sfqid) + 1; | ||
sfc-gh-dprzybysz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
char *status_query = (char*)SF_CALLOC(1, url_size); | ||
sf_sprintf(status_query, url_size, QUERY_MONITOR_URL, query_id); | ||
sf_sprintf(status_query, url_size, QUERY_MONITOR_URL, sfstmt->sfqid); | ||
|
||
if (request(sf, &resp, status_query, NULL, 0, NULL, NULL, | ||
GET_REQUEST_TYPE, &sf->error, SF_BOOLEAN_TRUE, | ||
0, sf->retry_count, get_retry_timeout(sf), | ||
if (request(sfstmt->connection, &resp, status_query, NULL, 0, NULL, NULL, | ||
GET_REQUEST_TYPE, &sfstmt->error, SF_BOOLEAN_TRUE, | ||
0, sfstmt->connection->retry_count, get_retry_timeout(sfstmt->connection), | ||
NULL, NULL, NULL, SF_BOOLEAN_FALSE)) { | ||
|
||
s_resp = snowflake_cJSON_Print(resp); | ||
log_info("Here is JSON response:\n%s", s_resp); | ||
log_trace("Here is JSON response:\n%s", s_resp); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please be more descriptive in logs, maybe: log_trace("GET %s returned response: %s", status_query, s_resp) |
||
|
||
data = snowflake_cJSON_GetObjectItem(resp, "data"); | ||
|
||
|
@@ -131,29 +133,38 @@ char *get_query_metadata(SF_CONNECT *sf, const char *query_id) { | |
return metadata; | ||
} | ||
SF_FREE(status_query); | ||
log_trace("Error getting query metadata."); | ||
log_error("Error getting query metadata. Query id: %s", sfstmt->sfqid); | ||
return NULL; | ||
} | ||
|
||
/** | ||
* Get the status of the query | ||
* @param sf the SF_CONNECT context | ||
* @param query_id the query id | ||
*/ | ||
SF_QUERY_STATUS get_query_status(SF_CONNECT *sf, const char *query_id) { | ||
|
||
SF_QUERY_STATUS snowflake_get_query_status(SF_STMT *sfstmt) { | ||
SF_QUERY_STATUS ret = SF_QUERY_STATUS_NO_DATA; | ||
char *metadata = get_query_metadata(sf, query_id); | ||
char *metadata = get_query_metadata(sfstmt); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do not free the metadata |
||
if (metadata) { | ||
cJSON* metadataJson = snowflake_cJSON_Parse(metadata); | ||
|
||
cJSON* status = snowflake_cJSON_GetObjectItem(metadataJson, "status"); | ||
if (snowflake_cJSON_IsString(status)) | ||
{ | ||
if (snowflake_cJSON_IsString(status)) { | ||
char* queryStatus = snowflake_cJSON_GetStringValue(status); | ||
ret = get_status_from_string(queryStatus); | ||
} | ||
else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After seeing the logic in |
||
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, | ||
SF_STATUS_ERROR_GENERAL, | ||
"Error retrieving the status from the metadata.", | ||
NULL, | ||
sfstmt->sfqid); | ||
} | ||
snowflake_cJSON_Delete(metadataJson); | ||
} | ||
else { | ||
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, | ||
SF_STATUS_ERROR_GENERAL, | ||
"Error retrieving query metadata.", | ||
NULL, | ||
sfstmt->sfqid); | ||
} | ||
|
||
return ret; | ||
} | ||
|
@@ -174,23 +185,26 @@ sf_bool is_query_still_running(SF_QUERY_STATUS query_status) { | |
* Get the results of the async query | ||
* @param sfstmt The SF_STMT context | ||
*/ | ||
void get_real_results(SF_STMT * sfstmt) { | ||
SF_QUERY_STATUS query_status = get_query_status(sfstmt->connection, sfstmt->sfqid); | ||
void get_real_results(SF_STMT *sfstmt) { | ||
//Get status until query is complete or timed out | ||
SF_QUERY_STATUS query_status = snowflake_get_query_status(sfstmt); | ||
int retry = 0; | ||
int no_data_retry = 0; | ||
int no_data_max_retries = 30; | ||
int retry_pattern[] = {1, 1, 2, 3, 4, 8, 10}; | ||
int max_retries = 7; | ||
while (query_status != SF_QUERY_STATUS_SUCCESS) { | ||
if (!is_query_still_running(query_status) && query_status != SF_QUERY_STATUS_SUCCESS) { | ||
log_error("Query status is done running and did not succeed. Status is %s", query_status_names[query_status]); | ||
log_error("Query status is done running and did not succeed. Status is %s", | ||
query_status_names[query_status]); | ||
return; | ||
} | ||
if (query_status == SF_QUERY_STATUS_NO_DATA) { | ||
no_data_retry++; | ||
if (no_data_retry >= no_data_max_retries) { | ||
log_error( | ||
"Cannot retrieve data on the status of this query. No information returned from server for queryID=%s", sfstmt->sfqid); | ||
"Cannot retrieve data on the status of this query. No information returned from server for queryID=%s", | ||
sfstmt->sfqid); | ||
SET_SNOWFLAKE_STMT_ERROR(&sfstmt->error, | ||
SF_STATUS_ERROR_GENERAL, | ||
"Cannot retrieve data on the status of this query.", | ||
|
@@ -199,28 +213,44 @@ void get_real_results(SF_STMT * sfstmt) { | |
return; | ||
} | ||
} | ||
} | ||
int sleep_time = retry_pattern[retry] * 500; | ||
|
||
int sleep_time = retry_pattern[retry] * 500; | ||
#ifdef _WIN32 | ||
Sleep(sleep_time); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. define this as a common function in platform.c? I think same logic being used in http_perform.c as well. |
||
Sleep(sleep_time); | ||
#else | ||
usleep(sleep_time * 1000); | ||
usleep(sleep_time * 1000); | ||
#endif | ||
if (retry < max_retries) { | ||
retry++; | ||
} else { | ||
log_error( | ||
"Cannot retrieve data on the status of this query. Max retries hit with queryID=%s", sfstmt->sfqid); | ||
if (retry < max_retries) { | ||
retry++; | ||
} | ||
else { | ||
log_error( | ||
"Cannot retrieve data on the status of this query. Max retries hit with queryID=%s", sfstmt->sfqid); | ||
} | ||
query_status = snowflake_get_query_status(sfstmt); | ||
} | ||
query_status = get_query_status(sfstmt->connection, sfstmt->sfqid); | ||
|
||
// Get query results | ||
char query[1024]; | ||
char* query_template = "select * from table(result_scan('%s'))"; | ||
sf_sprintf(query, strlen(query_template) - 2 + strlen(sfstmt->sfqid) + 1, query_template, sfstmt->sfqid); | ||
SF_STATUS ret = snowflake_query(sfstmt, query, strlen(query)); | ||
if (ret != SF_STATUS_SUCCESS) { | ||
snowflake_propagate_error(sfstmt->connection, sfstmt); | ||
} | ||
|
||
// Get query stats | ||
char* metadata_str = get_query_metadata(sfstmt); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do not free metadata_str? |
||
if (metadata_str) { | ||
cJSON* metadata = snowflake_cJSON_Parse(metadata_str); | ||
sfc-gh-jszczerbinski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cJSON* stats = snowflake_cJSON_GetObjectItem(metadata, "stats"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We assume it's an object? |
||
if (snowflake_cJSON_IsObject(stats)) { | ||
if (sfstmt->stats) { | ||
SF_FREE(sfstmt->stats); | ||
} | ||
sfstmt->stats = set_stats(stats); | ||
} | ||
} | ||
} | ||
|
||
#define _SF_STMT_TYPE_DML 0x3000 | ||
|
@@ -1739,7 +1769,7 @@ SF_STMT *STDCALL snowflake_stmt(SF_CONNECT *sf) { | |
return sfstmt; | ||
} | ||
|
||
SF_STMT *STDCALL snowflake_async_stmt(SF_CONNECT *sf, const char *query_id) { | ||
SF_STMT *STDCALL snowflake_create_async_query_result(SF_CONNECT *sf, const char *query_id) { | ||
if (!sf) { | ||
return NULL; | ||
} | ||
|
@@ -1749,18 +1779,8 @@ SF_STMT *STDCALL snowflake_async_stmt(SF_CONNECT *sf, const char *query_id) { | |
_snowflake_stmt_reset(sfstmt); | ||
sfstmt->connection = sf; | ||
sf_strcpy(sfstmt->sfqid, SF_UUID4_LEN, query_id); | ||
} | ||
|
||
get_real_results(sfstmt); | ||
|
||
char *metadata_str = get_query_metadata(sfstmt->connection, query_id); | ||
if (metadata_str) { | ||
cJSON* metadata = snowflake_cJSON_Parse(metadata_str); | ||
cJSON* stats = snowflake_cJSON_GetObjectItem(metadata, "stats"); | ||
if (snowflake_cJSON_IsObject(stats)) { | ||
_snowflake_stmt_row_metadata_reset(sfstmt); | ||
sfstmt->stats = set_stats(stats); | ||
} | ||
sfstmt->is_async = SF_BOOLEAN_TRUE; | ||
sfstmt->is_async_initialized = SF_BOOLEAN_FALSE; | ||
} | ||
|
||
return sfstmt; | ||
|
@@ -1941,6 +1961,11 @@ SF_STATUS STDCALL snowflake_fetch(SF_STMT *sfstmt) { | |
return SF_STATUS_ERROR_STATEMENT_NOT_EXIST; | ||
} | ||
|
||
if (sfstmt->is_async && !sfstmt->is_async_initialized) { | ||
get_real_results(sfstmt); | ||
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; | ||
} | ||
|
||
clear_snowflake_error(&sfstmt->error); | ||
SF_STATUS ret = SF_STATUS_ERROR_GENERAL; | ||
sf_bool get_chunk_success = SF_BOOLEAN_TRUE; | ||
|
@@ -2634,13 +2659,24 @@ int64 STDCALL snowflake_num_rows(SF_STMT *sfstmt) { | |
return -1; | ||
} | ||
|
||
if (sfstmt->is_async && !sfstmt->is_async_initialized) { | ||
get_real_results(sfstmt); | ||
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; | ||
} | ||
|
||
return sfstmt->total_rowcount; | ||
} | ||
|
||
int64 STDCALL snowflake_num_fields(SF_STMT *sfstmt) { | ||
if (!sfstmt) { | ||
return -1; | ||
} | ||
|
||
if (sfstmt->is_async && !sfstmt->is_async_initialized) { | ||
get_real_results(sfstmt); | ||
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; | ||
} | ||
|
||
return sfstmt->total_fieldcount; | ||
} | ||
|
||
|
@@ -2649,6 +2685,12 @@ uint64 STDCALL snowflake_num_params(SF_STMT *sfstmt) { | |
// TODO change to -1? | ||
return 0; | ||
} | ||
|
||
if (sfstmt->is_async && !sfstmt->is_async_initialized) { | ||
get_real_results(sfstmt); | ||
sfstmt->is_async_initialized = SF_BOOLEAN_TRUE; | ||
} | ||
|
||
ARRAY_LIST *p = (ARRAY_LIST *) sfstmt->params; | ||
return p->used; | ||
} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add test cases to cover typical use case, checking query status before calling any fetch/metadata API. Take JDBC doc as example, https://docs.snowflake.com/en/developer-guide/jdbc/jdbc-using#examples-of-asynchronous-queries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this function should return a striuct with parsed metadata instead of char array containing JSON. Consider moving parsing and validation into this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Especially since stringify it in this function and parse it again when it's used