本文将带您了解关于nodejs完成mqtt服务端的新内容,同时我们还将为您解释nodemcumqtt的相关知识,另外,我们还将为您提供关于CI/CD/开发/生产服务器中的多个NodeJs与单个Node
本文将带您了解关于nodejs 完成 mqtt 服务端的新内容,同时我们还将为您解释nodemcu mqtt的相关知识,另外,我们还将为您提供关于CI/CD/开发/生产服务器中的多个 NodeJs 与单个 NodeJs 版本、io.netty.handler.codec.mqtt.MqttDecoder.DecoderState的实例源码、io.netty.handler.codec.mqtt.MqttDecoder的实例源码、io.netty.handler.codec.mqtt.MqttEncoder的实例源码的实用信息。
本文目录一览:- nodejs 完成 mqtt 服务端(nodemcu mqtt)
- CI/CD/开发/生产服务器中的多个 NodeJs 与单个 NodeJs 版本
- io.netty.handler.codec.mqtt.MqttDecoder.DecoderState的实例源码
- io.netty.handler.codec.mqtt.MqttDecoder的实例源码
- io.netty.handler.codec.mqtt.MqttEncoder的实例源码
nodejs 完成 mqtt 服务端(nodemcu mqtt)
今天使用 mosca 写了一下基于 MQTT 的消息服务端,用于下一个项目的知识储备;
该功能主要是基本 NODEJS 的 mosca 插件完成
1. 安装 mosca
npm install mosca --save
2. 创建 mqtt 服务端,端口为:8000
var mosca = require(''mosca'');
var MqttServer = new mosca.Server({
port: 8000
});
3. mqtt 服务端部分逻辑处理
MqttServer.on(''clientConnected'', function(client){
console.log(''client connected'', client.id);
});
/**
* 监听MQTT主题消息
**/
MqttServer.on(''published'', function(packet, client) {
var topic = packet.topic;
switch(topic){
case ''pubMsg'':
console.log(''message-publish'', packet.payload.toString());
//MQTT转发主题消息
MqttServer.publish({topic: ''other'', payload: ''sssss''});
//发送消息NODEJS
console.log(''HD: ''+ YHSocketMap.get(''1000''));
//发送socket.io消息
//io.sockets.socket(YHSocketMap.get(''1000'')).emit(''subState'', packet);
break;
case ''other'':
console.log(''message-123'', packet.payload.toString());
break;
}
});
MqttServer.on(''ready'', function(){
console.log(''mqtt is running...'');
});
原文链接:http://86hyy.com/archives/nodejs-mqtt-server-auth2.html
CI/CD/开发/生产服务器中的多个 NodeJs 与单个 NodeJs 版本
如何解决CI/CD/开发/生产服务器中的多个 NodeJs 与单个 NodeJs 版本?
各位高手,我有几个关于NodeJs开发/生产环境的问题。 我们目前使用一个 nodejs 版本(开发环境和生产环境)。 我很难理解升级 NodeJs 的最佳实践是什么。 我知道我必须安装 nodejs 补丁才能获得安全保护。 每次 NodeJs 发布新补丁时,我都必须安装安全补丁吗? 我是否必须确保本地开发环境与生产环境具有相同的 Nodejs 版本? 我可以从这里借用任何最佳实践吗?或某处? 如果支持多版本的 nodejs 是最好的方式,那么我可以在我的开发环境和生产环境以及我的 CI/CD 中使用什么样的工具? (我正在使用 teamcity 和 IIS)
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)
io.netty.handler.codec.mqtt.MqttDecoder.DecoderState的实例源码
public MqttDecoder(int maxBytesInMessage) { super(DecoderState.READ_FIXED_HEADER); this.maxBytesInMessage = maxBytesInMessage; }
private MqttMessage invalidMessage(Throwable cause) { checkpoint(DecoderState.BAD_MESSAGE); return MqttMessageFactory.newInvalidMessage(cause); }
public MqttDecoder(int maxBytesInMessage) { super(DecoderState.READ_FIXED_HEADER); this.maxBytesInMessage = maxBytesInMessage; }
private MqttMessage invalidMessage(Throwable cause) { checkpoint(DecoderState.BAD_MESSAGE); return MqttMessageFactory.newInvalidMessage(cause); }
public MqttDecoder(int maxBytesInMessage) { super(DecoderState.READ_FIXED_HEADER); this.maxBytesInMessage = maxBytesInMessage; }
private MqttMessage invalidMessage(Throwable cause) { checkpoint(DecoderState.BAD_MESSAGE); return MqttMessageFactory.newInvalidMessage(cause); }
io.netty.handler.codec.mqtt.MqttDecoder的实例源码
@Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); SslHandler sslHandler = null; if (sslHandlerProvider != null) { sslHandler = sslHandlerProvider.getSslHandler(); pipeline.addLast(sslHandler); } pipeline.addLast("decoder",new MqttDecoder(MAX_PAYLOAD_SIZE)); pipeline.addLast("encoder",MqttEncoder.INSTANCE); MqttTransportHandler handler = new MqttTransportHandler(msgProducer,deviceService,authService,assetService,assetAuthService,relationService,sslHandler); pipeline.addLast(handler); // ch.closeFuture().addListener(handler); }
public void start(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MqttDecoder()); ch.pipeline().addLast(MqttEncoder.INSTANCE); ch.pipeline().addLast(new MqttInBoundHandler()); } }) .option(ChannelOption.so_BACKLOG,1024) .childOption(ChannelOption.so_KEEPALIVE,true); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
@Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); SslHandler sslHandler = null; if (sslHandlerProvider != null) { sslHandler = sslHandlerProvider.getSslHandler(); pipeline.addLast(sslHandler); } pipeline.addLast("decoder",sslHandler); pipeline.addLast(handler); // ch.closeFuture().addListener(handler); }
public MQTTHardwareServer(Holder holder) { super(holder.props.getProperty("listen.address"),holder.props.getIntProperty("hardware.mqtt.port"),holder.transportTypeHolder); int hardTimeoutSecs = holder.limits.hardwareIdleTimeout; MqttHardwareLoginHandler mqttHardwareLoginHandler = new MqttHardwareLoginHandler(holder); HardwareChannelStateHandler hardwareChannelStateHandler = new HardwareChannelStateHandler(holder.sessionDao,holder.gcmWrapper); channelInitializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("MqttIdleStateHandler",new IdleStateHandler(hardTimeoutSecs,hardTimeoutSecs,0)) .addLast(hardwareChannelStateHandler) .addLast(new MqttDecoder()) .addLast(MqttEncoder.INSTANCE) .addLast(mqttHardwareLoginHandler) .addLast(new HardwareNotLoggedHandler()); } }; log.debug("hard.socket.idle.timeout = {}",hardTimeoutSecs); }
protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("idleState",new IdleStateHandler(90,TimeUnit.SECONDS)); pipeline.addLast("mqttDecoder",new MqttDecoder()); pipeline.addLast("mqttEncoder",MqttEncoder.INSTANCE); pipeline.addLast("mqttHandler",new MqttHandler(new MqttProcessor(server))); }
private void initChannel(ChannelPipeline pipeline) { pipeline.addBefore("handler","mqttEncoder",MqttEncoder.INSTANCE); if (this.options.getMaxMessageSize() > 0) { pipeline.addBefore("handler","mqttDecoder",new MqttDecoder(this.options.getMaxMessageSize())); } else { // max message size not set,so the default from Netty MQTT codec is used pipeline.addBefore("handler",new MqttDecoder()); } // adding the idle state handler for timeout on CONNECT packet pipeline.addBefore("handler","idle",new IdleStateHandler(this.options.timeoutOnConnect(),0)); pipeline.addBefore("handler","timeoutOnConnect",new ChannelDuplexHandler() { @Override public void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.READER_IDLE) { // as MQTT 3.1.1 describes,if no packet is sent after a "reasonable" time (here CONNECT timeout) // the connection is closed ctx.channel().close(); } } } }); }
private void initChannel(ChannelPipeline pipeline) { // add into pipeline netty's (en/de)coder pipeline.addBefore("handler",MqttEncoder.INSTANCE); if (this.options.getMaxMessageSize() > 0) { pipeline.addBefore("handler",new MqttDecoder()); } if (this.options.isAutoKeepAlive() && this.options.getKeepAliveTimeSeconds() != 0) { pipeline.addBefore("handler",new IdleStateHandler(0,this.options.getKeepAliveTimeSeconds(),0)); pipeline.addBefore("handler","keepAliveHandler",new ChannelDuplexHandler() { @Override public void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.WRITER_IDLE) { ping(); } } } }); } }
@Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); SslHandler sslHandler = null; if (sslHandlerProvider != null) { sslHandler = sslHandlerProvider.getSslHandler(); pipeline.addLast(sslHandler); } pipeline.addLast("decoder",MqttEncoder.INSTANCE); MqttTransportHandler handler = new MqttTransportHandler(processor,adaptor,sslHandler); pipeline.addLast(handler); ch.closeFuture().addListener(handler); }
@Override protected void customizePipeline(EventExecutorGroup eventExecutorGroup,ChannelPipeline pipeline) { pipeline.addLast("decoder",new MqttDecoder()); pipeline.addLast("encoder",new MqttEncoder()); // we finally have the chance to add some business logic. pipeline.addLast(eventExecutorGroup,"iotracah-mqtt",new MqttServerHandler((MqttServerImpl) getServerImpl())); }
@Override public void addChannelHandlers(ChannelPipeline pipeline) { pipeline.addLast(MqttEncoder.INSTANCE); pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE)); pipeline.addLast(new MQTTProtocolHandler(server,this)); }
void init() { String host = "0.0.0.0"; int port = 1883; m_bossGroup = new NioEventLoopGroup(); m_workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(m_bossGroup,m_workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); try { pipeline.addLast("decoder",new MqttDecoder()); pipeline.addLast("encoder",MqttEncoder.INSTANCE); pipeline.addLast("handler",new PublishReceiverHandler()); // pipeline.addLast("decoder",new MqttDecoder()); // pipeline.addLast("encoder",MqttEncoder.INSTANCE); // pipeline.addLast("handler",new NettyPublishReceiverHandler()); } catch (Throwable th) { LOG.error("Severe error during pipeline creation",th); throw th; } } }) .option(ChannelOption.so_BACKLOG,128) .option(ChannelOption.so_REUSEADDR,true) .option(ChannelOption.TCP_NODELAY,true) .childOption(ChannelOption.so_KEEPALIVE,true); try { // Bind and start to accept incoming connections. ChannelFuture f = b.bind(host,port); LOG.info("Server binded host: {},port: {}",host,port); f.sync(); } catch (InterruptedException ex) { LOG.error(null,ex); } }
void init() { String host = "0.0.0.0"; int port = 1883; m_bossGroup = new NioEventLoopGroup(); m_workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(m_bossGroup,m_workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); try { pipeline.addFirst("idleStateHandler",new IdleStateHandler(2,0)); pipeline.addAfter("idleStateHandler","idleEventHandler",new MoquetteIdleTimeoutHandler()); pipeline.addLast("decoder",new MqttDecoder()); pipeline.addLast("encoder",MqttEncoder.INSTANCE); pipeline.addLast("handler",new LoopMQTTHandler(state)); } catch (Throwable th) { LOG.error("Severe error during pipeline creation",th); throw th; } } }) .option(ChannelOption.so_BACKLOG,128) .option(ChannelOption.so_REUSEADDR,true) .option(ChannelOption.TCP_NODELAY,true) .childOption(ChannelOption.so_KEEPALIVE,true); try { // Bind and start to accept incoming connections. ChannelFuture f = b.bind(host,port); LOG.info("Server binded host: {},port); f.sync(); } catch (InterruptedException ex) { LOG.error(null,ex); } }
private void initializePlainTCPTransport(final NettyMQTTHandler handler,IConfig props) throws IOException { LOG.info("Configuring TCP MQTT transport"); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(brokerConstants.HOST_PROPERTY_NAME); String tcpPortProp = props.getProperty(PORT_PROPERTY_NAME,disABLED_PORT_BIND); if (disABLED_PORT_BIND.equals(tcpPortProp)) { LOG.info("Property {} has been set to {}. TCP MQTT will be disabled",brokerConstants.PORT_PROPERTY_NAME,disABLED_PORT_BIND); return; } int port = Integer.parseInt(tcpPortProp); initFactory(host,port,"TCP MQTT",new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) { pipeline.addFirst("idleStateHandler",new IdleStateHandler(nettyChannelTimeoutSeconds,0)); pipeline.addAfter("idleStateHandler",timeoutHandler); // pipeline.addLast("logger",new LoggingHandler("Netty",LogLevel.ERROR)); if (errorsCather.isPresent()) { pipeline.addLast("bugsnagCatcher",errorsCather.get()); } pipeline.addFirst("bytemetrics",new BytesMetricsHandler(m_bytesMetricsCollector)); pipeline.addLast("decoder",new MqttDecoder()); pipeline.addLast("encoder",MqttEncoder.INSTANCE); pipeline.addLast("metrics",new MessageMetricsHandler(m_metricsCollector)); pipeline.addLast("messageLogger",new MQTTMessageLogger()); if (metrics.isPresent()) { pipeline.addLast("wizardMetrics",metrics.get()); } pipeline.addLast("handler",handler); } }); }
private void initializeWebSocketTransport(final NettyMQTTHandler handler,IConfig props) throws IOException { LOG.info("Configuring Websocket MQTT transport"); String webSocketPortProp = props.getProperty(WEB_SOCKET_PORT_PROPERTY_NAME,disABLED_PORT_BIND); if (disABLED_PORT_BIND.equals(webSocketPortProp)) { // Do nothing no WebSocket configured LOG.info("Property {} has been setted to {}. Websocket MQTT will be disabled",brokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME,disABLED_PORT_BIND); return; } int port = Integer.parseInt(webSocketPortProp); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(brokerConstants.HOST_PROPERTY_NAME); initFactory(host,"Websocket MQTT",new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) { pipeline.addLast(new HttpServerCodec()); pipeline.addLast("aggregator",new HttpObjectAggregator(65536)); pipeline.addLast("webSocketHandler",new WebSocketServerProtocolHandler("/mqtt",MQTT_SUBPROTOCOL_CSV_LIST)); pipeline.addLast("ws2bytebufDecoder",new WebSocketFrametoByteBufDecoder()); pipeline.addLast("bytebuf2wsEncoder",new ByteBufToWebSocketFrameEncoder()); pipeline.addFirst("idleStateHandler",timeoutHandler); pipeline.addFirst("bytemetrics",new MQTTMessageLogger()); pipeline.addLast("handler",handler); } }); }
private void initializeSSLTCPTransport(final NettyMQTTHandler handler,IConfig props,final SSLContext sslContext) throws IOException { LOG.info("Configuring SSL MQTT transport"); String sslPortProp = props.getProperty(SSL_PORT_PROPERTY_NAME,disABLED_PORT_BIND); if (disABLED_PORT_BIND.equals(sslPortProp)) { // Do nothing no SSL configured LOG.info("Property {} has been set to {}. SSL MQTT will be disabled",brokerConstants.SSL_PORT_PROPERTY_NAME,disABLED_PORT_BIND); return; } int sslPort = Integer.parseInt(sslPortProp); LOG.info("Starting SSL on port {}",sslPort); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(brokerConstants.HOST_PROPERTY_NAME); String sNeedsClientAuth = props.getProperty(brokerConstants.NEED_CLIENT_AUTH,"false"); final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth); initFactory(host,sslPort,"SSL MQTT",new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) throws Exception { pipeline.addLast("ssl",createSslHandler(sslContext,needsClientAuth)); pipeline.addFirst("idleStateHandler",LogLevel.ERROR)); pipeline.addFirst("bytemetrics",handler); } }); }
private void initializeWsstransport(final NettyMQTTHandler handler,final SSLContext sslContext) throws IOException { LOG.info("Configuring secure websocket MQTT transport"); String sslPortProp = props.getProperty(WSS_PORT_PROPERTY_NAME,disABLED_PORT_BIND); if (disABLED_PORT_BIND.equals(sslPortProp)) { // Do nothing no SSL configured LOG.info("Property {} has been set to {}. Secure websocket MQTT will be disabled",brokerConstants.WSS_PORT_PROPERTY_NAME,disABLED_PORT_BIND); return; } int sslPort = Integer.parseInt(sslPortProp); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(brokerConstants.HOST_PROPERTY_NAME); String sNeedsClientAuth = props.getProperty(brokerConstants.NEED_CLIENT_AUTH,"Secure websocket",needsClientAuth)); pipeline.addLast("httpEncoder",new HttpResponseEncoder()); pipeline.addLast("httpDecoder",new HttpRequestDecoder()); pipeline.addLast("aggregator",handler); } }); }
public static void main(String[] args) throws Exception { Resources resources = new Resources(); C3P0NativeJdbcExtractor cp30NativeJdbcExtractor = new C3P0NativeJdbcExtractor(); dispatcher dispatcher = new dispatcher(cp30NativeJdbcExtractor.getNativeConnection(resources.postgres.getConnection())); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(dispatcher); Onlinestate state = new Onlinestate(); ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup worker = new NioEventLoopGroup(); try { bootstrap.group(worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MqttDecoder()); ch.pipeline().addLast(MqttEncoder.INSTANCE); ch.pipeline().addLast(new IdleStateHandler(resources.maxIdleTime,0)); ch.pipeline().addLast(new MqttHandler(resources.postgres,dispatcher,resources.mongo,state)); } }); ChannelFuture future = bootstrap.bind(resources.port).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); } }
public MqttConnectReturnCode connect() throws InterruptedException { Class<? extends SocketChannel> socketChannelClass; if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) { group = new EpollEventLoopGroup(1,new DefaultThreadFactory("client")); socketChannelClass = EpollSocketChannel.class; } else { group = new NioEventLoopGroup(1,new DefaultThreadFactory("client")); socketChannelClass = NioSocketChannel.class; } bootstrap.group(group).channel(socketChannelClass).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { if ("mqtts".equalsIgnoreCase(uri.getScheme())) { SslContext sslCtx = SslContextBuilder.forClient().trustManager(trustManagerFactory).build(); ch.pipeline().addLast(sslCtx.newHandler(ch.alloc(),uri.getHost(),uri.getPort())); } ch.pipeline().addLast(MqttDecoder.class.getName(),new MqttDecoder()); ch.pipeline().addLast(MqttEncoder.class.getName(),MqttEncoder.INSTANCE); ch.pipeline().addLast(MqttPacketReceiver.class.getName(),new MqttPacketReceiver(MqttClient.this,receiver,sharedobject)); } }); channel = bootstrap.connect(uri.getHost(),uri.getPort()).sync().channel(); normalizeMessage(options.will()); send(MqttMessageFactory.connect(options)); synchronized (sharedobject.locker()) { int timeout = Settings.INSTANCE.getInt("mqttclient.responseTimeoutSeconds",15); sharedobject.locker().wait(timeout * 1000); } if (sharedobject.receivedMessage() == null) { return null; } return ((MqttConnAckMessage) sharedobject.receivedMessage()).variableHeader().connectReturnCode(); }
@Override protected void initChannel(SocketChannel ch) throws Exception { logger.debug("Initializaing channels..."); ch.pipeline().addLast(ByteCounterCodec.class.getName(),new ByteCounterCodec()); if ("true".equalsIgnoreCase(Settings.INSTANCE.getProperty("netty.logger"))) { ch.pipeline().addLast(LoggingHandler.class.getName(),new LoggingHandler(LogLevel.DEBUG)); } if (useSsl) { SslContext sslCtx = SslContextBuilder .forServer(Settings.INSTANCE.certChainFile(),Settings.INSTANCE.privateKeyFile()).build(); logger.debug("SSL Provider : {}",SslContext.defaultServerProvider()); ch.pipeline().addLast(sslCtx.newHandler(ch.alloc())); } if (useWebSocket) { String websocketPath = Settings.INSTANCE.getProperty("mqttserver.websocket.path","/"); ch.pipeline().addLast(HttpServerCodec.class.getName(),new HttpServerCodec()); ch.pipeline().addLast(HttpObjectAggregator.class.getName(),new HttpObjectAggregator(1048576)); ch.pipeline().addLast(HttpContentCompressor.class.getName(),new HttpContentCompressor()); ch.pipeline().addLast(WebSocketServerProtocolHandler.class.getName(),new WebSocketServerProtocolHandler(websocketPath,"mqtt,mqttv3.1,mqttv3.1.1",true,65536)); // [MQTT-6.0.0-3] ch.pipeline().addLast(new MqttWebSocketCodec()); } int maxBytesInMessage = Settings.INSTANCE.getInt("mqttserver.maxBytesInMessage",8092); ch.pipeline().addLast(MqttDecoder.class.getName(),new MqttDecoder(maxBytesInMessage)); ch.pipeline().addLast(MqttEncoder.class.getName(),MqttEncoder.INSTANCE); ch.pipeline().addLast(ConnectReceiver.class.getName(),ConnectReceiver.INSTANCE); ch.pipeline().addLast(PubAckReceiver.class.getName(),PubAckReceiver.INSTANCE); ch.pipeline().addLast(PublishReceiver.class.getName(),PublishReceiver.INSTANCE); ch.pipeline().addLast(SubscribeReceiver.class.getName(),SubscribeReceiver.INSTANCE); ch.pipeline().addLast(UnsubscribeReceiver.class.getName(),UnsubscribeReceiver.INSTANCE); ch.pipeline().addLast(GenericReceiver.class.getName(),GenericReceiver.INSTANCE); }
io.netty.handler.codec.mqtt.MqttEncoder的实例源码
@Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); SslHandler sslHandler = null; if (sslHandlerProvider != null) { sslHandler = sslHandlerProvider.getSslHandler(); pipeline.addLast(sslHandler); } pipeline.addLast("decoder",new MqttDecoder(MAX_PAYLOAD_SIZE)); pipeline.addLast("encoder",MqttEncoder.INSTANCE); MqttTransportHandler handler = new MqttTransportHandler(msgProducer,deviceService,authService,assetService,assetAuthService,relationService,sslHandler); pipeline.addLast(handler); // ch.closeFuture().addListener(handler); }
public void start(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MqttDecoder()); ch.pipeline().addLast(MqttEncoder.INSTANCE); ch.pipeline().addLast(new MqttInBoundHandler()); } }) .option(ChannelOption.so_BACKLOG,1024) .childOption(ChannelOption.so_KEEPALIVE,true); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
@Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); SslHandler sslHandler = null; if (sslHandlerProvider != null) { sslHandler = sslHandlerProvider.getSslHandler(); pipeline.addLast(sslHandler); } pipeline.addLast("decoder",sslHandler); pipeline.addLast(handler); // ch.closeFuture().addListener(handler); }
public MQTTHardwareServer(Holder holder) { super(holder.props.getProperty("listen.address"),holder.props.getIntProperty("hardware.mqtt.port"),holder.transportTypeHolder); int hardTimeoutSecs = holder.limits.hardwareIdleTimeout; MqttHardwareLoginHandler mqttHardwareLoginHandler = new MqttHardwareLoginHandler(holder); HardwareChannelStateHandler hardwareChannelStateHandler = new HardwareChannelStateHandler(holder.sessionDao,holder.gcmWrapper); channelInitializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("MqttIdleStateHandler",new IdleStateHandler(hardTimeoutSecs,hardTimeoutSecs,0)) .addLast(hardwareChannelStateHandler) .addLast(new MqttDecoder()) .addLast(MqttEncoder.INSTANCE) .addLast(mqttHardwareLoginHandler) .addLast(new HardwareNotLoggedHandler()); } }; log.debug("hard.socket.idle.timeout = {}",hardTimeoutSecs); }
protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("idleState",new IdleStateHandler(90,TimeUnit.SECONDS)); pipeline.addLast("mqttDecoder",new MqttDecoder()); pipeline.addLast("mqttEncoder",MqttEncoder.INSTANCE); pipeline.addLast("mqttHandler",new MqttHandler(new MqttProcessor(server))); }
private void initChannel(ChannelPipeline pipeline) { pipeline.addBefore("handler","mqttEncoder",MqttEncoder.INSTANCE); if (this.options.getMaxMessageSize() > 0) { pipeline.addBefore("handler","mqttDecoder",new MqttDecoder(this.options.getMaxMessageSize())); } else { // max message size not set,so the default from Netty MQTT codec is used pipeline.addBefore("handler",new MqttDecoder()); } // adding the idle state handler for timeout on CONNECT packet pipeline.addBefore("handler","idle",new IdleStateHandler(this.options.timeoutOnConnect(),0)); pipeline.addBefore("handler","timeoutOnConnect",new ChannelDuplexHandler() { @Override public void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.READER_IDLE) { // as MQTT 3.1.1 describes,if no packet is sent after a "reasonable" time (here CONNECT timeout) // the connection is closed ctx.channel().close(); } } } }); }
private void initChannel(ChannelPipeline pipeline) { // add into pipeline netty's (en/de)coder pipeline.addBefore("handler",MqttEncoder.INSTANCE); if (this.options.getMaxMessageSize() > 0) { pipeline.addBefore("handler",new MqttDecoder()); } if (this.options.isAutoKeepAlive() && this.options.getKeepAliveTimeSeconds() != 0) { pipeline.addBefore("handler",new IdleStateHandler(0,this.options.getKeepAliveTimeSeconds(),0)); pipeline.addBefore("handler","keepAliveHandler",new ChannelDuplexHandler() { @Override public void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.WRITER_IDLE) { ping(); } } } }); } }
@Test public void noConnectTest(TestContext context) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("mqttEncoder",MqttEncoder.INSTANCE); } }); // Start the client. ChannelFuture f = bootstrap.connect(MQTT_SERVER_HOST,MQTT_SERVER_PORT).sync(); f.channel().writeAndFlush(createPublishMessage()); // Wait until the connection is closed. f.channel().closeFuture().sync(); context.assertTrue(true); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }
@Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); SslHandler sslHandler = null; if (sslHandlerProvider != null) { sslHandler = sslHandlerProvider.getSslHandler(); pipeline.addLast(sslHandler); } pipeline.addLast("decoder",MqttEncoder.INSTANCE); MqttTransportHandler handler = new MqttTransportHandler(processor,adaptor,sslHandler); pipeline.addLast(handler); ch.closeFuture().addListener(handler); }
@Test public void noConnectMessage(TestContext context) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("mqttEncoder",MqttEncoder.INSTANCE); } }); // Start the client. ChannelFuture f = bootstrap.connect(MQTT_BIND_ADDRESS,MQTT_LISTEN_PORT).sync(); f.channel().writeAndFlush(createPublishMessage()); // Wait until the connection is closed. f.channel().closeFuture().sync(); context.assertTrue(true); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }
@Override protected void customizePipeline(EventExecutorGroup eventExecutorGroup,ChannelPipeline pipeline) { pipeline.addLast("decoder",new MqttDecoder()); pipeline.addLast("encoder",new MqttEncoder()); // we finally have the chance to add some business logic. pipeline.addLast(eventExecutorGroup,"iotracah-mqtt",new MqttServerHandler((MqttServerImpl) getServerImpl())); }
@Override public void addChannelHandlers(ChannelPipeline pipeline) { pipeline.addLast(MqttEncoder.INSTANCE); pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE)); pipeline.addLast(new MQTTProtocolHandler(server,this)); }
void init() { String host = "0.0.0.0"; int port = 1883; m_bossGroup = new NioEventLoopGroup(); m_workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(m_bossGroup,m_workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); try { pipeline.addLast("decoder",new MqttDecoder()); pipeline.addLast("encoder",MqttEncoder.INSTANCE); pipeline.addLast("handler",new PublishReceiverHandler()); // pipeline.addLast("decoder",new MqttDecoder()); // pipeline.addLast("encoder",MqttEncoder.INSTANCE); // pipeline.addLast("handler",new NettyPublishReceiverHandler()); } catch (Throwable th) { LOG.error("Severe error during pipeline creation",th); throw th; } } }) .option(ChannelOption.so_BACKLOG,128) .option(ChannelOption.so_REUSEADDR,true) .option(ChannelOption.TCP_NODELAY,true) .childOption(ChannelOption.so_KEEPALIVE,true); try { // Bind and start to accept incoming connections. ChannelFuture f = b.bind(host,port); LOG.info("Server binded host: {},port: {}",host,port); f.sync(); } catch (InterruptedException ex) { LOG.error(null,ex); } }
void init() { String host = "0.0.0.0"; int port = 1883; m_bossGroup = new NioEventLoopGroup(); m_workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(m_bossGroup,m_workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); try { pipeline.addFirst("idleStateHandler",new IdleStateHandler(2,0)); pipeline.addAfter("idleStateHandler","idleEventHandler",new MoquetteIdleTimeoutHandler()); pipeline.addLast("decoder",new MqttDecoder()); pipeline.addLast("encoder",MqttEncoder.INSTANCE); pipeline.addLast("handler",new LoopMQTTHandler(state)); } catch (Throwable th) { LOG.error("Severe error during pipeline creation",th); throw th; } } }) .option(ChannelOption.so_BACKLOG,128) .option(ChannelOption.so_REUSEADDR,true) .option(ChannelOption.TCP_NODELAY,true) .childOption(ChannelOption.so_KEEPALIVE,true); try { // Bind and start to accept incoming connections. ChannelFuture f = b.bind(host,port); LOG.info("Server binded host: {},port); f.sync(); } catch (InterruptedException ex) { LOG.error(null,ex); } }
private void initializePlainTCPTransport(final NettyMQTTHandler handler,IConfig props) throws IOException { LOG.info("Configuring TCP MQTT transport"); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(brokerConstants.HOST_PROPERTY_NAME); String tcpPortProp = props.getProperty(PORT_PROPERTY_NAME,disABLED_PORT_BIND); if (disABLED_PORT_BIND.equals(tcpPortProp)) { LOG.info("Property {} has been set to {}. TCP MQTT will be disabled",brokerConstants.PORT_PROPERTY_NAME,disABLED_PORT_BIND); return; } int port = Integer.parseInt(tcpPortProp); initFactory(host,port,"TCP MQTT",new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) { pipeline.addFirst("idleStateHandler",new IdleStateHandler(nettyChannelTimeoutSeconds,0)); pipeline.addAfter("idleStateHandler",timeoutHandler); // pipeline.addLast("logger",new LoggingHandler("Netty",LogLevel.ERROR)); if (errorsCather.isPresent()) { pipeline.addLast("bugsnagCatcher",errorsCather.get()); } pipeline.addFirst("bytemetrics",new BytesMetricsHandler(m_bytesMetricsCollector)); pipeline.addLast("decoder",new MqttDecoder()); pipeline.addLast("encoder",MqttEncoder.INSTANCE); pipeline.addLast("metrics",new MessageMetricsHandler(m_metricsCollector)); pipeline.addLast("messageLogger",new MQTTMessageLogger()); if (metrics.isPresent()) { pipeline.addLast("wizardMetrics",metrics.get()); } pipeline.addLast("handler",handler); } }); }
private void initializeWebSocketTransport(final NettyMQTTHandler handler,IConfig props) throws IOException { LOG.info("Configuring Websocket MQTT transport"); String webSocketPortProp = props.getProperty(WEB_SOCKET_PORT_PROPERTY_NAME,disABLED_PORT_BIND); if (disABLED_PORT_BIND.equals(webSocketPortProp)) { // Do nothing no WebSocket configured LOG.info("Property {} has been setted to {}. Websocket MQTT will be disabled",brokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME,disABLED_PORT_BIND); return; } int port = Integer.parseInt(webSocketPortProp); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(brokerConstants.HOST_PROPERTY_NAME); initFactory(host,"Websocket MQTT",new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) { pipeline.addLast(new HttpServerCodec()); pipeline.addLast("aggregator",new HttpObjectAggregator(65536)); pipeline.addLast("webSocketHandler",new WebSocketServerProtocolHandler("/mqtt",MQTT_SUBPROTOCOL_CSV_LIST)); pipeline.addLast("ws2bytebufDecoder",new WebSocketFrametoByteBufDecoder()); pipeline.addLast("bytebuf2wsEncoder",new ByteBufToWebSocketFrameEncoder()); pipeline.addFirst("idleStateHandler",timeoutHandler); pipeline.addFirst("bytemetrics",new MQTTMessageLogger()); pipeline.addLast("handler",handler); } }); }
private void initializeSSLTCPTransport(final NettyMQTTHandler handler,IConfig props,final SSLContext sslContext) throws IOException { LOG.info("Configuring SSL MQTT transport"); String sslPortProp = props.getProperty(SSL_PORT_PROPERTY_NAME,disABLED_PORT_BIND); if (disABLED_PORT_BIND.equals(sslPortProp)) { // Do nothing no SSL configured LOG.info("Property {} has been set to {}. SSL MQTT will be disabled",brokerConstants.SSL_PORT_PROPERTY_NAME,disABLED_PORT_BIND); return; } int sslPort = Integer.parseInt(sslPortProp); LOG.info("Starting SSL on port {}",sslPort); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(brokerConstants.HOST_PROPERTY_NAME); String sNeedsClientAuth = props.getProperty(brokerConstants.NEED_CLIENT_AUTH,"false"); final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth); initFactory(host,sslPort,"SSL MQTT",new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) throws Exception { pipeline.addLast("ssl",createSslHandler(sslContext,needsClientAuth)); pipeline.addFirst("idleStateHandler",LogLevel.ERROR)); pipeline.addFirst("bytemetrics",handler); } }); }
private void initializeWsstransport(final NettyMQTTHandler handler,final SSLContext sslContext) throws IOException { LOG.info("Configuring secure websocket MQTT transport"); String sslPortProp = props.getProperty(WSS_PORT_PROPERTY_NAME,disABLED_PORT_BIND); if (disABLED_PORT_BIND.equals(sslPortProp)) { // Do nothing no SSL configured LOG.info("Property {} has been set to {}. Secure websocket MQTT will be disabled",brokerConstants.WSS_PORT_PROPERTY_NAME,disABLED_PORT_BIND); return; } int sslPort = Integer.parseInt(sslPortProp); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(brokerConstants.HOST_PROPERTY_NAME); String sNeedsClientAuth = props.getProperty(brokerConstants.NEED_CLIENT_AUTH,"Secure websocket",needsClientAuth)); pipeline.addLast("httpEncoder",new HttpResponseEncoder()); pipeline.addLast("httpDecoder",new HttpRequestDecoder()); pipeline.addLast("aggregator",handler); } }); }
public static void main(String[] args) throws Exception { Resources resources = new Resources(); C3P0NativeJdbcExtractor cp30NativeJdbcExtractor = new C3P0NativeJdbcExtractor(); dispatcher dispatcher = new dispatcher(cp30NativeJdbcExtractor.getNativeConnection(resources.postgres.getConnection())); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(dispatcher); Onlinestate state = new Onlinestate(); ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup worker = new NioEventLoopGroup(); try { bootstrap.group(worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MqttDecoder()); ch.pipeline().addLast(MqttEncoder.INSTANCE); ch.pipeline().addLast(new IdleStateHandler(resources.maxIdleTime,0)); ch.pipeline().addLast(new MqttHandler(resources.postgres,dispatcher,resources.mongo,state)); } }); ChannelFuture future = bootstrap.bind(resources.port).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); } }
public MqttConnectReturnCode connect() throws InterruptedException { Class<? extends SocketChannel> socketChannelClass; if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) { group = new EpollEventLoopGroup(1,new DefaultThreadFactory("client")); socketChannelClass = EpollSocketChannel.class; } else { group = new NioEventLoopGroup(1,new DefaultThreadFactory("client")); socketChannelClass = NioSocketChannel.class; } bootstrap.group(group).channel(socketChannelClass).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { if ("mqtts".equalsIgnoreCase(uri.getScheme())) { SslContext sslCtx = SslContextBuilder.forClient().trustManager(trustManagerFactory).build(); ch.pipeline().addLast(sslCtx.newHandler(ch.alloc(),uri.getHost(),uri.getPort())); } ch.pipeline().addLast(MqttDecoder.class.getName(),new MqttDecoder()); ch.pipeline().addLast(MqttEncoder.class.getName(),MqttEncoder.INSTANCE); ch.pipeline().addLast(MqttPacketReceiver.class.getName(),new MqttPacketReceiver(MqttClient.this,receiver,sharedobject)); } }); channel = bootstrap.connect(uri.getHost(),uri.getPort()).sync().channel(); normalizeMessage(options.will()); send(MqttMessageFactory.connect(options)); synchronized (sharedobject.locker()) { int timeout = Settings.INSTANCE.getInt("mqttclient.responseTimeoutSeconds",15); sharedobject.locker().wait(timeout * 1000); } if (sharedobject.receivedMessage() == null) { return null; } return ((MqttConnAckMessage) sharedobject.receivedMessage()).variableHeader().connectReturnCode(); }
@Override protected void initChannel(SocketChannel ch) throws Exception { logger.debug("Initializaing channels..."); ch.pipeline().addLast(ByteCounterCodec.class.getName(),new ByteCounterCodec()); if ("true".equalsIgnoreCase(Settings.INSTANCE.getProperty("netty.logger"))) { ch.pipeline().addLast(LoggingHandler.class.getName(),new LoggingHandler(LogLevel.DEBUG)); } if (useSsl) { SslContext sslCtx = SslContextBuilder .forServer(Settings.INSTANCE.certChainFile(),Settings.INSTANCE.privateKeyFile()).build(); logger.debug("SSL Provider : {}",SslContext.defaultServerProvider()); ch.pipeline().addLast(sslCtx.newHandler(ch.alloc())); } if (useWebSocket) { String websocketPath = Settings.INSTANCE.getProperty("mqttserver.websocket.path","/"); ch.pipeline().addLast(HttpServerCodec.class.getName(),new HttpServerCodec()); ch.pipeline().addLast(HttpObjectAggregator.class.getName(),new HttpObjectAggregator(1048576)); ch.pipeline().addLast(HttpContentCompressor.class.getName(),new HttpContentCompressor()); ch.pipeline().addLast(WebSocketServerProtocolHandler.class.getName(),new WebSocketServerProtocolHandler(websocketPath,"mqtt,mqttv3.1,mqttv3.1.1",true,65536)); // [MQTT-6.0.0-3] ch.pipeline().addLast(new MqttWebSocketCodec()); } int maxBytesInMessage = Settings.INSTANCE.getInt("mqttserver.maxBytesInMessage",8092); ch.pipeline().addLast(MqttDecoder.class.getName(),new MqttDecoder(maxBytesInMessage)); ch.pipeline().addLast(MqttEncoder.class.getName(),MqttEncoder.INSTANCE); ch.pipeline().addLast(ConnectReceiver.class.getName(),ConnectReceiver.INSTANCE); ch.pipeline().addLast(PubAckReceiver.class.getName(),PubAckReceiver.INSTANCE); ch.pipeline().addLast(PublishReceiver.class.getName(),PublishReceiver.INSTANCE); ch.pipeline().addLast(SubscribeReceiver.class.getName(),SubscribeReceiver.INSTANCE); ch.pipeline().addLast(UnsubscribeReceiver.class.getName(),UnsubscribeReceiver.INSTANCE); ch.pipeline().addLast(GenericReceiver.class.getName(),GenericReceiver.INSTANCE); }
@Test public void multipleConnect(TestContext context) throws InterruptedException { // There are should not be any exceptions during the test mqttServer.exceptionHandler(t -> { context.assertTrue(false); }); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("mqttEncoder",MQTT_SERVER_PORT).sync(); long tick = System.currentTimeMillis(); MqttClientOptions options = new MqttClientOptions(); f.channel().writeAndFlush(createConnectPacket(options)).sync(); f.channel().writeAndFlush(createConnectPacket(options)).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); long tock = System.currentTimeMillis(); // Default timeout is 90 seconds // If connection was closed earlier that means that it was a server context.assertTrue((tock - tick) / 1000 < 90); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }
关于nodejs 完成 mqtt 服务端和nodemcu mqtt的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于CI/CD/开发/生产服务器中的多个 NodeJs 与单个 NodeJs 版本、io.netty.handler.codec.mqtt.MqttDecoder.DecoderState的实例源码、io.netty.handler.codec.mqtt.MqttDecoder的实例源码、io.netty.handler.codec.mqtt.MqttEncoder的实例源码的相关知识,请在本站寻找。
本文标签: