Skip to content

Commit

Permalink
Fixes error handling for error responses from STS (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
garrett528 authored Oct 20, 2021
1 parent 91fd461 commit ceb3ee0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
27 changes: 13 additions & 14 deletions src/rdkafka_aws.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static size_t rd_kafka_aws_curl_write_callback(char *ptr, size_t size, size_t nm
size_t realsize = size * nmemb;
curl_in_mem_buf *req = (curl_in_mem_buf *) userdata;

printf("receive chunk of %zu bytes\n", realsize);
printf("received chunk of %zu bytes\n", realsize);

while (req->buflen < req->len + realsize + 1)
{
Expand Down Expand Up @@ -401,8 +401,6 @@ int rd_kafka_aws_send_request (rd_kafka_aws_credential_t *credential,
const char *signed_headers,
const char *request_parameters,
const EVP_MD *md) {
int r = 1;

char *canonical_request = rd_kafka_aws_build_canonical_request(
host,
method,
Expand Down Expand Up @@ -515,51 +513,52 @@ int rd_kafka_aws_send_request (rd_kafka_aws_credential_t *credential,

res = curl_easy_perform(curl);
if (res != CURLE_OK) {
/* add errstr handling */
fprintf(stderr, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
return -1;
}

xmlDoc *document;
xmlNode *cur;
document = xmlReadMemory(req.buffer, req.len, "assume_role_response.xml", NULL, 0);
document = xmlReadMemory((char *)req.buffer, req.len, "assume_role_response.xml", NULL, 0);
if (document == NULL) {
/* add errstr handling */
fprintf(stderr, "Failed to parse document\n");
// return -1;
return -1;
}
cur = xmlDocGetRootElement(document);
if (xmlStrcmp(cur->name, (const xmlChar *)"ErrorResponse") == 0) {
fprintf(stderr, "Error occurred in AssumeRole call: %s\n", req.buffer);
return -1;
}
cur = cur->children;
while (cur != NULL) {
if ((!xmlStrcmp(cur->name, (const xmlChar *)"AssumeRoleResult"))) {
if (!xmlStrcmp(cur->name, (const xmlChar *)"AssumeRoleResult")) {
break;
}
cur = cur->next;
}

cur = cur->children;
while (cur != NULL) {
if ((!xmlStrcmp(cur->name, (const xmlChar *)"Credentials"))) {
if (!xmlStrcmp(cur->name, (const xmlChar *)"Credentials")) {
break;
}
cur = cur->next;
}

cur = cur->children;
while (cur != NULL) {
if ((!xmlStrcmp(cur->name, (const xmlChar *)"AccessKeyId"))) {
if (!xmlStrcmp(cur->name, (const xmlChar *)"AccessKeyId")) {
xmlChar *content = xmlNodeListGetString(document, cur->children, 1);
credential->aws_access_key_id = rd_strdup((const char *)content);
xmlFree(content);
}

if ((!xmlStrcmp(cur->name, (const xmlChar *)"SecretAccessKey"))) {
if (!xmlStrcmp(cur->name, (const xmlChar *)"SecretAccessKey")) {
xmlChar *content = xmlNodeListGetString(document, cur->children, 1);
credential->aws_secret_access_key = rd_strdup((const char *)content);
xmlFree(content);
}

if ((!xmlStrcmp(cur->name, (const xmlChar *)"SessionToken"))) {
if (!xmlStrcmp(cur->name, (const xmlChar *)"SessionToken")) {
xmlChar *content = xmlNodeListGetString(document, cur->children, 1);
credential->aws_security_token = rd_strdup((const char *)content);
xmlFree(content);
Expand All @@ -581,7 +580,7 @@ int rd_kafka_aws_send_request (rd_kafka_aws_credential_t *credential,
}
curl_easy_cleanup(curl);

return r;
return 1;
}

/**
Expand Down
17 changes: 10 additions & 7 deletions src/rdkafka_sasl_aws_msk_iam.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ rd_kafka_aws_msk_iam_credential_refresh0 (
str_builder_t *sb;
sb = str_builder_create();

int r = 1;
char *handle_aws_access_key_id;
char *handle_aws_secret_access_key;
char *handle_aws_region;
Expand Down Expand Up @@ -349,7 +348,8 @@ rd_kafka_aws_msk_iam_credential_refresh0 (

credential->aws_region = rd_strdup(handle_aws_region);
credential->md_lifetime_ms = now_wallclock_ms + conf->sasl.duration_sec * 1000;
rd_kafka_aws_send_request(credential,
rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "Sending refresh request to STS");
if (rd_kafka_aws_send_request(credential,
ymd,
hms,
host,
Expand All @@ -363,12 +363,15 @@ rd_kafka_aws_msk_iam_credential_refresh0 (
canonical_headers,
signed_headers,
request_parameters,
md);

if (r == -1) {
md) == -1) {
rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "AWS credential retrieval and parsing failed");
rd_kafka_sasl_aws_msk_iam_credential_free(credential);

return -1;
}

rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "New AWS credentials retrieved from STS");

RD_IF_FREE(handle_aws_access_key_id, rd_free);
RD_IF_FREE(handle_aws_secret_access_key, rd_free);
RD_IF_FREE(handle_aws_region, rd_free);
Expand All @@ -380,7 +383,7 @@ rd_kafka_aws_msk_iam_credential_refresh0 (
RD_IF_FREE(canonical_headers, rd_free);
RD_IF_FREE(request_parameters, rd_free);

return r;
return 1;
}

/**
Expand All @@ -396,7 +399,7 @@ rd_kafka_aws_msk_iam_credential_refresh (rd_kafka_t *rk, void *opaque) {
char errstr[512];
rd_kafka_aws_credential_t credential = RD_ZERO_INIT;

rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "Checking to refreshing AWS credentials");
rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "Checking whether to refresh AWS credentials");

if (rk->rk_conf.sasl.enable_use_sts) {
rd_kafka_dbg(rk, SECURITY, "SASLAWSMSKIAM", "Use STS enabled, will refresh credentials");
Expand Down

0 comments on commit ceb3ee0

Please sign in to comment.