Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Statement Execution API #185

Merged
merged 2 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Microsoft.Azure.Databricks.Client.Models;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.Databricks.Client.Sample;

internal static partial class SampleProgram
{
private static async Task TestStatementExecutionApi(DatabricksClient client)
{
Console.WriteLine($"Creating new warehouse");

var w = new WarehouseAttributes
{
Name = "Testing Warehouse",
WarehouseType = WarehouseType.PRO,
EnableServerlessCompute = true,
Channel = new Channel { Name = ChannelName.CHANNEL_NAME_UNSPECIFIED },
SpotInstancePolicy = SpotInstancePolicy.POLICY_UNSPECIFIED,
ClusterSize = "2X-Small",
AutoStopMins = 20,
MaxNumClusters = 2,
};

var id = await client.SQL.Warehouse.Create(w);

Console.WriteLine($"Starting warehouse id {id}");
await client.SQL.Warehouse.Start(id);
Thread.Sleep(10 * 1000);

Console.WriteLine($"Querying warehouse id {id}");
var s = SqlStatement.Create("select * from main.information_schema.catalogs", id);

var result = await client.SQL.StatementExecution.Execute(s);
Console.WriteLine(result.Status.State);
Console.WriteLine($"Row count: {result.Manifest.TotalRowCount}");

Console.WriteLine($"Stopping warehouse id {id}");
await client.SQL.Warehouse.Stop(id);

Console.WriteLine($"Deleting warehouse id {id}");
await client.SQL.Warehouse.Delete(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public static async Task Main(string[] args)
//await TestWarehouseApi(client);
//await TestReposApi(client);
//await TestPipelineApi(client);
await TestUnityCatalogApi(client);
//await TestUnityCatalogApi(client);
await TestStatementExecutionApi(client);
}

Console.WriteLine("Press enter to exit");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Microsoft.Azure.Databricks.Client.Models;
using Moq;
using Moq.Contrib.HttpClient;
using System.Net;
using System.Text.Json;
using System.Text.Json.Nodes;

namespace Microsoft.Azure.Databricks.Client.Test;

[TestClass]
public class StatementExecutionApiClientTest : ApiClientTest
{
private static readonly Uri StatementExecutionApiUri = new(BaseApiUri, "2.0/sql/statements");

[TestMethod]
public async Task TestExecute()
{
const string expectedRequest = "{\"statement\":\"string\",\"warehouse_id\":\"string\",\"parameters\":[]}";
const string expectedResponse = @"
{
""statement_id"": ""string"",
""status"": {
""state"": ""SUCCEEDED""
},
""manifest"": {
""format"": ""JSON_ARRAY"",
""schema"": {
""column_count"": 1,
""columns"": [
{
""name"": ""string"",
""position"": 0,
""type_name"": ""string"",
""type_text"": ""string""
}
]
}
},
""result"": {
""chunk_index"": 0,
""row_offset"": 0,
""row_count"": 1,
""data_array"": [
[ ""0"" ]
]
}
}
";

var expected = JsonSerializer.Deserialize<StatementExecution>(expectedResponse, Options);

var handler = CreateMockHandler();
handler
.SetupRequest(HttpMethod.Post, StatementExecutionApiUri)
.ReturnsResponse(HttpStatusCode.OK, JsonSerializer.Serialize(expected, Options), "application/json")
.Verifiable();

var hc = handler.CreateClient();
hc.BaseAddress = BaseApiUri;

using var client = new StatementExecutionApiClient(hc);
var statement = JsonNode.Parse(expectedRequest).Deserialize<SqlStatement>(Options);

var actual = await client.Execute(statement);
Assert.IsTrue(expected.Equals(actual));

Check warning on line 68 in csharp/Microsoft.Azure.Databricks.Client.Test/StatementExecutionApiClientTest.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

Check warning on line 68 in csharp/Microsoft.Azure.Databricks.Client.Test/StatementExecutionApiClientTest.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

handler.VerifyRequest(
HttpMethod.Post,
StatementExecutionApiUri,
GetMatcher(expectedRequest),
Times.Once()
);
}

[TestMethod]
public async Task TestGet()
{
string testId = "1234-567890-cited123";
string apiUri = $"{StatementExecutionApiUri}/{testId}";

const string expectedResponse = @"
{
""statement_id"": ""string"",
""status"": {
""state"": ""SUCCEEDED""
},
""manifest"": {
""format"": ""JSON_ARRAY"",
""schema"": {
""column_count"": 1,
""columns"": [
{
""name"": ""string"",
""position"": 0,
""type_name"": ""string"",
""type_text"": ""string""
}
]
}
},
""result"": {
""chunk_index"": 0,
""row_offset"": 0,
""row_count"": 1,
""data_array"": [
[ ""0"" ]
]
}
}
";

var expected = JsonSerializer.Deserialize<StatementExecution>(expectedResponse, Options);

var handler = CreateMockHandler();
handler
.SetupRequest(HttpMethod.Get, apiUri)
.ReturnsResponse(HttpStatusCode.OK, expectedResponse, "application/json");

var hc = handler.CreateClient();
hc.BaseAddress = BaseApiUri;

using var client = new StatementExecutionApiClient(hc);
var actual = await client.Get(testId);
Assert.IsTrue(expected.Equals(actual));

Check warning on line 127 in csharp/Microsoft.Azure.Databricks.Client.Test/StatementExecutionApiClientTest.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

Check warning on line 127 in csharp/Microsoft.Azure.Databricks.Client.Test/StatementExecutionApiClientTest.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
}

[TestMethod]
public async Task TestCancel()
{
string testId = "1234-567890-cited123";
string apiUri = $"{StatementExecutionApiUri}/{testId}/cancel";
var handler = CreateMockHandler();
handler
.SetupRequest(HttpMethod.Post, apiUri)
.ReturnsResponse(HttpStatusCode.OK)
.Verifiable();

var hc = handler.CreateClient();
hc.BaseAddress = BaseApiUri;

using var client = new StatementExecutionApiClient(hc);
await client.Cancel(testId);
handler.VerifyRequest(
HttpMethod.Post,
apiUri,
Times.Once()
);
}

[TestMethod]
public async Task TestGetResultChunk()
{
string testId = "1234-567890-cited123";
int chunkIndex = 0;
string apiUri = $"{StatementExecutionApiUri}/{testId}/result/chunks/{chunkIndex}";

const string expectedResponse = @"
{
""chunk_index"": 0,
""row_offset"": 0,
""row_count"": 1,
""data_array"": [
[ ""0"" ]
]
}
";

var expected = JsonSerializer.Deserialize<StatementExecutionResultChunk>(expectedResponse, Options);

var handler = CreateMockHandler();
handler
.SetupRequest(HttpMethod.Get, apiUri)
.ReturnsResponse(HttpStatusCode.OK, expectedResponse, "application/json");

var hc = handler.CreateClient();
hc.BaseAddress = BaseApiUri;

using var client = new StatementExecutionApiClient(hc);
var actual = await client.GetResultChunk(testId, chunkIndex);
Assert.IsTrue(expected.Equals(actual));

Check warning on line 183 in csharp/Microsoft.Azure.Databricks.Client.Test/StatementExecutionApiClientTest.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

Check warning on line 183 in csharp/Microsoft.Azure.Databricks.Client.Test/StatementExecutionApiClientTest.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
}
}
31 changes: 31 additions & 0 deletions csharp/Microsoft.Azure.Databricks.Client/ISQLApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Databricks.Client
{
public interface ISQLApi : IDisposable
{
IStatementExecutionApi StatementExecution { get; }
IWarehouseApi Warehouse { get; }
}

Expand Down Expand Up @@ -58,4 +59,34 @@ public interface IWarehouseApi : IDisposable
/// <param name="id">Required. Id of the SQL warehouse.</param>
Task Stop(string id, CancellationToken cancellationToken = default);
}

public interface IStatementExecutionApi : IDisposable
{
/// <summary>
/// Execute a SQL statement.
/// </summary>
Task<StatementExecution> Execute(SqlStatement statement, CancellationToken cancellationToken = default);

/// <summary>
/// Cancel statement execution.
/// </summary>
/// <param name="id">Requried. Id of statement execution.</param>
Task Cancel(string id, CancellationToken cancellationToken = default);

/// <summary>
/// Get status, manifest, and result first chunk.
/// </summary>
/// <param name="id">Requried. Id of statement execution.</param>
Task<StatementExecution> Get(string id, CancellationToken cancellationToken = default);

/// <summary>
/// Get result chunk by index.
/// </summary>
/// <remarks>
/// After the statement execution has SUCCEEDED, this request can be used to fetch any chunk by index. Whereas the first chunk with chunk_index=0 is typically fetched with statementexecution/executestatement or statementexecution/getstatement, this request can be used to fetch subsequent chunks. The response structure is identical to the nested result element described in the statementexecution/getstatement request, and similarly includes the next_chunk_index and next_chunk_internal_link fields for simple iteration through the result set.
/// </remarks>
/// <param name="id">Requried. Id of statement execution.</param>
/// <param name="chunkIndex">Required. The index of the chunk.</param>
Task<StatementExecutionResultChunk> GetResultChunk(string id, int chunkIndex, CancellationToken cancellationToken = default);
}
}
Loading