GVKun编程网logo

PubSub如何在BookSleeve / Redis中工作?(redis pubsub 原理)

26

在本文中,我们将带你了解PubSub如何在BookSleeve/Redis中工作?在这篇文章中,我们将为您详细介绍PubSub如何在BookSleeve/Redis中工作?的方方面面,并解答redis

在本文中,我们将带你了解PubSub如何在BookSleeve / Redis中工作?在这篇文章中,我们将为您详细介绍PubSub如何在BookSleeve / Redis中工作?的方方面面,并解答redis pubsub 原理常见的疑惑,同时我们还将给您一些技巧,以帮助您实现更有效的asp.net – 使用Booksleeve维护开放的Redis PubSub订阅、c# – 如何使用Booksleeve实现Redis流水线请求?、Distributed Phoenix Chat using Redis PubSub、Faye或Redis Pubsub

本文目录一览:

PubSub如何在BookSleeve / Redis中工作?(redis pubsub 原理)

PubSub如何在BookSleeve / Redis中工作?(redis pubsub 原理)

我不知道最好的方法是使用BookSleeve发布和订阅频道。目前,我实现了几种静态方法(请参见下文),这些方法可以将内容发布到特定的频道,而新创建的频道则存储在中privatestatic Dictionary<string, RedisSubscriberConnection> subscribedChannels;

考虑到我想发布到通道并订阅同一应用程序中的通道,这是正确的方法吗(注意:我的包装器是一个静态类)。即使我想发布和订阅,创建一个频道就足够了吗?显然,我不会在同一应用程序中发布相同的频道。但是我对其进行了测试,并且效果很好:

 RedisClient.SubscribeToChannel("Test").Wait(); RedisClient.Publish("Test", "Test Message");

而且有效。

这是我的问题:

1)设置专用的发布频道和专用的订阅频道,而不是同时使用两者,会更有效吗?

2)从语义上来说,“ channel”和“
PatternSubscription”之间有什么区别?我的理解是,我可以通过PatternSubscription()同一频道订阅多个“主题”,对吗?但是,如果我想为每个“主题”调用不同的回调,则必须为每个正确的主题设置一个通道吗?这样有效吗?还是您会建议这样做?

这里的代码片段。

谢谢!!!

    public static Task<long> Publish(string channel, byte[] message)    {        return connection.Publish(channel, message);    }    public static Task SubscribeToChannel(string channelName)    {        string subscriptionString = ChannelSubscriptionString(channelName);        RedisSubscriberConnection channel = connection.GetOpenSubscriberChannel();        subscribedChannels[subscriptionString] = channel;        return channel.PatternSubscribe(subscriptionString, OnSubscribedChannelMessage);    }    public static Task UnsubscribeFromChannel(string channelName)    {        string subscriptionString = ChannelSubscriptionString(channelName);        if (subscribedChannels.Keys.Contains(subscriptionString))        {            RedisSubscriberConnection channel = subscribedChannels[subscriptionString];            Task  task = channel.PatternUnsubscribe(subscriptionString);            //remove channel subscription            channel.Close(true);            subscribedChannels.Remove(subscriptionString);            return task;        }        else        {            return null;        }    }    private static string ChannelSubscriptionString(string channelName)    {        return channelName + "*";    }

答案1

小编典典

1:您的示例(Test)中只有一个频道;通道只是用于特定发布/订阅交换的名称。但是,由于redis API的工作原理的特殊性,有必要使用2个 连接
。具有 任何 订阅的连接不能执行任何其他操作,除了:

  • 听消息
  • 管理自己的订阅(subscribepsubscribeunsubscribepunsubscribe

但是,我不明白这一点:

private static Dictionary<string, RedisSubscriberConnection>

除非您要满足自己的特定需求,否则您不需要多个订阅者连接。单个订阅者连接可以处理任意数量的订阅。快速检查一下clientlist我的一台服务器,并且与(在撰写本文时)23,002个订阅建立了一个连接。可能可以减少,但是:它起作用。

2:模式订阅支持通配符;因此/topic/1/topic/2/您可以订阅而不是订阅等等/topic/*。所使用的 实际
通道的名称publish作为回调签名的一部分提供给接收者。

都可以。应该注意的是,的性能publish受唯一订阅总数的影响-
但坦率地说,即使您有成千上万个使用subscribe而不是订阅的频道,它的速度仍然非常快(如:0ms)psubscribe

但是从 publish

时间复杂度:O(N + M),其中N是订阅接收通道的客户端数,M是订阅模式(任何客户端)的总数。

我建议阅读pub / sub的redis文档。


编辑以下问题:

a)如果我想保证在接收项目时保留从同一发布者发送项目的顺序,我必须同步(使用Result或Wait())进行发布,对吗?

根本没有任何区别;因为您提到了Result/ Wait(),所以我假设您正在谈论BookSleeve-
在这种情况下,多路复用器已经保留了命令顺序。Redis本身是单线程的,并且将始终按顺序在单个连接上处理命令。但是:订阅服务器上的回调可以异步执行,并且可以(单独)传递给工作线程。我目前正在调查是否可以强制从强制进行此操作RedisSubscriberConnection

更新:从1.3.22开始,您可以将设置CompletionModePreserveOrder-,那么所有回调将依次而不是同时完成。

b)根据您的建议进行调整后,无论有效载荷的大小如何,发布少量项目时我的表现都很好。但是,当同一发布者发送100,000个或更多项目时,性能会迅速下降(仅从我的机器发送时会下降到7-8秒)。

首先,这段时间听起来很高-
在本地测试我得到的(对于100,000个出版物,包括等待所有出版物的响应)是1766ms(本地)或1219ms(远程)(听起来可能违反直觉,但是我的“本地”是)
t运行相同版本的redis;在Centos上,我的“远程”是2.6.12;在Windows上,我的“本地”是2.6.8-pre2。

我不能让实际的服务器建立网络更快或速度,而是:如果这是数据包碎片,我已经添加(只为你)一SuspendFlush()/
ResumeFlush()对。这将禁用快速刷新(即,当发送队列为空时;仍会发生其他类型的刷新);您可能会发现这有帮助:

conn.SuspendFlush();try {    // start lots of operations...} finally {    conn.ResumeFlush();}

请注意,Wait直到恢复后才应该这样做,因为在调用之前ResumeFlush(),可能还有一些操作仍在发送缓冲区中。有了这些,我得到了(100,000次操作):

local: 1766ms (eager-flush) vs 1554ms (suspend-flush)remote: 1219ms (eager-flush) vs 796ms (suspend-flush)

如您所见,它可以为远程服务器提供更多帮助,因为它将通过网络放置更少的数据包。

我无法使用事务,因为稍后要发布的项目无法一次全部使用。有没有一种方法可以在记住这些知识的情况下进行优化?

认为 以上已解决了这一问题-但请注意,最近CreateBatch也添加了它。批处理的操作很像交易-
只是:没有交易。同样,这是减少数据包碎片的另一种机制。在您的特定情况下,我怀疑暂停/恢复(同花)是您最好的选择。

您是否建议使用一个常规的RedisConnection和一个RedisSubscriberConnection或任何其他配置来使此类包装器执行所需的功能?

只要你不执行阻塞操作(blpopbrpopbrpoplpush等),或将过大的BLOB沿着电线(潜在延迟等操作,同时它清除),那么每个类型的单个连接通常工作得很好。但是YMMV取决于您的确切使用要求。

asp.net – 使用Booksleeve维护开放的Redis PubSub订阅

asp.net – 使用Booksleeve维护开放的Redis PubSub订阅

我正在使用Redis pubsub通道将工作进程池中的消息发送到我的ASP.NET应用程序.收到消息后,我的应用程序将消息转发到带有SignalR的客户端浏览器.

我发现this solution保持与Redis的开放连接,但它在重新创建连接时不考虑订阅.

我目前正在Global.asax文件中处理Redis pubsub消息:

public class Application : HttpApplication
{
    protected void Application_Start()
    {
        var gateway = Resolve<RedisConnectionGateway>();
        var connection = gateway.GetConnection();
        var channel = connection.GetopenSubscriberChannel();

        channel.PatternSubscribe("workers:job-done:*",OnExecutionCompleted);
    }

    /// <summary>
    /// Handle messages received from workers through Redis.</summary>
    private static void OnExecutionCompleted(string key,byte[] message)
    {
        /* forwarded the response to the client that requested it */
    }
}

当前RedisConnection因任何原因关闭时会发生此问题.最简单的解决方案是在重置连接时从RedisConnectionGateway类触发事件,并使用新的RedisSubscriberChannel重新订阅.但是,在重置连接时发布到通道的任何消息都将丢失.

是否有任何推荐的方法来处理这种情况?

解决方法

是的,如果连接中断(网络不稳定,重新掌握,等等),那么您将需要重新应用您所做的任何订阅.重新连接和重新订阅的事件非常正常,与我们在SE / SO上使用的事件没有什么不同(除了我们通常跟踪更细粒度的订阅,并且有一些处理所有这些的包装代码).

是的,在您的连接中断时发布的任何事件都已消失.这就是redis pub / sub的本质;它不保证交付给断开连接的客户端.要么使用承诺这个的工具,要么使用redis来驱动队列 – 从列表的两端推送/弹出通常是一个合理的选择,并确保不会丢失任何东西(只要你的软件没有丢失)它从列表中弹出后).如果它有帮助,我在我的列表上有一个添加阻塞pop方法的请求 – 它们完全破坏了多路复用器的意图,但它们在某些情况下真的有用,所以我不反对添加它们.

