本章是關(guān)于在網(wǎng)絡應用和分布式應用中使用的各種主題。主題劃分為使用 Python 編寫客戶端程序來訪問已有的服務,以及使用 Python 實現(xiàn)網(wǎng)絡服務端程序。也給出了一些常見的技術(shù),用于編寫涉及協(xié)同或通信的的代碼。
你需要通過 HTTP 協(xié)議以客戶端的方式訪問多種服務。例如,下載數(shù)據(jù)或者與基于 REST 的 API 進行交互。
對于簡單的事情來說,通常使用 urllib.request
模塊就夠了。例如,發(fā)送一個簡單的 HTTP GET 請求到遠程的服務上,可以這樣做:
from urllib import request, parse
# Base URL being accessed
url = 'http://httpbin.org/get'
# Dictionary of query parameters (if any)
parms = {
'name1' : 'value1',
'name2' : 'value2'
}
# Encode the query string
querystring = parse.urlencode(parms)
# Make a GET request and read the response
u = request.urlopen(url+'?' + querystring)
resp = u.read()
如果你需要使用 POST 方法在請求主體中發(fā)送查詢參數(shù),可以將參數(shù)編碼后作為可選參數(shù)提供給 urlopen()
函數(shù),就像這樣:
from urllib import request, parse
# Base URL being accessed
url = 'http://httpbin.org/post'
# Dictionary of query parameters (if any)
parms = {
'name1' : 'value1',
'name2' : 'value2'
}
# Encode the query string
querystring = parse.urlencode(parms)
# Make a POST request and read the response
u = request.urlopen(url, querystring.encode('ascii'))
resp = u.read()
如果你需要在發(fā)出的請求中提供一些自定義的 HTTP 頭,例如修改 user-agent
字段,可以創(chuàng)建一個包含字段值的字典,并創(chuàng)建一個 Request 實例然后將其傳給 urlopen()
,如下:
from urllib import request, parse
...
# Extra headers
headers = {
'User-agent' : 'none/ofyourbusiness',
'Spam' : 'Eggs'
}
req = request.Request(url, querystring.encode('ascii'), headers=headers)
# Make a request and read the response
u = request.urlopen(req)
resp = u.read()
如果需要交互的服務比上面的例子都要復雜,也許應該去看看 requests 庫(https://pypi.python.org/pypi/requests)。例如,下面這個示例采用 requests 庫重新實現(xiàn)了上面的操作:
import requests
# Base URL being accessed
url = 'http://httpbin.org/post'
# Dictionary of query parameters (if any)
parms = {
'name1' : 'value1',
'name2' : 'value2'
}
# Extra headers
headers = {
'User-agent' : 'none/ofyourbusiness',
'Spam' : 'Eggs'
}
resp = requests.post(url, data=parms, headers=headers)
# Decoded text returned by the request
text = resp.text
關(guān)于 requests 庫,一個值得一提的特性就是它能以多種方式從請求中返回響應結(jié)果的內(nèi)容。從上面的代碼來看, resp.text
帶給我們的是以 Unicode 解碼的響應文本。但是,如果去訪問 resp.content
,就會得到原始的二進制數(shù)據(jù)。另一方面,如果訪問 resp.json
,那么就會得到 JSON 格式的響應內(nèi)容。
下面這個示例利用 requests
庫發(fā)起一個 HEAD 請求,并從響應中提取出一些 HTTP 頭數(shù)據(jù)的字段:
import requests
resp = requests.head('http://www.python.org/index.html')
status = resp.status_code
last_modified = resp.headers['last-modified']
content_type = resp.headers['content-type']
content_length = resp.headers['content-length']
Here is a requests example that executes a login into the Python Package index using
basic authentication:
import requests
resp = requests.get('http://pypi.python.org/pypi?:action=login',
auth=('user','password'))
Here is an example of using requests to pass HTTP cookies from one request to the
next:
import requests
# First request
resp1 = requests.get(url)
...
# Second requests with cookies received on first requests
resp2 = requests.get(url, cookies=resp1.cookies)
Last, but not least, here is an example of using requests to upload content:
import requests
url = 'http://httpbin.org/post'
files = { 'file': ('data.csv', open('data.csv', 'rb')) }
r = requests.post(url, files=files)
對于真的很簡單 HTTP 客戶端代碼,用內(nèi)置的 urllib
模塊通常就足夠了。但是,如果你要做的不僅僅只是簡單的 GET 或 POST 請求,那就真的不能再依賴它的功能了。這時候就是第三方模塊比如 requests
大顯身手的時候了。
例如,如果你決定堅持使用標準的程序庫而不考慮像 requests
這樣的第三方庫,那么也許就不得不使用底層的 http.client
模塊來實現(xiàn)自己的代碼。比方說,下面的代碼展示了如何執(zhí)行一個 HEAD 請求:
from http.client import HTTPConnection
from urllib import parse
c = HTTPConnection('www.python.org', 80)
c.request('HEAD', '/index.html')
resp = c.getresponse()
print('Status', resp.status)
for name, value in resp.getheaders():
print(name, value)
同樣地,如果必須編寫涉及代理、認證、cookies 以及其他一些細節(jié)方面的代碼,那么使用 urllib
就顯得特別別扭和啰嗦。比方說,下面這個示例實現(xiàn)在 Python 包索引上的認證:
import urllib.request
auth = urllib.request.HTTPBasicAuthHandler()
auth.add_password('pypi','http://pypi.python.org','username','password')
opener = urllib.request.build_opener(auth)
r = urllib.request.Request('http://pypi.python.org/pypi?:action=login')
u = opener.open(r)
resp = u.read()
# From here. You can access more pages using opener
...
坦白說,所有的這些操作在 requests
庫中都變得簡單的多。
在開發(fā)過程中測試 HTTP 客戶端代碼常常是很令人沮喪的,因為所有棘手的細節(jié)問題都需要考慮(例如 cookies、認證、HTTP 頭、編碼方式等)。要完成這些任務,考慮使用 httpbin 服務(http://httpbin.org)。這個站點會接收發(fā)出的請求,然后以 JSON 的形式將相應信息回傳回來。下面是一個交互式的例子:
>>> import requests
>>> r = requests.get('http://httpbin.org/get?name=Dave&n=37',
... headers = { 'User-agent': 'goaway/1.0' })
>>> resp = r.json
>>> resp['headers']
{'User-Agent': 'goaway/1.0', 'Content-Length': '', 'Content-Type': '',
'Accept-Encoding': 'gzip, deflate, compress', 'Connection':
'keep-alive', 'Host': 'httpbin.org', 'Accept': '*/*'}
>>> resp['args']
{'name': 'Dave', 'n': '37'}
>>>
在要同一個真正的站點進行交互前,先在 httpbin.org 這樣的網(wǎng)站上做實驗常常是可取的辦法。尤其是當我們面對3次登錄失敗就會關(guān)閉賬戶這樣的風險時尤為有用(不要嘗試自己編寫 HTTP 認證客戶端來登錄你的銀行賬戶)。
盡管本節(jié)沒有涉及, request
庫還對許多高級的 HTTP 客戶端協(xié)議提供了支持,比如 OAuth。 requests
模塊的文檔(http://docs.python-requests.org)質(zhì)量很高(坦白說比在這短短的一節(jié)的篇幅中所提供的任何信息都好),可以參考文檔以獲得更多地信息。
你想實現(xiàn)一個服務器,通過 TCP 協(xié)議和客戶端通信。
創(chuàng)建一個 TCP 服務器的一個簡單方法是使用 socketserver
庫。例如,下面是一個簡單的應答服務器:
from socketserver import BaseRequestHandler, TCPServer
class EchoHandler(BaseRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
while True:
msg = self.request.recv(8192)
if not msg:
break
self.request.send(msg)
if __name__ == '__main__':
serv = TCPServer(('', 20000), EchoHandler)
serv.serve_forever()
在這段代碼中,你定義了一個特殊的處理類,實現(xiàn)了一個 handle()
方法,用來為客戶端連接服務。request
屬性是客戶端 socket,client_address
有客戶端地址。 為了測試這個服務器,運行它并打開另外一個 Python 進程連接這個服務器:
>>> from socket import socket, AF_INET, SOCK_STREAM
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.connect(('localhost', 20000))
>>> s.send(b'Hello')
5
>>> s.recv(8192)
b'Hello'
>>>
很多時候,可以很容易的定義一個不同的處理器。下面是一個使用 StreamRequestHandler
基類將一個類文件接口放置在底層 socket 上的例子:
from socketserver import StreamRequestHandler, TCPServer
class EchoHandler(StreamRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
# self.rfile is a file-like object for reading
for line in self.rfile:
# self.wfile is a file-like object for writing
self.wfile.write(line)
if __name__ == '__main__':
serv = TCPServer(('', 20000), EchoHandler)
serv.serve_forever()
socketserver
可以讓我們很容易的創(chuàng)建簡單的 TCP 服務器。 但是,你需要注意的是,默認情況下這種服務器是單線程的,一次只能為一個客戶端連接服務。 如果你想處理多個客戶端,可以初始化一個 ForkingTCPServer
或者是 ThreadingTCPServer
對象。例如:
from socketserver import ThreadingTCPServer
if __name__ == '__main__':
serv = ThreadingTCPServer(('', 20000), EchoHandler)
serv.serve_forever()
使用 fork 或線程服務器有個潛在問題就是它們會為每個客戶端連接創(chuàng)建一個新的進程或線程。 由于客戶端連接數(shù)是沒有限制的,因此一個惡意的黑客可以同時發(fā)送大量的連接讓你的服務器奔潰。
如果你擔心這個問題,你可以創(chuàng)建一個預先分配大小的工作線程池或進程池。 你先創(chuàng)建一個普通的非線程服務器,然后在一個線程池中使用 serve_forever()
方法來啟動它們。
if __name__ == '__main__':
from threading import Thread
NWORKERS = 16
serv = TCPServer(('', 20000), EchoHandler)
for n in range(NWORKERS):
t = Thread(target=serv.serve_forever)
t.daemon = True
t.start()
serv.serve_forever()
一般來講,一個 TCPServer
在實例化的時候會綁定并激活相應的 socket
。 不過,有時候你想通過設置某些選項去調(diào)整底下的 socket
,可以設置參數(shù) bind_and_activate=False
。如下:
if __name__ == '__main__':
serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)
# Set up various socket options
serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# Bind and activate
serv.server_bind()
serv.server_activate()
serv.serve_forever()
上面的socket
選項是一個非常普遍的配置項,它允許服務器重新綁定一個之前使用過的端口號。 由于要被經(jīng)常使用到,它被放置到類變量中,可以直接在 TCPServer
上面設置。 在實例化服務器的時候去設置它的值,如下所示:
if __name__ == '__main__':
TCPServer.allow_reuse_address = True
serv = TCPServer(('', 20000), EchoHandler)
serv.serve_forever()
在上面示例中,我們演示了兩種不同的處理器基類( BaseRequestHandler
和 StreamRequestHandler
)。 StreamRequestHandler
更加靈活點,能通過設置其他的類變量來支持一些新的特性。比如:
import socket
class EchoHandler(StreamRequestHandler):
# Optional settings (defaults shown)
timeout = 5 # Timeout on all socket operations
rbufsize = -1 # Read buffer size
wbufsize = 0 # Write buffer size
disable_nagle_algorithm = False # Sets TCP_NODELAY socket option
def handle(self):
print('Got connection from', self.client_address)
try:
for line in self.rfile:
# self.wfile is a file-like object for writing
self.wfile.write(line)
except socket.timeout:
print('Timed out!')
最后,還需要注意的是巨大部分 Python 的高層網(wǎng)絡模塊(比如 HTTP、XML-RPC 等)都是建立在 socketserver
功能之上。 也就是說,直接使用 socket
庫來實現(xiàn)服務器也并不是很難。 下面是一個使用 socket
直接編程實現(xiàn)的一個服務器簡單例子:
from socket import socket, AF_INET, SOCK_STREAM
def echo_handler(address, client_sock):
print('Got connection from {}'.format(address))
while True:
msg = client_sock.recv(8192)
if not msg:
break
client_sock.sendall(msg)
client_sock.close()
def echo_server(address, backlog=5):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(address)
sock.listen(backlog)
while True:
client_sock, client_addr = sock.accept()
echo_handler(client_addr, client_sock)
if __name__ == '__main__':
echo_server(('', 20000))
你想實現(xiàn)一個基于 UDP 協(xié)議的服務器來與客戶端通信。
跟 TCP 一樣,UDP 服務器也可以通過使用 socketserver
庫很容易的被創(chuàng)建。 例如,下面是一個簡單的時間服務器:
from socketserver import BaseRequestHandler, UDPServer
import time
class TimeHandler(BaseRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
# Get message and client socket
msg, sock = self.request
resp = time.ctime()
sock.sendto(resp.encode('ascii'), self.client_address)
if __name__ == '__main__':
serv = UDPServer(('', 20000), TimeHandler)
serv.serve_forever()
跟之前一樣,你先定義一個實現(xiàn) handle()
特殊方法的類,為客戶端連接服務。 這個類的 request
屬性是一個包含了數(shù)據(jù)報和底層 socket 對象的元組。client_address
包含了客戶端地址。
我們來測試下這個服務器,首先運行它,然后打開另外一個 Python 進程向服務器發(fā)送消息:
>>> from socket import socket, AF_INET, SOCK_DGRAM
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'', ('localhost', 20000))
0
>>> s.recvfrom(8192)
(b'Wed Aug 15 20:35:08 2012', ('127.0.0.1', 20000))
>>>
一個典型的 UPD 服務器接收到達的數(shù)據(jù)報(消息)和客戶端地址。如果服務器需要做應答, 它要給客戶端回發(fā)一個數(shù)據(jù)報。對于數(shù)據(jù)報的傳送, 你應該使用 socket 的 sendto()
和 recvfrom()
方法。 盡管傳統(tǒng)的 send()
和 recv()
也可以達到同樣的效果, 但是前面的兩個方法對于 UDP 連接而言更普遍。
由于沒有底層的連接,UPD 服務器相對于 TCP 服務器來講實現(xiàn)起來更加簡單。 不過,UDP 天生是不可靠的(因為通信沒有建立連接,消息可能丟失)。 因此需要由你自己來決定該怎樣處理丟失消息的情況。這個已經(jīng)不在本書討論范圍內(nèi)了, 不過通常來說,如果可靠性對于你程序很重要,你需要借助于序列號、重試、超時以及一些其他方法來保證。 UDP 通常被用在那些對于可靠傳輸要求不是很高的場合。例如,在實時應用如多媒體流以及游戲領域, 無需返回恢復丟失的數(shù)據(jù)包(程序只需簡單的忽略它并繼續(xù)向前運行)。
UDPServer
類是單線程的,也就是說一次只能為一個客戶端連接服務。 實際使用中,這個無論是對于 UDP 還是 TCP 都不是什么大問題。 如果你想要并發(fā)操作,可以實例化一個 ForkingUDPServer
或 ThreadingUDPServer
對象:
from socketserver import ThreadingUDPServer
if __name__ == '__main__':
serv = ThreadingUDPServer(('',20000), TimeHandler)
serv.serve_forever()
直接使用 socket
來是想一個 UDP 服務器也不難,下面是一個例子:
from socket import socket, AF_INET, SOCK_DGRAM
import time
def time_server(address):
sock = socket(AF_INET, SOCK_DGRAM)
sock.bind(address)
while True:
msg, addr = sock.recvfrom(8192)
print('Got message from', addr)
resp = time.ctime()
sock.sendto(resp.encode('ascii'), addr)
if __name__ == '__main__':
time_server(('', 20000))
你有一個 CIDR 網(wǎng)絡地址比如“123.45.67.89/27”,你想將其轉(zhuǎn)換成它所代表的所有 IP (比如,“123.45.67.64”, “123.45.67.65”, …, “123.45.67.95”))
可以使用 ipaddress
模塊很容易的實現(xiàn)這樣的計算。例如:
>>> import ipaddress
>>> net = ipaddress.ip_network('123.45.67.64/27')
>>> net
IPv4Network('123.45.67.64/27')
>>> for a in net:
... print(a)
...
123.45.67.64
123.45.67.65
123.45.67.66
123.45.67.67
123.45.67.68
...
123.45.67.95
>>>
>>> net6 = ipaddress.ip_network('12:3456:78:90ab:cd:ef01:23:30/125')
>>> net6
IPv6Network('12:3456:78:90ab:cd:ef01:23:30/125')
>>> for a in net6:
... print(a)
...
12:3456:78:90ab:cd:ef01:23:30
12:3456:78:90ab:cd:ef01:23:31
12:3456:78:90ab:cd:ef01:23:32
12:3456:78:90ab:cd:ef01:23:33
12:3456:78:90ab:cd:ef01:23:34
12:3456:78:90ab:cd:ef01:23:35
12:3456:78:90ab:cd:ef01:23:36
12:3456:78:90ab:cd:ef01:23:37
>>>
Network
也允許像數(shù)組一樣的索引取值,例如:
>>> net.num_addresses
32
>>> net[0]
IPv4Address('123.45.67.64')
>>> net[1]
IPv4Address('123.45.67.65')
>>> net[-1]
IPv4Address('123.45.67.95')
>>> net[-2]
IPv4Address('123.45.67.94')
>>>
另外,你還可以執(zhí)行網(wǎng)絡成員檢查之類的操作:
>>> a = ipaddress.ip_address('123.45.67.69')
>>> a in net
True
>>> b = ipaddress.ip_address('123.45.67.123')
>>> b in net
False
>>>
一個 IP 地址和網(wǎng)絡地址能通過一個 IP 接口來指定,例如:
>>> inet = ipaddress.ip_interface('123.45.67.73/27')
>>> inet.network
IPv4Network('123.45.67.64/27')
>>> inet.ip
IPv4Address('123.45.67.73')
>>>
ipaddress
模塊有很多類可以表示 IP 地址、網(wǎng)絡和接口。 當你需要操作網(wǎng)絡地址(比如解析、打印、驗證等)的時候會很有用。
要注意的是,ipaddress
模塊跟其他一些和網(wǎng)絡相關(guān)的模塊比如 socket
庫交集很少。 所以,你不能使用 IPv4Address
的實例來代替一個地址字符串,你首先得顯式的使用str()
轉(zhuǎn)換它。例如:
>>> a = ipaddress.ip_address('127.0.0.1')
>>> from socket import socket, AF_INET, SOCK_STREAM
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.connect((a, 8080))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: Can't convert 'IPv4Address' object to str implicitly
>>> s.connect((str(a), 8080))
>>>
更多相關(guān)內(nèi)容,請參考 An Introduction to the ipaddress Module
你想使用一個簡單的 REST 接口通過網(wǎng)絡遠程控制或訪問你的應用程序,但是你又不想自己去安裝一個完整的 web 框架。
構(gòu)建一個 REST 風格的接口最簡單的方法是創(chuàng)建一個基于 WSGI 標準(PEP 3333)的很小的庫,下面是一個例子:
# resty.py
import cgi
def notfound_404(environ, start_response):
start_response('404 Not Found', [ ('Content-type', 'text/plain') ])
return [b'Not Found']
class PathDispatcher:
def __init__(self):
self.pathmap = { }
def __call__(self, environ, start_response):
path = environ['PATH_INFO']
params = cgi.FieldStorage(environ['wsgi.input'],
environ=environ)
method = environ['REQUEST_METHOD'].lower()
environ['params'] = { key: params.getvalue(key) for key in params }
handler = self.pathmap.get((method,path), notfound_404)
return handler(environ, start_response)
def register(self, method, path, function):
self.pathmap[method.lower(), path] = function
return function
為了使用這個調(diào)度器,你只需要編寫不同的處理器,就像下面這樣:
import time
_hello_resp = '''\
<html>
<head>
<title>Hello {name}</title>
</head>
<body>
<h1>Hello {name}!</h1>
</body>
</html>'''
def hello_world(environ, start_response):
start_response('200 OK', [ ('Content-type','text/html')])
params = environ['params']
resp = _hello_resp.format(name=params.get('name'))
yield resp.encode('utf-8')
_localtime_resp = '''\
<?xml version="1.0"?>
<time>
<year>{t.tm_year}</year>
<month>{t.tm_mon}</month>
<day>{t.tm_mday}</day>
<hour>{t.tm_hour}</hour>
<minute>{t.tm_min}</minute>
<second>{t.tm_sec}</second>
</time>'''
def localtime(environ, start_response):
start_response('200 OK', [ ('Content-type', 'application/xml') ])
resp = _localtime_resp.format(t=time.localtime())
yield resp.encode('utf-8')
if __name__ == '__main__':
from resty import PathDispatcher
from wsgiref.simple_server import make_server
# Create the dispatcher and register functions
dispatcher = PathDispatcher()
dispatcher.register('GET', '/hello', hello_world)
dispatcher.register('GET', '/localtime', localtime)
# Launch a basic server
httpd = make_server('', 8080, dispatcher)
print('Serving on port 8080...')
httpd.serve_forever()
要測試下這個服務器,你可以使用一個瀏覽器或 urllib
和它交互。例如:
>>> u = urlopen('http://localhost:8080/hello?name=Guido')
>>> print(u.read().decode('utf-8'))
<html>
<head>
<title>Hello Guido</title>
</head>
<body>
<h1>Hello Guido!</h1>
</body>
</html>
>>> u = urlopen('http://localhost:8080/localtime')
>>> print(u.read().decode('utf-8'))
<?xml version="1.0"?>
<time>
<year>2012</year>
<month>11</month>
<day>24</day>
<hour>14</hour>
<minute>49</minute>
<second>17</second>
</time>
>>>
在編寫 REST 接口時,通常都是服務于普通的 HTTP 請求。但是跟那些功能完整的網(wǎng)站相比,你通常只需要處理數(shù)據(jù)。 這些數(shù)據(jù)以各種標準格式編碼,比如 XML、JSON 或 CSV。 盡管程序看上去很簡單,但是以這種方式提供的 API 對于很多應用程序來講是非常有用的。
例如,長期運行的程序可能會使用一個 REST API 來實現(xiàn)監(jiān)控或診斷。 大數(shù)據(jù)應用程序可以使用 REST 來構(gòu)建一個數(shù)據(jù)查詢或提取系統(tǒng)。 REST 還能用來控制硬件設備比如機器人、傳感器、工廠或燈泡。 更重要的是,REST API 已經(jīng)被大量客戶端編程環(huán)境所支持,比如 Javascript, Android, iOS 等。 因此,利用這種接口可以讓你開發(fā)出更加復雜的應用程序。
為了實現(xiàn)一個簡單的 REST 接口,你只需讓你的程序代碼滿足 Python 的 WSGI 標準即可。 WSGI 被標準庫支持,同時也被絕大部分第三方 web 框架支持。 因此,如果你的代碼遵循這個標準,在后面的使用過程中就會更加的靈活!
在 WSGI 中,你可以像下面這樣約定的方式以一個可調(diào)用對象形式來實現(xiàn)你的程序。
import cgi
def wsgi_app(environ, start_response):
pass
environ
屬性是一個字典,包含了從 web 服務器如 Apache [參考 Internet RFC 3875]提供的 CGI 接口中獲取的值。 要將這些不同的值提取出來,你可以像這么這樣寫:
def wsgi_app(environ, start_response):
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
# Parse the query parameters
params = cgi.FieldStorage(environ['wsgi.input'], environ=environ)
我們展示了一些常見的值。environ['REQUEST_METHOD']
代表請求類型如 GET、POST、HEAD 等。 environ['PATH_INFO']
表示被請求資源的路徑。 調(diào)用 cgi.FieldStorage()
可以從請求中提取查詢參數(shù)并將它們放入一個類字典對象中以便后面使用。
start_response
參數(shù)是一個為了初始化一個請求對象而必須被調(diào)用的函數(shù)。 第一個參數(shù)是返回的 HTTP 狀態(tài)值,第二個參數(shù)是一個(名,值)元組列表,用來構(gòu)建返回的 HTTP 頭。例如:
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
為了返回數(shù)據(jù),一個 WSGI 程序必須返回一個字節(jié)字符串序列??梢韵裣旅孢@樣使用一個列表來完成:
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
resp = []
resp.append(b'Hello World\n')
resp.append(b'Goodbye!\n')
return resp
或者,你還可以使用yield
:
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
yield b'Hello World\n'
yield b'Goodbye!\n'
這里要強調(diào)的一點是最后返回的必須是字節(jié)字符串。如果返回結(jié)果包含文本字符串,必須先將其編碼成字節(jié)。 當然,并沒有要求你返回的一點是文本,你可以很輕松的編寫一個生成圖片的程序。
盡管 WSGI 程序通常被定義成一個函數(shù),不過你也可以使用類實例來實現(xiàn),只要它實現(xiàn)了合適的 __call__()
方法。例如:
class WSGIApplication:
def __init__(self):
...
def __call__(self, environ, start_response)
...
我們已經(jīng)在上面使用這種技術(shù)創(chuàng)建 PathDispatcher
類。 這個分發(fā)器僅僅只是管理一個字典,將(方法,路徑)對映射到處理器函數(shù)上面。 當一個請求到來時,它的方法和路徑被提取出來,然后被分發(fā)到對應的處理器上面去。 另外,任何查詢變量會被解析后放到一個字典中,以 environ['params']
形式存儲。 后面這個步驟太常見,所以建議你在分發(fā)器里面完成,這樣可以省掉很多重復代碼。 使用分發(fā)器的時候,你只需簡單的創(chuàng)建一個實例,然后通過它注冊各種 WSGI 形式的函數(shù)。 編寫這些函數(shù)應該超級簡單了,只要你遵循 start_response()
函數(shù)的編寫規(guī)則,并且最后返回字節(jié)字符串即可。
當編寫這種函數(shù)的時候還需注意的一點就是對于字符串模板的使用。 沒人愿意寫那種到處混合著 print()
函數(shù) 、XML 和大量格式化操作的代碼。 我們上面使用了三引號包含的預先定義好的字符串模板。 這種方式的可以讓我們很容易的在以后修改輸出格式(只需要修改模板本身,而不用動任何使用它的地方)。
最后,使用 WSGI 還有一個很重要的部分就是沒有什么地方是針對特定 web 服務器的。 因為標準對于服務器和框架是中立的,你可以將你的程序放入任何類型服務器中。 我們使用下面的代碼測試測試本節(jié)代碼:
if __name__ == '__main__':
from wsgiref.simple_server import make_server
# Create the dispatcher and register functions
dispatcher = PathDispatcher()
pass
# Launch a basic server
httpd = make_server('', 8080, dispatcher)
print('Serving on port 8080...')
httpd.serve_forever()
上面代碼創(chuàng)建了一個簡單的服務器,然后你就可以來測試下你的實現(xiàn)是否能正常工作。 最后,當你準備進一步擴展你的程序的時候,你可以修改這個代碼,讓它可以為特定服務器工作。
WSGI 本身是一個很小的標準。因此它并沒有提供一些高級的特性比如認證、cookies、重定向等。 這些你自己實現(xiàn)起來也不難。不過如果你想要更多的支持,可以考慮第三方庫,比如WebOb
或者Paste
你想找到一個簡單的方式去執(zhí)行運行在遠程機器上面的 Python 程序中的函數(shù)或方法。
實現(xiàn)一個遠程方法調(diào)用的最簡單方式是使用 XML-RPC。下面我們演示一下一個實現(xiàn)了鍵-值存儲功能的簡單服務器:
from xmlrpc.server import SimpleXMLRPCServer
class KeyValueServer:
_rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
def __init__(self, address):
self._data = {}
self._serv = SimpleXMLRPCServer(address, allow_none=True)
for name in self._rpc_methods_:
self._serv.register_function(getattr(self, name))
def get(self, name):
return self._data[name]
def set(self, name, value):
self._data[name] = value
def delete(self, name):
del self._data[name]
def exists(self, name):
return name in self._data
def keys(self):
return list(self._data)
def serve_forever(self):
self._serv.serve_forever()
# Example
if __name__ == '__main__':
kvserv = KeyValueServer(('', 15000))
kvserv.serve_forever()
下面我們從一個客戶端機器上面來訪問服務器:
>>> from xmlrpc.client import ServerProxy
>>> s = ServerProxy('http://localhost:15000', allow_none=True)
>>> s.set('foo', 'bar')
>>> s.set('spam', [1, 2, 3])
>>> s.keys()
['spam', 'foo']
>>> s.get('foo')
'bar'
>>> s.get('spam')
[1, 2, 3]
>>> s.delete('spam')
>>> s.exists('spam')
False
>>>
XML-RPC 可以讓我們很容易的構(gòu)造一個簡單的遠程調(diào)用服務。你所需要做的僅僅是創(chuàng)建一個服務器實例, 通過它的方法 register_function()
來注冊函數(shù),然后使用方法 serve_forever()
啟動它。 在上面我們將這些步驟放在一起寫到一個類中,不夠這并不是必須的。比如你還可以像下面這樣創(chuàng)建一個服務器:
from xmlrpc.server import SimpleXMLRPCServer
def add(x,y):
return x+y
serv = SimpleXMLRPCServer(('', 15000))
serv.register_function(add)
serv.serve_forever()
XML-RPC 暴露出來的函數(shù)只能適用于部分數(shù)據(jù)類型,比如字符串、整形、列表和字典。 對于其他類型就得需要做些額外的功課了。 例如,如果你想通過 XML-RPC 傳遞一個對象實例,實際上只有他的實例字典被處理:
>>> class Point:
... def __init__(self, x, y):
... self.x = x
... self.y = y
...
>>> p = Point(2, 3)
>>> s.set('foo', p)
>>> s.get('foo')
{'x': 2, 'y': 3}
>>>
類似的,對于二進制數(shù)據(jù)的處理也跟你想象的不太一樣:
>>> s.set('foo', b'Hello World')
>>> s.get('foo')
<xmlrpc.client.Binary object at 0x10131d410>
>>> _.data
b'Hello World'
>>>
一般來講,你不應該將 XML-RPC 服務以公共 API 的方式暴露出來。 對于這種情況,通常分布式應用程序會是一個更好的選擇。
XML-RPC 的一個缺點是它的性能。SimpleXMLRPCServer
的實現(xiàn)是單線程的, 所以它不適合于大型程序,盡管我們在11.2小節(jié)中演示過它是可以通過多線程來執(zhí)行的。 另外,由于 XML-RPC 將所有數(shù)據(jù)都序列化為 XML 格式,所以它會比其他的方式運行的慢一些。 但是它也有優(yōu)點,這種方式的編碼可以被絕大部分其他編程語言支持。 通過使用這種方式,其他語言的客戶端程序都能訪問你的服務。
雖然 XML-RPC 有很多缺點,但是如果你需要快速構(gòu)建一個簡單遠程過程調(diào)用系統(tǒng)的話,它仍然值得去學習的。 有時候,簡單的方案就已經(jīng)足夠了。
你在不同的機器上面運行著多個 Python 解釋器實例,并希望能夠在這些解釋器之間通過消息來交換數(shù)據(jù)。
通過使用 multiprocessing.connection
模塊可以很容易的實現(xiàn)解釋器之間的通信。 下面是一個簡單的應答服務器例子:
from multiprocessing.connection import Listener
import traceback
def echo_client(conn):
try:
while True:
msg = conn.recv()
conn.send(msg)
except EOFError:
print('Connection closed')
def echo_server(address, authkey):
serv = Listener(address, authkey=authkey)
while True:
try:
client = serv.accept()
echo_client(client)
except Exception:
traceback.print_exc()
echo_server(('', 25000), authkey=b'peekaboo')
然后客戶端連接服務器并發(fā)送消息的簡單示例:
>>> from multiprocessing.connection import Client
>>> c = Client(('localhost', 25000), authkey=b'peekaboo')
>>> c.send('hello')
>>> c.recv()
'hello'
>>> c.send(42)
>>> c.recv()
42
>>> c.send([1, 2, 3, 4, 5])
>>> c.recv()
[1, 2, 3, 4, 5]
>>>
跟底層 socket 不同的是,每個消息會完整保存(每一個通過 send()發(fā)送的對象能通過 recv()來完整接受)。 另外,所有對象會通過 pickle 序列化。因此,任何兼容 pickle 的對象都能在此連接上面被發(fā)送和接受。
目前有很多用來實現(xiàn)各種消息傳輸?shù)陌秃瘮?shù)庫,比如 ZeroMQ、Celery 等。 你還有另外一種選擇就是自己在底層 socket 基礎之上來實現(xiàn)一個消息傳輸層。 但是你想要簡單一點的方案,那么這時候 multiprocessing.connection
就派上用場了。 僅僅使用一些簡單的語句即可實現(xiàn)多個解釋器之間的消息通信。
如果你的解釋器運行在同一臺機器上面,那么你可以使用另外的通信機制,比如 Unix 域套接字或者是 Windows 命名管道。 要想使用 UNIX 域套接字來創(chuàng)建一個連接,只需簡單的將地址改寫一個文件名即可:
s = Listener('/tmp/myconn', authkey=b'peekaboo')
要想使用 Windows 命名管道來創(chuàng)建連接,只需像下面這樣使用一個文件名:
s = Listener(r'\\.\pipe\myconn', authkey=b'peekaboo')
一個通用準則是,你不要使用 multiprocessing
來實現(xiàn)一個對外的公共服務。 Client()
和 Listener()
中的 authkey
參數(shù)用來認證發(fā)起連接的終端用戶。 如果密鑰不對會產(chǎn)生一個異常。此外,該模塊最適合用來建立長連接(而不是大量的短連接), 例如,兩個解釋器之間啟動后就開始建立連接并在處理某個問題過程中會一直保持連接狀態(tài)。
如果你需要對底層連接做更多的控制,比如需要支持超時、非阻塞 I/O 或其他類似的特性, 你最好使用另外的庫或者是在高層 socket 上來實現(xiàn)這些特性。
你想在一個消息傳輸層如 sockets
、multiprocessing connections
或 ZeroMQ
的基礎之上實現(xiàn)一個簡單的遠程過程調(diào)用(RPC)。
將函數(shù)請求、參數(shù)和返回值使用 pickle 編碼后,在不同的解釋器直接傳送 pickle 字節(jié)字符串,可以很容易的實現(xiàn) RPC。 下面是一個簡單的 PRC 處理器,可以被整合到一個服務器中去:
# rpcserver.py
import pickle
class RPCHandler:
def __init__(self):
self._functions = { }
def register_function(self, func):
self._functions[func.__name__] = func
def handle_connection(self, connection):
try:
while True:
# Receive a message
func_name, args, kwargs = pickle.loads(connection.recv())
# Run the RPC and send a response
try:
r = self._functions[func_name](*args,**kwargs)
connection.send(pickle.dumps(r))
except Exception as e:
connection.send(pickle.dumps(e))
except EOFError:
pass
要使用這個處理器,你需要將它加入到一個消息服務器中。你有很多種選擇, 但是使用 multiprocessing
庫是最簡單的。下面是一個 RPC 服務器例子:
from multiprocessing.connection import Listener
from threading import Thread
def rpc_server(handler, address, authkey):
sock = Listener(address, authkey=authkey)
while True:
client = sock.accept()
t = Thread(target=handler.handle_connection, args=(client,))
t.daemon = True
t.start()
# Some remote functions
def add(x, y):
return x + y
def sub(x, y):
return x - y
# Register with a handler
handler = RPCHandler()
handler.register_function(add)
handler.register_function(sub)
# Run the server
rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')
為了從一個遠程客戶端訪問服務器,你需要創(chuàng)建一個對應的用來傳送請求的 RPC 代理類。例如
import pickle
class RPCProxy:
def __init__(self, connection):
self._connection = connection
def __getattr__(self, name):
def do_rpc(*args, **kwargs):
self._connection.send(pickle.dumps((name, args, kwargs)))
result = pickle.loads(self._connection.recv())
if isinstance(result, Exception):
raise result
return result
return do_rpc
要使用這個代理類,你需要將其包裝到一個服務器的連接上面,例如:
>>> from multiprocessing.connection import Client
>>> c = Client(('localhost', 17000), authkey=b'peekaboo')
>>> proxy = RPCProxy(c)
>>> proxy.add(2, 3)
5
>>> proxy.sub(2, 3)
-1
>>> proxy.sub([1, 2], 4)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "rpcserver.py", line 37, in do_rpc
raise result
TypeError: unsupported operand type(s) for -: 'list' and 'int'
>>>
要注意的是很多消息層(比如 multiprocessing
)已經(jīng)使用 pickle 序列化了數(shù)據(jù)。 如果是這樣的話,對 pickle.dumps()
和 pickle.loads()
的調(diào)用要去掉。
RPCHandler
和 RPCProxy
的基本思路是很比較簡單的。 如果一個客戶端想要調(diào)用一個遠程函數(shù),比如 foo(1, 2, z=3)
,代理類創(chuàng)建一個包含了函數(shù)名和參數(shù)的元組 ('foo', (1, 2), {'z': 3})
。 這個元組被 pickle 序列化后通過網(wǎng)絡連接發(fā)生出去。 這一步在 RPCProxy
的__getattr__()
方法返回的 do_rpc()
閉包中完成。 服務器接收后通過 pickle 反序列化消息,查找函數(shù)名看看是否已經(jīng)注冊過,然后執(zhí)行相應的函數(shù)。 執(zhí)行結(jié)果(或異常)被pickle序列化后返回發(fā)送給客戶端。我們的實例需要依賴 multiprocessing
進行通信。 不過,這種方式可以適用于其他任何消息系統(tǒng)。例如,如果你想在 ZeroMQ 之上實習 RPC, 僅僅只需要將連接對象換成合適的 ZeroMQ 的 socket 對象即可。
由于底層需要依賴 pickle,那么安全問題就需要考慮了 (因為一個聰明的黑客可以創(chuàng)建特定的消息,能夠讓任意函數(shù)通過 pickle 反序列化后被執(zhí)行)。 因此你永遠不要允許來自不信任或未認證的客戶端的 RPC。特別是你絕對不要允許來自 Internet 的任意機器的訪問, 這種只能在內(nèi)部被使用,位于防火墻后面并且不要對外暴露。
作為 pickle 的替代,你也許可以考慮使用 JSON、XML 或一些其他的編碼格式來序列化消息。 例如,本機實例可以很容易的改寫成 JSON 編碼方案。還需要將 pickle.loads()
和 pickle.dumps()
替換成 json.loads()
和 json.dumps()
即可:
# jsonrpcserver.py
import json
class RPCHandler:
def __init__(self):
self._functions = { }
def register_function(self, func):
self._functions[func.__name__] = func
def handle_connection(self, connection):
try:
while True:
# Receive a message
func_name, args, kwargs = json.loads(connection.recv())
# Run the RPC and send a response
try:
r = self._functions[func_name](*args,**kwargs)
connection.send(json.dumps(r))
except Exception as e:
connection.send(json.dumps(str(e)))
except EOFError:
pass
# jsonrpcclient.py
import json
class RPCProxy:
def __init__(self, connection):
self._connection = connection
def __getattr__(self, name):
def do_rpc(*args, **kwargs):
self._connection.send(json.dumps((name, args, kwargs)))
result = json.loads(self._connection.recv())
return result
return do_rpc
實現(xiàn) RPC 的一個比較復雜的問題是如何去處理異常。至少,當方法產(chǎn)生異常時服務器不應該奔潰。 因此,返回給客戶端的異常所代表的含義就要好好設計了。 如果你使用 pickle,異常對象實例在客戶端能被反序列化并拋出。如果你使用其他的協(xié)議,那得想想另外的方法了。 不過至少,你應該在響應中返回異常字符串。我們在 JSON 的例子中就是使用的這種方式。
對于其他的 RPC 實現(xiàn)例子,我推薦你看看在 XML-RPC 中使用的 SimpleXMLRPCServer
和 ServerProxy
的實現(xiàn), 也就是11.6小節(jié)中的內(nèi)容。