forked from liyaopinner/mxshop_sources
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalipay.py
152 lines (126 loc) · 5.77 KB
/
alipay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# -*- coding: utf-8 -*-
# pip install pycryptodome
__author__ = 'bobby'
from datetime import datetime
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
from base64 import b64encode, b64decode
from urllib.parse import quote_plus
from urllib.parse import urlparse, parse_qs
from urllib.request import urlopen
from base64 import decodebytes, encodebytes
import json
class AliPay(object):
"""
支付宝支付接口
"""
def __init__(self, appid, app_notify_url, app_private_key_path,
alipay_public_key_path, return_url, debug=False):
self.appid = appid
self.app_notify_url = app_notify_url
self.app_private_key_path = app_private_key_path
self.app_private_key = None
self.return_url = return_url
with open(self.app_private_key_path) as fp:
self.app_private_key = RSA.importKey(fp.read())
self.alipay_public_key_path = alipay_public_key_path
with open(self.alipay_public_key_path) as fp:
self.alipay_public_key = RSA.import_key(fp.read())
if debug is True:
self.__gateway = "https://openapi.alipaydev.com/gateway.do"
else:
self.__gateway = "https://openapi.alipay.com/gateway.do"
def direct_pay(self, subject, out_trade_no, total_amount, return_url=None, **kwargs):
biz_content = {
"subject": subject,
"out_trade_no": out_trade_no,
"total_amount": total_amount,
"product_code": "FAST_INSTANT_TRADE_PAY",
# "qr_pay_mode":4
}
biz_content.update(kwargs)
data = self.build_body("alipay.trade.page.pay", biz_content, self.return_url)
return self.sign_data(data)
def build_body(self, method, biz_content, return_url=None):
data = {
"app_id": self.appid,
"method": method,
"charset": "utf-8",
"sign_type": "RSA2",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0",
"biz_content": biz_content
}
if return_url is not None:
data["notify_url"] = self.app_notify_url
data["return_url"] = self.return_url
return data
def sign_data(self, data):
data.pop("sign", None)
# 排序后的字符串
unsigned_items = self.ordered_data(data)
unsigned_string = "&".join("{0}={1}".format(k, v) for k, v in unsigned_items)
sign = self.sign(unsigned_string.encode("utf-8"))
ordered_items = self.ordered_data(data)
quoted_string = "&".join("{0}={1}".format(k, quote_plus(v)) for k, v in ordered_items)
# 获得最终的订单信息字符串
signed_string = quoted_string + "&sign=" + quote_plus(sign)
return signed_string
def ordered_data(self, data):
complex_keys = []
for key, value in data.items():
if isinstance(value, dict):
complex_keys.append(key)
# 将字典类型的数据dump出来
for key in complex_keys:
data[key] = json.dumps(data[key], separators=(',', ':'))
return sorted([(k, v) for k, v in data.items()])
def sign(self, unsigned_string):
# 开始计算签名
key = self.app_private_key
signer = PKCS1_v1_5.new(key)
signature = signer.sign(SHA256.new(unsigned_string))
# base64 编码,转换为unicode表示并移除回车
sign = encodebytes(signature).decode("utf8").replace("\n", "")
return sign
def _verify(self, raw_content, signature):
# 开始计算签名
key = self.alipay_public_key
signer = PKCS1_v1_5.new(key)
digest = SHA256.new()
digest.update(raw_content.encode("utf8"))
if signer.verify(digest, decodebytes(signature.encode("utf8"))):
return True
return False
def verify(self, data, signature):
if "sign_type" in data:
sign_type = data.pop("sign_type")
# 排序后的字符串
unsigned_items = self.ordered_data(data)
message = "&".join(u"{}={}".format(k, v) for k, v in unsigned_items)
return self._verify(message, signature)
if __name__ == "__main__":
return_url = 'http://47.92.87.172:8000/?total_amount=0.01×tamp=2017-08-15+17%3A15%3A13&sign=jnnA1dGO2iu2ltMpxrF4MBKE20Akyn%2FLdYrFDkQ6ckY3Qz24P3DTxIvt%2BBTnR6nRk%2BPAiLjdS4sa%2BC9JomsdNGlrc2Flg6v6qtNzTWI%2FEM5WL0Ver9OqIJSTwamxT6dW9uYF5sc2Ivk1fHYvPuMfysd90lOAP%2FdwnCA12VoiHnflsLBAsdhJazbvquFP%2Bs1QWts29C2%2BXEtIlHxNgIgt3gHXpnYgsidHqfUYwZkasiDGAJt0EgkJ17Dzcljhzccb1oYPSbt%2FS5lnf9IMi%2BN0ZYo9%2FDa2HfvR6HG3WW1K%2FlJfdbLMBk4owomyu0sMY1l%2Fj0iTJniW%2BH4ftIfMOtADHA%3D%3D&trade_no=2017081521001004340200204114&sign_type=RSA2&auth_app_id=2016080600180695&charset=utf-8&seller_id=2088102170208070&method=alipay.trade.page.pay.return&app_id=2016080600180695&out_trade_no=201702021222&version=1.0'
alipay = AliPay(
appid="2016080600180695",
app_notify_url="http://projectsedus.com/",
app_private_key_path=u"H:/VueShop/RSA/private_2048.txt",
alipay_public_key_path="H:/VueShop/RSA/ali_pub.txt", # 支付宝的公钥,验证支付宝回传消息使用,不是你自己的公钥,
debug=True, # 默认False,
return_url="http://47.92.87.172:8000/"
)
o = urlparse(return_url)
query = parse_qs(o.query)
processed_query = {}
ali_sign = query.pop("sign")[0]
for key, value in query.items():
processed_query[key] = value[0]
print (alipay.verify(processed_query, ali_sign))
url = alipay.direct_pay(
subject="测试订单",
out_trade_no="201702021222",
total_amount=0.01
)
re_url = "https://openapi.alipaydev.com/gateway.do?{data}".format(data=url)
print(re_url)