Skip to content
This repository has been archived by the owner on Oct 27, 2023. It is now read-only.

Commit

Permalink
clickhouse ado
Browse files Browse the repository at this point in the history
  • Loading branch information
YahuiWong committed Jun 10, 2021
1 parent 2a837a5 commit 3ecef5b
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<RepositoryUrl>https://github.com/YahuiWong/Dapper.Contrib.BulkInsert.git</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageTags>clickhouse,dapper,bulk,mysql,sqlserver</PackageTags>
<Version>0.1.5</Version>
<Version>0.1.7</Version>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<PackageLicenseUrl>https://github.com/YahuiWong/Dapper.Contrib.BulkInsert/blob/master/LICENSE</PackageLicenseUrl>
<PackageId>Dapper.Contrib.BulkInsert</PackageId>
Expand Down
15 changes: 13 additions & 2 deletions src/Dapper.Contrib.BulkInsert/SqlMapperExtensions.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static async Task InsertBulkAsync<T>(this IDbConnection connection, IEnum
var wasClosed = connection.State == ConnectionState.Closed;
if (wasClosed) connection.Open();

await connection.ExecuteAsync(cmd.Item1,cmd.Item2, null, commandTimeout);
await connection.ExecuteAsync(cmd.Item1,cmd.Item2, null, commandTimeout, CommandType.Text);
if (wasClosed) connection.Close();
}
/// <summary>
Expand Down Expand Up @@ -127,8 +127,19 @@ public static async Task InsertBulkAsync<T>(this ClickHouse.Client.ADO.ClickHous


if (wasClosed) connection.Close();
}
/// <summary>
/// Inserts an entity into table "Ts" and returns identity id or number of inserted rows if inserting a list.
/// </summary>
/// <param name="connection">Open SqlConnection</param>
/// <param name="entityToInsert">Entity to insert, can be list of entities</param>
/// <returns>Identity of inserted entity, or number of inserted rows if inserting a list</returns>
public static async Task InsertBulkAsync<T>(this ClickHouse.Ado.ClickHouseConnection connection, IEnumerable<T> entityToInsert, int? commandTimeout = null)
{
await Task.Run(() => InsertBulk(connection, entityToInsert, commandTimeout)
);

}

}


Expand Down
134 changes: 131 additions & 3 deletions src/Dapper.Contrib.BulkInsert/SqlMapperExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,18 @@ private static (string, DynamicParameters) GenerateBulkSql<T>(IDbConnection conn
return (cmd,parameters);
}


/// <summary>
/// Inserts an entity into table "Ts" and returns identity id or number of inserted rows if inserting a list.
/// </summary>
/// <param name="connection">Open SqlConnection</param>
/// <param name="entityToInsert">Entity to insert, can be list of entities</param>
/// <returns>Identity of inserted entity, or number of inserted rows if inserting a list</returns>
public static void InsertBulk<T>(this ClickHouse.Client.ADO.ClickHouseConnection connection, IEnumerable<T> entityToInsert, int? commandTimeout = null)
{
InsertBulkAsync(connection, entityToInsert, commandTimeout).GetAwaiter().GetResult();
}