c# – 如何使用Booksleeve实现Redis流水线请求?

c# – 如何使用Booksleeve实现Redis流水线请求?

关于Redis事务和管道之间的区别以及最终如何使用Booksleeve的管道,我有点混淆了.我看到 Booksleeve支持Redis transaction feature(MULTI / EXEC),但其API /测试中没有提及流水线功能.但是,在其他实现中很明显,管道和事务之间存在区别,即在 atomicity中,如下面的redis-ruby版本所示,但在某些 places中,这些术语似乎可以互换使用.

redis-ruby实现:

r.pipelined {
  # these commands will be pipelined
  r.get("insensitive_key")
}

r.multi {
  # these commands will be executed atomically
  r.set("sensitive_key")
}

我只是使用MULTI / EXEC,但它们似乎阻止了所有其他用户,直到交易完成(在我的情况下不是必需的),所以我担心他们的表现.有没有人使用Booksleeve的管道或有任何关于如何实现它们的想法?

解决方法

在BookSleeve中,一切都是流水线的.没有同步操作.不是一个人.因此,每个操作都返回某种形式的Task(可能是一个vanilla任务,可能是Task< string>,Task< long>等),这在将来的某个时候(即redis响应时)将具有一个值.您可以在调用代码中使用Wait来执行同步等待,或使用ContinueWith / await(C#5语言功能)来执行异步回调.

交易没有什么不同;它们是流水线的.事务的唯一微妙变化是它们在呼叫站点被额外缓冲直到完成(因为它是一个多路复用器,我们无法开始管道与事务相关的消息,直到我们有一个完整的工作单元,因为它会对同一多路复用器上的其他呼叫者产生不利影响).

所以:没有明确的.pipelined的原因是所有内容都是流水线和异步的.

Distributed Phoenix Chat using Redis PubSub

Distributed Phoenix Chat using Redis PubSub

 
转自:https://www.poeticoding.com/distributed-phoenix-chat-using-redis-pubsub/

In the previous article, Create a High-Availability Kubernetes Cluster on AWS with Kops, we have seen how to create a Kubernetes cluster and how to deploy the Phoenix Chat app. The single node configuration worked well, but when we tried to scale out we saw that the messages were not sent to all the browsers.

In the image below, we see a configuration where the two chat servers are isolated. When the load-balancer routes the two WebSocket connections into two different servers, there is no way for the two browsers to send messages one another.

Two Phoenix Chat containers

This problem is often solved using an external component, like Redis, which helps to broadcast the messages to all the nodes in the cluster. 
With Elixir is also possible to get rid of this component, purely relying on the communication between Elixir nodes (we will see this in the next article).

In this article we’ll see how we can scale the Phoenix Chat horizontally, running multiple servers and leveraging on Redis PubSub.

Redis PubSub

If you want to test this out on your local machine, you can find this version of the app on the poeticoding/phoenix_chat_example GitHub repo, under the pubsub_redis branch.

We are now going to use the Redis PubSub messaging implementation, so we can broadcast the messages to all the Chat nodes. But .. What is PubSub messaging? I think the AWS definition is simple and explanatory.

Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures. In a pub/sub model, any message published to a topic is immediately received by all of the subscribers to the topic.

AWS – Pub/Sub Messaging

To understand better how PubSub works, let’s see it in action with Redis. The easiest way to run a Redis server on our local machine is using Docker.

$ docker run --name redis -d redis

In this way we run Redis in background, starting a container we call redis. In the Docker container we’ve just started, we now find the redis-cli which we can use to do some experiments subscribing to a channel and publishing messages on it. 
To execute the redis-cli process in the redis container, we use the docker exec command, passing the -it option to enable the interactive mode. The cli will connect to the local redis server by default.

$ docker exec -it redis redis-cli
Redis PubSub

In the image above, we run 4 different clients. Three of them subscribe to my_channel and one publishes a message to the channel. We see how Redis sends this message to all the clients subscribed to the channel.

Phoenix PubSub Redis adapter

Connecting Phoenix Chat servers to Redis

To be able to leverage on this Redis PubSub functionality in our app, we are going to use the phoenix_pubsub_redis adapter. This adapter is really easy to integrate: we just need to add it in the dependencies and configure it in our phoenix app.

#mix.exs
defp deps do
[
  ...
  {:phoenix_pubsub_redis, "~> 2.1"}
]
end
#config/config.exs
config :chat, Chat.Endpoint,
  url: [host: "localhost"],
  root: Path.expand("..", __DIR__),
  secret_key_base: ".......",
  debug_errors: false,
  pubsub: [
    name: Chat.PubSub,
    adapter: Phoenix.PubSub.Redis,
    host: "localhost", port: 6379,
    node_name: System.get_env("NODE")
 ]

In this case we just manually set the host and the port to localhost and 6379, but if you are running a different Redis setup you maybe need to change this setting. We also need to set a node_name for each chat server we run, so we differentiate the Elixir nodes inside the Redis PubSub channel. We’ll pass the node_name using the NODE environment variable.

That’s it, ready to roll! We start two chat servers, one on 4000 called n1 and the other one on 4001 called n2.

# Terminal 1
$ NODE=n1 PORT=4000 mix phx.server

# Terminal 2
$ NODE=n2 PORT=4001 mix phx.server
Messages Broadcasted correctly using Phoenix PubSub Redis

The messages are now correctly sent to all the browsers connected to the channel, regardless of whether they are connected to different chat servers or not.

Inspecting the Phoenix PubSub messages in Redis

To understand better what’s going on, we can inspect the messages sent to Redis by the nodes. All the chat nodes publish messages on the phx:Elixir.Chat.PubSubRedis channel, but these messages are binary encoded using the :erlang.term_to_binary function, so we can’t simply use the redis-cli to properly see them.

In the repo, always under the pubsub_redis branch and in the redis_printdirectory, I’ve put a super-small Elixir app we can use to subscribe and decode the binary messages.

defmodule RedisPrint do
  def subscribe(host,port,channel) do
    {:ok, pubsub} = Redix.PubSub.start_link(host: host, port: port)
    {:ok, ref} = Redix.PubSub.subscribe(pubsub, channel, self())
    receive_messages(pubsub,ref)
  end

  def receive_messages(pubsub,ref) do
    receive do
      {:redix_pubsub, ^pubsub, ^ref, :message, %{channel: _, payload: payload}} ->
        :erlang.binary_to_term(payload) |> IO.inspect()
    end
    receive_messages(pubsub,ref)
  end
end

The RedisPrint.subscribe/3function starts a PubSub process which connects to Redis and subscribes to a specific channel. It then start receiving messages, recursively calling receive_messages/2 and decoding the payload with :erlang.binary_to_term

Let’s test again the chat with two browsers and two servers, this time inspecting the messages in a separate iex session.

# Terminal 1
$ NODE=n1 PORT=4000 mix phx.server

# Terminal 2
$ NODE=n2 PORT=4001 mix phx.server

# redis_print
$ iex -S mix
...
%{
  __struct__: Phoenix.Socket.Broadcast,
  event: "new:msg",
  payload: %{body: "hello from u1", user: "u1"},
  topic: "rooms:lobby"
}

%{
  __struct__: Phoenix.Socket.Broadcast,
  event: "new:msg",
  payload: %{body: "hello from u2", user: "u2"},
  topic: "rooms:lobby"
}

When the user u1, connected to n1 on port 4000 , sends the message “hello from u1”, this message is sent through the WebSocket connection and once n1receives it, it’s then published to the phx:Elixir.Chat.PubSub Redis channel.

Wrap Up

As I said at the beginning, this is just one way to make our Phoenix Chat app distributed. We’ll see in further articles how, thanks to how Elixir nodes can communicate, we connect and broadcast the messages using another Phoenix PubSub adapter.

 
 
 
 

Faye或Redis Pubsub

Faye或Redis Pubsub

我以为我了解这项技术,但也许我不了解。两者有什么区别?为什么要选择一个?

用例:〜实时更新。

今天关于PubSub如何在BookSleeve / Redis中工作?redis pubsub 原理的分享就到这里,希望大家有所收获,若想了解更多关于asp.net – 使用Booksleeve维护开放的Redis PubSub订阅、c# – 如何使用Booksleeve实现Redis流水线请求?、Distributed Phoenix Chat using Redis PubSub、Faye或Redis Pubsub等相关知识,可以在本站进行查询。

本文标签: