using MessagePack; using MessagePack.Resolvers; using Newtonsoft.Json; using System.Collections.Concurrent; using System.Diagnostics; using System.Linq.Dynamic.Core; using WCS.Core; using WCS.Core.BaseExtensions; using WCS.Core.DataTrans; using WCS.Core.DbHelper; using WCS.Core.Redis; using WCS.Entity; using WCS.Entity.Protocol; namespace WCS.Service { /// /// 代理协议 /// public class ProtocolProxy : ProtocolProxyBase { public ProtocolProxy(string id, WCS_DATABLOCK db, ushort start, WCS_DEVICEPROTOCOL protocol) : base(id, db, start, protocol) { } private static readonly ConcurrentDictionary LastDatas = new(); /// /// 获取新的数据 /// /// /// protected override WCS_PROTOCOLDATA GetLastData(Db db) { if (!LastDatas.ContainsKey(this.ProtocolDataType)) { LastDatas[this.ProtocolDataType] = db.Default.Queryable().AS($"{this.ProtocolDataType.Name}", "it").Where("ISLAST=0").OrderBy("ID").ToArray(); } dynamic data = LastDatas[this.ProtocolDataType]; var list = new List(); foreach (var itemData in data) { try { if (itemData.DEVICECOD == Protocol.DEVICE.CODE) list.Add(itemData); } catch { // ignored } } if (list.Count > 1) { for (var i = 0; i < list.Count - 1; i++) { var obj = list[i]; db.Default.Insertable((object)obj).ExecuteCommand(); obj.ISLAST = false; } } var res = list.LastOrDefault(); return res; } private static int _total; protected override WCS_PROTOCOLDATA SaveNewData(Db db, WCS_PROTOCOLDATA last, WCS_PROTOCOLDATA newobj, string user) { _total++; if (last != null) { var dc = TypeExtension.EntityClassToDictionary((object)last); db.Default.Insertable(dc).AS($"{last.GetType().Name}").ExecuteCommand(); last.ISLAST = false; } newobj.DEVICECOD = Protocol.DEVICE.CODE; newobj.ISLAST = true; newobj.UPDATETIME = DateTime.Now; newobj.UPDATEUSER = user; newobj.FRAME = LogicHandler.Frame; var typeName = newobj.GetType().Name; var dc1 = TypeExtension.EntityClassToDictionary((object)newobj); db.Default.Insertable(dc1).AS($"{typeName}").ExecuteCommand(); return newobj; } static ProtocolProxy() { MessagePackSerializer.DefaultOptions = StandardResolver.Options.WithCompression(MessagePackCompression.Lz4Block); #region 初始化Redis连接 var redisConnectionStrings = RedisHelper.Default.Check("RedisConnectionStrings") ?? throw new Exception("请在Redis中配置RedisConnectionStrings"); Configs.RedisConnectionStrings = JsonConvert.DeserializeObject>(redisConnectionStrings); if (Configs.RedisConnectionStrings != null) { if (Configs.RedisConnectionStrings.All(v => v.Key != "Monitor")) throw new Exception("请在RedisConnectionStrings中配置监控RedisDB库连接字符串"); } foreach (var redisConnection in Configs.RedisConnectionStrings!) { RedisHelper.CreateContext(redisConnection.ConnectionString, redisConnection.Key); switch (redisConnection.Key) { case "Monitor": RedisHelper.SetMonitorContextType(redisConnection.Key); RedisHelper.Monitor.Serialize = obj => { var bytes = MessagePackSerializer.Serialize(obj); return bytes; }; RedisHelper.Monitor.DeserializeRaw = (bytes, type) => { var obj = MessagePackSerializer.Deserialize(type, bytes); return obj; }; break; } } #endregion 初始化Redis连接 } public override void Publish(string code, WCS_PROTOCOLDATA obj) { try { var datas = AllDatas; if (code.StartsWith("SRM")) { if (!datas.ContainsKey(code)) datas[code] = new SCData { Code = code }; } else if (code.StartsWith("RGV")) { if (!datas.ContainsKey(code)) datas[code] = new RGVData { Code = code }; } else if (code == "Robot") { if (!datas.ContainsKey(code)) datas[code] = new RobotData { Code = code }; } else if (code.Length == 4) { if (!datas.ContainsKey(code)) datas[code] = new StationData { Code = code }; } if (!datas.ContainsKey(code)) return; var data = datas[code]; data.Frame = LogicHandler.Frame; var p = data.GetType().GetProperties().FirstOrDefault(v => v.PropertyType == obj.GetType()); if (p == null) { Console.WriteLine("类型" + data.GetType().Name + "不包含类型为" + obj.GetType().Name + "的属性"); } else { p.SetValue(data, obj); } } catch (Exception ex) { Console.WriteLine(ex.Message); } } /// /// 所有的设备数据 /// public static ConcurrentDictionary AllDatas = new(); public static void Do() { Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"更改:{_total}"); Console.ResetColor(); _total = 0; try { var gs = AllDatas.GroupBy(v => v.Value.GetType()); var pack = new DeviceDataPack { Frame = LogicHandler.Frame }; foreach (var g in gs) { var value = g.Select(v => v.Value).ToArray(); var etype = g.Key; var type = typeof(DeviceDataCollection<>).MakeGenericType(etype); var coll = Activator.CreateInstance(type, LogicHandler.Frame, value); var p = pack.GetType().GetProperties().First(v => v.PropertyType == type); p.SetValue(pack, coll); } #region 存入Redis var sw = new Stopwatch(); sw.Start(); RedisHelper.Monitor.Set(nameof(DeviceDataPack), pack); RedisHelper.Monitor.RPush("Packs", pack); sw.Stop(); Console.ForegroundColor = ConsoleColor.Blue; Console.WriteLine($"Redis耗时{sw.ElapsedMilliseconds}"); Console.ResetColor(); var len = RedisHelper.Monitor.LLen("Packs"); if (len > 150000) { RedisHelper.Monitor.LTrim("Packs", 20000, len); } #endregion 存入Redis var converter = new System.Text.UnicodeEncoding(); var plcRawData = new PlcRawData { CONTENT = converter.GetBytes(JsonConvert.SerializeObject(pack)), WAREHOUSE = "1" }; #region 存入PGSql var sw1 = new Stopwatch(); sw1.Start(); Core.DbHelper.Db.Do(db => { db.Context("WCSDlog").Insertable(plcRawData).ExecuteCommand(); }); sw1.Stop(); Console.ForegroundColor = ConsoleColor.Blue; Console.WriteLine($"PGSql耗时{sw1.ElapsedMilliseconds}"); Console.ResetColor(); RedisHelper.Default.RPush("Packs", pack); #endregion 存入PGSql #region 存入sqlServe var sw2 = new Stopwatch(); sw2.Start(); Core.DbHelper.Db.Do(db => { db.Default.Insertable(plcRawData).ExecuteCommand(); }); sw2.Stop(); Console.ForegroundColor = ConsoleColor.Blue; Console.WriteLine($"sqlServe耗时{sw2.ElapsedMilliseconds}"); Console.ResetColor(); RedisHelper.Default.RPush("Packs", pack); #endregion 存入sqlServe foreach (var data in AllDatas) { data.Value.Info = ""; if (data.Value is not ProdLineData pld) continue; pld.TaskList.Clear(); pld.Frame = LogicHandler.Frame; pld.Code = data.Key; } } catch (Exception) { // ignored } } } public class PackInfo { public DateTime Frame { get; set; } public byte[] Data { get; set; } } }