/// <summary>
/// Inserts an entity into table "Ts" and returns identity id or number of inserted rows if inserting a list.
/// </summary>
Expand All @@ -339,17 +351,133 @@ public static void InsertBulk<T>(this IDbConnection connection, IEnumerable<T> e
connection.Execute(cmd.Item1, cmd.Item2, null, commandTimeout);
if (wasClosed) connection.Close();
}
private static (string, List<dynamic[]>) GenerateCHBulkSql<T>(IDbConnection connection, IEnumerable<T> entityToInsert)
{
var type = entityToInsert.GetType();
var typeInfo = type.GetTypeInfo();
bool implementsGenericIEnumerableOrIsGenericIEnumerable =
typeInfo.ImplementedInterfaces.Any(ti =>
ti.IsGenericType() && ti.GetGenericTypeDefinition() == typeof(IEnumerable<>)) ||
typeInfo.GetGenericTypeDefinition() == typeof(IEnumerable<>);

if (implementsGenericIEnumerableOrIsGenericIEnumerable)
{
type = type.GetGenericArguments()[0];
}

var name = GetTableName(type);
var sbColumnList = new StringBuilder(null);
var allProperties = TypePropertiesCache(type);
var keyProperties = KeyPropertiesCache(type);
var computedProperties = ComputedPropertiesCache(type);
var allPropertiesExceptKeyAndComputed =
allProperties.Except(keyProperties.Union(computedProperties)).ToList();

for (var i = 0; i < allPropertiesExceptKeyAndComputed.Count; i++)
{
var property = allPropertiesExceptKeyAndComputed[i];
sbColumnList.AppendFormat("{0}", GetColumnName(property));
if (i < allPropertiesExceptKeyAndComputed.Count - 1)
sbColumnList.Append(", ");
}

//var sbParameterList = new StringBuilder(null);
List<dynamic[]> dynamics = new List<dynamic[]>();

for (int j = 0, length = Enumerable.Count(entityToInsert); j < length; j++)
{
var item = Enumerable.ElementAt(entityToInsert, j);
{
List<dynamic> dynamicsParams = new List<dynamic>();
//sbParameterList.Append("(");
for (int i = 0,count= allPropertiesExceptKeyAndComputed.Count; i < count; i++)
{

var property = allPropertiesExceptKeyAndComputed[i];

var val = property.GetValue(item);
//var columnName = $"@{GetColumnName(property)}{j}{i}";
//sbParameterList.Append(columnName);
if (property.PropertyType.IsValueType)
{
if (property.PropertyType == typeof(DateTime))
{
var datetimevalue = Convert.ToDateTime(val);
if (property.GetCustomAttribute<DateAttribute>() != null)
{
//parameters.Add(columnName, datetimevalue.Date, DbType.Date);
dynamicsParams.Add(datetimevalue.Date);
//sbParameterList.AppendFormat("'{0:yyyy-MM-dd}'", val);
}
else
{
dynamicsParams.Add(datetimevalue);
//parameters.Add(columnName, datetimevalue, DbType.DateTime);
//sbParameterList.AppendFormat("'{0:yyyy-MM-dd HH:mm:ss}'", val);
}
}
else if (property.PropertyType == typeof(Boolean))
{
var boolvalue = Convert.ToBoolean(val);

{
//parameters.Add(columnName, boolvalue ? 1 : 0, DbType.Int32);
dynamicsParams.Add(boolvalue);
//sbParameterList.AppendFormat("'{0:yyyy-MM-dd HH:mm:ss}'", val);
}
}
else
{
dynamicsParams.Add(val);
//sbParameterList.AppendFormat("{0}", FixValue(val));
}
}
else
{
dynamicsParams.Add(val);
///sbParameterList.AppendFormat("'{0}'", FixValue(val));
}

//if (i < allPropertiesExceptKeyAndComputed.Count - 1)
// sbParameterList.Append(", ");

}
dynamics.Add(dynamicsParams.ToArray());
//sbParameterList.Append("),");
}
}


//sbParameterList.Remove(sbParameterList.Length - 1, 1);
//sbParameterList.Append(";");
//insert list of entities
var cmd = $"insert into {name} ({sbColumnList}) values @bulk; ";
return (cmd, dynamics);
}
/// <summary>
/// Inserts an entity into table "Ts" and returns identity id or number of inserted rows if inserting a list.
/// </summary>
/// <param name="connection">Open SqlConnection</param>
/// <param name="entityToInsert">Entity to insert, can be list of entities</param>
/// <returns>Identity of inserted entity, or number of inserted rows if inserting a list</returns>
public static void InsertBulk<T>(this ClickHouse.Client.ADO.ClickHouseConnection connection, IEnumerable<T> entityToInsert, int? commandTimeout = null)
public static void InsertBulk<T>(this ClickHouse.Ado.ClickHouseConnection connection, IEnumerable<T> entityToInsert, int? commandTimeout = null)
{
InsertBulkAsync(connection, entityToInsert, commandTimeout).GetAwaiter().GetResult();
}
var cmd = GenerateCHBulkSql(connection, entityToInsert);

var wasClosed = connection.State == ConnectionState.Closed;
if (wasClosed) connection.Open();

var command = connection.CreateCommand();
command.CommandText = cmd.Item1;
command.Parameters.Add(new ClickHouse.Ado.ClickHouseParameter
{
ParameterName = "bulk",
Value = cmd.Item2
});
command.ExecuteNonQuery();
if (wasClosed) connection.Close();

}

/// <summary>
/// Specifies a custom callback that detects the database type instead of relying on the default strategy (the name of the connection type object).
Expand Down

0 comments on commit 3ecef5b

Please sign in to comment.