Skip to content

Commit

Permalink
Add Csv processor (#4374)
Browse files Browse the repository at this point in the history
Relates: #4341

This commit adds the Csv processor to the collection of ingest
processors.

(cherry picked from commit f91ccda)
  • Loading branch information
russcam committed Feb 10, 2020
1 parent 2ef4eee commit 04ea5cb
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/Nest/Ingest/ProcessorFormatter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ internal class ProcessorFormatter : IJsonFormatter<IProcessor>
{ "drop", 29 },
{ "circle", 30 },
{ "enrich", 31 },
{ "csv", 32 },
};

public IProcessor Deserialize(ref JsonReader reader, IJsonFormatterResolver formatterResolver)
Expand Down Expand Up @@ -157,6 +158,9 @@ public IProcessor Deserialize(ref JsonReader reader, IJsonFormatterResolver form
case 31:
processor = Deserialize<EnrichProcessor>(ref reader, formatterResolver);
break;
case 32:
processor = Deserialize<CsvProcessor>(ref reader, formatterResolver);
break;
}
}
else
Expand Down Expand Up @@ -185,6 +189,9 @@ public void Serialize(ref JsonWriter writer, IProcessor value, IJsonFormatterRes
case "append":
Serialize<IAppendProcessor>(ref writer, value, formatterResolver);
break;
case "csv":
Serialize<ICsvProcessor>(ref writer, value, formatterResolver);
break;
case "convert":
Serialize<IConvertProcessor>(ref writer, value, formatterResolver);
break;
Expand Down
114 changes: 114 additions & 0 deletions src/Nest/Ingest/Processors/CsvProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System;
using System.Linq.Expressions;
using System.Runtime.Serialization;
using Elasticsearch.Net;
using Elasticsearch.Net.Utf8Json;


namespace Nest
{
/// <summary>
/// Extracts fields from CSV line out of a single text field within a document.
/// Any empty field in CSV will be skipped.
/// <para></para>
/// Available in Elasticsearch 7.6.0+
/// </summary>
[InterfaceDataContract]
public interface ICsvProcessor : IProcessor
{
/// <summary>
/// The field to extract data from
/// </summary>
[DataMember(Name ="field")]
Field Field { get; set; }

/// <summary>
/// The array of fields to assign extracted values to.
/// </summary>
[DataMember(Name ="target_fields")]
Fields TargetFields { get; set; }

/// <summary>
/// Separator used in CSV, has to be single character string. Defaults to <c>,</c>
/// </summary>
[DataMember(Name = "separator")]
string Separator { get; set; }

/// <summary>
/// Quote used in CSV, has to be single character string. Defaults to <c>"</c>
/// </summary>
[DataMember(Name = "quote")]
string Quote { get; set; }

/// <summary>
/// If <c>true</c> and <see cref="Field" /> does not exist or is null,
/// the processor quietly exits without modifying the document. Default is <c>false</c>
/// </summary>
[DataMember(Name = "ignore_missing")]
bool? IgnoreMissing { get; set; }

/// <summary>
/// Trim whitespaces in unquoted fields. Default is <c>false</c>;
/// </summary>
[DataMember(Name = "trim")]
bool? Trim { get; set; }
}

/// <inheritdoc cref="ICsvProcessor"/>
public class CsvProcessor : ProcessorBase, ICsvProcessor
{
/// <inheritdoc />
public Field Field { get; set; }
/// <inheritdoc />
public Fields TargetFields { get; set; }
/// <inheritdoc />
public string Separator { get; set; }
/// <inheritdoc />
public string Quote { get; set; }
/// <inheritdoc />
public bool? IgnoreMissing { get; set; }
/// <inheritdoc />
public bool? Trim { get; set; }
/// <inheritdoc />
protected override string Name => "csv";
}

/// <inheritdoc cref="ICsvProcessor"/>
public class CsvProcessorDescriptor<T> : ProcessorDescriptorBase<CsvProcessorDescriptor<T>, ICsvProcessor>, ICsvProcessor
where T : class
{
protected override string Name => "csv";
Field ICsvProcessor.Field { get; set; }
Fields ICsvProcessor.TargetFields { get; set; }
bool? ICsvProcessor.IgnoreMissing { get; set; }
string ICsvProcessor.Quote { get; set; }
string ICsvProcessor.Separator { get; set; }
bool? ICsvProcessor.Trim { get; set; }

/// <inheritdoc cref="ICsvProcessor.Field" />
public CsvProcessorDescriptor<T> Field(Field field) => Assign(field, (a, v) => a.Field = v);

/// <inheritdoc cref="ICsvProcessor.Field" />
public CsvProcessorDescriptor<T> Field<TValue>(Expression<Func<T, TValue>> objectPath) =>
Assign(objectPath, (a, v) => a.Field = v);

/// <inheritdoc cref="ICsvProcessor.TargetFields" />
public CsvProcessorDescriptor<T> TargetFields(Func<FieldsDescriptor<T>, IPromise<Fields>> targetFields) =>
Assign(targetFields, (a, v) => a.TargetFields = v?.Invoke(new FieldsDescriptor<T>())?.Value);

/// <inheritdoc cref="ICsvProcessor.TargetFields" />
public CsvProcessorDescriptor<T> TargetFields(Fields targetFields) => Assign(targetFields, (a, v) => a.TargetFields = v);

/// <inheritdoc cref="ICsvProcessor.IgnoreMissing" />
public CsvProcessorDescriptor<T> IgnoreMissing(bool? ignoreMissing = true) => Assign(ignoreMissing, (a, v) => a.IgnoreMissing = v);

/// <inheritdoc cref="ICsvProcessor.Trim" />
public CsvProcessorDescriptor<T> Trim(bool? trim = true) => Assign(trim, (a, v) => a.Trim = v);

/// <inheritdoc cref="ICsvProcessor.Quote" />
public CsvProcessorDescriptor<T> Quote(string quote) => Assign(quote, (a, v) => a.Quote = v);

/// <inheritdoc cref="ICsvProcessor.Separator" />
public CsvProcessorDescriptor<T> Separator(string separator) => Assign(separator, (a, v) => a.Separator = v);
}
}
5 changes: 5 additions & 0 deletions src/Nest/Ingest/ProcessorsDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ public ProcessorsDescriptor Attachment<T>(Func<AttachmentProcessorDescriptor<T>,
public ProcessorsDescriptor Append<T>(Func<AppendProcessorDescriptor<T>, IAppendProcessor> selector) where T : class =>
Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new AppendProcessorDescriptor<T>())));

/// <inheritdoc cref="ICsvProcessor"/>
public ProcessorsDescriptor Csv<T>(Func<CsvProcessorDescriptor<T>, ICsvProcessor> selector) where T : class =>
Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new CsvProcessorDescriptor<T>())));


/// <inheritdoc cref="IConvertProcessor"/>
public ProcessorsDescriptor Convert<T>(Func<ConvertProcessorDescriptor<T>, IConvertProcessor> selector) where T : class =>
Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new ConvertProcessorDescriptor<T>())));
Expand Down
24 changes: 24 additions & 0 deletions tests/Tests/Ingest/ProcessorAssertions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ public class Append : ProcessorAssertion
public override string Key => "append";
}

[SkipVersion("<7.6.0", "Introduced in Elasticsearch 7.6.0+")]
public class Csv : ProcessorAssertion
{
public override Func<ProcessorsDescriptor, IPromise<IList<IProcessor>>> Fluent => d => d
.Csv<Project>(c => c
.Field(p => p.Name)
.TargetFields(new[] { "targetField1", "targetField2" })
);

public override IProcessor Initializer => new CsvProcessor
{
Field = "name",
TargetFields = new[] { "targetField1", "targetField2" },
};

public override object Json => new
{
field = "name",
target_fields = new[] { "targetField1", "targetField2" },
};

public override string Key => "csv";
}

public class Convert : ProcessorAssertion
{
public override Func<ProcessorsDescriptor, IPromise<IList<IProcessor>>> Fluent => d => d
Expand Down

0 comments on commit 04ea5cb

Please sign in to comment.