Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Associate an external id with a task, so that it can be cancelled later. #3277

Merged
merged 3 commits into from
Jun 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/Elasticsearch.Net/Configuration/RequestConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public interface IRequestConfiguration
/// </summary>
string ContentType { get; set; }

/// <summary>
/// Associate an Id with this user-initiated task, such that it can be located in the cluster task list.
/// Valid only for Elasticsearch 6.2.0+
/// </summary>
string OpaqueId { get; set; }

/// <summary>
/// Force a different Accept header on the request
/// </summary>
Expand Down Expand Up @@ -95,6 +101,7 @@ public class RequestConfiguration : IRequestConfiguration
public TimeSpan? RequestTimeout { get; set; }
public TimeSpan? PingTimeout { get; set; }
public string ContentType { get; set; }
public string OpaqueId { get; set; }
public string Accept { get; set; }
public int? MaxRetries { get; set; }
public Uri ForceNode { get; set; }
Expand All @@ -121,6 +128,7 @@ public class RequestConfigurationDescriptor : IRequestConfiguration
TimeSpan? IRequestConfiguration.RequestTimeout { get; set; }
TimeSpan? IRequestConfiguration.PingTimeout { get; set; }
string IRequestConfiguration.ContentType { get; set; }
string IRequestConfiguration.OpaqueId { get; set; }
string IRequestConfiguration.Accept { get; set; }

int? IRequestConfiguration.MaxRetries { get; set; }
Expand Down Expand Up @@ -152,6 +160,7 @@ public RequestConfigurationDescriptor(IRequestConfiguration config)
Self.RunAs = config?.RunAs;
Self.ClientCertificates = config?.ClientCertificates;
Self.ThrowExceptions = config?.ThrowExceptions;
Self.OpaqueId = config?.OpaqueId;
}

/// <summary>
Expand All @@ -170,6 +179,16 @@ public RequestConfigurationDescriptor RequestTimeout(TimeSpan requestTimeout)
return this;
}

/// <summary>
/// Associate an Id with this user-initiated task, such that it can be located in the cluster task list.
/// Valid only for Elasticsearch 6.2.0+
/// </summary>
public RequestConfigurationDescriptor OpaqueId(string opaqueId)
{
Self.OpaqueId = opaqueId;
return this;
}

public RequestConfigurationDescriptor PingTimeout(TimeSpan pingTimeout)
{
Self.PingTimeout = pingTimeout;
Expand Down
5 changes: 5 additions & 0 deletions src/Elasticsearch.Net/Transport/Pipeline/RequestData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class RequestData
{
public const string MimeType = "application/json";
public const string RunAsSecurityHeader = "es-security-runas-user";
public const string OpaqueIdHeader = "X-Opaque-Id";

public Uri Uri => this.Node != null ? new Uri(this.Node.Uri, this.PathAndQuery) : null;

Expand Down Expand Up @@ -77,6 +78,10 @@ private RequestData(
this.RequestMimeType = local?.ContentType ?? MimeType;
this.Accept = local?.Accept ?? MimeType;
this.Headers = global.Headers != null ? new NameValueCollection(global.Headers) : new NameValueCollection();

if (!string.IsNullOrEmpty(local?.OpaqueId))
this.Headers.Add(OpaqueIdHeader, local.OpaqueId);

this.RunAs = local?.RunAs;
this.SkipDeserializationForStatusCodes = global?.SkipDeserializationForStatusCodes;
this.ThrowExceptions = local?.ThrowExceptions ?? global.ThrowExceptions;
Expand Down
6 changes: 6 additions & 0 deletions src/Nest/Cluster/TaskManagement/GetTask/TaskInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,11 @@ public class TaskInfo

[JsonProperty("cancellable")]
public bool Cancellable { get; internal set; }

[JsonProperty("headers")]
public IReadOnlyDictionary<string, string> Headers { get; internal set; } = EmptyReadOnly<string, string>.Dictionary;

[JsonProperty("children")]
public IReadOnlyCollection<TaskInfo> Children { get; internal set; } = EmptyReadOnly<TaskInfo>.Collection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public class TaskState

[JsonProperty("parent_task_id")]
public TaskId ParentTaskId { get; internal set; }

[JsonProperty("cancellable")]
public bool Cancellable { get; internal set; }

[JsonProperty("headers")]
public IReadOnlyDictionary<string, string> Headers { get; internal set; } = EmptyReadOnly<string, string>.Dictionary;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, do we know what headers are possibly returned here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly not (from the source code I read) but certainly X-Opaque-Id.

I had considered a helper method to expose this value, but just left to the user for the time being...

}

public class TaskStatus
Expand Down
53 changes: 53 additions & 0 deletions src/Tests/Search/Search/SearchApiTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -351,4 +351,57 @@ protected override void ExpectResponse(ISearchResponse<Project> response)
response.ShouldBeValid();
}
}

public class OpaqueIdApiTests : ApiIntegrationTestBase<ReadOnlyCluster, ISearchResponse<Project>, ISearchRequest, SearchDescriptor<Project>, SearchRequest<Project>>
{
private const string OpaqueId = "123456";

public OpaqueIdApiTests(ReadOnlyCluster cluster, EndpointUsage usage) : base(cluster, usage) { }

protected override object ExpectJson => new { };

protected override int ExpectStatusCode => 200;
protected override bool ExpectIsValid => true;
protected override HttpMethod HttpMethod => HttpMethod.POST;
protected override string UrlPath => $"/project/doc/_search?scroll=10m";

protected override Func<SearchDescriptor<Project>, ISearchRequest> Fluent => s => s
.RequestConfiguration(r => r.OpaqueId(OpaqueId))
.Query(q => q)
.Scroll("10m"); // Create a scroll in order to keep the task around.

protected override SearchRequest<Project> Initializer => new SearchRequest<Project>()
{
RequestConfiguration = new RequestConfiguration
{
OpaqueId = OpaqueId
}
};

protected override LazyResponses ClientUsage() => Calls(
fluent: (c, f) => c.Search(f),
fluentAsync: (c, f) => c.SearchAsync(f),
request: (c, r) => c.Search<Project>(r),
requestAsync: (c, r) => c.SearchAsync<Project>(r)
);

protected override void OnAfterCall(IElasticClient client)
{
var tasks = client.ListTasks(d => d.RequestConfiguration(r => r.OpaqueId(OpaqueId)));
tasks.Should().NotBeNull();
foreach (var node in tasks.Nodes)
{
foreach (var task in node.Value.Tasks)
{
task.Value.Headers[RequestData.OpaqueIdHeader].Should().Be(OpaqueId);
}
}
base.OnAfterCall(client);
}

protected override void ExpectResponse(ISearchResponse<Project> response)
{
response.ShouldBeValid();
}
}
}