123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using Apache.NMS;
- using Apache.NMS.ActiveMQ;
- using Apache.NMS.Util;
- using Serilog;
- using wms.util.Http;
- namespace wms.console
- {
- public class HouseConsumer
- {
- private string _queuesName;
- private readonly IConnectionFactory _connectionFactory = null;
- private IConnection _connection = null;
- Apache.NMS.ISession _session = null;
- private IMessageConsumer _consumer = null;
- private string _clientId;
- public HouseConsumer(string queuesName)
- {
- _queuesName = queuesName;
- _clientId = _queuesName.Replace(".", "3");
- string confuri = "";
- switch (_queuesName)
- {
- case "ESB.OUT.163LFJ.Q":
- confuri = Util.GetSettings("MqTcpUri:fjurl");
- break;
- case "ESB.OUT.163KSX.Q":
- confuri = Util.GetSettings("MqTcpUri:sxurl");
- break;
- case "ESB.OUT.163JCP.Q":
- confuri = Util.GetSettings("MqTcpUri:cpurl");
- break;
- case "ESB.OUT.163IPT.Q":
- confuri = Util.GetSettings("MqTcpUri:cpurl");
- break;
- case "ESB.OUT.163DHJ.Q":
- confuri = Util.GetSettings("MqTcpUri:pturl");
- break;
- default:
- break;
- }
- //confuri = Util.GetSettings("MqTcpUri:fjurl");
- //Uri _uri = new Uri(String.Concat("activemq: failover:(tcp://esb-pre.zt.net.cn:61616?wireFormat.maxInactivityDuration-0)"));
- Uri _uri = new Uri(String.Concat("activemq:failover:(" + confuri + "?wireFormat.maxInactivityDuration=3000)?randomize=false"));
- _connectionFactory = new ConnectionFactory(_uri);
- }
- public void CreateConsumer()
- {
- try
- {
- _connection = _connectionFactory.CreateConnection("esbadminrw", "esbadminrw");//创建连接
- _connection.ClientId = _clientId;
- _connection.ExceptionListener += _connection_ExceptionListener;
- _session = _connection.CreateSession();
- _connection.Start();//启动连接
-
- IDestination destination = SessionUtil.GetDestination(_session, _queuesName);
- _consumer = _session.CreateConsumer(destination);
- _consumer.Listener += (IMessage message) =>
- {
- ITextMessage msg = (ITextMessage)message;
- Log.Information("接收消息:" + msg.Text);
- foreach (var key in msg.Properties.Keys.Cast<string>())
- {
- var keyvalue = message.Properties[key];
- if (keyvalue != null && key == "serviceCode")
- {
- var methodcode = Util.GetSettings("ApiUrl:" + keyvalue.ToString());
- if (methodcode != null)
- {
- try
- {
- Log.Information("消费post调用地址:" + Util.GetSettings("ApiUrl:" + keyvalue.ToString()) + "|请求参数:" + msg.Text);
- var res = HttpUtil.PostRequest(Util.GetSettings("ApiUrl:" + keyvalue.ToString()), msg.Text, 30000);
- Log.Information("消费post调用返回:" + res);
- }
- catch (Exception ex)
- {
- Log.Information("Post请求异常:" + ex + "--------------地址:" + methodcode + "----------------请求参数:" + msg.Text);
- }
- }
- Log.Information("接收消息:" + message.Properties[key]);
- }
- }
- };
- Console.WriteLine("消费者启动成功:" + _connection.ClientId);
- }
- catch (Exception ex)
- {
- Log.Information("消费者创建异常:" + ex);
- }
- }
- public void Stop()
- {
-
- _session.Close();
- _connection.Stop();
- _connection.Close();
- _connection.Dispose();
- }
- private void _connection_ExceptionListener(Exception exception)
- {
- //Console.WriteLine("消费者发生异常:{0}", exception);
- Log.Information("消费者发生异常:{0}", exception);
- }
- }
- }
|