在本文中,我们将带你了解PubSub如何在BookSleeve/Redis中工作?在这篇文章中,我们将为您详细介绍PubSub如何在BookSleeve/Redis中工作?的方方面面,并解答redis
在本文中,我们将带你了解PubSub如何在BookSleeve / Redis中工作?在这篇文章中,我们将为您详细介绍PubSub如何在BookSleeve / Redis中工作?的方方面面,并解答redis pubsub 原理常见的疑惑,同时我们还将给您一些技巧,以帮助您实现更有效的asp.net – 使用Booksleeve维护开放的Redis PubSub订阅、c# – 如何使用Booksleeve实现Redis流水线请求?、Distributed Phoenix Chat using Redis PubSub、Faye或Redis Pubsub。
本文目录一览:- PubSub如何在BookSleeve / Redis中工作?(redis pubsub 原理)
- asp.net – 使用Booksleeve维护开放的Redis PubSub订阅
- c# – 如何使用Booksleeve实现Redis流水线请求?
- Distributed Phoenix Chat using Redis PubSub
- Faye或Redis Pubsub
PubSub如何在BookSleeve / Redis中工作?(redis pubsub 原理)
我不知道最好的方法是使用BookSleeve发布和订阅频道。目前,我实现了几种静态方法(请参见下文),这些方法可以将内容发布到特定的频道,而新创建的频道则存储在中privatestatic Dictionary<string, RedisSubscriberConnection> subscribedChannels;
。
考虑到我想发布到通道并订阅同一应用程序中的通道,这是正确的方法吗(注意:我的包装器是一个静态类)。即使我想发布和订阅,创建一个频道就足够了吗?显然,我不会在同一应用程序中发布相同的频道。但是我对其进行了测试,并且效果很好:
RedisClient.SubscribeToChannel("Test").Wait(); RedisClient.Publish("Test", "Test Message");
而且有效。
这是我的问题:
1)设置专用的发布频道和专用的订阅频道,而不是同时使用两者,会更有效吗?
2)从语义上来说,“ channel”和“
PatternSubscription”之间有什么区别?我的理解是,我可以通过PatternSubscription()
同一频道订阅多个“主题”,对吗?但是,如果我想为每个“主题”调用不同的回调,则必须为每个正确的主题设置一个通道吗?这样有效吗?还是您会建议这样做?
这里的代码片段。
谢谢!!!
public static Task<long> Publish(string channel, byte[] message) { return connection.Publish(channel, message); } public static Task SubscribeToChannel(string channelName) { string subscriptionString = ChannelSubscriptionString(channelName); RedisSubscriberConnection channel = connection.GetOpenSubscriberChannel(); subscribedChannels[subscriptionString] = channel; return channel.PatternSubscribe(subscriptionString, OnSubscribedChannelMessage); } public static Task UnsubscribeFromChannel(string channelName) { string subscriptionString = ChannelSubscriptionString(channelName); if (subscribedChannels.Keys.Contains(subscriptionString)) { RedisSubscriberConnection channel = subscribedChannels[subscriptionString]; Task task = channel.PatternUnsubscribe(subscriptionString); //remove channel subscription channel.Close(true); subscribedChannels.Remove(subscriptionString); return task; } else { return null; } } private static string ChannelSubscriptionString(string channelName) { return channelName + "*"; }
答案1
小编典典1:您的示例(Test
)中只有一个频道;通道只是用于特定发布/订阅交换的名称。但是,由于redis API的工作原理的特殊性,有必要使用2个 连接
。具有 任何 订阅的连接不能执行任何其他操作,除了:
- 听消息
- 管理自己的订阅(
subscribe
,psubscribe
,unsubscribe
,punsubscribe
)
但是,我不明白这一点:
private static Dictionary<string, RedisSubscriberConnection>
除非您要满足自己的特定需求,否则您不需要多个订阅者连接。单个订阅者连接可以处理任意数量的订阅。快速检查一下clientlist
我的一台服务器,并且与(在撰写本文时)23,002个订阅建立了一个连接。可能可以减少,但是:它起作用。
2:模式订阅支持通配符;因此/topic/1
,/topic/2/
您可以订阅而不是订阅等等/topic/*
。所使用的 实际
通道的名称publish
作为回调签名的一部分提供给接收者。
都可以。应该注意的是,的性能publish
受唯一订阅总数的影响-
但坦率地说,即使您有成千上万个使用subscribe
而不是订阅的频道,它的速度仍然非常快(如:0ms)psubscribe
。
但是从 publish
时间复杂度:O(N + M),其中N是订阅接收通道的客户端数,M是订阅模式(任何客户端)的总数。
我建议阅读pub / sub的redis文档。
编辑以下问题:
a)如果我想保证在接收项目时保留从同一发布者发送项目的顺序,我必须同步(使用Result或Wait())进行发布,对吗?
根本没有任何区别;因为您提到了Result
/ Wait()
,所以我假设您正在谈论BookSleeve-
在这种情况下,多路复用器已经保留了命令顺序。Redis本身是单线程的,并且将始终按顺序在单个连接上处理命令。但是:订阅服务器上的回调可以异步执行,并且可以(单独)传递给工作线程。我目前正在调查是否可以强制从强制进行此操作RedisSubscriberConnection
。
更新:从1.3.22开始,您可以将设置CompletionMode
为PreserveOrder
-,那么所有回调将依次而不是同时完成。
b)根据您的建议进行调整后,无论有效载荷的大小如何,发布少量项目时我的表现都很好。但是,当同一发布者发送100,000个或更多项目时,性能会迅速下降(仅从我的机器发送时会下降到7-8秒)。
首先,这段时间听起来很高-
在本地测试我得到的(对于100,000个出版物,包括等待所有出版物的响应)是1766ms(本地)或1219ms(远程)(听起来可能违反直觉,但是我的“本地”是)
t运行相同版本的redis;在Centos上,我的“远程”是2.6.12;在Windows上,我的“本地”是2.6.8-pre2。
我不能让实际的服务器建立网络更快或速度,而是:如果这是数据包碎片,我已经添加(只为你)一SuspendFlush()
/ResumeFlush()
对。这将禁用快速刷新(即,当发送队列为空时;仍会发生其他类型的刷新);您可能会发现这有帮助:
conn.SuspendFlush();try { // start lots of operations...} finally { conn.ResumeFlush();}
请注意,Wait
直到恢复后才应该这样做,因为在调用之前ResumeFlush()
,可能还有一些操作仍在发送缓冲区中。有了这些,我得到了(100,000次操作):
local: 1766ms (eager-flush) vs 1554ms (suspend-flush)remote: 1219ms (eager-flush) vs 796ms (suspend-flush)
如您所见,它可以为远程服务器提供更多帮助,因为它将通过网络放置更少的数据包。
我无法使用事务,因为稍后要发布的项目无法一次全部使用。有没有一种方法可以在记住这些知识的情况下进行优化?
我 认为 以上已解决了这一问题-但请注意,最近CreateBatch
也添加了它。批处理的操作很像交易-
只是:没有交易。同样,这是减少数据包碎片的另一种机制。在您的特定情况下,我怀疑暂停/恢复(同花)是您最好的选择。
您是否建议使用一个常规的RedisConnection和一个RedisSubscriberConnection或任何其他配置来使此类包装器执行所需的功能?
只要你不执行阻塞操作(blpop
,brpop
,brpoplpush
等),或将过大的BLOB沿着电线(潜在延迟等操作,同时它清除),那么每个类型的单个连接通常工作得很好。但是YMMV取决于您的确切使用要求。
asp.net – 使用Booksleeve维护开放的Redis PubSub订阅
我发现this solution保持与Redis的开放连接,但它在重新创建连接时不考虑订阅.
我目前正在Global.asax文件中处理Redis pubsub消息:
public class Application : HttpApplication { protected void Application_Start() { var gateway = Resolve<RedisConnectionGateway>(); var connection = gateway.GetConnection(); var channel = connection.GetopenSubscriberChannel(); channel.PatternSubscribe("workers:job-done:*",OnExecutionCompleted); } /// <summary> /// Handle messages received from workers through Redis.</summary> private static void OnExecutionCompleted(string key,byte[] message) { /* forwarded the response to the client that requested it */ } }
当前RedisConnection因任何原因关闭时会发生此问题.最简单的解决方案是在重置连接时从RedisConnectionGateway类触发事件,并使用新的RedisSubscriberChannel重新订阅.但是,在重置连接时发布到通道的任何消息都将丢失.
是否有任何推荐的方法来处理这种情况?
解决方法
是的,在您的连接中断时发布的任何事件都已消失.这就是redis pub / sub的本质;它不保证交付给断开连接的客户端.要么使用承诺这个的工具,要么使用redis来驱动队列 – 从列表的两端推送/弹出通常是一个合理的选择,并确保不会丢失任何东西(只要你的软件没有丢失)它从列表中弹出后).如果它有帮助,我在我的列表上有一个添加阻塞pop方法的请求 – 它们完全破坏了多路复用器的意图,但它们在某些情况下真的有用,所以我不反对添加它们.
c# – 如何使用Booksleeve实现Redis流水线请求?
redis-ruby实现:
r.pipelined { # these commands will be pipelined r.get("insensitive_key") } r.multi { # these commands will be executed atomically r.set("sensitive_key") }
我只是使用MULTI / EXEC,但它们似乎阻止了所有其他用户,直到交易完成(在我的情况下不是必需的),所以我担心他们的表现.有没有人使用Booksleeve的管道或有任何关于如何实现它们的想法?
解决方法
交易没有什么不同;它们是流水线的.事务的唯一微妙变化是它们在呼叫站点被额外缓冲直到完成(因为它是一个多路复用器,我们无法开始管道与事务相关的消息,直到我们有一个完整的工作单元,因为它会对同一多路复用器上的其他呼叫者产生不利影响).
所以:没有明确的.pipelined的原因是所有内容都是流水线和异步的.
Distributed Phoenix Chat using Redis PubSub

In the previous article, Create a High-Availability Kubernetes Cluster on AWS with Kops, we have seen how to create a Kubernetes cluster and how to deploy the Phoenix Chat app. The single node configuration worked well, but when we tried to scale out we saw that the messages were not sent to all the browsers.
In the image below, we see a configuration where the two chat servers are isolated. When the load-balancer routes the two WebSocket connections into two different servers, there is no way for the two browsers to send messages one another.

This problem is often solved using an external component, like Redis, which helps to broadcast the messages to all the nodes in the cluster.
With Elixir is also possible to get rid of this component, purely relying on the communication between Elixir nodes (we will see this in the next article).
In this article we’ll see how we can scale the Phoenix Chat horizontally, running multiple servers and leveraging on Redis PubSub.
Redis PubSub
If you want to test this out on your local machine, you can find this version of the app on the poeticoding/phoenix_chat_example GitHub repo, under the pubsub_redis
branch.
We are now going to use the Redis PubSub messaging implementation, so we can broadcast the messages to all the Chat nodes. But .. What is PubSub messaging? I think the AWS definition is simple and explanatory.
Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures. In a pub/sub model, any message published to a topic is immediately received by all of the subscribers to the topic.
AWS – Pub/Sub Messaging
To understand better how PubSub works, let’s see it in action with Redis. The easiest way to run a Redis server on our local machine is using Docker.
$ docker run --name redis -d redis
In this way we run Redis in background, starting a container we call redis
. In the Docker container we’ve just started, we now find the redis-cli
which we can use to do some experiments subscribing to a channel and publishing messages on it.
To execute the redis-cli
process in the redis
container, we use the docker exec
command, passing the -it
option to enable the interactive mode. The cli will connect to the local redis server by default.
$ docker exec -it redis redis-cli


In the image above, we run 4 different clients. Three of them subscribe to my_channel
and one publishes a message to the channel. We see how Redis sends this message to all the clients subscribed to the channel.
Phoenix PubSub Redis adapter

To be able to leverage on this Redis PubSub functionality in our app, we are going to use the phoenix_pubsub_redis adapter. This adapter is really easy to integrate: we just need to add it in the dependencies and configure it in our phoenix app.
#mix.exs
defp deps do
[
...
{:phoenix_pubsub_redis, "~> 2.1"}
]
end
#config/config.exs
config :chat, Chat.Endpoint,
url: [host: "localhost"],
root: Path.expand("..", __DIR__),
secret_key_base: ".......",
debug_errors: false,
pubsub: [
name: Chat.PubSub,
adapter: Phoenix.PubSub.Redis,
host: "localhost", port: 6379,
node_name: System.get_env("NODE")
]
In this case we just manually set the host and the port to localhost
and 6379
, but if you are running a different Redis setup you maybe need to change this setting. We also need to set a node_name
for each chat server we run, so we differentiate the Elixir nodes inside the Redis PubSub channel. We’ll pass the node_name
using the NODE
environment variable.
That’s it, ready to roll! We start two chat servers, one on 4000
called n1
and the other one on 4001
called n2
.
# Terminal 1
$ NODE=n1 PORT=4000 mix phx.server
# Terminal 2
$ NODE=n2 PORT=4001 mix phx.server

The messages are now correctly sent to all the browsers connected to the channel, regardless of whether they are connected to different chat servers or not.
Inspecting the Phoenix PubSub messages in Redis
To understand better what’s going on, we can inspect the messages sent to Redis by the nodes. All the chat nodes publish messages on the phx:Elixir.Chat.PubSub
Redis channel, but these messages are binary encoded using the :erlang.term_to_binary
function, so we can’t simply use the redis-cli to properly see them.
In the repo, always under the pubsub_redis
branch and in the redis_print
directory, I’ve put a super-small Elixir app we can use to subscribe and decode the binary messages.
defmodule RedisPrint do
def subscribe(host,port,channel) do
{:ok, pubsub} = Redix.PubSub.start_link(host: host, port: port)
{:ok, ref} = Redix.PubSub.subscribe(pubsub, channel, self())
receive_messages(pubsub,ref)
end
def receive_messages(pubsub,ref) do
receive do
{:redix_pubsub, ^pubsub, ^ref, :message, %{channel: _, payload: payload}} ->
:erlang.binary_to_term(payload) |> IO.inspect()
end
receive_messages(pubsub,ref)
end
end
The RedisPrint.subscribe/3
function starts a PubSub process which connects to Redis and subscribes to a specific channel. It then start receiving messages, recursively calling receive_messages/2
and decoding the payload with :erlang.binary_to_term
Let’s test again the chat with two browsers and two servers, this time inspecting the messages in a separate iex
session.
# Terminal 1
$ NODE=n1 PORT=4000 mix phx.server
# Terminal 2
$ NODE=n2 PORT=4001 mix phx.server
# redis_print
$ iex -S mix
...
%{
__struct__: Phoenix.Socket.Broadcast,
event: "new:msg",
payload: %{body: "hello from u1", user: "u1"},
topic: "rooms:lobby"
}
%{
__struct__: Phoenix.Socket.Broadcast,
event: "new:msg",
payload: %{body: "hello from u2", user: "u2"},
topic: "rooms:lobby"
}
When the user u1
, connected to n1
on port 4000
, sends the message “hello from u1”, this message is sent through the WebSocket connection and once n1
receives it, it’s then published to the phx:Elixir.Chat.PubSub
Redis channel.
Wrap Up
As I said at the beginning, this is just one way to make our Phoenix Chat app distributed. We’ll see in further articles how, thanks to how Elixir nodes can communicate, we connect and broadcast the messages using another Phoenix PubSub adapter.
Faye或Redis Pubsub
我以为我了解这项技术,但也许我不了解。两者有什么区别?为什么要选择一个?
用例:〜实时更新。
今天关于PubSub如何在BookSleeve / Redis中工作?和redis pubsub 原理的分享就到这里,希望大家有所收获,若想了解更多关于asp.net – 使用Booksleeve维护开放的Redis PubSub订阅、c# – 如何使用Booksleeve实现Redis流水线请求?、Distributed Phoenix Chat using Redis PubSub、Faye或Redis Pubsub等相关知识,可以在本站进行查询。
本文标签: