forked from filswan/swan-provider
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathswan_miner_deal_importer.py
179 lines (141 loc) · 7.25 KB
/
swan_miner_deal_importer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import logging.config
import re
import subprocess
import sys
import time
sys.path.append("../")
from common.config import read_config
from common.swan_client import SwanClient
from common.logging import get_logger
DEAL_STATUS_FAILED = "ImportFailed"
DEAL_STATUS_READY = "ReadyForImport"
DEAL_STATUS_FILE_IMPORTING = "FileImporting"
DEAL_STATUS_FILE_IMPORTED = "FileImported"
DEAL_STATUS_ACTIVE = 'DealActive'
ONCHAIN_DEAL_STATUS_ERROR = "StorageDealError"
ONCHAIN_DEAL_STATUS_ACTIVE = "StorageDealActive"
ONCHAIN_DEAL_STATUS_NOTFOUND = "StorageDealNotFound"
ONCHAIN_DEAL_STATUS_WAITTING = "StorageDealWaitingForData"
ONCHAIN_DEAL_STATUS_ACCEPT = "StorageDealAcceptWait"
# Max number of deals to be imported at a time
IMPORT_NUMNBER = "20"
def get_deal_on_chain_status(deal_cid: str):
cmd = "lotus-miner storage-deals list -v | grep " + deal_cid
try:
pipe = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = pipe.communicate()
if stderr != b'':
return stderr.decode("utf-8")
if stdout == b'':
return ONCHAIN_DEAL_STATUS_NOTFOUND
stdout = stdout.decode("utf-8")
# Deal status starts with StorageDeal, such as StorageDealError, StorageDealSealing etc.
deal_status_index = stdout.find("StorageDeal", 0)
return stdout[deal_status_index:stdout.find(' ', deal_status_index)]
except Exception as error:
return str(error)
def update_offline_deal_status(status: str, note: str, deal_id: str, client: SwanClient):
logger = get_logger("swan_miner_deal_importer")
try:
client.update_offline_deal_details(status, note, deal_id)
except Exception as e:
logger.error("Failed to update offline deal status.")
logger.error(str(e))
def importer():
config = read_config()
logger = get_logger("swan_miner_deal_importer")
api_url = config['main']['api_url']
api_key = config['main']['api_key']
access_token = config['main']['access_token']
import_interval = config['main']['import_interval']
expected_sealing_time = config['main']['expected_sealing_time']
miner_fid = config['main']['miner_fid']
client = SwanClient(api_url, api_key, access_token)
while True:
client = SwanClient(api_url, api_key, access_token)
deals = client.get_offline_deals(miner_fid, DEAL_STATUS_READY, IMPORT_NUMNBER)
if deals is None or isinstance(deals, Exception):
if isinstance(deals, Exception):
logger.error(str(deals))
logger.error("Failed to get offline deals.")
logger.info("Sleeping...")
time.sleep(import_interval)
continue
if len(deals) == 0:
logger.info("No pending offline deals found.")
logger.info("Sleeping...")
time.sleep(import_interval)
continue
for deal in deals:
logger.info('')
logger.info("Deal CID: %s. File Path: %s", deal["deal_cid"], deal["file_path"], )
on_chain_status = get_deal_on_chain_status(deal["deal_cid"])
if on_chain_status.startswith("StorageDeal") is False:
logger.error(on_chain_status)
logger.error("Failed to get deal on chain status, please check if lotus-miner is running properly.")
logger.info("Sleeping...")
time.sleep(import_interval)
break
logger.info("Deal on chain status: %s.", on_chain_status)
if on_chain_status == ONCHAIN_DEAL_STATUS_ERROR:
logger.info("Deal on chain status is error before importing.")
note = "Deal error before importing."
update_offline_deal_status(DEAL_STATUS_FAILED, note, str(deal["id"]), client)
continue
if on_chain_status == ONCHAIN_DEAL_STATUS_ACTIVE:
logger.info("Deal on chain status is active before importing.")
note = "Deal active before importing."
update_offline_deal_status(DEAL_STATUS_ACTIVE, note, str(deal["id"]), client)
continue
if on_chain_status == ONCHAIN_DEAL_STATUS_ACCEPT:
logger.info("Deal on chain status is StorageDealAcceptWait. Deal will be ready shortly.")
continue
if on_chain_status == ONCHAIN_DEAL_STATUS_NOTFOUND:
logger.info("Deal on chain status not found.")
note = "Deal not found."
update_offline_deal_status(DEAL_STATUS_FAILED, note, str(deal["id"]), client)
continue
if on_chain_status != ONCHAIN_DEAL_STATUS_WAITTING:
logger.info("Deal is already imported, please check.")
note = on_chain_status
update_offline_deal_status(DEAL_STATUS_FILE_IMPORTED, note, str(deal["id"]), client)
continue
try:
info_proving = subprocess.run(['lotus-miner', 'proving', 'info'], stdout=subprocess.PIPE).stdout.decode(
'utf-8')
current_epoch = int(re.search("(?<=Current Epoch: {11})[0-9]+", info_proving).group(0))
except Exception as e:
logger.error("Failed to get current epoch. Please check if miner is running properly.")
logger.error(str(e))
logger.info("Sleeping...")
time.sleep(import_interval)
break
logger.info("Current epoch: %s. Deal starting epoch: %s", current_epoch, deal.get("start_epoch"))
try:
if deal.get("start_epoch") - current_epoch < expected_sealing_time:
logger.info("Deal will start too soon. Do not import this deal.")
note = "Deal expired."
update_offline_deal_status(DEAL_STATUS_FAILED, note, str(deal["id"]), client)
continue
command = "lotus-miner storage-deals import-data " + deal.get("deal_cid") + " " + deal.get("file_path")
logger.info('Command: %s' % command)
note = ""
update_offline_deal_status(DEAL_STATUS_FILE_IMPORTING, note, str(deal["id"]), client)
pipe = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, err = pipe.communicate()
# There should be no output if everything goes well
if out != b'':
update_offline_deal_status(DEAL_STATUS_FAILED, str(out), str(deal["id"]), client)
logger.error("Import deal failed. CID: %s. Error message: %s", deal["deal_cid"], str(out))
continue
update_offline_deal_status(DEAL_STATUS_FILE_IMPORTED, "", str(deal["id"]), client)
logger.info("Deal CID %s imported.", deal["deal_cid"])
logger.info("Sleeping...")
time.sleep(import_interval)
except Exception as e:
logger.error("Import deal failed. CID: %s. Error message: %s", deal["deal_cid"], str(e))
note = str(e)
update_offline_deal_status(DEAL_STATUS_FAILED, note, str(deal["id"]), client)
continue
logger.info("Sleeping...")
time.sleep(import_interval)