-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.py
125 lines (107 loc) · 3.33 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from fastapi import Depends, FastAPI, HTTPException
from . import collection as collection_loader
from . import config, dependencies, schemas, services
app = FastAPI(
root_path=config.settings.root_path,
settings=config.settings,
)
@app.get(
"/ingestions", response_model=schemas.ListIngestionResponse, tags=["Ingestion"]
)
async def list_ingestions(
list_request: schemas.ListIngestionRequest = Depends(),
db: services.Database = Depends(dependencies.get_db),
):
return db.fetch_many(
status=list_request.status, next=list_request.next, limit=list_request.limit
)
@app.post(
"/ingestions",
response_model=schemas.Ingestion,
tags=["Ingestion"],
status_code=201,
)
async def create_ingestion(
item: schemas.AccessibleItem,
username: str = Depends(dependencies.get_username),
db: services.Database = Depends(dependencies.get_db),
) -> schemas.Ingestion:
return schemas.Ingestion(
id=item.id,
created_by=username,
item=item,
status=schemas.Status.queued,
).enqueue(db)
@app.get(
"/ingestions/{ingestion_id}",
response_model=schemas.Ingestion,
tags=["Ingestion"],
)
def get_ingestion(
ingestion: schemas.Ingestion = Depends(dependencies.fetch_ingestion),
) -> schemas.Ingestion:
return ingestion
@app.patch(
"/ingestions/{ingestion_id}",
response_model=schemas.Ingestion,
tags=["Ingestion"],
)
def update_ingestion(
update: schemas.UpdateIngestionRequest,
ingestion: schemas.Ingestion = Depends(dependencies.fetch_ingestion),
db: services.Database = Depends(dependencies.get_db),
):
updated_item = ingestion.copy(update=update.dict(exclude_unset=True))
return updated_item.save(db)
@app.delete(
"/ingestions/{ingestion_id}",
response_model=schemas.Ingestion,
tags=["Ingestion"],
)
def cancel_ingestion(
ingestion: schemas.Ingestion = Depends(dependencies.fetch_ingestion),
db: services.Database = Depends(dependencies.get_db),
) -> schemas.Ingestion:
if ingestion.status != schemas.Status.queued:
raise HTTPException(
status_code=400,
detail=(
"Unable to delete ingestion if status is not "
f"{schemas.Status.queued}"
),
)
return ingestion.cancel(db)
@app.post(
"/collections",
tags=["Collection"],
status_code=201,
dependencies=[Depends(dependencies.get_username)],
)
def publish_collection(collection: schemas.StacCollection):
# pgstac create collection
try:
collection_loader.ingest(collection)
return {f"Successfully published: {collection.id}"}
except Exception as e:
raise HTTPException(
status_code=400,
detail=(f"Unable to publish collection: {e}"),
)
@app.delete(
"/collections/{collection_id}",
tags=["Collection"],
dependencies=[Depends(dependencies.get_username)],
)
def delete_collection(collection_id: str):
try:
collection_loader.delete(collection_id=collection_id)
return {f"Successfully deleted: {collection_id}"}
except Exception as e:
print(e)
raise HTTPException(status_code=400, detail=(f"{e}"))
@app.get("/auth/me")
def who_am_i(username=Depends(dependencies.get_username)):
"""
Return username for the provided request
"""
return {"username": username}