Skip to content

Commit

Permalink
Experimental object creation for Databricks statements (#273)
Browse files Browse the repository at this point in the history
* Convert timestamp

* Bump version

* Update docs

* Add support for object creation

* Update documentation

* Add release notes and bump package version

* Fix license header

* Fix documentation and add remark

* Adjust naming and document method

* Adjust naming

* Add requirements for the record/class

* Throw exception if constructor mismatch

* Enhance with exceptions if there is a mismatch

* Format documentation for readability

* Reduce allocation

* Review suggestion

Co-authored-by: Dan Stenrøjl <[email protected]>

---------

Co-authored-by: Henrik Sommer <[email protected]>
Co-authored-by: Dan Stenrøjl <[email protected]>
  • Loading branch information
3 people authored Nov 16, 2023
1 parent 769f9bc commit e5d0229
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 2 deletions.
29 changes: 29 additions & 0 deletions source/Databricks/documents/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,35 @@ var rowCount = await result.CountAsync();
rowCount.Should().Be(2);
```

#### Experimental - object creation

It's possible to create objects from the result of a query. This is done by annotation the properties of a record class with ArrowField attributes.

The creation is very limited. It only applies to records that are constructed from constructor parameters. Using the constructor order property of the ArrowField attribute to map the constructor parameters to the columns of the query.

Example usage:

```c#
public record Person(
[property: ArrowField("name", 1)] string Name,
[property: ArrowField("age", 2)] int Age);

// Create person objects
var statement = DatabricksStatement.FromRawSql(@"SELECT * FROM VALUES
('Zen Hui', 25),
('Anil B' , 18),
('Shone S', 16),
('Mike A' , 25),
('John A' , 18),
('Jack N' , 16) AS data(name, age)")
.Build();


var result = client.ExecuteStatementAsync<Person>(statement);
await foreach (var person in result)
Console.WriteLine(person);
```

### Health checks

The package contains functionality to do health checks of the status of the Databricks Sql Statement Execution API.
Expand Down
4 changes: 4 additions & 0 deletions source/Databricks/documents/release-notes/release-notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Databricks Release Notes

## Version 8.1.0

- Create objects from statements as an alternative to dynamic

## Version 8.0.2

- Fix bug in databricks timestamp handling
Expand Down
2 changes: 1 addition & 1 deletion source/Databricks/source/Jobs/Jobs.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ limitations under the License.

<PropertyGroup>
<PackageId>Energinet.DataHub.Core.Databricks.Jobs</PackageId>
<PackageVersion>8.0.2$(VersionSuffix)</PackageVersion>
<PackageVersion>8.1.0$(VersionSuffix)</PackageVersion>
<Title>Databricks Jobs</Title>
<Company>Energinet-DataHub</Company>
<Authors>Energinet-DataHub</Authors>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Energinet.DataHub.Core.Databricks.SqlStatementExecution.Formats;
using Energinet.DataHub.Core.Databricks.SqlStatementExecution.IntegrationTests.Fixtures;
using FluentAssertions;

namespace Energinet.DataHub.Core.Databricks.SqlStatementExecution.IntegrationTests.Client;

public class ObjectCreationTests : IClassFixture<DatabricksSqlWarehouseFixture>
{
private readonly DatabricksSqlWarehouseFixture _sqlWarehouseFixture;

private static DatabricksStatement PersonsStatement => DatabricksStatement.FromRawSql(@"SELECT * FROM VALUES
('Zen Hui', 25),
('Anil B' , 18),
('Shone S', 16),
('Mike A' , 25),
('John A' , 18),
('Jack N' , 16) AS data(name, age)")
.Build();

public ObjectCreationTests(DatabricksSqlWarehouseFixture sqlWarehouseFixture)
{
_sqlWarehouseFixture = sqlWarehouseFixture;
}

[Fact]
public async Task CanMapToRecord()
{
// Arrange
var client = _sqlWarehouseFixture.CreateSqlStatementClient();

// Act
var result = client.ExecuteStatementAsync<Person>(PersonsStatement);
var persons = await result.ToListAsync();

// Assert
persons.Should().Contain(new Person("John A", 18));
}

[Fact]
public async Task GivenAClassWithMultipleConstructors_WhenConstructingObject_ThenExceptionIsThrown()
{
// Arrange
var client = _sqlWarehouseFixture.CreateSqlStatementClient();

// Act
var result = client.ExecuteStatementAsync<BadPerson>(PersonsStatement);
Func<Task> act = async () => await result.ToListAsync();

// Assert
await act.Should().ThrowAsync<InvalidOperationException>();
}

[Fact]
public async Task GivenAClassWithTwoParameters_WhenOnlyOneIsMapped_ThenExceptionIsThrown()
{
// Arrange
var client = _sqlWarehouseFixture.CreateSqlStatementClient();

// Act
var result = client.ExecuteStatementAsync<ReallyBadPerson>(PersonsStatement);
Func<Task> act = async () => await result.ToListAsync();

// Assert
await act.Should().ThrowAsync<ArgumentException>();
}

public class ReallyBadPerson
{
public string Name { get; private set; }

[ArrowField("age", 2)]
public int Age { get; private set; }

public ReallyBadPerson(string name, int age)
{
Name = name;
Age = age;
}
}

public class BadPerson
{
public BadPerson()
: this(string.Empty) { }

public BadPerson(string name) => Name = name;

public string Name { get; set; }
}

public record Person(
[property: ArrowField("name", 1)] string Name,
[property: ArrowField("age", 2)] int Age);
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,52 @@ public virtual async IAsyncEnumerable<dynamic> ExecuteStatementAsync(DatabricksS
}
}

/// <summary>
/// Asynchronously executes a parameterized SQL query on Databricks and streams the result back as a collection of strongly typed objects.
/// </summary>
/// <param name="statement">The SQL query to be executed, with collection of <see cref="QueryParameter"/> parameters.</param>
/// <typeparam name="T">Target type</typeparam>
/// <returns>An asynchronous enumerable of <typeparamref name="T"/> representing the result of the query.</returns>
/// <remarks>
/// This is an experimental feature and may be removed in a future version.
/// <br/><br/>
/// Requirements for <typeparamref name="T"/>:<br/>
/// - Must be a reference type<br/>
/// - Must have a public constructor with parameters matching the columns in the result set<br/>
/// - Must only have a single constructor<br/>
/// - Must be annotated with <see cref="ArrowFieldAttribute"/> to indicate the order of the constructor parameters
/// </remarks>
public virtual async IAsyncEnumerable<T> ExecuteStatementAsync<T>(DatabricksStatement statement)
where T : class
{
var strategy = new StronglyTypedApacheArrowFormat(_options);
var request = strategy.GetStatementRequest(statement);
var response = await request.WaitForSqlWarehouseResultAsync(_httpClient, StatementsEndpointPath);

if (_httpClient.BaseAddress == null) throw new InvalidOperationException();

if (response.manifest.total_row_count <= 0)
{
yield break;
}

foreach (var chunk in response.manifest.chunks)
{
var uri = StatementsEndpointPath +
$"/{response.statement_id}/result/chunks/{chunk.chunk_index}?row_offset={chunk.row_offset}";
var chunkResponse = await _httpClient.GetFromJsonAsync<ManifestChunk>(uri);

if (chunkResponse?.external_links == null) continue;

await using var stream = await _externalHttpClient.GetStreamAsync(chunkResponse.external_links[0].external_link);

await foreach (var row in strategy.ExecuteAsync<T>(stream))
{
yield return row;
}
}
}

private async IAsyncEnumerable<dynamic> DoExecuteStatementAsync(DatabricksStatement statement, Format format)
{
var strategy = format.GetStrategy(_options);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;

namespace Energinet.DataHub.Core.Databricks.SqlStatementExecution.Formats;

[AttributeUsage(AttributeTargets.Property)]
public sealed class ArrowFieldAttribute : Attribute
{
public ArrowFieldAttribute(string name, int constructorOrder)
{
Name = name;
ConstructorOrder = constructorOrder;
}

public string Name { get; }

public int ConstructorOrder { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Linq;
using Apache.Arrow;

namespace Energinet.DataHub.Core.Databricks.SqlStatementExecution.Formats;

internal static class RecordBatchExtensions
{
public static T ReadRecord<T>(this RecordBatch batch, int row)
where T : class
{
var fieldNames = Reflections.GetArrowFieldNames<T>();
var values = fieldNames.Select(field => batch.Column(field).GetValue(row)).ToArray();
return Reflections.CreateInstance<T>(values);
}
}
Loading

0 comments on commit e5d0229

Please sign in to comment.