-
Notifications
You must be signed in to change notification settings - Fork 335
事务消息使用简介
unixliang edited this page Oct 10, 2018
·
4 revisions
支持事务消息的分支:https://github.com/Tencent/phxqueue/tree/tx
用类似2PC的方式执行分布式事务,不仅对主从事务解耦,保证主事务稳定,还能保证主从事务两者最终一致。
主从事务的概念如下:
主事务一般是核心逻辑,逻辑重,同步调用
从事务一般是次要逻辑,逻辑轻,异步调用
PhxQueue通过二阶段提交保证主从事务最终一致。
使用者先通过 Prepare,将消息暂存到事件中心,该消息对外部不可见;
Prepare后,使用者执行主事务,成功就执行 Commit,PhxQueue 投递消息到从事务;失败就执行 Rollback,PhxQueue 丢弃消息不做投递。
网络分区、业务系统重启等原因,都会导致 Commit/Rollback 丢失,这时要靠反查机制来恢复整个分布式事务的上下文。
PhxQueue 先查内部状态,看使用者是否曾经执行过 Commit 或 Rollback,如果都没,才通过 TxQuery 反问业务。
有了这一套机制,业务系统仅用一个无状态的逻辑层,就能实现分布式事务,这个模式对业务非常便利。
Prepare 成功后,消息会进入 Prepare队列,由 Prepare队列 来驱动反查。 Commit 成功后,消息会转而进入 Commit队列,消息从 Commit队列 出队后将会被推送给订阅者。
按 中文部署手册 完成部署
{
"topic":
{
"topic_id": "1000",
"handle_ids": ["1", "2", "3", "4"],
"store_paxos_batch_count": "80",
"store_paxos_batch_delay_time_ms": "30"
},
"queue_infos":
[
{
"queue_info_id": "1",
"ranges": ["0"] <-- Commit队列
},
{
"queue_info_id": "2",
"ranges": ["1"], <-- Prepare队列
"delay": "1", <-- 该队列的重试间隔
"count": "20" <-- 该队列的重试次数;默认值 -1 表示无限重试
}
],
"pubs":
[
{
"pub_id": "1", <-- 普通消息pub,事务消息不会用到,忽略
"consumer_group_ids": ["1"],
"queue_info_ids": ["1"],
"sub_ids": ["1"]
},
{
"pub_id": "2", <-- 事务消息pub
"consumer_group_ids": ["1"],
"queue_info_ids": ["1"], <-- 指定 Commit队列
"sub_ids": ["1"], <-- 指定订阅者是谁
"is_transaction": "1", <-- 该pub处理事务消息
"tx_query_sub_id": "2", <-- 指定被反查方是谁(反查方其实也可以作为一个订阅者)
"tx_query_queue_info_ids": ["2"] <-- 指定 Prepare队列
}
],
"consumer_groups":
[
{
"consumer_group_id": "1",
"use_dynamic_scale": "0",
"skip_lock": "1"
}
],
"subs":
[
{
"sub_id": "1", <-- 订阅者标识
"sub_name": "push",
"consumer_group_id": "1",
"route_conf": "etc/push_routeconfig.conf" <-- 订阅者路由
},
{
"sub_id": "2", <-- 被反查方标识
"sub_name": "txquery",
"consumer_group_id": "1",
"route_conf": "etc/txquery_routeconfig.conf" <-- 被反查方路由
}
]
}
{
"general":
{
"conn_timeout_ms": "500",
"uri": "/push" <-- 订阅者接收推送的HTTP POST URI
},
"routes":
[
{
"addr":
{
"ip": "127.0.0.1",
"port": "8081"
},
"scale": "1000"
}
]
}
{
"general":
{
"conn_timeout_ms": "500",
"uri": "/push" <-- 订阅者接收推送的HTTP POST URI
},
"routes":
[
{
"addr":
{
"ip": "127.0.0.1",
"port": "8081"
},
"scale": "1000"
}
]
}
{
"general":
{
"conn_timeout_ms": "500",
"uri": "/txquery" <-- 被反查方接收反查请求的HTTP POST URI
},
"routes":
[
{
"addr":
{
"ip": "127.0.0.1",
"port": "8081"
},
"scale": "1000"
}
]
}
subscriber.py
同时处理/push
和/txquery
,其行为是:前10次/txquery
返回UNCERTAIN
,随即返回Commit
。
$ phxqueue_phxrpc/test/test_producer_main -f prepare
succeeded! func prepare client_id 1539090487_REHlvT3nBa buf wMrSGK5Tv1
$ phxqueue_phxrpc/test/test_producer_main -f commit -c 1539090487_REHlvT3nBa
succeeded! func commit client_id 1539090487_REHlvT3nBa buf wMrSGK5Tv1
Subscriber输出
...
----- Request Start ----->
request_path /txquery <-- 反查请求
topic_id 1000
pub_id 2
client_id 1539090487_REHlvT3nBa
count 3 <-- Subscriber 数次返回 UNCERTAIN,PhxQueue 不断反查
atime 1539090487
buffer REHlvT3nBa
127.0.0.1 - - [09/Oct/2018 13:08:11] "POST /txquery HTTP/1.0" 200 -
<----- Request End -----
----- Request Start ----->
request_path /push <-- 执行 Commit 后,PhxQueue 随即推送消息给 Subscriber
topic_id 1000
pub_id 2
client_id 1539090487_REHlvT3nBa
count 0
atime 1539090487
buffer REHlvT3nBa
127.0.0.1 - - [09/Oct/2018 13:08:13] "POST /push HTTP/1.0" 200 -
<----- Request End -----
$ phxqueue_phxrpc/test/test_producer_main -f prepare
succeeded! func prepare client_id 1538919775_hj8gqrUcq2 buf hj8gqrUcq2
$ phxqueue_phxrpc/test/test_producer_main -f rollback -c 1538919775_hj8gqrUcq2
succeeded! func rollback client_id 1538919775_hj8gqrUcq2 buf egZ7XoYsRw
Subscriber输出
...
----- Request Start ----->
request_path /txquery
topic_id 1000
pub_id 2
client_id 1538966857_MlO6436Qwe
count 5 <-- Subscriber 数次返回 UNCERTAIN,PhxQueue 不断反查
atime 1538966857
buffer MlO6436Qwe
127.0.0.1 - - [08/Oct/2018 02:47:44] "POST /txquery HTTP/1.0" 200 -
<----- Request End -----
<-- 执行Rollback之后,PhxQueue 丢弃消息,不再推送
Subscriber输出
...
----- Request Start ----->
request_path /txquery
topic_id 1000
pub_id 2
client_id 1539132534_AhyzRJRVq4
count 10 <-- Subscriber 数次返回 UNCERTAIN,PhxQueue 不断反查;第10次反查,Subscribe 返回 Commit
atime 1539132535
buffer AhyzRJRVq4
127.0.0.1 - - [10/Oct/2018 00:49:08] "POST /txquery HTTP/1.0" 200 -
<----- Request End -----
----- Request Start ----->
request_path /push <-- PhxQueue 收到 Commit,进行推送
topic_id 1000
pub_id 2
client_id 1539132534_AhyzRJRVq4
count 0
atime 1539132535
buffer AhyzRJRVq4
127.0.0.1 - - [10/Oct/2018 00:49:09] "POST /push HTTP/1.0" 200 -
<----- Request End -----