对于想了解可以将HTTPS功能添加到pythonflaskWeb服务器吗?的读者,本文将提供新的信息,我们将详细介绍https可以再加端口吗,并且为您提供关于400行python实现http/http
对于想了解可以将HTTPS功能添加到python flask Web服务器吗?的读者,本文将提供新的信息,我们将详细介绍https可以再加端口吗,并且为您提供关于400行python 实现http/https 代理服务器、Flask Python中带有Http的Https、flask 设置https请求 访问flask服务器、Flask-RESTful和Flask-HTTPAuth: Python web应用程序中的用户身份验证和授权的最佳实践的有价值信息。
本文目录一览:- 可以将HTTPS功能添加到python flask Web服务器吗?(https可以再加端口吗)
- 400行python 实现http/https 代理服务器
- Flask Python中带有Http的Https
- flask 设置https请求 访问flask服务器
- Flask-RESTful和Flask-HTTPAuth: Python web应用程序中的用户身份验证和授权的最佳实践
可以将HTTPS功能添加到python flask Web服务器吗?(https可以再加端口吗)
我正在尝试构建一个Web界面,以在该网络设备使用摘要式身份验证和HTTPS的网络设备上模拟一个安静的接口。我想出了如何将摘要式身份验证集成到Web服务器中,但是如果您可以向我展示如何对下面的代码进行注释,我似乎无法找出如何使用FLASK获取https的方法。
from flask import Flask, jsonifyapp = Flask(__name__)@app.route(''/'')def index(): return ''Flask is running!''@app.route(''/data'')def names(): data = {"names": ["John", "Jacob", "Julie", "Jennifer"]} return jsonify(data)if __name__ == ''__main__'': app.run()
答案1
小编典典这也适用于紧要关头
from flask import Flask, jsonifyfrom OpenSSL import SSLcontext = SSL.Context(SSL.PROTOCOL_TLSv1_2)context.use_privatekey_file(''server.key'')context.use_certificate_file(''server.crt'')app = Flask(__name__)@app.route(''/'')def index(): return ''Flask is running!''@app.route(''/data'')def names(): data = {"names": ["John", "Jacob", "Julie", "Jennifer"]} return jsonify(data)#if __name__ == ''__main__'':# app.run()if __name__ == ''__main__'': app.run(host=''127.0.0.1'', debug=True, ssl_context=context)
400行python 实现http/https 代理服务器
下面是小编 jb51.cc 通过网络收集整理的代码片段。
小编小编现在分享给大家,也给大家做个参考。
#!/usr/bin/python #-*- coding:utf-8 -*- import socket,logging import select,errno import os import sys import traceback import gzip from StringIO import StringIO import Queue import threading import time import thread import cgi from cgi import parse_qs import json import imp from os.path import join,getsize import re import ssl ##################user config ################## logger = logging.getLogger("network-server") ############################################# def getTraceStackMsg(): tb = sys.exc_info()[2] msg = '' for i in traceback.format_tb(tb): msg += i return msg def InitLog(): logger.setLevel(logging.DEBUG) fh = logging.FileHandler("network-server.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) def clearfdpro(epoll_fd,params,fd): try: fd_check = int(fd) except Exception,e: print "fd error" sys.exit(1) try: #print "pid:%s,close fd:%s" % (os.getpid(),fd) epoll_fd.unregister(fd) except Exception,e: #print str(e)+getTraceStackMsg() pass try: param = params[fd] try: addr = param["addr"] if "next" in param: print "close sock,%s:%s" % (addr[0],addr[1]) except Exception,e: pass param["connections"].shutdown(socket.SHUT_RDWR) param["connections"].close() f = param.get("f",None) if f != None: f.close() rc = param.get("rc",None) if rc != None: rc.close() if "read_cache_name" in param: os.remove(param["read_cache_name"]) except Exception,e: #print str(e)+getTraceStackMsg() pass try: del params[fd] #logger.error(getTraceStackMsg()) #logger.error("clear fd:%s" % fd) except Exception,e: #print str(e)+getTraceStackMsg() pass def clearfd(epoll_fd,fd): try: param = params[fd] if "nextfd" in param: nextfd = param["nextfd"] next_param = params[nextfd] del param["nextfd"] del next_param["nextfd"] if not "next" in param: #masterfd clearfdpro(epoll_fd,nextfd) else: # nextfd if not "writedata" in next_param or len(next_param["writedata"]) == 0: clearfdpro(epoll_fd,nextfd) else: next_param["sendandclose"] = "true" clearfdpro(epoll_fd,fd) except Exception,e: #print str(e)+getTraceStackMsg() pass def FindHostPort(datas): host_s = -1 host_e = -1 host_str = None host = "" port = "" if not datas.startswith("CONNECT"): host_s = datas.find("Host:") if host_s < 0: host_s = datas.find("host:") if host_s > 0: host_e = datas.find("\r\n",host_s) if host_s > 0 and host_e > 0: host_str = datas[host_s+5:host_e].strip() add_list = host_str.split(":") if len(add_list) == 2: host = add_list[0] port = add_list[1] else: host = add_list[0] port = 80 first_seg = datas.find("\r\n") first_line = datas[0:first_seg] first_line = first_line.replace(" http://%s" % host_str," ") datas = first_line + datas[first_seg:] else: first_seg = datas.find("\r\n") head_e = datas.find("\r\n\r\n") if first_seg > 0 and head_e > 0: first_line = datas[0:first_seg] com,host_str,http_version = re.split('\s+',first_line) add_list = host_str.split(":") if len(add_list) == 2: host = add_list[0] port = add_list[1] else: host = add_list[0] port = 443 host_s = 1 host_e = 1 return host_str,host_s,host_e,host,port,datas def connect_pro(params,param,epoll_fd,datas,fd,cur_time,port): try: nextfd = socket.socket(socket.AF_INET,socket.soCK_STREAM,0) nextfd.setsockopt(socket.soL_SOCKET,socket.so_REUSEADDR,1) nextfd.settimeout(5) try: nextfd.connect((host,int(port))) except Exception,e: print "########%s,%s connect fail" % (host,port) nextfd.setblocking(0) next_fileno = nextfd.fileno() print "pid:%s,connect %s:%s fd:%s" % (os.getpid(),next_fileno) if next_fileno in params: print "fileno exist" sys.exit(1) if not datas.startswith("CONNECT"): next_param = {"addr":[host,port],"writelen":0,"connections":nextfd,"time":cur_time,"nextfd":fd} param["nextfd"] = next_fileno next_param["writedata"] = datas next_param["writelen"] = 0 next_param["next"] = "true" param["read_len"] = 0 param["readdata"] = "" params[next_fileno] = next_param epoll_fd.register(next_fileno,select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP) epoll_fd.modify(fd,select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP) else: next_param = {"addr":[host,"nextfd":fd} param["nextfd"] = next_fileno next_param["next"] = "true" params[next_fileno] = next_param epoll_fd.register(next_fileno,select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP) param["read_len"] = 0 param["readdata"] = "" param["writedata"] = "HTTP/1.1 200 Connection Established\r\nConnection: close\r\n\r\n" param["writelen"] = 0 param["reuse"] = "true" epoll_fd.modify(fd,select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP) except socket.error,msg: clearfd(epoll_fd,fd) def process_datas(process_status,read_len,cur_time): if process_status == "close": clearfd(epoll_fd,fd) else: need_connect = False host_str = None host_s = -1 host_e = -1 if "reuse" in param and "next" not in param: if not datas.startswith("CONNECT") and \ not datas.startswith("GET") and \ not datas.startswith("POST") and \ not datas.startswith("PUT"): del param["reuse"] else: host_str,datas = FindHostPort(datas) host_s = int(host_s) host_e = int(host_e) next_fileno = param["nextfd"] next_param = params[next_fileno] addr = next_param["addr"] if host_s > 0 and host_e > 0: if host != addr[0] or str(port) != str(addr[1]): print "%s,%s neq %s,%s" % (host,addr[0],addr[1]) need_connect = True del param["nextfd"] del next_param["nextfd"] clearfd(epoll_fd,next_fileno) del param["reuse"] else: param["read_len"] = read_len param["readdata"] = datas return None if need_connect or not "nextfd" in param: if host_str == None or not host_s > 0 or not host_e > 0: host_str,datas = FindHostPort(datas) host_s = int(host_s) host_e = int(host_e) if host_s > 0 and host_e > 0: if not datas.startswith("CONNECT"): epoll_fd.modify(fd,select.EPOLLERR | select.EPOLLHUP) # 简单处理,http连接时把读去掉,避免内存攻击 thread.start_new_thread(connect_pro,(params,port)) else: param["read_len"] = read_len param["readdata"] = datas else: next_fileno = param["nextfd"] next_param = params[next_fileno] if "next" in param: next_param["reuse"] = "true" write_data = next_param.get("writedata","") write_data += datas next_param["writedata"] = write_data param["read_len"] = 0 param["readdata"] = "" epoll_fd.modify(next_fileno,select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP) if process_status == "close_after_process": print "close after process" clearfd(epoll_fd,fd) def run_main(listen_fd): try: epoll_fd = select.epoll() epoll_fd.register(listen_fd.fileno(),select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP) print "listen_fd:%s" % listen_fd.fileno() except select.error,msg: logger.error(msg) params = {} last_min_time = -1 while True: epoll_list = epoll_fd.poll() cur_time = time.time() for fd,events in epoll_list: if fd == listen_fd.fileno(): while True: try: conn,addr = listen_fd.accept() conn.setblocking(0) epoll_fd.register(conn.fileno(),select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP) conn.setsockopt(socket.soL_SOCKET,1) #conn.setsockopt(socket.IPPROTO_TCP,socket.TCP_NODELAY,True) params[conn.fileno()] = {"addr":addr,"connections":conn,"time":cur_time} except socket.error,msg: break elif select.EPOLLIN & events: param = params.get(fd,None) if param == None: continue param["time"] = cur_time datas = param.get("readdata","") cur_sock = params[fd]["connections"] read_len = param.get("read_len",0) process_status = "close" while True: try: data = cur_sock.recv(102400) if not data: if datas == "": break else: raise Exception("close after process") else: datas += data read_len += len(data) except socket.error,msg: if msg.errno == errno.EAGAIN: process_status = "process" break else: break except Exception,e: process_status = "close_after_process" break process_datas(process_status,cur_time) elif select.EPOLLHUP & events or select.EPOLLERR & events: clearfd(epoll_fd,fd) logger.error("sock: %s error" % fd) elif select.EPOLLOUT & events: param = params.get(fd,None) if param == None: continue param["time"] = cur_time sendLen = param.get("writelen",0) writedata = param.get("writedata","") total_write_len = len(writedata) cur_sock = param["connections"] f = param.get("f",None) totalsenlen = param.get("totalsenlen",None) if writedata == "": clearfd(epoll_fd,fd) continue while True: try: sendLen += cur_sock.send(writedata[sendLen:]) if sendLen == total_write_len: if f != None and totalsenlen != None: readmorelen = 102400 if readmorelen > totalsenlen: readmorelen = totalsenlen morefiledata = "" if readmorelen > 0: morefiledata = f.read(readmorelen) if morefiledata != "": writedata = morefiledata sendLen = 0 total_write_len = len(writedata) totalsenlen -= total_write_len param["writedata"] = writedata param["totalsenlen"] = totalsenlen continue else: f.close() del param["f"] del param["totalsenlen"] if not "sendandclose" in param: param["writedata"] = "" param["writelen"] = 0 epoll_fd.modify(fd,select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP) else: clearfd(epoll_fd,fd) break except socket.error,msg: if msg.errno == errno.EAGAIN: param["writelen"] = sendLen break clearfd(epoll_fd,fd) break else: continue #check time out if cur_time - last_min_time > 20: last_min_time = cur_time objs = params.items() for (key_fd,value) in objs: fd_time = value.get("time",0) del_time = cur_time - fd_time if del_time > 20: clearfd(epoll_fd,key_fd) elif fd_time < last_min_time: last_min_time = fd_time if __name__ == "__main__": reload(sys) sys.setdefaultencoding('utf8') InitLog() port = int(sys.argv[1]) try: listen_fd = socket.socket(socket.AF_INET,0) except socket.error,msg: logger.error("create socket Failed") try: listen_fd.setsockopt(socket.soL_SOCKET,1) except socket.error,msg: logger.error("setsocketopt SO_REUSEADDR Failed") try: listen_fd.bind(('',port)) except socket.error,msg: logger.error("bind Failed") try: listen_fd.listen(10240) listen_fd.setblocking(0) except socket.error,msg: logger.error(msg) child_num = 19 c = 0 while c < child_num: c = c + 1 newpid = os.fork() if newpid == 0: run_main(listen_fd) run_main(listen_fd)
以上是小编(jb51.cc)为你收集整理的全部代码内容,希望文章能够帮你解决所遇到的程序开发问题。
如果觉得小编网站内容还不错,欢迎将小编网站推荐给程序员好友。
Flask Python中带有Http的Https
如何解决Flask Python中带有Http的Https?
第一件事:不要使用flask中内置的Web服务器进行繁重的工作。您应该使用真实的Web服务器,例如apache(mod_wsgi)nginex + gunicore等。这些服务器包含有关如何同时运行http和https的文档
解决方法
我有一个客户端服务器应用程序。我设法使他们使用SSl加密通过https连接
context = SSL.Context(SSL.SSLv3_METHOD)
context.use_privatekey_file(''/path_to_key/key.key'')
context.use_certificate_file(''/path_to_cert/cert.crt'')
app.run(use_reloader=True,host=''0.0.0.0'',port=9020,ssl_context = context)
现在,我想同时使用http和https运行服务器。有什么可行的方法吗?
flask 设置https请求 访问flask服务器
学习过程中想要学教程中一样,做个假的微信公众号推送,不过去了微信开发文档怎么一直说需要https的请求(教学中没有说需要https,一直是http)
但是我的服务器只能使用http请求访问,如果硬是要在url中添加https 则会显示没有这个网站
于是我就想法让https请求也能请求到我的服务器域名上
我的域名是在阿里云买的,所以我就直接在阿里云上购买了证书,购买使用的是免费的1年证书
流程如下
购买之后显示,但是还没有绑定域名,所以需要申请证书绑定这个域名
点击申请,根据弹出的页面进行一个填写 我买的域名是 www.zengyimin.xyz
根据下一步不断填写信息,等待审核,出现如下画面表示绑定域名成功
点击下载 ,会下载一个压缩包
包名是绑定的域名
包中含有pem和key 证书和密匙,解压
接下来在放置 nginx 的服务器上进行操作 ,进入nginx的配置目录 (我是ubuntu)
cd /etc/nginx
该目录新建一个文件夹 放置证书和key
mkdir cert
将解压的文件上传至该文件夹
进入nginx的配置文件 (我的nginx版本不同,有些操作和你们不同,以下我的个人讲解你们看看就好 )
其中nginx.conf 是全部的配置文件集合, sites-acailable 和 siter-enabled 文件夹含有一些服务器的配置
如果你在sites中配置完成,启动nginx 时 nginx会带上nginx.conf 配置文件启动,配置文件会include导入 sites 中的配置文文件
同时sites 中的文件貌似是镜像文件? 更改一个另一个也会变
我的配置文件 进入 sites-enabled vim更改配置文件 default
这是我原本 http 80 端口的server服务配置 http不需要证书,所以不需要证书设置
https 443 的配置 ,在该文件中添加一个 server 服务
重启ngxin
service nginx restart
可以尝试在url中使用https请求nginx绑定的域名(我是一台机,其实就是flask服务器绑定的域名)了
免费证书会在google浏览器访问时提示有问题,所以需要证书还是购买有服务的比较好
http://www.zengyimin.xyz/api/v1.0/log
https://www.zengyimin.xyz/api/v1.0/log
都是一样的访问,不过http和https的请求都能请求,区别也不大...
除非用户输入http请求会自动跳转https的请求
所以我个人的最终方案
# 所有的往http80端口的请求都会被 rewrite 进行一个拦截 重定向 将http 请求改为https
server {
listen 80;
server_name www.zengyimin.xyz;
rewrite ^(.*)$ https://$host$1 permanent; # <----------------- 重点 这是重定向
location / {
index index.html index.htm;
}
}
# 这是新增的 https 的server
server {
listen 443;
server_name www.zengyimin.xyz;
ssl on;
root html;
index index.html index.htm;
ssl_certificate cert/2568867_www.zengyimin.xyz.pem; # 证书
ssl_certificate_key cert/2568867_www.zengyimin.xyz.key; # 密匙
ssl_session_timeout 5m;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_prefer_server_ciphers on;
location / {
proxy_pass http://127.0.0.1:5001; # https请求转发到服务器ip地址
proxy_redirect off;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
最终所有前往nginx服务器的请求都会被重定向到https请求
https://help.aliyun.com/document_detail/98728.html?spm=5176.2020520163.0.0.7bb9d5OKd5OKj5
https://lufficc.com/blog/configure-nginx-as-a-web-server
Flask-RESTful和Flask-HTTPAuth: Python web应用程序中的用户身份验证和授权的最佳实践
随着 web 应用程序的普及,用户身份验证和授权已经变得越来越重要。这些功能可以保护 web 应用程序中的敏感数据和重要功能,使其只有授权用户才能访问。python 语言提供了许多库和框架,使得在 web 应用中实现用户身份验证和授权变得更加容易。本篇文章将介绍两个 python 库:flask-restful 和 flask-httpauth,它们是 web 应用程序中实现身份验证和授权的最佳实践。
- 什么是 Flask-RESTful?
Flask-RESTful 是一个基于 Flask 框架的 RESTful 库,可以方便地创建 RESTful API。它是建立在 Flask、Werkzeug 和 Jinja2 之上的,提供了一个轻量级的 RESTful 框架和一个简单的路由系统,它可以完美地与 Flask 配合使用,使开发者能够快速地创建 RESTful API。 - 什么是 Flask-HTTPAuth?
Flask-HTTPAuth 是一个为 Flask Web 应用提供用户身份验证和授权的库,它可以通过 HTTP 基本认证、cookie、token 或其他自定义方式进行身份验证。Flask-HTTPAuth 不需要额外的数据库,它将用户认证信息保存在应用程序的配置文件中。这使得身份验证过程变得更加简单方便。 - 使用 Flask-RESTful 和 Flask-HTTPAuth 实现身份验证和授权
在使用 Flask-RESTful 和 Flask-HTTPAuth 实现身份验证和授权之前,需要对应用程序进行一些设置。首先,需要安装 Flask、Flask-RESTful 和 Flask-HTTPAuth 库:
pip install Flask pip install Flask-RESTful pip install Flask-HTTPAuth
接下来,需要在 Flask 应用程序中导入 Flask-RESTful 和 Flask-HTTPAuth:
from flask import Flask from flask_restful import Api, Resource from flask_httpauth import HTTPBasicAuth app = Flask(__name__) api = Api(app) auth = HTTPBasicAuth()
然后,可以创建注册用户,用于身份验证:
users = { "john": "hello", "susan": "bye" } @auth.get_password def get_password(username): if username in users: return users.get(username) return None
这里定义了一个简单的用户字典(用户名和密码),并通过 get_password 回调函数将该字典传递给了 Flask-HTTPAuth。在 get_password 函数中,首先判断请求中的用户名是否在用户字典中,如果存在,则返回该用户的密码;否则返回 None,表示认证失败。
现在,可以将 Flask-RESTful 和 Flask-HTTPAuth 应用于应用程序,实现对 API 的身份验证和授权。下面是一个示例:
立即学习“Python免费学习笔记(深入)”;
class PrivateResource(Resource): @auth.login_required def get(self): return {"message": "Hello, %s!" % auth.username()}, 200 api.add_resource(PrivateResource, ''/private'')
在上面的代码中,创建了一个受保护的资源 PrivateResource,使用了 Flask-HTTPAuth 中的 login_required 装饰器进行身份验证。如果请求中的用户名和密码匹配,那么调用 get 方法返回一个包含用户名称的 JSON 对象。
创建路由之后,可以运行 Flask 应用程序,并使用用户名和密码验证身份:
$ curl -u john:hello http://localhost:5000/private {"message": "Hello, john!"} $ curl -u susan:bye http://localhost:5000/private {"message": "Hello, susan!"}
- 总结
在本文中,我们介绍了如何使用 Flask-RESTful 和 Flask-HTTPAuth 库实现 Python Web 应用程序中的身份验证和授权。通过设置 Python 库和使用上述代码示例,您可以保护自己的 Web 应用程序中的敏感数据和重要功能,确保只有授权用户才能访问。Python 是一种功能强大、易于学习和使用的语言,在数据科学和 Web 程序开发等领域都非常受欢迎,它的生态系统也非常丰富,提供了许多库和框架,使得开发更加容易和快速。
以上就是Flask-RESTful和Flask-HTTPAuth: Python web应用程序中的用户身份验证和授权的最佳实践的详细内容,更多请关注php中文网其它相关文章!
关于可以将HTTPS功能添加到python flask Web服务器吗?和https可以再加端口吗的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于400行python 实现http/https 代理服务器、Flask Python中带有Http的Https、flask 设置https请求 访问flask服务器、Flask-RESTful和Flask-HTTPAuth: Python web应用程序中的用户身份验证和授权的最佳实践的相关知识,请在本站寻找。
本文标签: