关于Nodejs实战心得之eventproxy模块控制并发和nodejsevent的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于com.jcraft.jsch.agentproxy.Ag
关于Nodejs实战心得之eventproxy模块控制并发和nodejs event的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于com.jcraft.jsch.agentproxy.AgentProxyException的实例源码、eventproxy、EventProxy详细讲解、Golang TcpProxy和Nodejs TcpProxy等相关知识的信息别忘了在本站进行查找喔。
本文目录一览:- Nodejs实战心得之eventproxy模块控制并发(nodejs event)
- com.jcraft.jsch.agentproxy.AgentProxyException的实例源码
- eventproxy
- EventProxy详细讲解
- Golang TcpProxy和Nodejs TcpProxy
Nodejs实战心得之eventproxy模块控制并发(nodejs event)
目标
建立一个 lesson4 项目,在其中编写代码。
代码的入口是 app.js,当调用 node app.js 时,它会输出 CNode(https://cnodejs.org/ ) 社区首页的所有主题的标题,链接和第一条评论,以 json 的格式。
输出示例:
[ { "title": "【公告】发招聘帖的同学留意一下这里", "href": "http://cnodejs.org/topic/541ed2d05e28155f24676a12", "comment1": "呵呵呵呵" }, { "title": "发布一款 Sublime Text 下的 JavaScript 语法高亮插件", "href": "http://cnodejs.org/topic/54207e2efffeb6de3d61f68f", "comment1": "沙发!" } ]
挑战
以上文目标为基础,输出 comment1 的作者,以及他在 cnode 社区的积分值。
示例:
[ { "title": "【公告】发招聘帖的同学留意一下这里", "href": "http://cnodejs.org/topic/541ed2d05e28155f24676a12", "comment1": "呵呵呵呵", "author1": "auser", "score1": 80 }, ... ]
知识点
体会 Node.js 的 callback hell 之美
学习使用 eventproxy 这一利器控制并发
课程内容
这一章我们来到了 Node.js 最牛逼的地方——异步并发的内容了。
上一课我们介绍了如何使用 superagent 和 cheerio 来取主页内容,那只需要发起一次 http get 请求就能办到。但这次,我们需要取出每个主题的第一条评论,这就要求我们对每个主题的链接发起请求,并用 cheerio 去取出其中的第一条评论。
CNode 目前每一页有 40 个主题,于是我们就需要发起 1 + 40 个请求,来达到我们这一课的目标。
后者的 40 个请求,我们并发地发起:),而且不会遇到多线程啊锁什么的,Node.js 的并发模型跟多线程不同,抛却那些观念。更具体一点的话,比如异步到底为何异步,Node.js 为何单线程却能并发这类走近科学的问题,我就不打算讲了。对于这方面有兴趣的同学,强烈推荐 @朴灵 的 《九浅一深Node.js》: http://book.douban.com/subject/25768396/ 。
有些逼格比较高的朋友可能听说过 promise 和 generator 这类概念。不过我呢,只会讲 callback,主要原因是我个人只喜欢 callback。
这次课程我们需要用到三个库:superagent cheerio eventproxy(https://github.com/JacksonTian/eventproxy )
手脚架的工作各位自己来,我们一步一步来一起写出这个程序。
首先 app.js 应该长这样
var eventproxy = require(''eventproxy''); var superagent = require(''superagent''); var cheerio = require(''cheerio''); // url 模块是 Node.js 标准库里面的 // http://nodejs.org/api/url.html var url = require(''url''); var cnodeUrl = ''https://cnodejs.org/''; superagent.get(cnodeUrl) .end(function (err, res) { if (err) { return console.error(err); } var topicUrls = []; var $ = cheerio.load(res.text); // 获取首页所有的链接 $(''#topic_list .topic_title'').each(function (idx, element) { var $element = $(element); // $element.attr(''href'') 本来的样子是 /topic/542acd7d5d28233425538b04 // 我们用 url.resolve 来自动推断出完整 url,变成 // https://cnodejs.org/topic/542acd7d5d28233425538b04 的形式 // 具体请看 http://nodejs.org/api/url.html#url_url_resolve_from_to 的示例 var href = url.resolve(cnodeUrl, $element.attr(''href'')); topicUrls.push(href); }); console.log(topicUrls); });
运行 node app.js
输出如下图:
OK,这时候我们已经得到所有 url 的地址了,接下来,我们把这些地址都抓取一遍,就完成了,Node.js 就是这么简单。
抓取之前,还是得介绍一下 eventproxy 这个库。
用 js 写过异步的同学应该都知道,如果你要并发异步获取两三个地址的数据,并且要在获取到数据之后,对这些数据一起进行利用的话,常规的写法是自己维护一个计数器。
先定义一个 var count = 0,然后每次抓取成功以后,就 count++。如果你是要抓取三个源的数据,由于你根本不知道这些异步操作到底谁先完成,那么每次当抓取成功的时候,就判断一下 count === 3。当值为真时,使用另一个函数继续完成操作。
而 eventproxy 就起到了这个计数器的作用,它来帮你管理到底这些异步操作是否完成,完成之后,它会自动调用你提供的处理函数,并将抓取到的数据当参数传过来。
假设我们不使用 eventproxy 也不使用计数器时,抓取三个源的写法是这样的:
// 参考 jquery 的 $.get 的方法
$.get("http://data1_source", function (data1) { // something $.get("http://data2_source", function (data2) { // something $.get("http://data3_source", function (data3) { // something var html = fuck(data1, data2, data3); render(html); }); }); });
上述的代码大家都写过吧。先获取 data1,获取完成之后获取 data2,然后再获取 data3,然后 fuck 它们,进行输出。
但大家应该也想到了,其实这三个源的数据,是可以并行去获取的,data2 的获取并不依赖 data1 的完成,data3 同理也不依赖 data2。
于是我们用计数器来写,会写成这样:
(function () { var count = 0; var result = {}; $.get(''http://data1_source'', function (data) { result.data1 = data; count++; handle(); }); $.get(''http://data2_source'', function (data) { result.data2 = data; count++; handle(); }); $.get(''http://data3_source'', function (data) { result.data3 = data; count++; handle(); }); function handle() { if (count === 3) { var html = fuck(result.data1, result.data2, result.data3); render(html); } } })();
丑的一逼,也不算丑,主要我写代码好看。
如果我们用 eventproxy,写出来是这样的:
var ep = new eventproxy(); ep.all(''data1_event'', ''data2_event'', ''data3_event'', function (data1, data2, data3) { var html = fuck(data1, data2, data3); render(html); }); $.get(''http://data1_source'', function (data) { ep.emit(''data1_event'', data); }); $.get(''http://data2_source'', function (data) { ep.emit(''data2_event'', data); }); $.get(''http://data3_source'', function (data) { ep.emit(''data3_event'', data); });
好看多了是吧,也就是个高等计数器嘛。
ep.all(''data1_event'', ''data2_event'', ''data3_event'', function (data1, data2, data3) {});
这一句,监听了三个事件,分别是 data1_event, data2_event, data3_event,每次当一个源的数据抓取完成时,就通过 ep.emit() 来告诉 ep 自己,某某事件已经完成了。
当三个事件未同时完成时,ep.emit() 调用之后不会做任何事;当三个事件都完成的时候,就会调用末尾的那个回调函数,来对它们进行统一处理。
eventproxy 提供了不少其他场景所需的 API,但最最常用的用法就是以上的这种,即:
先 var ep = new eventproxy(); 得到一个 eventproxy 实例。
告诉它你要监听哪些事件,并给它一个回调函数。ep.all(''event1'', ''event2'', function (result1, result2) {})。
在适当的时候 ep.emit(''event_name'', eventData)。
eventproxy 这套处理异步并发的思路,我一直觉得就像是汇编里面的 goto 语句一样,程序逻辑在代码中随处跳跃。本来代码已经执行到 100 行了,突然 80 行的那个回调函数又开始工作了。如果你异步逻辑复杂点的话,80 行的这个函数完成之后,又激活了 60 行的另外一个函数。并发和嵌套的问题虽然解决了,但老祖宗们消灭了几十年的 goto 语句又回来了。
至于这套思想糟糕不糟糕,我个人倒是觉得还是不糟糕,用熟了看起来蛮清晰的。不过 js 这门渣渣语言本来就乱嘛,什么变量提升(http://www.cnblogs.com/damonlan/archive/2012/07/01/2553425.html )啊,没有 main 函数啊,变量作用域啊,数据类型常常简单得只有数字、字符串、哈希、数组啊,这一系列的问题,都不是事儿。
编程语言美丑啥的,咱心中有佛就好。
回到正题,之前我们已经得到了一个长度为 40 的 topicUrls 数组,里面包含了每条主题的链接。那么意味着,我们接下来要发出 40 个并发请求。我们需要用到 eventproxy 的 #after API。
大家自行学习一下这个 API 吧:https://github.com/JacksonTian/eventproxy#%E9%87%8D%E5%A4%8D%E5%BC%82%E6%AD%A5%E5%8D%8F%E4%BD%9C
我代码就直接贴了哈。
// 得到 topicUrls 之后 // 得到一个 eventproxy 的实例 var ep = new eventproxy(); // 命令 ep 重复监听 topicUrls.length 次(在这里也就是 40 次) `topic_html` 事件再行动 ep.after(''topic_html'', topicUrls.length, function (topics) { // topics 是个数组,包含了 40 次 ep.emit(''topic_html'', pair) 中的那 40 个 pair // 开始行动 topics = topics.map(function (topicPair) { // 接下来都是 jquery 的用法了 var topicUrl = topicPair[0]; var topicHtml = topicPair[1]; var $ = cheerio.load(topicHtml); return ({ title: $(''.topic_full_title'').text().trim(), href: topicUrl, comment1: $(''.reply_content'').eq(0).text().trim(), }); }); console.log(''final:''); console.log(topics); }); topicUrls.forEach(function (topicUrl) { superagent.get(topicUrl) .end(function (err, res) { console.log(''fetch '' + topicUrl + '' successful''); ep.emit(''topic_html'', [topicUrl, res.text]); }); });
输出长这样:
完整的代码请查看 lesson4 目录下的 app.js 文件
总结
今天介绍的 eventproxy 模块是控制并发用的,有时我们需要同时发送 N 个 http 请求,然后利用得到的数据进行后期的处理工作,如何方便地判断数据已经全部并发获取得到,就可以用到该模块了。而模块不仅可以在服务端使用,也可以应用在客户端
- Nodejs探秘之深入理解单线程实现高并发原理
- Nodejs爬虫进阶教程之异步并发控制
- 深入浅析NodeJs并发异步的回调处理
- Nodejs高并发原理示例详解
com.jcraft.jsch.agentproxy.AgentProxyException的实例源码
public OpenSSHAgentAuthenticator() { try { proxy = new AgentProxy(new SSHAgentConnector(new JNAUSocketFactory())); } catch(AgentProxyException e) { log.warn(String.format("Agent proxy %s Failed with %s",this,e)); } }
public PageantAuthenticator() { try { this.proxy = new AgentProxy(new PageantConnector()); } catch(AgentProxyException e) { log.warn(String.format("Agent proxy %s Failed with %s",e)); } }
private Session newSession() throws JSchException { JSch jSch = new JSch(); try { jSch.setConfig("PreferredAuthentications","publickey"); if (hostInfo.isDoHostKeyChecks()) { jSch.setKNownHosts(userInfo.sshFolderLocation() + File.separator + "kNown_hosts"); } else { jSch.setHostKeyRepository(new FakeHostKeyRepository()); } if (userInfo.isUseAgentIdentities()) { Connector connector = ConnectorFactory.getDefault().createConnector(); if (connector != null) { IdentityRepository identityRepository = new RemoteIdentityRepository(connector); jSch.setIdentityRepository(identityRepository); } } // add private key to the IdentityRepository. If using agent identities,this will add the private // key to the agent,if it is not already present. jSch.addIdentity(userInfo.privateKeyLocation().getAbsolutePath()); session = jSch.getSession(userInfo.getUserName(),hostInfo.getHostname(),hostInfo.getPort()); Long timeout = TimeUnit.SECONDS.toMillis(hostInfo.getTimeoutSeconds()); session.setTimeout(timeout.intValue()); session.setUserInfo(new PasswordlessEnabledUser(userInfo.getpassphrase())); return session; } catch (JSchException | AgentProxyException e) { String msg = ExecutionFailedException.userFriendlyCause(e.getMessage(),userInfo); throw new ExecutionFailedException(msg,e); } }
public RemoteClient(Environment environment,DeploymentContext context,Logger logger) throws RemoteClientException { RemoteClient instance = this; Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { instance.trydisconnect(); } }); this.logger = logger; try { JSch shellClient = new JSch(); shellClient.setKNownHosts(new FileInputStream(new File(System.getenv("HOME") + "/.ssh/kNown_hosts"))); ConnectorFactory connectorFactory = ConnectorFactory.getDefault(); Connector connector = connectorFactory.createConnector(); Properties config = new java.util.Properties(); config.put("PreferredAuthentications","publickey"); if (context.project().options().containsKey("check_host_keys")) { String option = (boolean) context.project().options().get("check_host_keys") ? "yes" : "no"; JSch.setConfig("StrictHostKeyChecking",option); if (option.equals("no")) { logger.warn("WARNING: host key check is disabled!"); } } if (connector != null) { IdentityRepository remoteIdentityRepository = new RemoteIdentityRepository(connector); shellClient.setIdentityRepository(remoteIdentityRepository); } for (Target target : environment.targets()) { sessions.add(new RemoteTarget(connect(shellClient,target,context,config),environment)); } } catch (AgentProxyException | FileNotFoundException | JSchException e) { throw new RemoteClientException(e.getMessage()); } }
@Nullable private static Connector createSshAgentConnector() { Connector result = null; try { result = ConnectorFactory.getDefault().createConnector(); } catch (AgentProxyException e) { LOG.info("Could not create ssh agent connector",e); } return result; }
private Session initSessionSshAgent(String username,String socketPath,JSch jsch) throws JSchException { final Session session = jsch.getSession(username,myHost,myPort); session.setConfig("PreferredAuthentications","publickey"); try { ConnectorFactory cf = ConnectorFactory.getDefault(); cf.setUSocketPath(socketPath); Connector con = cf.createConnector(); IdentityRepository irepo = new RemoteIdentityRepository(con); jsch.setIdentityRepository(irepo); return session; } catch (AgentProxyException e) { throw new JSchException("Failed to connect to ssh agent.",e); } }
public static SshClient.Factory defaultSshClientFactory() { return new SshClient.Factory() { int timeout = SystemUtils2.getIntegerProperty("org.excalibur.ssh.default.connection.timeout.ms",60000); Optional<Connector> agentConnector = getAgentConnector(); @Override public boolean isAgentAvailable() { return agentConnector.isPresent(); } @Override public SshClient create(HostAndPort socket,LoginCredentials credentials) { return new JschSshClient(socket,credentials,timeout,agentConnector,new BackoffLimitedRetryHandler()); } Optional<Connector> getAgentConnector() { try { return Optional.of(ConnectorFactory.getDefault().createConnector()); } catch (final AgentProxyException e) { return Optional.absent(); } } }; }
public static void main(String[] args) throws AgentProxyException,IOException,JSchException { HostAndPort hostAndPort = HostAndPort.fromParts("ec2-54-83-158-5.compute-1.amazonaws.com",22); LoginCredentials loginCredentials = LoginCredentials.builder().privateKey(System.getProperty("user.home") + "/.ec2/leite.pem").user("ubuntu") .build(); SshClient client = SshClientFactory.defaultSshClientFactory().create(hostAndPort,loginCredentials); client.connect(); ExecutableResponse response = client.execute("uname -a && date && uptime && who"); System.out.println(response); // final OnlineChannel shell = client.shell(); // shell.getinput().write("sudo apt-get install maven -y\n".getBytes()); // shell.getinput().flush(); // // Thread t = new Thread(new Runnable() // { // @Override // public void run() // { // while (true) // { // BufferedReader reader = new BufferedReader(new InputStreamReader(shell.getoutput(),Charsets.US_ASCII)); // String line; // try // { // while ((line = reader.readLine()) != null) // { // System.out.println(line); // } // } // catch (IOException e) // { // e.printstacktrace(); // } // } // } // }); // t.setDaemon(true); // t.start(); client.put("/home/ubuntu/hi.txt","Hi node!"); client.disconnect(); // JSch jsch = new JSch(); // jsch.addIdentity(loginCredentials.getPrivateKey()); // // Session session = jsch.getSession(loginCredentials.getUser(),hostAndPort.getHostText()); // // Properties config = new Properties(); // config.put("StrictHostKeyChecking","no"); // // session.setConfig(config); // session.connect(); // // Channel shell = session.openChannel("shell"); // shell.setInputStream(system.in); // shell.setoutputStream(System.out); // shell.connect(0); }
eventproxy
eventproxy 介绍
这个世界上不存在所谓回调函数深度嵌套的问题。 —— Jackson Tian
世界上本没有嵌套回调,写得人多了,也便有了}}}}}}}}}}}}。 —— fengmk2
EventProxy 仅仅是一个很轻量的工具,但是能够带来一种事件式编程的思维变化。有几个特点:
利用事件机制解耦复杂业务逻辑
移除被广为诟病的深度callback嵌套问题
将串行等待变成并行等待,提升多异步协作场景下的执行效率
友好的Error handling
无平台依赖,适合前后端,能用于浏览器和Node.js
兼容CMD,AMD以及Commonjs模块环境
现在的,无深度嵌套的,并行的
var ep = EventProxy.create("template","data","l10n",function (template,data,l10n) {
_.template(template,l10n);
});
$.get("template",function (template) {
// something
ep.emit("template",template);
});
$.get("data",function (data) {
// something
ep.emit("data",data);
});
$.get("l10n",function (l10n) {
// something
ep.emit("l10n",l10n);
});
过去的,深度嵌套的,串行的。
var render = function (template,data) {
_.template(template,data);
};
$.get("template",function (template) {
// something
$.get("data",function (data) {
// something
$.get("l10n",function (l10n) {
// something
render(template,l10n);
});
});
});
安装
Node用户
通过npm安装即可使用:npm install eventproxy调用:var EventProxy = require(''eventproxy'');spm
spm install eventproxy
Component
component install JacksonTian/eventproxy
网站地址:http://html5ify.com/eventproxy
GitHub:https://github.com/JacksonTian/eventproxy
网站描述:基于任务/事件的异步模式实现
eventproxy官方网站
官方网站:http://html5ify.com/eventproxy
如果觉得小编网站内容还不错,欢迎将小编网站 推荐给程序员好友。
EventProxy详细讲解
Golang TcpProxy和Nodejs TcpProxy
自己平时的工作基本都在PHP和nodejs之间徘徊,但是目前面对python和java的猛烈攻击呢,其实内心有一种隐隐的痛“PHP是世界上最好的语言“,”nodejs在cpu密集时服务彻底瘫痪"。。。
看了半个月python真实发现,其实它太像PHP语言了,所以基本不用怎么理解就会了。golang看了1个多月了真的得多写多看源代码才能收获,别看才30几个关键字但是内容真的很多,golang的性能是真的高可以大大缩减服务器开销,举个例子web服务中PHP需要100台机器,那么golang可能只需要10台甚至更少!
最近在研究MysqL proxy,其实MysqL本身是支持代理的,但是想自己尝试下这样就会很灵活:
- 灵活slb MysqL负载均衡
- 读写直接通过proxy直接进行判断
- 提前预警或拒绝危险性sql
- ...太多太多太多好处
以下是golang MysqL proxy的代码
package main import ( "net" "fmt" "time" ) const ( MysqL_ADDRESS = "mysq-host" MysqL_PORT = "3306" ) func main() { listener,err := net.Listen("tcp",":1234") if err != nil { fmt.Println("tcp",err.Error()) return } else { fmt.Println("tcp",listener.Addr(),"success") } for { user,err := listener.Accept() if err != nil { fmt.Println("accept error: ",err.Error()) continue } go proxyRequest(user) } } //打理用户请求 func proxyRequest(user net.Conn) { fmt.Println(user.LocalAddr()) bytes := make([]byte,10240) conn,err := net.Dial("tcp",MysqL_ADDRESS + ":" + MysqL_PORT) if err != nil { fmt.Println(conn.RemoteAddr(),"error:",err.Error()) conn.Close() return } ch := make(chan bool,1) go proxyResponse(user,conn,ch) for { n,err := user.Read(bytes) if err != nil { break } fmt.Println(string(bytes[:n])) conn.Write(bytes[:n]) } defer close(ch) select { case <-ch: conn.Close() user.Close() fmt.Println("proxy over") case <-time.After(time.Second * 60): fmt.Println("proxy timeout") } } //代理服务的返回给用户 func proxyResponse(user net.Conn,service net.Conn,ch chan bool) { bytes := make([]byte,10240) for { n,err := service.Read(bytes) if err != nil { break } user.Write(bytes[:n]) } ch <- true }
以下是nodejs简单的proxy
var net = require('net'); var model = require('../../models/proxy'); var trace = require('../../libs/trace'); //代理表 var proxys = []; //tcp server var server = null; //proxy information var information = {count: 0,success: 0,error: 0}; /** * 启动服务 * @param info array * @param callback function */ exports.start = (info,callback) => { model.getProxyListFromServer(info.id,(err,result)=> { if (err) { callback(err,err); } else { proxys = result; initServer(info.port,info.to_host,info.to_port); callback(null); } }); }; /** * 停止服务. * @return bool */ exports.stop = function () { if (server && server.listening) { server.close(); server = null; return true; } return false; }; /** * 获取信息 * @return object */ exports.getInfo = () => { return information; }; /** * 初始化tcp proxy server. * @param port * @param toHost * @param toPort */ function initServer(port,toHost,toPort) { server = net.createServer((client)=> { information.count++; client.on('end',() => { connect.end(); }); var connect = net.createConnection({host: toHost,port: toPort},(err)=> { }); connect.on('error',(err)=> { information.error++; }); connect.on('end',()=> { information.success++; }); client.pipe(connect); connect.pipe(client); // client.on('data',function (data) { // var buf = Buffer.from(data); // console.log('data: ' + buf.toString()); // }); }); server.listen(port,(err) => { if (err) { trace.log('proxy server error',err); process.exit(); } trace.log('proxy server started',port); }); }
我们今天的关于Nodejs实战心得之eventproxy模块控制并发和nodejs event的分享已经告一段落,感谢您的关注,如果您想了解更多关于com.jcraft.jsch.agentproxy.AgentProxyException的实例源码、eventproxy、EventProxy详细讲解、Golang TcpProxy和Nodejs TcpProxy的相关信息,请在本站查询。
本文标签: