-
Notifications
You must be signed in to change notification settings - Fork 0
/
all.sql
72 lines (62 loc) · 2.14 KB
/
all.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/* the mysql schemas and sql statements used in this project
these can be used to debug and optimise index */
-- database
create database nq character set utf8;
grant all privileges on nq.* to 'nq'@'localhost' identified by '123456';
FLUSH PRIVILEGES;
-- tables
create table queue1_msg(
id INT UNSIGNED PRIMARY KEY AUTO_INCREMENT,
sender VARCHAR(20) NOT NULL,
created_time DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
message MEDIUMBLOB NOT NULL
) ENGINE = MyISAM;
create table queue1_rst(
m_id INT UNSIGNED,
receiver VARCHAR(20) NOT NULL,
created_time DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
updated_time DATETIME(3) ON UPDATE CURRENT_TIMESTAMP(3),
status ENUM('processing', 'finished', 'failed') NOT NULL DEFAULT 'processing',
fail_count TINYINT UNSIGNED DEFAULT 0,
result VARBINARY(1024),
PRIMARY key(m_id, receiver),
FOREIGN KEY(m_id) REFERENCES mq1_msg(id),
INDEX receiver_idx(receiver),
INDEX status_idx(status)
) ENGINE = MyISAM;
-- post messages
insert into queue1_msg(sender, message)
values('sender1', 'foo'), ('sender1', 'bar');
-- get last id
select max(id) from queue1_msg;
select max(m_id) from queue1_rst where receiver='receiver1';
-- this should use index merge: intersect(receiver_idx, status_idx)
select count(*) from queue1_rst
where receiver='receiver1' and status='processing';
-- pull messages
select * from queue1_msg where id >= 1 limit 2;
-- save result
insert into queue1_rst(m_id, receiver)
values(1, 'receiver1'),(2, 'receiver1');
-- update result
update queue1_rst
set status='finished', result='done'
where m_id=2 and receiver='receiver1';
-- retry tasks(2 minutes later)
select queue1_msg.*, queue1_rst.fail_count from queue1_rst, queue1_msg
where receiver='receiver1' and status='processing' and
(
(fail_count=0 and queue1_rst.created_time<DATE_SUB(NOW(), INTERVAL 120 SECOND))
or
(fail_count<>0 and updated_time<DATE_SUB(NOW(), INTERVAL 120 SECOND))
)
and
queue1_msg.id=queue1_rst.m_id;
-- update fail count(retry num)
update queue1_rst
set fail_count=1
where m_id=1 and receiver='receiver1';
-- also update status of failed tasks
update queue1_rst
set fail_count=1, status='failed'
where m_id=1 and receiver='receiver1';