using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Apache.NMS; using Apache.NMS.ActiveMQ; using Apache.NMS.Util; using Serilog; using wms.util.Http; namespace wms.console { public class HouseConsumer { private string _queuesName; private readonly IConnectionFactory _connectionFactory = null; private IConnection _connection = null; Apache.NMS.ISession _session = null; private IMessageConsumer _consumer = null; private string _clientId; public HouseConsumer(string queuesName) { _queuesName = queuesName; _clientId = _queuesName.Replace(".", "3"); string confuri = ""; switch (_queuesName) { case "ESB.OUT.163LFJ.Q": confuri = Util.GetSettings("MqTcpUri:fjurl"); break; case "ESB.OUT.163KSX.Q": confuri = Util.GetSettings("MqTcpUri:sxurl"); break; case "ESB.OUT.163JCP.Q": confuri = Util.GetSettings("MqTcpUri:cpurl"); break; case "ESB.OUT.163IPT.Q": confuri = Util.GetSettings("MqTcpUri:cpurl"); break; case "ESB.OUT.163DHJ.Q": confuri = Util.GetSettings("MqTcpUri:pturl"); break; default: break; } //confuri = Util.GetSettings("MqTcpUri:fjurl"); //Uri _uri = new Uri(String.Concat("activemq: failover:(tcp://esb-pre.zt.net.cn:61616?wireFormat.maxInactivityDuration-0)")); Uri _uri = new Uri(String.Concat("activemq:failover:(" + confuri + "?wireFormat.maxInactivityDuration=3000)?randomize=false")); _connectionFactory = new ConnectionFactory(_uri); } public void CreateConsumer() { try { _connection = _connectionFactory.CreateConnection("esbadminrw", "esbadminrw");//创建连接 _connection.ClientId = _clientId; _connection.ExceptionListener += _connection_ExceptionListener; _session = _connection.CreateSession(); _connection.Start();//启动连接 IDestination destination = SessionUtil.GetDestination(_session, _queuesName); _consumer = _session.CreateConsumer(destination); _consumer.Listener += (IMessage message) => { ITextMessage msg = (ITextMessage)message; Log.Information("接收消息:" + msg.Text); foreach (var key in msg.Properties.Keys.Cast()) { var keyvalue = message.Properties[key]; if (keyvalue != null && key == "serviceCode") { var methodcode = Util.GetSettings("ApiUrl:" + keyvalue.ToString()); if (methodcode != null) { try { Log.Information("消费post调用地址:" + Util.GetSettings("ApiUrl:" + keyvalue.ToString()) + "|请求参数:" + msg.Text); var res = HttpUtil.PostRequest(Util.GetSettings("ApiUrl:" + keyvalue.ToString()), msg.Text, 30000); Log.Information("消费post调用返回:" + res); } catch (Exception ex) { Log.Information("Post请求异常:" + ex + "--------------地址:" + methodcode + "----------------请求参数:" + msg.Text); } } Log.Information("接收消息:" + message.Properties[key]); } } }; Console.WriteLine("消费者启动成功:" + _connection.ClientId); } catch (Exception ex) { Log.Information("消费者创建异常:" + ex); } } public void Stop() { _session.Close(); _connection.Stop(); _connection.Close(); _connection.Dispose(); } private void _connection_ExceptionListener(Exception exception) { //Console.WriteLine("消费者发生异常:{0}", exception); Log.Information("消费者发生异常:{0}", exception); } } }