-
Notifications
You must be signed in to change notification settings - Fork 24
/
mmapfactory.go
137 lines (108 loc) · 2.38 KB
/
mmapfactory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package bigqueue
import (
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
)
// DBFactory is used to manupilate mulitple data files by index number
type DBFactory struct {
lockMap map[int64]*sync.Mutex
// DB mapping with file index no
dbMap map[int64]*DB
filePrefix string
fileSuffix string
lock sync.Mutex
filePath string
InitialMmapSize int
}
func (f *DBFactory) acquireDB(index int64) (*DB, error) {
// add map lock
f.lock.Lock()
db := f.dbMap[index]
if db != nil {
f.lock.Unlock()
return db, nil
}
lock := f.lockMap[index]
if lock == nil {
lock = &(sync.Mutex{})
f.lockMap[index] = lock
}
defer func() {
delete(f.lockMap, index)
}()
f.lock.Unlock()
// lock by index
lock.Lock()
defer lock.Unlock()
db = &DB{
path: f.getFilePath(index),
InitialMmapSize: f.InitialMmapSize,
opened: true,
}
err := db.Open(defaultFileMode)
if err != nil {
return nil, err
}
f.dbMap[index] = db
return db, nil
}
func (f *DBFactory) getFilePath(index int64) string {
return f.filePath + "/" + GetFileName(f.filePrefix, f.fileSuffix, index)
}
// Close all data files
func (f *DBFactory) Close() error {
if f.dbMap != nil {
for k, v := range f.dbMap {
err := v.Close()
if err != nil {
log.Println("Close DB from map failed. ", k, err)
}
}
}
// set to the emtpy map
f.dbMap = make(map[int64]*DB)
f.lockMap = make(map[int64]*sync.Mutex)
return nil
}
func (f *DBFactory) removeBeforeIndex(index int64) error {
f.lock.Lock()
defer f.lock.Unlock()
for idx, db := range f.dbMap {
if int64(idx) < index {
log.Println("Do delete index db file by gc. no=", idx)
db.Close()
os.Remove(f.getFilePath(idx))
delete(f.dbMap, idx)
}
}
// double check delete file
files, err := GetFiles(f.filePath)
if err != nil {
return err
}
for i := files.Front(); i != nil; i = i.Next() {
fn := fmt.Sprintf("%v", i.Value)
if strings.HasSuffix(fn, f.fileSuffix) {
fin := f.getFileIndex(fn)
if fin >= 0 && int64(fin) < index {
log.Println("Do delete index db file by gc. no=", fin)
os.Remove(f.getFilePath(fin))
}
}
}
return nil
}
func (f *DBFactory) getFileIndex(fn string) int64 {
beginIndex := strings.LastIndex(fn, "-")
beginIndex = beginIndex + 1
endIndex := strings.LastIndex(fn, f.fileSuffix)
sIndex, err := strconv.Atoi(fn[beginIndex:endIndex])
if err != nil {
return -1
}
return int64(sIndex)
}