这篇文章主要围绕RabbitMQ消息队列和七:适用于云计算集群的远程调用(RPC)展开,旨在为您提供一份详细的参考资料。我们将全面介绍RabbitMQ消息队列的优缺点,解答七:适用于云计算集群的远程调
这篇文章主要围绕RabbitMQ消息队列和七:适用于云计算集群的远程调用(RPC)展开,旨在为您提供一份详细的参考资料。我们将全面介绍RabbitMQ消息队列的优缺点,解答七:适用于云计算集群的远程调用(RPC)的相关问题,同时也会为您带来.Net Core 商城微服务项目系列(七):使用消息队列(RabbitMQ)实现服务异步通信、AMQP协议详解与RabbitMQ,MQ消息队列的应用场景,如何避免消息丢失等消息队列常见问题、C# 封装RabbitMQ消息队列处理、openstack共享组件--rabbitmq消息队列(1)的实用方法。
本文目录一览:- RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)
- .Net Core 商城微服务项目系列(七):使用消息队列(RabbitMQ)实现服务异步通信
- AMQP协议详解与RabbitMQ,MQ消息队列的应用场景,如何避免消息丢失等消息队列常见问题
- C# 封装RabbitMQ消息队列处理
- openstack共享组件--rabbitmq消息队列(1)
RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)
1. 客户端接口 Client interface
为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class。 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞,直到收到RPC运算的结果。代码如下:
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)
2. 回调函数队列 Callback queue
总体来说,在RabbitMQ进行RPC远程调用是比较容易的。client端发送请求的Message,然后server端返回响应结果。为了收到server端响应client端的请求,我们在client端设置publish message时需要提供一个”callback“(回调)的queue地址。code如下:
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='''',
routing_key=''rpc_queue'',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
# ... and some code to read a response message from the callback_queue ...
2.1 Message properties
AMQP 预定义了14个属性。它们中的绝大多很少会用到。以下几个是平时用的比较多的:
- delivery_mode: 持久化一个Message(通过设定值为2)。其他任意值都是非持久化。
- content_type: 描述mime-type 的encoding。比如设置为JSON编码:设置该property为application/json。
- reply_to: 一般用来指明用于回调的queue(Commonly used to name a callback queue)。
- correlation_id: 在请求中关联处理RPC响应(correlate RPC responses with requests)。
3. 相关id Correlation id
一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id
属性,这样客户端在回调队列中根据correlation_id
字段的值就可以分辨此响应属于哪个请求。
在上个小节里,实现方法是对每个RPC请求都会创建一个callback queue。这是不高效的。幸运的是,在这里有一个解决方法:为每个client创建唯一的callback queue。
这又有其他问题了:收到响应后它无法确定是否是它的,因为所有的响应都写到同一个queue了。上一小节的correlation_id在这种情况下就派上用场了:对于每个request,都设置唯一的一个值,在收到响应后,通过这个值就可以判断是否是自己的响应。如果不是自己的响应,就不去处理。
4. 总结

工作流程:
- 当客户端启动时,它创建了匿名的exclusive callback queue.
- 客户端的RPC请求时将同时设置两个properties: reply_to设置为callback queue;correlation_id设置为每个request一个独一无二的值.
- 请求将被发送到an rpc_queue queue.
- RPC端或者说server一直在等待那个queue的请求。当请求到达时,它将通过在reply_to指定的queue回复一个message给client。
- client一直等待callback queue的数据。当message到达时,它将检查correlation_id的值,如果值和它request发送时的一致那么就将返回响应。
5. 最终实现
The code for rpc_client.py:
root@ansible:~/workspace/rabbitmq/sixth_rpc# cat rpc_client.py
# coding:utf-8
import pika
import sys
import uuid
"""
客户端主要作用是发送message,然后通过订阅得到server断返回的消息
"""
class FibonacciRpcClient(object):
def __init__(self):
"""
客户端启动时,创建回调队列,并开启会话 用于发送RPC请求以及接受响应
"""
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=''localhost''))
# 建立一个会话,每个channel代表一个会话任务
self.channel = self.connection.channel()
# durable表示queue的持久化
# channel.queue_declare(queue=''hello'',durable=True)
# 声明回调队列,再次声明的原因,服务器和客户端不知道谁先启动,该声明是幂等性的,多次声明只生效一次
result = self.channel.queue_declare(exclusive=True)
# 回调队列名字
self.callback_queue_name = result.method.queue
# 客户端订阅回调队列,当回调队列中有相应时,调用''on_response''方法对响应进行处理
# 调用回调函数
self.channel.basic_consume(self.on_response,no_ack=True,queue=self.callback_queue_name)
# 定义回调函数,对回调队列中的响应进行处理的函数
def on_response(self,ch,method,props,body):
if self.corr_id == props.correlation_id:
self.reponse = body
def call(self,n):
# 初始化response
self.reponse = None
# 生成correlation_id
self.corr_id = str(uuid.uuid4())
# 发送RPC请求内容到RPC请求队列rpc_queue,同时发送reply_to 和 correlation_id
self.channel.basic_publish(exchange='''',routing_key=''rpc_queue'',
properties=pika.BasicProperties(reply_to=self.callback_queue_name,
correlation_id=self.corr_id,),
body=str(n))
while self.reponse is None:
# 当没有收到consumer 消息时,循环等待consumer的消息然后处理数据,这个是阻塞的
self.connection.process_data_events()
return int(self.reponse)
fibonacci_rpc = FibonacciRpcClient()
# 发送rpc请求
print "[x] requesting fib(30)"
response = fibonacci_rpc.call(30)
print "[.] got %r" %response
The code for rpc_server.py:
root@ansible:~/workspace/rabbitmq/sixth_rpc# cat rpc_server.py
# coding:utf-8
import pika
import time
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host=''127.0.0.1''))
channel = connection.channel()
channel.queue_declare(queue=''rpc_queue'')
# 数据处理方法
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
# 对rpc请求队列中的请求进行处理
def on_request(ch,method,props,body):
n = int(body)
print "[.] fib(%s)" %n
# 调用处理方法
response = fib(n)
# 将处理结果(响应) 发送到回调队列中
ch.basic_publish(exchange='''',routing_key=props.reply_to,
properties = pika.BasicProperties(correlation_id=props.correlation_id),
body = str(response),
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue=''rpc_queue'')
# 循环监听
print "[*] Waiting RPC request..."
channel.start_consuming()
以下是我自己总结RPC代码流程:
- 当客户端启动时,它创建了匿名的exclusive callback queue.
- 客户端的RPC请求时将同时设置两个properties: reply_to设置为callback queue;correlation_id设置为每个request一个独一无二的值.这里的client会等待server端返回的响应(这里的client端会订阅server端的queue(callback queue))
- 请求将被发送到an rpc_queue queue. server端会订阅rpc_queue,等待client端请求过来
- RPC端或者说server一直在等待那个queue的请求。当请求到达时,它将通过在reply_to指定的queue响应一个message给client。
- client一直等待callback queue的数据。当message到达时,它将检查correlation_id的值,如果值和它request发送时的一致那么就将返回响应
执行测试一下吧:
root@ansible:~/workspace/rabbitmq/sixth_rpc# python rpc_server.py
root@ansible:~/workspace/rabbitmq/sixth_rpc# python rpc_client.py
.Net Core 商城微服务项目系列(七):使用消息队列(RabbitMQ)实现服务异步通信
RabbitMQ是什么,怎么使用我就不介绍了,大家可以到园子里搜一下教程。本篇的重点在于实现服务与服务之间的异步通信。
首先说一下为什么要使用消息队列来实现服务通信:1.提高接口并发能力。 2.保证服务各方数据最终一致。 3.解耦。
使用消息队列通信的优点就是直接调用的缺点,比如在直接调用过程中发生未知错误,很可能就会出现数据不一致的问题,这个时候就需要人工修补数据,如果有过这个经历的同学一定是可怜的,人工修补数据简直痛苦!!再比如高并发情况下接口直接挂点,这就更直白了,接口挂了,功能就挂了,事故报告写起来!!而消息队列可以轻松解决上面两个问题,接口发生错误,不要紧,MQ重试一下,再不行,人工重试MQ;在使用消息队列的时候,请求实际是被串行化,简单说就是排队,所以再也不用担心因为并发导致数据不一致或者接口直接挂掉的问题。
我现在公司使用的消息队列排队的请求最高的有上万个,所以完全不需要担心MQ的性能。
OK,我们来实现一下微服务里如何使用消息队列,主要思路是这样的:
【提供消费者注册界面,用于绑定RoutingKey和队列;消息发布后,根据RoutingKey去Redis中查找对应的服务地址,然后异步调用。】
上面这句话就是消息队列的主体思路,也是我司现在使用的方式,话不多说,代码敲起来。
首先看下我们的项目结构:
首先我们需要先建三个这样的类库,这里面有些东西是用不到的,当然最最主要的就是标记出来的消息队列部分,现在暂时提供了两个方法,分别是发布(Publish)和订阅(Subscribe)。
首先新增消息·队列接口类IEventBus,这个将来用于在业务系统中注入使用,这里提供了发布订阅方法:
public interface IEventBus
{
void Publish(string RoutingKey, object Model);
void Subscribe(string QueueName, string RoutingKey);
}
新增RabbitMQ操作接口类IRabbitMQPersistentConnection,这个用来检查RabbitMQ的连接和释放:
public interface IRabbitMQPersistentConnection : IDisposable
{
bool IsConnected { get; }
bool TryConnect();
IModel CreateModel();
}
新增IRabbitMQPersistentConnection的实现类DefaultRabbitMQPersistentConnection,这个是RabbitMQ连接和释放方法的具体实现,这个没什么可说的,大家一看就知道了,就是检查RabbitMQ的连接状态,没有连接创建连接,发生错误的捕捉错误重新连接,这里用到了Polly的重新策略:
public class DefaultRabbitMQPersistentConnection:IRabbitMQPersistentConnection
{
private readonly IConnectionFactory _connectionFactory;
private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
private readonly int _retryCount;
IConnection _connection;
bool _disposed;
object sync_root = new object();
public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_retryCount = retryCount;
}
public bool IsConnected
{
get
{
return _connection != null && _connection.IsOpen && !_disposed;
}
}
public IModel CreateModel()
{
if (!IsConnected)
{
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
}
return _connection.CreateModel();
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try
{
_connection.Dispose();
}
catch (IOException ex)
{
_logger.LogCritical(ex.ToString());
}
}
public bool TryConnect()
{
_logger.LogInformation("RabbitMQ Client is trying to connect");
lock (sync_root)
{
var policy = RetryPolicy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
_logger.LogWarning(ex.ToString());
});
policy.Execute(() =>
{
_connection = _connectionFactory
.CreateConnection();
});
if (IsConnected)
{
_connection.ConnectionShutdown += OnConnectionShutdown;
_connection.CallbackException += OnCallbackException;
_connection.ConnectionBlocked += OnConnectionBlocked;
_logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");
return true;
}
else
{
_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
return false;
}
}
}
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");
TryConnect();
}
void OnCallbackException(object sender, CallbackExceptionEventArgs e)
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");
TryConnect();
}
void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
{
if (_disposed) return;
_logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
TryConnect();
}
}
接下来是最重要,IEventBus的实现类EventBusRabbitMQ,在这个类里我们实现了消息的发布、订阅、消费,首先把代码展示出来,然后一个一个的介绍:
public class EventBusRabbitMQ : IEventBus, IDisposable
{
const string BROKER_NAME = "mi_event_bus";
private readonly IRabbitMQPersistentConnection _persistentConnection;
private readonly ILogger<EventBusRabbitMQ> _logger;
private readonly ILifetimeScope _autofac;
private readonly IApiHelperService _apiHelperService;
private readonly string AUTOFAC_SCOPE_NAME = "mi_event_bus";
private readonly int _retryCount;
private IModel _consumerChannel;
private string _queueName;
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection,ILogger<EventBusRabbitMQ> logger,
ILifetimeScope autofac, IApiHelperService apiHelperService, string queueName=null,int retryCount=5)
{
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_queueName = queueName;
_consumerChannel = CreateConsumerChannel();
_autofac = autofac;
_retryCount = retryCount;
_apiHelperService = apiHelperService;
}
/// <summary>
/// 发布消息
/// </summary>
public void Publish(string routingKey,object Model)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
var policy = RetryPolicy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
_logger.LogWarning(ex.ToString());
});
using (var channel = _persistentConnection.CreateModel())
{
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
var message = JsonConvert.SerializeObject(Model);
var body = Encoding.UTF8.GetBytes(message);
policy.Execute(() =>
{
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; //持久化
channel.BasicPublish(exchange: BROKER_NAME, routingKey: routingKey, mandatory: true, basicProperties: properties, body: body);
});
}
}
/// <summary>
/// 订阅(绑定RoutingKey和队列)
/// </summary>
public void Subscribe(string QueueName, string RoutingKey)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueBind(queue: QueueName, exchange: BROKER_NAME, routingKey: RoutingKey);
}
}
/// <summary>
/// 创建消费者并投递消息
/// </summary>
/// <returns></returns>
private IModel CreateConsumerChannel()
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
var channel = _persistentConnection.CreateModel();
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
await ProcessEvent(ea.RoutingKey, message);
channel.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
channel.CallbackException += (sender, ea) =>
{
_consumerChannel.Dispose();
_consumerChannel = CreateConsumerChannel();
};
return channel;
}
/// <summary>
/// 发送MQ数据到指定服务接口
/// </summary>
private async Task ProcessEvent(string routingKey, string message)
{
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{
//获取绑定该routingKey的服务地址集合
var subscriptions = await StackRedis.Current.GetAllList(routingKey);
foreach(var apiUrl in subscriptions)
{
_logger.LogInformation(message);
await _apiHelperService.PostAsync(apiUrl, message);
}
}
}
public void Dispose()
{
_consumerChannel?.Dispose();
}
}
首先是发布方法,接受一个字符串类型的RoutingKey和Object类型的MQ数据,然后根据RoutingKey将数据发布到指定的队列,这里RoutingKey发布到队列的方式用的是direct模式,生产环境下我们通常会使用Topic模式,后面真正使用的时候这里也会改掉;同时在MQ发布方面也采用了Polly的重试策略。
接下来是订阅Subscribe方法,这个比较简单,就是包RoutingKey和Queue进行绑定,这里会提供一个专门的注册界面,用于配置RoutingKey、Queue、ExChange和服务接口地址之间的对应关系,用的就是这个方法。
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueBind(queue: QueueName, exchange: BROKER_NAME, routingKey: RoutingKey);
}
然后是消费者的创建和消费方式方法CreateConsumerChannel,这个是最重要一个,在这个方法里真正实现了消息的消费,消息的消费通过委托实现,我们需要关注的是下面这个地方:
var channel = _persistentConnection.CreateModel();
channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
await ProcessEvent(ea.RoutingKey, message);
channel.BasicAck(ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
解释下这段代码,首先创建消息通道,并为它绑定交换器Exchange和队列Queue,然后在这条消息通道上创建消费者Consumer,为这个消费者的接受消息的委托注册一个处理方法。
当消息被路由到当前队列Queue上时,就会触发这个消息的处理方法,处理完成后,自动发送ack确认。
ProcessEvent是消息的具体处理方法,大体流程是这样的,它接受一个RoutingKey和消息数据message,根据RoutingKey从Redis中拿到对应的服务地址,我们前面说过会有一个专门的页面用于绑定RoutingKey和服务地址的关系,拿到地址集合之后循环调用,即Api调用。
/// <summary>
/// 发送MQ到指定服务接口
/// </summary>
private async Task ProcessEvent(string routingKey, string message)
{
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{
//获取绑定该routingKey的服务地址集合
var subscriptions = await StackRedis.Current.GetAllList(routingKey);
foreach(var apiUrl in subscriptions)
{
_logger.LogInformation(message);
await _apiHelperService.PostAsync(apiUrl, message);
}
}
}
这里用到了Api调用的帮助类,前面已经写过了,只不过把它放到了这个公共的地方,还是贴下代码:
public interface IApiHelperService
{
Task<T> PostAsync<T>(string url, object Model);
Task<T> GetAsync<T>(string url);
Task PostAsync(string url, string requestMessage);
}
public class ApiHelperService : IApiHelperService
{
private readonly IHttpClientFactory _httpClientFactory;
private readonly ILogger<ApiHelperService> _logger;
public ApiHelperService(ILogger<ApiHelperService> _logger, IHttpClientFactory _httpClientFactory)
{
this._httpClientFactory = _httpClientFactory;
this._logger = _logger;
}
/// <summary>
/// HttpClient实现Post请求
/// </summary>
public async Task<T> PostAsync<T>(string url, object Model)
{
var http = _httpClientFactory.CreateClient("MI.Web");
//添加Token
var token = await GetToken();
http.SetBearerToken(token);
//使用FormUrlEncodedContent做HttpContent
var httpContent = new StringContent(JsonConvert.SerializeObject(Model), Encoding.UTF8, "application/json");
//await异步等待回应
var response = await http.PostAsync(url, httpContent);
//确保HTTP成功状态值
response.EnsureSuccessStatusCode();
//await异步读取
string Result = await response.Content.ReadAsStringAsync();
var Item = JsonConvert.DeserializeObject<T>(Result);
return Item;
}
/// <summary>
/// HttpClient实现Post请求(用于MQ发布功能 无返回)
/// </summary>
public async Task PostAsync(string url, string requestMessage)
{
var http = _httpClientFactory.CreateClient();
//添加Token
var token = await GetToken();
http.SetBearerToken(token);
//使用FormUrlEncodedContent做HttpContent
var httpContent = new StringContent(requestMessage, Encoding.UTF8, "application/json");
//await异步等待回应
var response = await http.PostAsync(url, httpContent);
//确保HTTP成功状态值
response.EnsureSuccessStatusCode();
}
/// <summary>
/// HttpClient实现Get请求
/// </summary>
public async Task<T> GetAsync<T>(string url)
{
var http = _httpClientFactory.CreateClient("MI.Web");
//添加Token
var token = await GetToken();
http.SetBearerToken(token);
//await异步等待回应
var response = await http.GetAsync(url);
//确保HTTP成功状态值
response.EnsureSuccessStatusCode();
var Result = await response.Content.ReadAsStringAsync();
var Items = JsonConvert.DeserializeObject<T>(Result);
return Items;
}
/// <summary>
/// 转换URL
/// </summary>
/// <param name="str"></param>
/// <returns></returns>
public static string UrlEncode(string str)
{
StringBuilder sb = new StringBuilder();
byte[] byStr = System.Text.Encoding.UTF8.GetBytes(str);
for (int i = 0; i < byStr.Length; i++)
{
sb.Append(@"%" + Convert.ToString(byStr[i], 16));
}
return (sb.ToString());
}
//获取Token
//获取Token
public async Task<string> GetToken()
{
var client = _httpClientFactory.CreateClient("MI.Web");
string token = await Untity.StackRedis.Current.Get("ApiToken");
if (!string.IsNullOrEmpty(token))
{
return token;
}
try
{
//DiscoveryClient类:IdentityModel提供给我们通过基础地址(如:http://localhost:5000)就可以访问令牌服务端;
//当然可以根据上面的restful api里面的url自行构建;上面就是通过基础地址,获取一个TokenClient;(对应restful的url:token_endpoint "http://localhost:5000/connect/token")
//RequestClientCredentialsAsync方法:请求令牌;
//获取令牌后,就可以通过构建http请求访问API接口;这里使用HttpClient构建请求,获取内容;
var cache = new DiscoveryCache("http://localhost:7000");
var disco = await cache.GetAsync();
if (disco.IsError) throw new Exception(disco.Error);
var tokenResponse = await client.RequestClientCredentialsTokenAsync(new ClientCredentialsTokenRequest
{
Address = disco.TokenEndpoint,
ClientId = "MI.Web",
ClientSecret = "miwebsecret",
Scope = "MI.Service"
});
if (tokenResponse.IsError)
{
throw new Exception(tokenResponse.Error);
}
token = tokenResponse.AccessToken;
await Untity.StackRedis.Current.Set("ApiToken", token, (int)TimeSpan.FromSeconds(tokenResponse.ExpiresIn).TotalMinutes);
}
catch (Exception ex)
{
throw new Exception(ex.Message);
}
return token;
}
}
然后Redis帮助类的代码也贴一下,Redis这里大家可以根据自己习惯,如何使用没什么区别:
public class StackRedis : IDisposable
{
#region 配置属性 基于 StackExchange.Redis 封装
//连接串 (注:IP:端口,属性=,属性=)
//public string _ConnectionString = "47.99.92.76:6379,password=shenniubuxing3";
public string _ConnectionString = "47.99.92.76:6379";
//操作的库(注:默认0库)
public int _Db = 0;
#endregion
#region 管理器对象
/// <summary>
/// 获取redis操作类对象
/// </summary>
private static StackRedis _StackRedis;
private static object _locker_StackRedis = new object();
public static StackRedis Current
{
get
{
if (_StackRedis == null)
{
lock (_locker_StackRedis)
{
_StackRedis = _StackRedis ?? new StackRedis();
return _StackRedis;
}
}
return _StackRedis;
}
}
/// <summary>
/// 获取并发链接管理器对象
/// </summary>
private static ConnectionMultiplexer _redis;
private static object _locker = new object();
public ConnectionMultiplexer Manager
{
get
{
if (_redis == null)
{
lock (_locker)
{
_redis = _redis ?? GetManager(_ConnectionString);
return _redis;
}
}
return _redis;
}
}
/// <summary>
/// 获取链接管理器
/// </summary>
/// <param name="connectionString"></param>
/// <returns></returns>
public ConnectionMultiplexer GetManager(string connectionString)
{
return ConnectionMultiplexer.Connect(connectionString);
}
/// <summary>
/// 获取操作数据库对象
/// </summary>
/// <returns></returns>
public IDatabase GetDb()
{
return Manager.GetDatabase(_Db);
}
#endregion
#region 操作方法
#region string 操作
/// <summary>
/// 根据Key移除
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public async Task<bool> Remove(string key)
{
var db = this.GetDb();
return await db.KeyDeleteAsync(key);
}
/// <summary>
/// 根据key获取string结果
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public async Task<string> Get(string key)
{
var db = this.GetDb();
return await db.StringGetAsync(key);
}
/// <summary>
/// 根据key获取string中的对象
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key"></param>
/// <returns></returns>
public async Task<T> Get<T>(string key)
{
var t = default(T);
try
{
var _str = await this.Get(key);
if (string.IsNullOrWhiteSpace(_str)) { return t; }
t = JsonConvert.DeserializeObject<T>(_str);
}
catch (Exception ex) { }
return t;
}
/// <summary>
/// 存储string数据
/// </summary>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="expireMinutes"></param>
/// <returns></returns>
public async Task<bool> Set(string key, string value, int expireMinutes = 0)
{
var db = this.GetDb();
if (expireMinutes > 0)
{
return db.StringSet(key, value, TimeSpan.FromMinutes(expireMinutes));
}
return await db.StringSetAsync(key, value);
}
/// <summary>
/// 存储对象数据到string
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key"></param>
/// <param name="value"></param>
/// <param name="expireMinutes"></param>
/// <returns></returns>
public async Task<bool> Set<T>(string key, T value, int expireMinutes = 0)
{
try
{
var jsonOption = new JsonSerializerSettings()
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
};
var _str = JsonConvert.SerializeObject(value, jsonOption);
if (string.IsNullOrWhiteSpace(_str)) { return false; }
return await this.Set(key, _str, expireMinutes);
}
catch (Exception ex) { }
return false;
}
#endregion
#region List操作(注:可以当做队列使用)
/// <summary>
/// list长度
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key"></param>
/// <returns></returns>
public async Task<long> GetListLen<T>(string key)
{
try
{
var db = this.GetDb();
return await db.ListLengthAsync(key);
}
catch (Exception ex) { }
return 0;
}
/// <summary>
/// 获取队列出口数据并移除
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key"></param>
/// <returns></returns>
public async Task<T> GetListAndPop<T>(string key)
{
var t = default(T);
try
{
var db = this.GetDb();
var _str = await db.ListRightPopAsync(key);
if (string.IsNullOrWhiteSpace(_str)) { return t; }
t = JsonConvert.DeserializeObject<T>(_str);
}
catch (Exception ex) { }
return t;
}
/// <summary>
/// 集合对象添加到list左边
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key"></param>
/// <param name="values"></param>
/// <returns></returns>
public async Task<long> SetLists<T>(string key, List<T> values)
{
var result = 0L;
try
{
var jsonOption = new JsonSerializerSettings()
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
};
var db = this.GetDb();
foreach (var item in values)
{
var _str = JsonConvert.SerializeObject(item, jsonOption);
result += await db.ListLeftPushAsync(key, _str);
}
return result;
}
catch (Exception ex) { }
return result;
}
/// <summary>
/// 单个对象添加到list左边
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="key"></param>
/// <param name="value"></param>
/// <returns></returns>
public async Task<long> SetList<T>(string key, T value)
{
var result = 0L;
try
{
result = await this.SetLists(key, new List<T> { value });
}
catch (Exception ex) { }
return result;
}
/// <summary>
/// 获取List所有数据
/// </summary>
public async Task<List<string>> GetAllList(string list)
{
var db = this.GetDb();
var redisList = await db.ListRangeAsync(list);
List<string> listMembers = new List<string>();
foreach (var item in redisList)
{
listMembers.Add(JsonConvert.DeserializeObject<string>(item));
}
return listMembers;
}
#endregion
#region 额外扩展
/// <summary>
/// 手动回收管理器对象
/// </summary>
public void Dispose()
{
this.Dispose(_redis);
}
public void Dispose(ConnectionMultiplexer con)
{
if (con != null)
{
con.Close();
con.Dispose();
}
}
#endregion
#endregion
}
OK,核心代码部分介绍到这里,具体来看怎么使用,推送当前类库到自己的Nuget包,不知道怎么建Nuget服务器的可以看下我之前的那篇文章。
打开MI.Web项目,在Startup中注册RabbitMQ的相关信息:
/// <summary>
/// 消息总线RabbitMQ
/// </summary>
private void RegisterEventBus(IServiceCollection services)
{
#region 加载RabbitMQ账户
services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
};
if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
{
factory.UserName = Configuration["EventBusUserName"];
}
if (!string.IsNullOrEmpty(Configuration["EventBusPassword"]))
{
factory.Password = Configuration["EventBusPassword"];
}
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});
#endregion
var subscriptionClientName = Configuration["SubscriptionClientName"];
services.AddSingleton<IEventBus, EventBusRabbitMQ.EventBusRabbitMQ>(sp =>
{
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ.EventBusRabbitMQ>>();
var apiHelper = sp.GetRequiredService<IApiHelperService>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
{
retryCount = int.Parse(Configuration["EventBusRetryCount"]);
}
return new EventBusRabbitMQ.EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, apiHelper, subscriptionClientName, retryCount);
});
}
这里暂时还没做出专门用于注册RoutingKey的界面,所以暂时用在这里用方法注册下,后面再修改,这里的RoutingKey用于用户注册使用:
//绑定RoutingKey与队列
private void ConfigureEventBus(IApplicationBuilder app)
{
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe(Configuration["SubscriptionClientName"], "UserRegister");
}
上面用的都是appsettings.json里的配置,贴下代码,标蓝的部分是需要用到的:
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Warning"
}
},
"ConnectionStrings": {
"ElasticSearchServerAddress": "",
"Redis": "47.99.92.76:6379"
},
"ServiceAddress": {
"Service.Identity": "http://localhost:7000",
"Service.Account": "http://localhost:7001",
"Service.Ocelot": "http://localhost:7003",
"Service.Picture": "http://localhost:7005"
},
"MehtodName": {
"Account.MiUser.SSOLogin": "/Account/MiUser/SSOLogin", //登录
"Identity.Connect.Token": "/connect/token", //获取token
"Picture.QueryPicture.QueryStartProduct": "/Picture/QueryPicture/QueryStartProduct", //查询明星产品
"Picture.QueryPicture.QuerySlideImg": "/Picture/QueryPicture/QuerySlideImg", //查询轮播图
"Picture.QueryPicture.QueryHadrWare": "/Picture/QueryPicture/QueryHadrWare" //查询智能硬件表数据
},
"EventBusConnection": "******", //RabbitMQ地址
"EventBusUserName": "guest",
"EventBusPassword": "guest",
"EventBusRetryCount": 5,
"SubscriptionClientName": "RabbitMQ_Bus_MI"
}
OK,配置部分算是完成了,接下我们就要去发送MQ了,我们这里使用IEventBus对象调用发布方法,用于发送用户的注册信息,最终最调用新增用户接口:
private readonly IEventBus _eventBus;
public LoginController(IEventBus _eventBus)
{
this._eventBus = _eventBus;
}
public JsonResult RegisterUser(string UserName, string UserPwd)
{
try
{
if (!string.IsNullOrEmpty(UserName) && !string.IsNullOrEmpty(UserPwd))
{
RegisterRequest request = new RegisterRequest
{
UserName = UserName,
Password = UserPwd
};
_eventBus.Publish("UserRegister", request);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "注册失败!");
}
return Json("");
}
最终会新增当前传入的用户信息。
当然,这不是消息队列的最终使用方式,后面会继续修改,这里的问题在于发布和消费都耦合再了业务层,对于业务系统来说这是一种负担,举个例子,我们公司当前队列消息多的能达到上百万个,如果把消息的消费和业务系统放在一起可能会影响,所以使用的时候会把消费端单独拿出来做成Windows服务,并添加自动重试和补偿机制,毕竟RabbitMQ也不是没有错误的,比如调用Api出现问题,迟迟无法返回ack确认,这个时候就会报出 wait ack timeout的错误。
OK,今天先到这里,我去煮包泡面吃。。。
AMQP协议详解与RabbitMQ,MQ消息队列的应用场景,如何避免消息丢失等消息队列常见问题
什么是AMQP?
在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,
AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。
AMQP 中包含的主要元素
生产者(Producer):向Exchange发布消息的应用。
消费者(Consumer):从消息队列queue中消费消息的应用。
消息队列(Message Queue):服务器组件,用于保存消息,直到发送给消费者。
Queue:消息载体;每个消息都会被投入到一个或多个队列。
消息(Message):传输的内容。
交换器(exchange):路由组件,接收Producer发送的消息,并根据Routing Key转发给消息队列queue。
Routing Key:路由关键字,exchange根据这个Routing Key进行消息投递到队列queue。
虚拟主机(Virtual Host): 用作不同用户的权限分离;一批交换器,消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的独立服务器域。vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的权限的 vhost 中)
Broker :AMQP的服务端称为Broker。
连接(Connection):一个网络连接,比如TCP/IP套接字连接;应用程序与Rabbit之间建立连接的管理器,程序代码中使用ConnectionFactory(连接管理器)。
信道(Channel):消息通道,在客户端的每个Connection连接里,可建立多个channel,每个channel代表一个会话任务;多路复用连接中的一条独立的双向数据流通道,为会话提供物理传输介质。
绑定器(Binding):把exchange和queue按照路由规则绑定起来。
exchange 与 Queue 的路由机制
生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中;
生产者发消息不需要指定Queue,消费者可以指定Queue绑定到某个RoutingKey和某个Exchange,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息。
Exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange :
1. Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
2. Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding的所有Queues中去,其实是一种广播行为。
3. topic exchange 通配路由,是direct exchange的通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue。
TopicExchange的匹配符号:
#:匹配多个词
*: 匹配一个词
需要注意的一点只有queue具有保存消息的功能,exchange不能保存消息。
RabbitMQ中一个核心的原则是,消息不能直接投递到Queue中。Producer只能将自己的消息投递到Exchange中,由Exchange按照路由规则将消息投递到对应的Queue中。
在Consumer中,声明自己对哪个Exchange感兴趣,并将自己的Queue绑定到自己感兴趣的路由关键字上,建立相应的映射关系;第二,在Producer中,将消息投递一个Exchange中,并指明它的路由关键字。
AMQP 如何实现通信的
(1)建立连接Connection。由producer和consumer分别连接到broker的物理节点上。
(2)建立消息Channel。Channel是建立在Connection之上的,一个Connection可以建立多个Channel;producer连接Virtual Host 建立Channel,Consumer连接到相应的queue上建立Channel。
(3)发送消息。由Producer发送消息到Broker中的exchange中。
(4)路由转发。exchange收到消息后,根据一定的路由策略routing key,将消息转发到相应的queue中去。
(5)消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。
(6)消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。 至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性。
消息队列的使用大概过程
(1)客户端连接Connection到消息队列服务器Broker,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
RabbitMQ中 exchange、route、queue的关系
MessageQueue、Exchange和Binding构成了AMQP协议的核心。
声明MessageQueue
在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:
a)消费者是无法订阅或者获取不存在的MessageQueue中信息。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。
(重点) 这里有一点需要明确:
- 如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的,所以一个队列如果已经存在了,比如消费者如果再次尝试建立已存在的队列,是无效的
- 比如,你通过SpringBoot程序已经建立了一个queueA,再通过另外一个SpringBoot程序想要更改其queue属性,比如设置队列持久化durable=="true",就再次建立了一个queueA设置属性,是无效的
如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:
a) Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
b) Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
c) Durable:持久化。
d) 其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。
exchange 与 Queue 的路由机制
生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中,所以发消息不需要指定Queue,似乎消费者可以指定Queue绑定到某个RoutingKey和某个Exchange,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息。
exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。
Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding的所有Queues中去,其实是一种广播行为。
topic exchange 通配路由,是direct exchange的通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue。
需要注意的一点只有queue具有保存消息的功能,exchange不能保存消息。
AMQP的应用场景
AMQP是实现消息机制的一种协议,消息队列主要有以下几种应用场景:
异步处理
应用解耦
死信队列
分布式事务
流量缓冲
日志处理
SpringBoot+RabbitMQ的简单demo
https://www.cnblogs.com/theRhyme/p/10071781.html
RabbitMQ死信队列的应用场景和代码实现
https://www.cnblogs.com/theRhyme/p/10874409.html
RabbitMQ延迟队列代码实现和应用场景
场景: 订单下单30min如果没有付款就删除该订单
通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列(重定向队列),实现延迟功能;
使用 rabbitmq_delayed_message_exchange 插件实现延迟功能。
代码:https://www.cnblogs.com/theRhyme/p/10986409.html
rabbitmq 怎么避免消息丢失?
- 生产者Confirm机制(异步,推荐)或是事务方式(同步,不推荐)
- MQ服务端将消息持久化
- 消费者给MQ回复ACK,确认机制
- MQ服务端设置集群镜像模式
- 消费者消费消息补偿机制(如死信队列)
如果生产者弄丢了数据
RabbitMQ 生产者将数据发送到 RabbitMQ 的时候,可能数据在网络传输中搞丢了,这个时候 RabbitMQ 收不到消息,消息就丢了。
RabbitMQ 提供了两种方式来解决这个问题:
事务方式:在生产者发送消息之前,通过`channel.txSelect`开启一个事务,接着发送消息。
如果消息没有成功被 RabbitMQ 接收到,生产者会收到异常,此时就可以进行事务回滚`channel.txRollback`,然后重新发送。假如 RabbitMQ 收到了这个消息,就可以提交事务`channel.txCommit`。
但是这样一来,生产者的吞吐量和性能都会降低很多,现在一般不这么干。
另外一种方式就是通过 Confirm 机制:这个 Confirm 模式是在生产者那里设置的,就是每次发消息的时候会分配一个唯一的 ID,然后 RabbitMQ服务端 收到之后会回传一个 ACK,告诉生产者这个消息 OK 了。
如果 RabbitMQ 没有处理到这个消息,那么就回调一个 Nack 的接口,这个时候生产者就可以重发。
事务机制和 Confirm 机制最大的不同在于事务机制是同步的,提交一个事务之后会阻塞在那儿。
但是 Confirm 机制是异步的,发送一个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。
所以一般在生产者这块避免数据丢失,都是用 Confirm 机制的。
要保证消息持久化成功的条件有哪些?
- 声明队列必须设置持久化 durable 设置为 true.
- 消息推送投递模式必须设置持久化,deliveryMode 设置为 2(持久)。
- 消息已经到达持久化交换器。
- 消息已经到达持久化队列。
以上四个条件都满足才能保证消息持久化成功。
rabbitmq 持久化有什么缺点?
持久化的缺点就是降低了服务器的吞吐量,因为使用的是磁盘而非内存存储,从而降低了吞吐量。可尽量使用 ssd 硬盘来缓解吞吐量的问题。
RabbitMQ如何保证同一个队列中的消息被顺序消费?
TODO待写
来源:
https://blog.csdn.net/letempsar/article/details/52565020
https://blog.csdn.net/ztx114/article/details/78410727
https://www.cnblogs.com/linkenpark/p/5393666.html
http://techblog.ppdai.com/2018/07/17/20180717/
https://www.toutiao.com/a6698312611185820171/?timestamp=1559696015&app=news_article&group_id=6698312611185820171&tdsourcetag=s_pctim_aiomsg&req_id=2019060508533401002506701591332CD
https://mp.weixin.qq.com/s?__biz=MjM5ODI5Njc2MA==&mid=2655825391&idx=1&sn=f7523195ff08a51085012c736bc002a8&chksm=bd74e0388a03692e49ca3967a03dd2e8164e02741a75cabc8bd56e5b70ba6f2cc19f6fe10fd2&scene=0&xtrack=1&key=1c855a3d2871be72b53c28efecdb6c847aa5d9daffdac4207cc93d6a62948c3ac03b6e8813a35eaa72a54f7668de41b31fb1265ff3066312574ca210769ad2b726b9932dd21a296f9ea91fd6cf367dd7&ascene=1&uin=ODEzMzE3OTc%3D&devicetype=Windows+10&version=62060833&lang=zh_CN&pass_ticket=ZLGuBJ0cY2BIuQpqK%2Be08dQVFm3Htt7htVVelbWP8XE%3D
C# 封装RabbitMQ消息队列处理
现在使用.net领域使用RabbitMQ有很多解决方案,我自己使用过的有两个,一个是EasyNetQ,一个是CAP,都挺好用的,尤其是CAP,懒人推荐使用,怎么使用的文章也很多,小伙伴可以自行搜索。
最近我自己尝试根据目前手头项目的需要,自行封装一下基于RabbitMQ的使用,下面开搞,贴上我自己写的代码。
首先定义消息发布者/生产者接口:
1 using System.Threading.Tasks;
2
3 namespace fx.MQ
4 {
5 public interface IPublisher
6 {
7 /// <summary>
8 /// 释放资源。
9 /// </summary>
10 void Dispose();
11 /// <summary>
12 ///
13 /// </summary>
14 /// <typeparam name="T"></typeparam>
15 /// <param name="message"></param>
16 void Publish<T>(T message) where T : class;
17 /// <summary>
18 ///
19 /// </summary>
20 /// <param name="message"></param>
21 /// <param name="channelName"></param>
22 void Publish(string message, string channelName);
23 /// <summary>
24 ///
25 /// </summary>
26 /// <typeparam name="T"></typeparam>
27 /// <param name="message"></param>
28 /// <returns></returns>
29 Task PublishAsync<T>(T message) where T : class;
30 }
31 }
定义订阅者/消费者接口:
1 using System;
2 using System.Threading.Tasks;
3
4 namespace fx.MQ
5 {
6 public interface ISubscriber
7 {
8 /// <summary>
9 ///
10 /// </summary>
11 void Dispose();
12 /// <summary>
13 ///
14 /// </summary>
15 /// <typeparam name="T"></typeparam>
16 /// <param name="channelName"></param>
17 /// <returns></returns>
18 void Subscribe(string channelName, Action<string> callback);
19 /// <summary>
20 ///
21 /// </summary>
22 /// <typeparam name="T"></typeparam>
23 /// <param name="channelName"></param>
24 /// <returns></returns>
25 Task<T> SubscribeAsync<T>(string channelName) where T : class;
26 }
27 }
定义RabbmitMQProvider
1 using RabbitMQ.Client;
2 using System;
3 using System.Collections.Generic;
4 using System.Text;
5
6 namespace fx.MQ
7 {
8 public class RabbitMQProvider
9 {
10 private readonly string _ipAddress;
11 private readonly int? _port;
12 private readonly string _username;
13 private readonly string _password;
14
15 public RabbitMQProvider(string ipAddress, int? port, string username, string password)
16 {
17 _ipAddress = ipAddress ?? throw new ArgumentException("IP地址不能为空!");
18 _port = port ?? throw new ArgumentException("端口不能为空");
19 _username = username ?? throw new ArgumentException("用户名不能为空");
20 _password = password ?? throw new ArgumentException("密码不能为空");
21
22 ConnectionFactory = new ConnectionFactory//创建连接工厂对象
23 {
24 HostName = _ipAddress,//IP地址
25 Port = (int)_port,//端口号
26 UserName = _username,//用户账号
27 Password = _password//用户密码
28 };
29 }
30
31 public IConnectionFactory ConnectionFactory { get; }
32
33 }
34 }
实现生产者:
1 using Newtonsoft.Json;
2 using RabbitMQ.Client;
3 using System;
4 using System.Text;
5 using System.Threading.Tasks;
6
7 namespace fx.MQ
8 {
9 /// <summary>
10 /// 消息发布者。
11 /// </summary>
12 public class RabbitMQPublisher : IPublisher
13 {
14
15 private readonly RabbitMQProvider _provider;
16 private IConnection _connection;
17 public RabbitMQPublisher(RabbitMQProvider provider)
18 {
19 _provider = provider;
20 _connection = _provider.ConnectionFactory.CreateConnection();
21 }
22
23 public IConnection Connection
24 {
25 get
26 {
27 if (_connection != null)
28 return _connection;
29 return _connection = _provider.ConnectionFactory.CreateConnection();
30 }
31 }
32
33 private IModel _channel;
34 public IModel Channel
35 {
36 get
37 {
38 if (_channel != null)
39 return _channel;
40 else
41 return _channel = _connection.CreateModel();
42 }
43 }
44
45 public void Dispose()
46 {
47 if (Channel != null)
48 {
49 if (Channel.IsOpen)
50 Channel.Close();
51 Channel.Abort();
52 Channel.Dispose();
53 }
54
55 if (Connection != null)
56 {
57 if (Connection.IsOpen)
58 Connection.Close();
59 }
60 }
61
62 public void Publish<T>(T message) where T : class
63 {
64 var channelName = typeof(T).Name;
65 Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null);
66
67 var msgContent = JsonConvert.SerializeObject(message);
68 var msgByte = Encoding.UTF8.GetBytes(msgContent);
69 Channel.BasicPublish
70 (
71 exchange: channelName,
72 routingKey: string.Empty,
73 mandatory: false,
74 basicProperties: null,
75 body: msgByte
76 );
77 }
78
79
80 public void Publish(string message, string channelName)
81 {
82 Channel.ExchangeDeclare(exchange: channelName, type: "fanout", durable: false, autoDelete: false, null);
83
84 var msgByte = Encoding.UTF8.GetBytes(message);
85 Channel.BasicPublish
86 (
87 exchange: channelName,
88 routingKey: string.Empty,
89 mandatory: false,
90 basicProperties: null,
91 body: msgByte
92 );
93 }
94
95 public Task PublishAsync<T>(T message) where T : class
96 {
97 throw new NotImplementedException();
98 }
99 }
100 }
实现消费者:
1 using RabbitMQ.Client;
2 using RabbitMQ.Client.Events;
3 using System;
4 using System.Collections.Generic;
5 using System.Text;
6 using System.Threading.Tasks;
7
8 namespace fx.MQ
9 {
10 /// <summary>
11 /// 消息订阅者/消费者。
12 /// </summary>
13 public class RabbitMQSubscriber : ISubscriber
14 {
15 private readonly RabbitMQProvider _provider;
16 private IConnection _connection;
17 public RabbitMQSubscriber(RabbitMQProvider provider)
18 {
19 _provider = provider;
20 _connection = _provider.ConnectionFactory.CreateConnection();
21 }
22
23 public IConnection Connection
24 {
25 get
26 {
27 if (_connection != null)
28 return _connection;
29 return _connection = _provider.ConnectionFactory.CreateConnection();
30 }
31 }
32
33 private IModel _channel;
34 public IModel Channel
35 {
36 get
37 {
38 if (_channel != null)
39 return _channel;
40 else
41 return _channel = _connection.CreateModel();
42 }
43 }
44
45
46 public void Dispose()
47 {
48 if (_channel != null)
49 {
50 _channel.Abort();
51 if (_channel.IsOpen)
52 _channel.Close();
53
54 _channel.Dispose();
55 }
56
57 if (_connection != null)
58 {
59 if (_connection.IsOpen)
60 _connection.Close();
61
62 _connection.Dispose();
63 }
64 }
65
66 /// <summary>
67 /// 消费消息,并执行回调。
68 /// </summary>
69 /// <param name="channelName"></param>
70 /// <param name="callback"></param>
71 public void Subscribe(string channelName, Action<string> callback)
72 {
73 //声明交换机
74 Channel.ExchangeDeclare(exchange: channelName, type: "fanout");
75 //消息队列名称
76 var queueName = channelName + "_" + Guid.NewGuid().ToString().Replace("-", "");
77 //声明队列
78 Channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
79 //将队列与交换机进行绑定
80 Channel.QueueBind(queue: queueName, exchange: channelName, routingKey: "");
81 //声明为手动确认,每次只消费1条消息。
82 Channel.BasicQos(0, 1, false);
83 //定义消费者
84 var consumer = new EventingBasicConsumer(Channel);
85 //接收事件
86 consumer.Received += (eventSender, args) =>
87 {
88 var message = args.Body;//接收到的消息
89
90 callback(Encoding.UTF8.GetString(message));
91 //返回消息确认
92 Channel.BasicAck(args.DeliveryTag, true);
93 };
94 //开启监听
95 Channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
96
97 }
98
99 public Task<T> SubscribeAsync<T>(string channelName) where T : class
100 {
101 throw new NotImplementedException();
102 }
103 }
104 }
到这里为止,简单的实现消息队列的接受,发送,已经满足我自己当前项目的需要了。这里我用的exchange进行消息队列的生产消费,并且用fanout模式,就是一个生产者对应多个消费者,有点类似于消息广播,另外还有两种模式,可以根据需要修改。
下面是测试代码:
1 using System;
2 using System.Windows.Forms;
3
4 namespace fx.MQ.TestForm
5 {
6 public partial class Form1 : Form
7 {
8 private readonly RabbitMQProvider _provider;
9 private readonly RabbitMQPublisher _publisher;
10 private readonly RabbitMQSubscriber _subscriber;
11 delegate void Callback(string msg);
12
13 public Form1()
14 {
15 _provider = new RabbitMQProvider("192.168.101.199", 5672, "admin", "admin");
16 _publisher = new RabbitMQPublisher(_provider);
17 _subscriber = new RabbitMQSubscriber(_provider);
18 //callback = new Callback(ShowMessage);
19 InitializeComponent();
20 }
21
22 private void button1_Click(object sender, EventArgs e)
23 {
24 _publisher.Publish(textBox1.Text, "public");
25 }
26
27 private void Form1_Load(object sender, EventArgs e)
28 {
29
30 _subscriber.Subscribe("public", c=> {
31 ShowMessage(c);
32 });
33 }
34
35
36 private void ShowMessage(string msg)
37 {
38 if (this.richTextBox1.InvokeRequired)
39 {
40 var cb = new Callback(ShowMessage);
41 this.Invoke(cb, new object[] { msg });
42 }
43 else
44 {
45 this.richTextBox1.Text = msg;
46 }
47 }
48 }
49 }
运行效果如图所示:
OK,没有问题。
另外注意,退出程序时消息发布者和订阅者都需要Dispose()来释放连接。
openstack共享组件--rabbitmq消息队列(1)
一、MQ 全称为 Message Queue, 消息队列( MQ )
是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。
消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。
排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。udp
二、AMQP 即 Advanced Message Queuing Protocol
高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件(用于两个或多个软件之间的软件)主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布 / 订阅)、可靠性、安全。通过rabbitmq实现
三、 Rabbitmq概念:
属于一个流行的开源消息队列系统。属于AMQP( 高级消息队列协议 ) 标准的一个 实现。是应用层协议的一个开放标准,为面向消息的中间件设计。用于在分布式系统中存储转发消息,在 易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ特点:
使用Erlang编写
支持持久化
支持HA
提供C# , erlang,java,perl,python,ruby等的client开发端
四、什么是耦合、解耦合
一、耦合
1、耦合是指两个或两个以上的体系或两种运动形式间通过相互作用而彼此影响以至联合起来的现象。
2、在软件工程中,对象之间的耦合度就是对象之间的依赖性。对象之间的耦合越高,维护成本越高,因此对象的设计应使类和构件之间的耦合最小。
3、分类:有软硬件之间的耦合,还有软件各模块之间的耦合。耦合性是程序结构中各个模块之间相互关联的度量。它取决于各个模块之间的接口的复杂程度、调用模块的方式以及哪些信息通过接口。
二、解耦
1、解耦,字面意思就是解除耦合关系。
2、在软件工程中,降低耦合度即可以理解为解耦,模块间有依赖关系必然存在耦合,理论上的绝对零耦合是做不到的,但可以通过一些现有的方法将耦合度降至最低。
3、设计的核心思想:尽可能减少代码耦合,如果发现代码耦合,就要采取解耦技术。让数据模型,业务逻辑和视图显示三层之间彼此降低耦合,把关联依赖降到最低,而不至于牵一发而动全身。原则就是A功能的代码不要写在B的功能代码中,如果两者之间需要交互,可以通过接口,通过消息,甚至可以引入框架,但总之就是不要直接交叉写。
五、RabbitMQ中的概念名词
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字, exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
六、RabbitMQ工作原理
MQ 是消费 - 生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。 MQ 则是遵循了 AMQP协议的具体实现和产品。在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
( 1)客户端连接到消息队列服务器,打开一个channel。
( 2)客户端声明一个exchange,并设置相关属性。
( 3)客户端声明一个queue,并设置相关属性。
( 4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
( 5)客户端投递消息到exchange。
( 6) exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里
七、Rabbitmq 的 metadata(元数据)
元数据可以持久化在 RAM 或 Disc. 从这个角度可以把 RabbitMQ 集群中的节点分成两种 :RAM Node和 Disk Node.
RAM Node 只会将元数据存放在RAM
Disk node 会将元数据持久化到磁盘。
单节点系统就没有什么选择了 , 只允许 disk node, 否则由于没有数据冗余一旦重启就会丢掉所有的配置信息 . 但在集群环境中可以选择哪些节点是 RAM node.在集群中声明(declare) 创建 exchange queue binding, 这类操作要等到所有的节点都完成创建才会返回 :
如果是内存节点就要修改内存数据 ,
如果是 disk node 就要等待写磁盘 , 节点过多这里的速度就会被大大的拖慢 .
有些场景 exchang queue 相当固定 , 变动很少 ,那即使全都是 disc node, 也没有什么影响 . 如果使用 Rabbitmq 做 RPC( RPC :Remote Procedure Call—远程过程调用), RPC 或者类似 RPC 的场景这个问题就严重了 , 频繁创建销毁临时队列 , 磁盘读写能力就很快成为性能瓶颈了。所以 , 大多数情况下 , 我们尽量把 Node 创建为RAM Node. 这里就有一个问题了 , 要想集群重启后元数据可以恢复就需要把集群元数据持久化到磁盘 , 那需要规划 RabbitMQ 集群中的 RAM Node 和 Disc Node 。
只要有一个节点是 Disc Node 就能提供条件把集群元数据写到磁盘 ,RabbitMQ 的确也是这样要求的 : 集群中只要有一个 disk node 就可以 , 其它的都可以是 RAM node. 节点加入或退出集群一定至少要通知集群中的一个 disk node 。
如果集群中 disk node 都宕掉 , 就不要变动集群的元数据 . 声明 exchange queue 修改用户权限 , 添加用户等等这些变动在节点重启之后无法恢复 。
有一种情况要求所有的 disk node 都要在线情况在才能操作 , 那就是增加或者移除节点 .RAM node 启动的时候会连接到预设的 disk node 下载最新的集群元数据 . 如果你有两个 disk node(d1 d2), 一个 RAM node 加入的时候你只告诉 d1, 而恰好这个 RAM node 重启的时候 d1 并没有启动 , 重启就会失败 . 所以加入 RAM 节点的时候 , 把所有的disk node 信息都告诉它 ,RAM node 会把 disk node 的信息持久化到磁盘以便后续启动可以按图索骥 .
八、Rabbitmq 集群部署
192.168.253.135 bb 192.168.253.171 aa 192.168.253.153 cc
yum install -y erlang rabbitmq-server.noarch systemctl enable rabbitmq-server.service systemctl start rabbitmq-server.service systemctl status rabbitmq-server.service
rabbitmqctl change_password guest admin #更改密码为admin,用户名不能数字开头

RABBITMQ_NODE_PORT=5672 ulimit -S -n 4096 RABBITMQ_SERVER_ERL_ARGS="+K true +A30 +P 1048576 -kernel inet_default_connect_options [{nodelay,true},{raw,6,18,<<5000:64/native>>}] -kernel inet_default_listen_options [{raw,6,18,<<5000:64/native>>}]" RABBITMQ_NODE_IP_ADDRESS=192.168.253.135 #修改为本地节点的ip

scp /etc/rabbitmq/rabbitmq-env.conf aa:/etc/rabbitmq/ scp /etc/rabbitmq/rabbitmq-env.conf cc:/etc/rabbitmq/
/usr/lib/rabbitmq/bin/rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_management

node1:
rabbitmqctl add_user mama admin
rabbitmqctl set_permissions mama ".*" ".*" ".*" #.*所有权限 rabbitmqctl set_user_tags mama administrator #设为管理员才可以登陆
scp /var/lib/rabbitmq/.erlang.cookie aa:/var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie cc:/var/lib/rabbitmq/.erlang.cookie

systemctl restart rabbitmq-server.service rabbitmqctl stop_app rabbitmqctl join_cluster --ram rabbit@bb rabbitmqctl start_app



rabbitmqctl stop_app
rabbitmqctl change_cluster_node_type disc (ram)
Turning rabbit@cc into a ram node ...
Error: Mnesia is still running on node rabbit@cc.
Please stop the node with rabbitmqctl stop_app first. #可以看到需要先停止应用
停止后再次执行
[root@cc rabbitmq]# rabbitmqctl change_cluster_node_type ram
Turning rabbit@cc into a ram node ...
rabbitmqctl start_app


rabbitmqctl stop_app
rabbitmqctl reset
----》 Resetting node rabbit@cc ...
[root@cc rabbitmq]# rabbitmqctl cluster_status #查看状态发现节点cc脱离了集群 Cluster status of node rabbit@cc ... [{nodes,[{disc,[rabbit@cc]}]}, #脱离集群后的节点自动会变成disc的方式,原因在下方 1 {running_nodes,[rabbit@cc]}, {cluster_name,<<"rabbit@cc">>}, #并没有其他节点的名称说明不在集群内了 {partitions,[]}, {alarms,[{rabbit@cc,[]}]}]

rabbitmqctl forget_cluster_node rabbit@node3
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
今天关于RabbitMQ消息队列和七:适用于云计算集群的远程调用(RPC)的分享就到这里,希望大家有所收获,若想了解更多关于.Net Core 商城微服务项目系列(七):使用消息队列(RabbitMQ)实现服务异步通信、AMQP协议详解与RabbitMQ,MQ消息队列的应用场景,如何避免消息丢失等消息队列常见问题、C# 封装RabbitMQ消息队列处理、openstack共享组件--rabbitmq消息队列(1)等相关知识,可以在本站进行查询。
本文标签: