QuestDBFastBuilder.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. using Npgsql;
  2. using NpgsqlTypes;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Data;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. namespace SqlSugar
  10. {
  11. public class QuestDBFastBuilder : FastBuilder, IFastBuilder
  12. {
  13. public static Dictionary<string , NpgsqlDbType> PgSqlType = UtilMethods.EnumToDictionary<NpgsqlDbType>();
  14. private EntityInfo entityInfo;
  15. public QuestDBFastBuilder(EntityInfo entityInfo)
  16. {
  17. this.entityInfo = entityInfo;
  18. }
  19. public override string UpdateSql { get; set; } = @"UPDATE {1} SET {0} FROM {2} AS TE WHERE {3}
  20. ";
  21. //public virtual async Task<int> UpdateByTempAsync(string tableName, string tempName, string[] updateColumns, string[] whereColumns)
  22. //{
  23. // Check.ArgumentNullException(!updateColumns.Any(), "update columns count is 0");
  24. // Check.ArgumentNullException(!whereColumns.Any(), "where columns count is 0");
  25. // var sets = string.Join(",", updateColumns.Select(it => $"TM.{it}=TE.{it}"));
  26. // var wheres = string.Join(",", whereColumns.Select(it => $"TM.{it}=TE.{it}"));
  27. // string sql = string.Format(UpdateSql, sets, tableName, tempName, wheres);
  28. // return await this.Context.Ado.ExecuteCommandAsync(sql);
  29. //}
  30. public async Task<int> ExecuteBulkCopyAsync(DataTable dt)
  31. {
  32. List<string> lsColNames = new List<string>();
  33. for (int i = 0; i < dt.Columns.Count; i++)
  34. {
  35. lsColNames.Add($"\"{dt.Columns[i].ColumnName}\"");
  36. }
  37. string copyString = $"COPY {dt.TableName} ( {string.Join(",", lsColNames) } ) FROM STDIN (FORMAT BINARY)";
  38. NpgsqlConnection conn = (NpgsqlConnection)this.Context.Ado.Connection;
  39. var columns = this.Context.DbMaintenance.GetColumnInfosByTableName(this.entityInfo.DbTableName);
  40. try
  41. {
  42. var identityColumnInfo = this.entityInfo.Columns.FirstOrDefault(it => it.IsIdentity);
  43. if (identityColumnInfo != null)
  44. {
  45. throw new Exception("PgSql bulkcopy no support identity");
  46. }
  47. BulkCopy(dt, copyString, conn, columns);
  48. }
  49. catch (Exception ex)
  50. {
  51. throw ex;
  52. }
  53. finally
  54. {
  55. base.CloseDb();
  56. }
  57. return await Task.FromResult(dt.Rows.Count);
  58. }
  59. private void BulkCopy(DataTable dt, string copyString, NpgsqlConnection conn, List<DbColumnInfo> columns)
  60. {
  61. if (conn.State == ConnectionState.Closed)
  62. conn.Open();
  63. List<ColumnView> columnViews = new List<ColumnView>();
  64. foreach (DataColumn item in dt.Columns)
  65. {
  66. ColumnView result = new ColumnView();
  67. result.DbColumnInfo = columns.FirstOrDefault(it => it.DbColumnName.EqualCase(item.ColumnName));
  68. result.DataColumn = item;
  69. result.EntityColumnInfo=this.entityInfo.Columns.FirstOrDefault(it => it.DbColumnName.EqualCase(item.ColumnName));
  70. var key = result.DbColumnInfo?.DataType?.ToLower();
  71. if (result.DbColumnInfo == null)
  72. {
  73. result.Type = null;
  74. }
  75. else if (PgSqlType.ContainsKey(key))
  76. {
  77. result.Type = PgSqlType[key];
  78. }
  79. else if (key?.First() == '_')
  80. {
  81. var type = PgSqlType[key.Substring(1)];
  82. result.Type = NpgsqlDbType.Array | type;
  83. }
  84. else
  85. {
  86. result.Type = null;
  87. }
  88. columnViews.Add(result);
  89. }
  90. using (var writer = conn.BeginBinaryImport(copyString))
  91. {
  92. foreach (DataRow row in dt.Rows)
  93. {
  94. writer.StartRow();
  95. foreach (var column in columnViews)
  96. {
  97. var value = row[column.DataColumn.ColumnName];
  98. if (value == null)
  99. {
  100. value = DBNull.Value;
  101. }
  102. if (column.Type == null)
  103. {
  104. writer.Write(value);
  105. }
  106. else
  107. {
  108. writer.Write(value, column.Type.Value);
  109. }
  110. }
  111. }
  112. writer.Complete();
  113. }
  114. }
  115. public override async Task<int> UpdateByTempAsync(string tableName, string tempName, string[] updateColumns, string[] whereColumns)
  116. {
  117. var sqlquerybulder= this.Context.Queryable<object>().SqlBuilder;
  118. Check.ArgumentNullException(!updateColumns.Any(), "update columns count is 0");
  119. Check.ArgumentNullException(!whereColumns.Any(), "where columns count is 0");
  120. var sets = string.Join(",", updateColumns.Select(it => $"{sqlquerybulder.GetTranslationColumnName(it)}=TE.{sqlquerybulder.GetTranslationColumnName(it)}"));
  121. var wheres = string.Join(" AND ", whereColumns.Select(it => $"{tableName}.{sqlquerybulder.GetTranslationColumnName(it)}=TE.{sqlquerybulder.GetTranslationColumnName(it)}"));
  122. string sql = string.Format(UpdateSql, sets, tableName, tempName, wheres);
  123. return await this.Context.Ado.ExecuteCommandAsync(sql);
  124. }
  125. public override async Task CreateTempAsync<T>(DataTable dt)
  126. {
  127. await this.Context.Queryable<T>().Where(it => false).AS(dt.TableName).Select(" * into temp mytemptable").ToListAsync();
  128. dt.TableName = "mytemptable";
  129. }
  130. public class ColumnView
  131. {
  132. public DataColumn DataColumn { get; set; }
  133. public EntityColumnInfo EntityColumnInfo { get; set; }
  134. public DbColumnInfo DbColumnInfo { get; set; }
  135. public NpgsqlDbType? Type { get; set; }
  136. }
  137. }
  138. }