HouseConsumer.cs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using Apache.NMS;
  7. using Apache.NMS.ActiveMQ;
  8. using Apache.NMS.Util;
  9. using Serilog;
  10. using wms.util.Http;
  11. namespace wms.console
  12. {
  13. public class HouseConsumer
  14. {
  15. private string _queuesName;
  16. private readonly IConnectionFactory _connectionFactory = null;
  17. private IConnection _connection = null;
  18. Apache.NMS.ISession _session = null;
  19. private IMessageConsumer _consumer = null;
  20. private string _clientId;
  21. public HouseConsumer(string queuesName)
  22. {
  23. _queuesName = queuesName;
  24. _clientId = _queuesName.Replace(".", "3");
  25. string confuri = "";
  26. switch (_queuesName)
  27. {
  28. case "ESB.OUT.163LFJ.Q":
  29. confuri = Util.GetSettings("MqTcpUri:fjurl");
  30. break;
  31. case "ESB.OUT.163KSX.Q":
  32. confuri = Util.GetSettings("MqTcpUri:sxurl");
  33. break;
  34. case "ESB.OUT.163JCP.Q":
  35. confuri = Util.GetSettings("MqTcpUri:cpurl");
  36. break;
  37. case "ESB.OUT.163IPT.Q":
  38. confuri = Util.GetSettings("MqTcpUri:cpurl");
  39. break;
  40. case "ESB.OUT.163DHJ.Q":
  41. confuri = Util.GetSettings("MqTcpUri:pturl");
  42. break;
  43. default:
  44. break;
  45. }
  46. //confuri = Util.GetSettings("MqTcpUri:fjurl");
  47. //Uri _uri = new Uri(String.Concat("activemq: failover:(tcp://esb-pre.zt.net.cn:61616?wireFormat.maxInactivityDuration-0)"));
  48. Uri _uri = new Uri(String.Concat("activemq:failover:(" + confuri + "?wireFormat.maxInactivityDuration=3000)?randomize=false"));
  49. _connectionFactory = new ConnectionFactory(_uri);
  50. }
  51. public void CreateConsumer()
  52. {
  53. try
  54. {
  55. _connection = _connectionFactory.CreateConnection("esbadminrw", "esbadminrw");//创建连接
  56. _connection.ClientId = _clientId;
  57. _connection.ExceptionListener += _connection_ExceptionListener;
  58. _session = _connection.CreateSession();
  59. _connection.Start();//启动连接
  60. IDestination destination = SessionUtil.GetDestination(_session, _queuesName);
  61. _consumer = _session.CreateConsumer(destination);
  62. _consumer.Listener += (IMessage message) =>
  63. {
  64. ITextMessage msg = (ITextMessage)message;
  65. Log.Information("接收消息:" + msg.Text);
  66. foreach (var key in msg.Properties.Keys.Cast<string>())
  67. {
  68. var keyvalue = message.Properties[key];
  69. if (keyvalue != null && key == "serviceCode")
  70. {
  71. var methodcode = Util.GetSettings("ApiUrl:" + keyvalue.ToString());
  72. if (methodcode != null)
  73. {
  74. try
  75. {
  76. Log.Information("消费post调用地址:" + Util.GetSettings("ApiUrl:" + keyvalue.ToString()) + "|请求参数:" + msg.Text);
  77. var res = HttpUtil.PostRequest(Util.GetSettings("ApiUrl:" + keyvalue.ToString()), msg.Text, 30000);
  78. Log.Information("消费post调用返回:" + res);
  79. }
  80. catch (Exception ex)
  81. {
  82. Log.Information("Post请求异常:" + ex + "--------------地址:" + methodcode + "----------------请求参数:" + msg.Text);
  83. }
  84. }
  85. Log.Information("接收消息:" + message.Properties[key]);
  86. }
  87. }
  88. };
  89. Console.WriteLine("消费者启动成功:" + _connection.ClientId);
  90. }
  91. catch (Exception ex)
  92. {
  93. Log.Information("消费者创建异常:" + ex);
  94. }
  95. }
  96. public void Stop()
  97. {
  98. _session.Close();
  99. _connection.Stop();
  100. _connection.Close();
  101. _connection.Dispose();
  102. }
  103. private void _connection_ExceptionListener(Exception exception)
  104. {
  105. //Console.WriteLine("消费者发生异常:{0}", exception);
  106. Log.Information("消费者发生异常:{0}", exception);
  107. }
  108. }
  109. }