Skip to content

Commit

Permalink
Merge pull request #14 from Prooksius/patch-22
Browse files Browse the repository at this point in the history
Bybit Websocket connect
  • Loading branch information
zhouaini528 authored Dec 2, 2022
2 parents 25928ca + f088c95 commit 9e17e56
Show file tree
Hide file tree
Showing 6 changed files with 1,123 additions and 1 deletion.
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
},
"require": {
"php": ">=7.0",
"guzzlehttp/guzzle": "*"
"guzzlehttp/guzzle": "*",
"workerman/workerman": "*",
"workerman/globaldata": "*"
}
}
247 changes: 247 additions & 0 deletions src/Api/WebSocket/MySocketClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
<?php
/**
* @author lin <[email protected]>
* */

namespace Lin\Bybit\Api\WebSocket;

use Exception;
use Lin\Bybit\Api\WebSocket\SocketGlobal;
use Lin\Bybit\Api\WebSocket\SocketFunction;

use Workerman\Lib\Timer;
use Workerman\Worker;

class SocketClient
{
use SocketGlobal;
use SocketFunction;

private $config=[];
private $keysecret=[];

private $worker = null;

function __construct(array $config=[]) {
$this->config = $config;

$this->client();

$this->init();

}

protected function init() {
//初始化全局变量
$this->add('global_key',[]);//保存全局变量key

$this->add('all_sub',[]);//目前总共订阅的频道

$this->add('add_sub',[]);//正在订阅的频道

$this->add('del_sub',[]);//正在删除的频道

$this->add('keysecret',[]);//目前总共key

$this->add('global_local',[]);//临时存储数据

$this->add('debug',[]);

}

function keysecret(array $keysecret=[]) {
$this->keysecret = $keysecret;
return $this;
}

/**
* @param array $sub
*/
public function subscribe(array $sub=[]) {
// 是否又私有频道订阅
if (!empty($this->keysecret)) {
$keysecret = $this->get('keysecret');

if (!isset($keysecret[$this->keysecret['key']]['connection'])) {
$this->keysecretInit($this->keysecret, [
'connection' => 0,
]);
}
}

//$this->save('add_sub',$sub);
$add_sub = $this->get('add_sub');
if (!empty($sub) && !empty($add_sub)) {
$this->save('add_sub',array_merge($sub, $add_sub));
} elseif (!empty($add_sub)) {
$this->save('add_sub', $add_sub);
} elseif (!empty($sub)) {
$this->save('add_sub', $sub);
}
}

/**
* @param array $sub
*/
public function unsubscribe(array $sub=[]){
// 是否又私有频道订阅
if (!empty($this->keysecret)) {
if (!isset($keysecret[$this->keysecret['key']]['connection']))
$this->keysecretInit($this->keysecret, [
'connection_close' => 1,
]);
}

if (!empty($sub)) $this->save('del_sub',$sub);
}

/**
* @param array $sub 默认获取所有public订阅的数据,private数据需要设置keysecret
* @param null $callback
* @param bool $daemon
* @return mixed
*/
public function getSubscribe(array $sub, $callback = null, $daemon = false){
if ($daemon) $this->daemon($callback,$sub);

return $this->getData($this, $callback, $sub, $this->user_id);
}

/**
* 返回订阅的所有数据
* @param null $callback
* @param bool $daemon
* @return array
*/
public function getSubscribes($callback = null, $daemon = false){
if($daemon) $this->daemon($callback);

return $this->getData($this, $callback, [], $this->user_id);
}

protected function daemon($callback = null, $sub = []){
$this->worker = new Worker();
$this->worker->name = 'Keys scanning, slot' . (isset($this->config['crypto_slot']) ? 'No: ' . $this->config['crypto_slot'] : 'unknown');

$this->worker->onWorkerStart = function() use($callback, $sub) {
$global = $this->client();

$time = isset($this->config['data_time']) ? $this->config['data_time'] : 0.1 ;

Timer::add($time, function() use ($global, $callback, $sub) {
$this->getData($global, $callback, $sub);
});
};
Worker::runAll();
}

/**
* @param $global
* @param null $callback
* @param array $sub 返回规定的频道
* @return array
*/
protected function getData($global, $callback = null, $sub=[])
{
$all_sub = $global->get('all_sub');
if (empty($all_sub)) return [];

$global_local = $global->get('global_local');
$temp = [];

//默认返回所有数据
if (empty($sub)) {
foreach ($all_sub as $k => $v){
if (is_array($v)) {
if (empty($this->keysecret) || $this->keysecret['key'] != $k) continue;

foreach ($v as $vv) {
$data = $global->getQueue($vv);
$temp[$vv] = $data;
}
} else {
//$data = $global->get($v);
if (isset($global_local['public'][$v])) $temp[$v] = $global_local['public'][$v];
}
}
} else {
//返回规定的数据
if (!empty($this->keysecret)) {
//是否有私有数据
if (isset($all_sub[$this->keysecret['key']])) $sub = array_merge($sub, $all_sub[$this->keysecret['key']]);

}
//print_r($sub); die;
foreach ($sub as $k => $v){
//判断私有数据是否需要走队列数据
$temp_v = explode(self::$USER_DELIMITER, $v);
if (count($temp_v) > 1){
//private
$data = $global->getQueue($v);
} else {
//public
//$data = $global->get($v);
if (isset($global_local['public'][$v])) $data = $global_local['public'][$v];
}

if (empty($data)) continue;

$temp[$v] = $data;
}
}

if ($callback !== null){
call_user_func_array($callback, array($temp));
}

return $temp;
}

/*
*
* */
function reconPrivate(string $key){
$debug = $this->client->debug;
if (empty($debug)){
$this->client->debug = [
'private' => [$key => $key],
];
} else {
$this->client->debug = ['private' => array_merge($this->client->debug['private'], [$key => $key])];
}
}

function reconPublic(){
$this->client->debug = [
'public' => ['market' => 'close', 'kline' => 'close'],
];
}

function test(){
print_r($this->client->all_sub);
print_r($this->client->add_sub);
print_r($this->client->del_sub);
print_r($this->client->keysecret);
print_r($this->client->global_key);
}

function test2(){
$global_key=$this->client->global_key;
foreach ($global_key as $k=>$v){
echo count($this->client->$v).'----'.$k.PHP_EOL;
echo json_encode($this->client->$v).PHP_EOL;
}
}

function test_reconnection(){
$this->reconPublic();
}

function test_reconnection2(){
$this->client->debug2=1;
}

function test_reconnection3($key){
$this->reconPrivate($key);
}
}
93 changes: 93 additions & 0 deletions src/Api/WebSocket/MySocketFunction.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<?php
/**
* @author lin <[email protected]>
* */

namespace Lin\Bybit\Api\WebSocket;

trait SocketFunction
{
//标记分隔符
static $USER_DELIMITER = '===';

/**
* @param $global
* @param $tag
* @param $data
* @param string $keysecret
*/
protected function errorMessage($global, $tag, $data, $keysecret = '')
{
$all_sub = $global->get('all_sub');
if (empty($all_sub)) return;

if ($tag == 'public') {
//查询 message 是否包含了关键词。并把错误信息写入频道记录
foreach ($all_sub as $k => $v){
if (is_array($v)) continue;
$sub = strtolower($v);
if (stristr(strtolower($data['message']), $v) !== false) $global->save($sub, $data);
}
} else {
//如果是用户单独订阅,则该用户所有相关的订阅都显示该错误
/*foreach ($all_sub as $k=>$v){
if(!is_array($v)) continue;
$sub=strtolower($v[0]);
$global->add($keysecret['key'].$sub,$data);
}*/
}
}

protected function log($message)
{
if (!is_string($message)) $message=json_encode($message);

$time = time();
$tiemdate = date('d.m.Y H:i:s', $time);

$message = $tiemdate . ' ' . $message . PHP_EOL;

if (isset($this->config['log'])) {
if (is_array($this->config['log']) && isset($this->config['log']['filename'])) {
$filename = $this->config['log']['filename'] . '--' . date('d.m.Y', $time) . '.log';
} else {
$filename = date('d.m.Y', $time) . '.log';
}

file_put_contents($filename, $message, FILE_APPEND);
}

echo $message;
}

/**
* 设置用户key
* @param $keysecret
*/
protected function userKey(array $keysecret, string $sub){
return $keysecret['key'] . self::$USER_DELIMITER . $sub;
}

/**
* 重新订阅
*/
private function reconnection($global, $type = 'public', array $keysecret = [])
{
$all_sub = $global->get('all_sub');
if (empty($all_sub)) return;

$temp = [];
if ($type == 'public'){
foreach ($all_sub as $v){
if (!is_array($v)) $temp[] = $v;
}
//$global->save('add_sub',$temp);
} else {

}

$add_sub = $global->get('add_sub');
if (empty($add_sub)) $global->save('add_sub', $temp);
else $global->save('add_sub', array_merge($temp, $add_sub));
}
}
Loading

0 comments on commit 9e17e56

Please sign in to comment.