-
Notifications
You must be signed in to change notification settings - Fork 2
/
1password_deduplicator.py
205 lines (168 loc) · 6.01 KB
/
1password_deduplicator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import subprocess
import tldextract
import argparse
import shlex
import json
def run_command(cmd):
try:
return subprocess.run(
shlex.split(cmd), check=True, capture_output=True, text=True
).stdout
except subprocess.CalledProcessError as e:
# print(e)
raise
def domain_parts(item: dict):
"""Get top level domain."""
if not "domain_parts" in item:
item["domain_parts"] = [tldextract.extract(url["href"]) for url in item["urls"]]
return item["domain_parts"]
def domains(item):
s = set()
for d in domain_parts(item):
if d.subdomain == "www":
d = (d.domain, d.suffix)
s.add(".".join(p for p in d if p))
return s
def root_domains(item) -> set:
"""Return a set of domain names linked to the item."""
return set(
".".join(p for p in (d.domain, d.suffix) if p) for d in domain_parts(item)
)
def username(item):
return item.get("additional_information", None)
def password(item):
cmd = f"op read op://{item['vault']['name']}/{item['id']}/password"
try:
return run_command(cmd)
except subprocess.CalledProcessError:
print(f"Exception while getting password for {item['title']}")
def otp(item):
cmd = f"op item get {item['id']} --otp"
try:
result = int(run_command(cmd).strip("\n"))
except subprocess.CalledProcessError:
return None
return result
def updated_at(item):
if not "updated_at" in item:
item["updated_at"] = 0
return item["updated_at"]
def details(item):
if not "details" in item:
item["details"] = json.loads(
run_command(
f'op get item {item["id"]} --fields "username,password,one-time password"'
)
)
return item["details"]
def delete(item):
"""Delete / archive the item in 1Password."""
if dry_run:
print(
f'To delete duplicate item {item["title"]}, username {username(item)}, with password {password(item)} in vault {item["vault"]["name"]} for site{"s" if len(domains(item)) > 1 else ""} {", ".join(domains(item))}, last updated_at on {updated_at(item)}, run again without the dry run flag.'
)
item["trashed"] = "Y"
return None
if prompt:
confirm = input(
f'Are you sure you want to delete duplicate item {item["title"]}, username {username(item)}, password {password(item)} in vault {item["vault"]["name"]} for site{"s" if len(domains(item)) > 1 else ""} {", ".join(domains(item))}, last updated_at on {updated_at(item)}? (Y/n): '
)
if confirm.upper() != "Y":
return None
if archive:
verb = "Archived"
run_command(f'op item delete {item["id"]} --archive')
else:
run_command(f'op item delete {item["id"]}')
verb = "Deleted"
print(
f'{verb} duplicate item {item["title"]}, username {username(item)} for site{"s" if len(domains(item)) > 1 else ""} {", ".join(domains(item))}'
)
item["trashed"] = "Y"
def run(items: list):
uniq = {}
for new_item in items:
if "trashed" in new_item:
continue
if not "urls" in new_item:
continue
if ignore_favorites and new_item.get("favorite", False):
continue
for domain_name in root_domains(new_item):
try:
# Get item from 'uniq' if it already exists.
existing_item = uniq[(domain_name, username(new_item))]
except KeyError:
# Not in 'uniq'. Add to 'uniq'.
uniq[(domain_name, username(new_item))] = new_item
continue
# If it's already marked as trashed, continue.
if existing_item.get("trashed", None) == "Y":
continue
if domains(new_item) != domains(existing_item) and password(
new_item
) != password(existing_item):
continue
# Should we prefer (keep) the new or old item?
prefer_new = False
if (otp(new_item) and not otp(existing_item)):
prefer_new = True
elif (updated_at(new_item) and updated_at(existing_item) and
updated_at(new_item) > updated_at(existing_item)
):
prefer_new = True
elif (len(password(new_item)) > len(password(existing_item))):
prefer_new = True
# Delete the item we don't want.
if (prefer_new):
uniq[(domain_name, username(existing_item))] = new_item
delete(existing_item)
else:
delete(new_item)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Remove duplicate logins from your 1Password vault"
)
parser.add_argument(
"-d",
"--dry-run",
action="store_true",
help="Output the items to be removed without actually removing them",
)
parser.add_argument(
"-y", "--yes", action="store_true", help="Don't prompt for delete confirmation"
)
parser.add_argument(
"--vault",
metavar="[vault id]",
help="Only search for duplicates in the specified vault",
)
parser.add_argument(
"--tag",
metavar="[tag id]",
action="append",
help="Only search for duplicates with the specified tags",
)
parser.add_argument(
"--archive",
action="store_true",
help="Archive duplicates instead of deleting them",
)
parser.add_argument(
"--ignore-favorites",
action="store_true",
help="Ignore favorites",
)
args = parser.parse_args()
dry_run = args.dry_run
prompt = not args.yes
archive = args.archive
ignore_favorites = args.ignore_favorites
cmd = "op item list --categories Login --format=json"
if args.vault:
cmd += f" --vault {args.vault}"
if args.tag:
cmd += f' --tags {",".join(args.tag)}'
items = json.loads(run_command(cmd))
run(items)
print("Finished.")