对于想了解终止挂起的redispubsub.listen的读者,本文将是一篇不可错过的文章,我们将详细介绍线程,并且为您提供关于AKKA集群中的发布与订阅DistributedPublishSubsc
对于想了解终止挂起的redis pubsub.listen的读者,本文将是一篇不可错过的文章,我们将详细介绍线程,并且为您提供关于AKKA 集群中的发布与订阅Distributed Publish Subscribe in Cluster、Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster、asp.net – 使用Booksleeve维护开放的Redis PubSub订阅、Basic Tutorials of Redis(7) -Publish and Subscribe的有价值信息。
本文目录一览:- 终止挂起的redis pubsub.listen()线程(停止redis的命令)
- AKKA 集群中的发布与订阅Distributed Publish Subscribe in Cluster
- Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster
- asp.net – 使用Booksleeve维护开放的Redis PubSub订阅
- Basic Tutorials of Redis(7) -Publish and Subscribe
终止挂起的redis pubsub.listen()线程(停止redis的命令)
与此问题相关,我有以下代码可订阅redispubsub队列,并使用__init__中提供的处理程序将消息提供给处理它们的类:
from threading import Threadimport msgpackclass Subscriber(Thread): def __init__(self, redis_connection, channel_name, handler): super(Subscriber, self).__init__(name="Receiver") self.connection = redis_connection self.pubsub = self.connection.pubsub() self.channel_name = channel_name self.handler = handler self.should_die = False def start(self): self.pubsub.subscribe(self.channel_name) super(Subscriber, self).start() def run(self): for msg in self.pubsub.listen(): if self.should_die: return try: data = msg["data"] unpacked = msgpack.unpackb(data) except TypeError: # stop non-msgpacked, invalid, messages breaking stuff # other validation happens in handler continue self.handler(unpacked) def die(self): self.should_die = True
在上面的链接问题中,请注意,pubsub.listen()
如果断开连接,则永不返回。因此,die()
尽管我的函数可以被调用,但它实际上不会导致线程终止,因为它挂在listen()
对线程内部的调用上run()
。
链接问题的可接受答案提到了黑客入侵redis-py的连接池。我真的不想这样做,并且有一个分支版本的redis-
py(至少在希望该修补程序被母版接受之前),但是无论如何我一直在看一下redis-py代码,并且不要立即看看将在哪里进行更改。
有谁知道如何彻底解决悬挂的redis-py listen()
电话吗?
我将直接使用哪些问题Thread._Thread__stop
?
答案1
小编典典这么多年后才将其关闭。它最终是redis库中的一个bug。我调试了它并提交了PR。它不应该再发生了。
AKKA 集群中的发布与订阅Distributed Publish Subscribe in Cluster
Distributed Publish Subscribe in Cluster
基本定义
在单机环境下订阅与发布是很常用的,然而在集群环境是比较麻烦和不好实现的;
AKKA已经提供了相应的实现,集群环境各节点之间的actor相互订阅发布感兴的主题的消息,
关键依赖媒介actor: akka.cluster.pubsub.DistributedPubSubMediator

订阅:
DistributedPubSubMediator.Subscribe方法将actor注册到本地中介者。
成功的订阅和取消订阅由DistributedPubSubMediator.SubscribeAck和DistributedPubSubMediator.UnsubscribeAck应答确认。这个确认消息意味着订阅已经注册了,但是它仍然需要花费一些时间复制到其它的节点上。节点之间发现与注册会有一定延迟,可能造成消息不会立即送达!
发布:
你通过向本地的中介者发送DistributedPubSubMediator.Publish消息来发布消息。
当actor终止时,它们会自动从注册表移除,或者你可以明确的使用DistributedPubSubMediator.Unsubscribe移除。
实现示例
package pubsub
import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import org.slf4j.LoggerFactory
import scala.PartialFunction
import scala.runtime.BoxedUnit
import akka.cluster.pubsub.DistributedPubSubMediator
import akka.actor.Nobody.tell
import akka.actor.Props
import java.time.Clock.system
import akka.cluster.pubsub.DistributedPubSub
import akka.actor.Nobody.tell
import com.typesafe.config.ConfigFactory
/**
* Created by: tankx
* Date: 2019/7/16
* Description: 发布订阅模式
*/
/**
* 定义发布者
*/
class Pub() : AbstractActor() {
private var log = LoggerFactory.getLogger(Pub::class.java)
var mediator: ActorRef = DistributedPubSub.get(context.system).mediator()
override fun createReceive(): Receive {
return receiveBuilder().matchAny(this::receive).build()
}
private fun receive(msg: Any) {
log.info("派发事件:$msg")
if (msg is String) {
mediator.tell(DistributedPubSubMediator.Publish(topA, msg), self)
}
}
}
/**
* 定义订阅者
*/
class Sub() : AbstractActor() {
private var log = LoggerFactory.getLogger(Sub::class.java)
override fun preStart() {
//注册订阅
var mediator = DistributedPubSub.get(getContext().system()).mediator()
mediator.tell(DistributedPubSubMediator.Subscribe(topA, self), self)
println("注册订阅")
//ActorRef.noSender()不会接收订阅信息DistributedPubSubMediator.SubscribeAck
//mediator.tell(DistributedPubSubMediator.Subscribe(topA, self), ActorRef.noSender())
//移除订阅
//DistributedPubSub.get(getContext().system()).mediator().tell(DistributedPubSubMediator.Unsubscribe(topA, self), ActorRef.noSender())
}
override fun createReceive(): Receive {
return receiveBuilder().matchAny(this::receive).build()
}
private fun receive(msg: Any) {
when (msg) {
is String -> log.info("收到事件: $msg")
is DistributedPubSubMediator.SubscribeAck -> log.info("订阅事件:$msg")
else -> log.info("无对应类型")
}
}
}
//定义主题
var topA: String = "topa"
fun getSystem(port: Int): ActorSystem {
val config = ConfigFactory.parseString(
"akka.remote.netty.tcp.port=$port"
).withFallback(
ConfigFactory.load("application_pub.conf")
)
var actorSystem = ActorSystem.create("custerPubSystem", config);
return actorSystem
}
fun main() {
var system = getSystem(3660);
var subActor = system.actorOf(Props.create(Sub::class.java))
Thread.sleep(1000)//让sub 完全起来
// var pubActor = system.actorOf(Props.create(Pub::class.java))
// pubActor.tell("hello", ActorRef.noSender())
//
// pubActor.tell("world", ActorRef.noSender())
//
// Thread.sleep(3000)
}
上面订阅启动后,再启动一个节点派发事件
package pubsub
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator
/**
* Created by: tankx
* Date: 2019/7/18
* Description:
*/
fun main() {
var system = getSystem(3661);
Thread.sleep(3000)
var mediator: ActorRef = DistributedPubSub.get(system).mediator()
for (i in 1..1000) {
mediator.tell(DistributedPubSubMediator.Publish(topA, "消息XXXXXX"), ActorRef.noSender())
Thread.sleep(2000)
}
}
配置文件
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
cluster {
seed-nodes = [
"akka.tcp://custerPubSystem@127.0.0.1:3660"
]
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
}
依赖JAR
compile("com.typesafe.akka:akka-actor_2.13:$akkaVersion")
compile("com.typesafe.akka:akka-remote_2.13:$akkaVersion")
compile("com.typesafe.akka:akka-cluster-tools_2.13:$akkaVersion")
结果:
2019-07-18 20:19:55.941 [custerPubSystem-akka.actor.default-dispatcher-4] INFO pubsub.Sub 77- 收到事件: 消息XXXXXX
2019-07-18 20:19:55.942 [custerPubSystem-akka.actor.default-dispatcher-4] INFO pubsub.Sub 77- 收到事件: 消息XXXXXX
结论:
AKKA 集群中的发布与订阅在节点之间的Actor之间广播消息,监听自己关心的主题消息做相应逻辑,是非常方便与很多场景适用的
原文出处:https://www.cnblogs.com/tankaixiong/p/11209895.html
Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster
在ClusterClient源码分析中,我们知道,他是依托于“Distributed Publish Subscribe in Cluster”来实现消息的转发的,那本文就来分析一下Pub/Sub是如何实现的。
还记得之前分析Cluster源码的文章吗?其实Cluster只是把集群内各个节点的信息通过gossip协议公布出来,并把节点的信息分发出来。但各个actor的地址还是需要开发者自行获取或设计的,比如我要跟worker通信,那就需要知道这个actor在哪个节点,通过actorPath或actorRef通信。
“Distributed Publish Subscribe”就是用来屏蔽Actor位置的一个组件,通过它你可以给actor发消息而不需要知道actor的网咯位置。其实就是提供了一个类似kafka的消息发布、订阅的机制,其实吧,如果这个功能让你实现,你准备怎么做?肯定是在集群层面提供一个proxy,来屏蔽目标actor的网络位置啊。简单来说,就是提供一个通用的actor,来对消息进行转发,发送者只需要提供目标actor的路径就好了(比如/user/serviceA)。不过还是那句话,akka的都是对的,akka的都是好的。akka帮你实现这个事儿,就不用你自己考虑通用、稳定的问题啦。
消息订阅发布模式提供了一个中继actor:akka.cluster.pubsub.DistributedPubSubMediator。它管理actor的注册引用、分发实例引用给端actor,而且必须在所有的节点或一组节点内启动。它可以通过DistributedPubSub扩展启动,也可以像普通actor那样启动。
服务actor的注册是最终一致的,也就是说服务信息在变化时并不能立即通知给其他节点,过一段时间参会分发给所有节点。当然了每次都是以增量的信息分发这些信息。
消息的发送有两种模式:Send和Publish。简单来说就是点对点、广播。
Publish模式下,只有注册到命名的topic的actor才会收到消息,topic是啥?。其实这才是真正的订阅、发布模式。
class Subscriber extends Actor with ActorLogging {
import DistributedPubSubMediator.{ Subscribe, SubscribeAck }
val mediator = DistributedPubSub(context.system).mediator
// subscribe to the topic named "content"
mediator ! Subscribe("content", self)
def receive = {
case s: String ⇒
log.info("Got {}", s)
case SubscribeAck(Subscribe("content", None, `self`)) ⇒
log.info("subscribing")
}
}
class Publisher extends Actor {
import DistributedPubSubMediator.Publish
// activate the extension
val mediator = DistributedPubSub(context.system).mediator
def receive = {
case in: String ⇒
val out = in.toUpperCase
mediator ! Publish("content", out)
}
}
上面是官方的demo,可以看出,订阅者actor订阅了名为“content”的topic,在发布者actor发送指定topic的消息时,会自动收到对应的消息。怎么样,是不是很简单。其实吧,mediator只需要维护一个topic到订阅者的映射列表就好了,当收到对应topic的消息时,取出对应的订阅者(也就是ActorRef或actorSelection)把消息转发给他就好了。
Send模式就是一个点对点模式,每个消息被发送给一个目标,而不用知道这个目标actors的位置。既然之前我们说了,这是通过ActorPath发送的,那如果集群中同时有多个节点命中了这个ActorPath怎么办呢?那就路由呗,提供一个RoutingLogic
路由策略。默认策略是随机发送,当然了我们是可以修改这个策略的。与Publish模式不同,这里注册服务actor是通过Put消息实现的。不过实现原理都差不多,反正都要维护列表。
class Destination extends Actor with ActorLogging {
import DistributedPubSubMediator.Put
val mediator = DistributedPubSub(context.system).mediator
// register to the path
mediator ! Put(self)
def receive = {
case s: String ⇒
log.info("Got {}", s)
}
}
class Sender extends Actor {
import DistributedPubSubMediator.Send
// activate the extension
val mediator = DistributedPubSub(context.system).mediator
def receive = {
case in: String ⇒
val out = in.toUpperCase
mediator ! Send(path = "/user/destination", msg = out, localAffinity = true)
}
}
当然了,我们还是可以通过SendToAll把消息发送给所有命中指定path的actor的。
这里需要注意的是,官方的订阅发布组件只能保证至少一次投递,想想都是这样的,哈哈。废话不多说了,上代码。
object DistributedPubSub extends ExtensionId[DistributedPubSub] with ExtensionIdProvider {
override def get(system: ActorSystem): DistributedPubSub = super.get(system)
override def lookup = DistributedPubSub
override def createExtension(system: ExtendedActorSystem): DistributedPubSub =
new DistributedPubSub(system)
}
很显然DistributedPubSub这个扩展也是可以通过配置直接实例化的,不需要我们自行写代码实例化。由于其源码非常简单就是定义并创建了mediator这个actor(DistributedPubSubMediator),下面直接转到DistributedPubSubMediator源码的分析。
/**
* This actor manages a registry of actor references and replicates
* the entries to peer actors among all cluster nodes or a group of nodes
* tagged with a specific role.
*
* The `DistributedPubSubMediator` actor is supposed to be started on all nodes,
* or all nodes with specified role, in the cluster. The mediator can be
* started with the [[DistributedPubSub]] extension or as an ordinary actor.
*
* Changes are only performed in the own part of the registry and those changes
* are versioned. Deltas are disseminated in a scalable way to other nodes with
* a gossip protocol. The registry is eventually consistent, i.e. changes are not
* immediately visible at other nodes, but typically they will be fully replicated
* to all other nodes after a few seconds.
*
* You can send messages via the mediator on any node to registered actors on
* any other node. There is three modes of message delivery.
*
* You register actors to the local mediator with [[DistributedPubSubMediator.Put]] or
* [[DistributedPubSubMediator.Subscribe]]. `Put` is used together with `Send` and
* `SendToAll` message delivery modes. The `ActorRef` in `Put` must belong to the same
* local actor system as the mediator. `Subscribe` is used together with `Publish`.
* Actors are automatically removed from the registry when they are terminated, or you
* can explicitly remove entries with [[DistributedPubSubMediator.Remove]] or
* [[DistributedPubSubMediator.Unsubscribe]].
*
* Successful `Subscribe` and `Unsubscribe` is acknowledged with
* [[DistributedPubSubMediator.SubscribeAck]] and [[DistributedPubSubMediator.UnsubscribeAck]]
* replies.
*
* Not intended for subclassing by user code.
*/
@DoNotInherit
class DistributedPubSubMediator(settings: DistributedPubSubSettings) extends Actor with ActorLogging with PerGroupingBuffer
PerGroupingBuffer这个trait不再分析源码,从代码和命名来看,就是给每个group提供一个消息缓存的列表。其实这个actor最重要的功能是要能够感知集群节点的变化和对应服务actor的变化,并及时的把这些信息分发给其他DistributedPubSubMediator,还有就是能够把消息路由给指定的订阅者。为了简化分析,我们忽略第一个功能点,只分析是如何路由消息的。分析这点需要关注几个消息的处理逻辑:Put、Subscribe、Publish、Send、SendToAll。
先来看Subscribe 。
case msg @ Subscribe(topic, _, _) ⇒
// each topic is managed by a child actor with the same name as the topic
val encTopic = encName(topic)
bufferOr(mkKey(self.path / encTopic), msg, sender()) {
context.child(encTopic) match {
case Some(t) ⇒ t forward msg
case None ⇒ newTopicActor(encTopic) forward msg
}
}
Subscribe消息表明某个actor需要订阅某个topic的消息,简单来说就是先判断是否需要缓存,不需要的话就执行{}代码块。很显然,刚开始的时候是不需要缓存的。上面的逻辑就是从当前的children中查找encTopic的一个actor,然后把消息转发给它;不存在则创建之后再转发给它。那猜一下这个子actor的功能?其实吧,它应该是一个actor负责维护某个topic与所有订阅者的关系,所有发给这个topic的消息都会转发给所有的订阅者。
def newTopicActor(encTopic: String): ActorRef = {
val t = context.actorOf(Props(classOf[Topic], removedTimeToLive, routingLogic), name = encTopic)
registerTopic(t)
t
}
很显然newTopicActor创建了Topic这个actor,名字就是topic的值,并传入了两个参数:removedTimeToLive、routingLogic。第二个是路由策略。
def registerTopic(ref: ActorRef): Unit = {
put(mkKey(ref), Some(ref))
context.watch(ref)
}
put这个函数的功能我们先略过,其功能大概是把这个actor注册到系统内,把它与当前地址、版本号做关联并保存,在适当的时机分发出去。
Topic这个actor只有两个方法,所以还需要去看下TopicLike的代码。
可以看到TopicLike中有一个subscribers列表,这也是预期之中的。这个actor的消息会被business和defaultReceive处理,business在Topic中重新实现了,且会优先处理。
case msg @ Subscribe(_, Some(group), _) ⇒
val encGroup = encName(group)
bufferOr(mkKey(self.path / encGroup), msg, sender()) {
context.child(encGroup) match {
case Some(g) ⇒ g forward msg
case None ⇒ newGroupActor(encGroup) forward msg
}
}
pruneDeadline = None
收到Subscribe消息后,做了跟DistributedPubSubMediator类似的逻辑,又创建了一个子actor(Group),并把消息转发给了它。其实这一点在官方也有说过,也就是说,topic也是可以分组的,一个消息并不一定会发给所有订阅者,可以发给一组订阅者,其实吧,这一点我不太喜欢,感觉功能有点过了,如果要对topic划分子topic,用户自定义实现好了啊,搞得现在源码这么复杂。
class Group(val emptyTimeToLive: FiniteDuration, routingLogic: RoutingLogic) extends TopicLike {
def business = {
case SendToOneSubscriber(msg) ⇒
if (subscribers.nonEmpty)
Router(routingLogic, (subscribers map ActorRefRoutee).toVector).route(wrapIfNeeded(msg), sender())
}
}
Group的代码还这么简单,它又把Subscribe发给了TopicLike的defaultReceive
def defaultReceive: Receive = {
case msg @ Subscribe(_, _, ref) ⇒
context watch ref
subscribers += ref
pruneDeadline = None
context.parent ! Subscribed(SubscribeAck(msg), sender())
上面是defaultReceive对Subscribe消息的处理,就是watch,然后把订阅者添加到subscribers列表中,再告诉父actor(就是Topic这个actor)订阅成功了。
聪明的读者可能会问了,为啥topic还需要弄个消息缓存呢?其实吧,如果是我实现,肯定不搞这么麻烦啊。消息丢了就丢了啊,没有订阅者的时候,消息缓存起来等有订阅者的时候再发送出去?哈哈,有点浪费内存啊。不过为了稳定性、功能性、完善性,akka还是做了很多额外努力的。不过吧,建议还是把这个队列的大小调小一点,要不然太浪费内存了。不过很不幸的告诉你,目前没有这个开关。
既然订阅topic的逻辑跟我们的猜测差不多,那么发布消息的逻辑就应该也符合我们的猜测喽。其实就是获取某个topic对应的订阅者,然后foreach把消息发出去。
case Publish(topic, msg, sendOneMessageToEachGroup) ⇒
if (sendOneMessageToEachGroup)
publishToEachGroup(mkKey(self.path / encName(topic)), msg)
else
publish(mkKey(self.path / encName(topic)), msg)
简单起见,我们只分析消息不分组的情况
def publish(path: String, msg: Any, allButSelf: Boolean = false): Unit = {
val refs = for {
(address, bucket) ← registry
if !(allButSelf && address == selfAddress) // if we should skip sender() node and current address == self address => skip
valueHolder ← bucket.content.get(path)
ref ← valueHolder.ref
} yield ref
if (refs.isEmpty) ignoreOrSendToDeadLetters(msg)
else refs.foreach(_.forward(msg))
}
还记得registry什么时候赋值的嘛?如果忘了,可以翻翻registerTopic的代码,因为我没有分析,哈哈。不过不重要了,其实就是获取当前的Topic的Group的ActorRef,然后把消息转发给它。
case msg ⇒
subscribers foreach { _ forward msg }
Group继承的TopicLike中的defaultReceive方法处理了消息,其实就是把消息转发给所有的subscribers。
pub/sub的逻辑就分析到这里了,其实这里面的逻辑还是有点复杂的,当然了有一部分是因为topic分组带来的,其他的都是gossip协议分发订阅者、发布者的相关信息带来的。
下面分析Send模式。从Put消息的处理入手。
case Put(ref: ActorRef) ⇒
if (ref.path.address.hasGlobalScope)
log.warning("Registered actor must be local: [{}]", ref)
else {
put(mkKey(ref), Some(ref))
context.watch(ref)
}
这就有点简单了,就是把ref注册一下,然后watch。这个ref的key是ActorRef值,其实就是ActorPath.toString
case Send(path, msg, localAffinity) ⇒
val routees = registry(selfAddress).content.get(path) match {
case Some(valueHolder) if localAffinity ⇒
(for {
routee ← valueHolder.routee
} yield routee).toVector
case _ ⇒
(for {
(_, bucket) ← registry
valueHolder ← bucket.content.get(path)
routee ← valueHolder.routee
} yield routee).toVector
}
if (routees.isEmpty) ignoreOrSendToDeadLetters(msg)
else Router(routingLogic, routees).route(wrapIfNeeded(msg), sender())
其实就是从registry中优先找当前节点的订阅者,然后通过Router和指定的策略把消息发送出去,这个比pub/sub模式稍微简单点。wrapIfNeeded的功能不再分析,其实就是为了防止与用户本身的路由消息发生冲突。
关于节点信息同步,感兴趣的读者可以自行阅读源码,不过我看下来还是有几个问题的。比如当前注册信息的版本是通过时间戳来标志的,如果节点间时间不同步,会发生意外的结果啊;另外所谓的gossip协议,其实就是随机把注册信息发送给其他节点,也就是说集群内的节点都会把消息按照心跳时间,把注册信息随机发送给本身节点以外的节点,达到最终注册信息的同步。如果是我来实现,直接就是粗暴的广播注册信息,哈哈,不过这在集群规模比较大的时候比较耗时,啊哈哈。
Distributed Publish Subscribe in Cluster
asp.net – 使用Booksleeve维护开放的Redis PubSub订阅
我发现this solution保持与Redis的开放连接,但它在重新创建连接时不考虑订阅.
我目前正在Global.asax文件中处理Redis pubsub消息:
public class Application : HttpApplication { protected void Application_Start() { var gateway = Resolve<RedisConnectionGateway>(); var connection = gateway.GetConnection(); var channel = connection.GetopenSubscriberChannel(); channel.PatternSubscribe("workers:job-done:*",OnExecutionCompleted); } /// <summary> /// Handle messages received from workers through Redis.</summary> private static void OnExecutionCompleted(string key,byte[] message) { /* forwarded the response to the client that requested it */ } }
当前RedisConnection因任何原因关闭时会发生此问题.最简单的解决方案是在重置连接时从RedisConnectionGateway类触发事件,并使用新的RedisSubscriberChannel重新订阅.但是,在重置连接时发布到通道的任何消息都将丢失.
是否有任何推荐的方法来处理这种情况?
解决方法
是的,在您的连接中断时发布的任何事件都已消失.这就是redis pub / sub的本质;它不保证交付给断开连接的客户端.要么使用承诺这个的工具,要么使用redis来驱动队列 – 从列表的两端推送/弹出通常是一个合理的选择,并确保不会丢失任何东西(只要你的软件没有丢失)它从列表中弹出后).如果它有帮助,我在我的列表上有一个添加阻塞pop方法的请求 – 它们完全破坏了多路复用器的意图,但它们在某些情况下真的有用,所以我不反对添加它们.
Basic Tutorials of Redis(7) -Publish and Subscribe
This post is mainly about the publishment and subscription in Redis.I think you may subscribe some offiial
accounts on wechat,and when the authors published something new to their accounts,you will get them in
your wechat.The instance can make you understand this pattern easily.You will find there only 6 commands
for publishment and subscription.
This post introduces the basic usages as well.I will show you how to publish a message to a channel.publish
is the command to publish sometings.I publish a channel named news-nba with the message nba.And the client
return me a integer 0.What is the meaning of the return value?It means that there is nobody subscribe this channel.
I open a new client for subscriber.In this client,I subscribe the above channel news-nba.It will return something
about this channel.
OK,let's publish a new message to the news-nba to see the changes in the both clients.After publishing lakers to
the news-nba channel,this client returns 1.It means that the channel has a subscriber.
Let's turn to the subscriber's client.You will find the message lakers was already in the client.so amazing it is.
publish a new message to the news-nba,it returns 2 meaning ...(you understand it)
All right,let's see the other commands of this feature in Redis.Seeing the sql first:
channels channelname To execute this sql,you will get all of the channel whoes
name start with news.Redis can also
The subscriber's client will receive the message from the channel news-tech.
Again!!Publishing a message to a new channel.
As you can see,the client receives this message as well.
following code demonstrates the usage in C#.
今天关于终止挂起的redis pubsub.listen和线程的讲解已经结束,谢谢您的阅读,如果想了解更多关于AKKA 集群中的发布与订阅Distributed Publish Subscribe in Cluster、Akka源码分析-Cluster-Distributed Publish Subscribe in Cluster、asp.net – 使用Booksleeve维护开放的Redis PubSub订阅、Basic Tutorials of Redis(7) -Publish and Subscribe的相关知识,请在本站搜索。
本文标签: