-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontentsmanager.py
586 lines (497 loc) · 23.5 KB
/
contentsmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
"""ContentsManager that allows to open Rmd, py, R and ipynb files as notebooks
"""
import os
from datetime import timedelta
import nbformat
try:
import unittest.mock as mock
except ImportError:
import mock
from tornado.web import HTTPError
from traitlets import Unicode, Float, Bool, Enum
from traitlets.config import Configurable
# import notebook.transutils before notebook.services.contents.filemanager #75
try:
import notebook.transutils # noqa
except ImportError:
pass
from .jupytext import reads, writes
from .jupytext import create_prefix_dir as create_prefix_dir_from_path
from .combine import combine_inputs_with_outputs
from .formats import rearrange_jupytext_metadata, check_file_version
from .formats import (
NOTEBOOK_EXTENSIONS,
long_form_one_format,
long_form_multiple_formats,
)
from .formats import short_form_one_format, short_form_multiple_formats
from .paired_paths import (
paired_paths,
find_base_path_and_format,
base_path,
full_path,
InconsistentPath,
)
from .kernels import set_kernelspec_from_language
def preferred_format(incomplete_format, preferred_formats):
"""Return the preferred format for the given extension"""
incomplete_format = long_form_one_format(incomplete_format)
if "format_name" in incomplete_format:
return incomplete_format
for fmt in long_form_multiple_formats(preferred_formats):
if (
(
incomplete_format["extension"] == fmt["extension"]
or (
fmt["extension"] == ".auto"
and incomplete_format["extension"]
not in [".md", ".markdown", ".Rmd", ".ipynb"]
)
)
and incomplete_format.get("suffix")
== fmt.get("suffix", incomplete_format.get("suffix"))
and incomplete_format.get("prefix")
== fmt.get("prefix", incomplete_format.get("prefix"))
):
fmt.update(incomplete_format)
return fmt
return incomplete_format
def _jupytext_writes(fmt):
def _writes(nbk, version=nbformat.NO_CONVERT, **kwargs):
return writes(nbk, fmt, version=version, **kwargs)
return _writes
def _jupytext_reads(fmt):
def _reads(text, as_version, **kwargs):
return reads(text, fmt, as_version=as_version, **kwargs)
return _reads
def build_jupytext_contents_manager_class(base_contents_manager_class):
"""Derives a TextFileContentsManager class from the given base class"""
class JupytextContentsManager(base_contents_manager_class, Configurable):
"""
A FileContentsManager Class that reads and stores notebooks to classical
Jupyter notebooks (.ipynb), R Markdown notebooks (.Rmd), Julia (.jl),
Python (.py) or R scripts (.R)
"""
# Dictionary: notebook path => (fmt, formats) where fmt is the current format, and formats the paired formats.
paired_notebooks = dict()
def all_nb_extensions(self):
"""All extensions that should be classified as notebooks"""
return [
ext if ext.startswith(".") else "." + ext
for ext in self.notebook_extensions.split(",")
]
default_jupytext_formats = Unicode(
u"",
help="Save notebooks to these file extensions. "
"Can be any of ipynb,Rmd,md,jl,py,R,nb.jl,nb.py,nb.R "
"comma separated. If you want another format than the "
"default one, append the format name to the extension, "
"e.g. ipynb,py:percent to save the notebook to "
"hydrogen/spyder/vscode compatible scripts",
config=True,
)
preferred_jupytext_formats_save = Unicode(
u"",
help="Preferred format when saving notebooks as text, per extension. "
'Use "jl:percent,py:percent,R:percent" if you want to save '
"Julia, Python and R scripts in the double percent format and "
'only write "jupytext_formats": "py" in the notebook metadata.',
config=True,
)
preferred_jupytext_formats_read = Unicode(
u"",
help="Preferred format when reading notebooks from text, per "
'extension. Use "py:sphinx" if you want to read all python '
"scripts as Sphinx gallery scripts.",
config=True,
)
default_notebook_metadata_filter = Unicode(
u"",
help="Cell metadata that should be save in the text representations. "
"Examples: 'all', '-all', 'widgets,nteract', 'kernelspec,jupytext-all'",
config=True,
)
default_cell_metadata_filter = Unicode(
u"",
help="Notebook metadata that should be saved in the text representations. "
"Examples: 'all', 'hide_input,hide_output'",
config=True,
)
comment_magics = Enum(
values=[True, False],
allow_none=True,
help="Should Jupyter magic commands be commented out in the text representation?",
config=True,
)
split_at_heading = Bool(
False,
help="Split markdown cells on headings (Markdown and R Markdown formats only)",
config=True,
)
sphinx_convert_rst2md = Bool(
False,
help="When opening a Sphinx Gallery script, convert the reStructuredText to markdown",
config=True,
)
outdated_text_notebook_margin = Float(
1.0,
help="Refuse to overwrite inputs of a ipynb notebooks with those of a "
"text notebook when the text notebook plus margin is older than "
"the ipynb notebook",
config=True,
)
default_cell_markers = Unicode(
u"",
help='Start and end cell markers for the light format, comma separated. Use "{{{,}}}" to mark cells'
'as foldable regions in Vim, and "region,endregion" to mark cells as Vscode/PyCharm regions',
config=True,
)
notebook_extensions = Unicode(
u",".join(NOTEBOOK_EXTENSIONS),
help="A comma separated list of notebook extensions",
config=True,
)
def drop_paired_notebook(self, path):
"""Remove the current notebook from the list of paired notebooks"""
if path not in self.paired_notebooks:
return
fmt, formats = self.paired_notebooks.pop(path)
prev_paired_paths = paired_paths(path, fmt, formats)
for alt_path, _ in prev_paired_paths:
if alt_path in self.paired_notebooks:
self.drop_paired_notebook(alt_path)
def update_paired_notebooks(self, path, fmt, formats):
"""Update the list of paired notebooks to include/update the current pair"""
if not formats:
self.drop_paired_notebook(path)
return
new_paired_paths = paired_paths(path, fmt, formats)
for alt_path, _ in new_paired_paths:
self.drop_paired_notebook(alt_path)
long_formats = long_form_multiple_formats(formats)
if len(long_formats) == 1 and set(long_formats[0]) <= {"extension"}:
return
short_formats = short_form_multiple_formats(formats)
for alt_path, alt_fmt in new_paired_paths:
self.paired_notebooks[alt_path] = (
short_form_one_format(alt_fmt),
short_formats,
)
def set_default_format_options(self, format_options, read=False):
"""Set default format option"""
if self.default_notebook_metadata_filter:
format_options.setdefault(
"notebook_metadata_filter", self.default_notebook_metadata_filter
)
if self.default_cell_metadata_filter:
format_options.setdefault(
"cell_metadata_filter", self.default_cell_metadata_filter
)
if self.comment_magics is not None:
format_options.setdefault("comment_magics", self.comment_magics)
if self.split_at_heading:
format_options.setdefault("split_at_heading", self.split_at_heading)
if not read and self.default_cell_markers:
format_options.setdefault("cell_markers", self.default_cell_markers)
if read and self.sphinx_convert_rst2md:
format_options.setdefault("rst2md", self.sphinx_convert_rst2md)
def default_formats(self, path):
"""Return the default formats, if they apply to the current path #157"""
formats = long_form_multiple_formats(self.default_jupytext_formats)
for fmt in formats:
try:
base_path(path, fmt)
return self.default_jupytext_formats
except InconsistentPath:
continue
return None
def create_prefix_dir(self, path, fmt):
"""Create the prefix dir, if missing"""
create_prefix_dir_from_path(self._get_os_path(path.strip("/")), fmt)
def save(self, model, path=""):
"""Save the file model and return the model with no content."""
if model["type"] != "notebook":
return super(JupytextContentsManager, self).save(model, path)
path = path.strip("/")
nbk = model["content"]
try:
metadata = nbk.get("metadata")
rearrange_jupytext_metadata(metadata)
jupytext_metadata = metadata.setdefault("jupytext", {})
jupytext_formats = jupytext_metadata.get(
"formats"
) or self.default_formats(path)
if not jupytext_formats:
text_representation = jupytext_metadata.get(
"text_representation", {}
)
ext = os.path.splitext(path)[1]
fmt = {"extension": ext}
if ext == text_representation.get(
"extension"
) and text_representation.get("format_name"):
fmt["format_name"] = text_representation.get("format_name")
jupytext_formats = [fmt]
jupytext_formats = long_form_multiple_formats(
jupytext_formats, metadata, auto_ext_requires_language_info=False
)
# Set preferred formats if not format name is given yet
jupytext_formats = [
preferred_format(f, self.preferred_jupytext_formats_save)
for f in jupytext_formats
]
base, fmt = find_base_path_and_format(path, jupytext_formats)
self.update_paired_notebooks(path, fmt, jupytext_formats)
self.set_default_format_options(jupytext_metadata)
if not jupytext_metadata:
metadata.pop("jupytext")
# Save as ipynb first
return_value = None
value = None
for fmt in jupytext_formats[::-1]:
if fmt["extension"] != ".ipynb":
continue
alt_path = full_path(base, fmt)
self.create_prefix_dir(alt_path, fmt)
self.log.info("Saving %s", os.path.basename(alt_path))
value = super(JupytextContentsManager, self).save(model, alt_path)
if alt_path == path:
return_value = value
# And then to the other formats, in reverse order so that
# the first format is the most recent
for fmt in jupytext_formats[::-1]:
if fmt["extension"] == ".ipynb":
continue
alt_path = full_path(base, fmt)
self.create_prefix_dir(alt_path, fmt)
if "format_name" in fmt and fmt["extension"] not in [
".md",
".markdown",
".Rmd",
]:
self.log.info(
"Saving %s in format %s:%s",
os.path.basename(alt_path),
fmt["extension"][1:],
fmt["format_name"],
)
else:
self.log.info("Saving %s", os.path.basename(alt_path))
with mock.patch("nbformat.writes", _jupytext_writes(fmt)):
value = super(JupytextContentsManager, self).save(
model, alt_path
)
if alt_path == path:
return_value = value
# Update modified timestamp to match that of the pair #207
return_value["last_modified"] = value["last_modified"]
return return_value
except Exception as err:
raise HTTPError(400, str(err))
def get(
self,
path,
content=True,
type=None,
format=None,
load_alternative_format=True,
):
""" Takes a path for an entity and returns its model"""
path = path.strip("/")
os_path = self._get_os_path(path)
ext = os.path.splitext(path)[1]
# Not a notebook?
if (
not self.exists(path)
or os.path.isdir(os_path)
or (type != "notebook" if type else ext not in self.all_nb_extensions())
):
return super(JupytextContentsManager, self).get(
path, content, type, format
)
fmt = preferred_format(ext, self.preferred_jupytext_formats_read)
if ext == ".ipynb":
model = self._notebook_model(path, content=content)
else:
self.set_default_format_options(fmt, read=True)
with mock.patch("nbformat.reads", _jupytext_reads(fmt)):
model = self._notebook_model(path, content=content)
if not load_alternative_format:
return model
if not content:
# Modification time of a paired notebook, in this context - Jupyter is checking timestamp
# before saving - is the most recent among all representations #118
if path not in self.paired_notebooks:
return model
fmt, formats = self.paired_notebooks.get(path)
for alt_path, _ in paired_paths(path, fmt, formats):
if alt_path != path and self.exists(alt_path):
alt_model = self._notebook_model(alt_path, content=False)
if alt_model["last_modified"] > model["last_modified"]:
model["last_modified"] = alt_model["last_modified"]
return model
# We will now read a second file if this is a paired notebooks.
nbk = model["content"]
jupytext_formats = nbk.metadata.get("jupytext", {}).get(
"formats"
) or self.default_formats(path)
jupytext_formats = long_form_multiple_formats(
jupytext_formats, nbk.metadata, auto_ext_requires_language_info=False
)
# Compute paired notebooks from formats
alt_paths = [(path, fmt)]
if jupytext_formats:
try:
_, fmt = find_base_path_and_format(path, jupytext_formats)
alt_paths = paired_paths(path, fmt, jupytext_formats)
self.update_paired_notebooks(path, fmt, jupytext_formats)
except InconsistentPath as err:
self.log.info("Unable to read paired notebook: %s", str(err))
else:
if path in self.paired_notebooks:
fmt, formats = self.paired_notebooks.get(path)
alt_paths = paired_paths(path, fmt, formats)
if len(alt_paths) > 1 and ext == ".ipynb":
# Apply default options (like saving and reloading would do)
jupytext_metadata = model["content"]["metadata"].get("jupytext", {})
self.set_default_format_options(jupytext_metadata, read=True)
if jupytext_metadata:
model["content"]["metadata"]["jupytext"] = jupytext_metadata
org_model = model
fmt_inputs = fmt
path_inputs = path_outputs = path
model_outputs = None
# Source format is the most recent non ipynb format found on disk
if path.endswith(".ipynb"):
source_timestamps = {}
for alt_path, alt_fmt in alt_paths:
if not alt_path.endswith(".ipynb") and self.exists(alt_path):
source_timestamps[alt_path] = self._notebook_model(
alt_path, content=False
)["last_modified"]
most_recent_timestamp = None
for alt_path in source_timestamps:
alt_ts = source_timestamps[alt_path]
if len(source_timestamps) > 1:
self.log.info(
u"File {} was last modified at {}".format(alt_path, alt_ts)
)
if most_recent_timestamp is None or alt_ts > most_recent_timestamp:
most_recent_timestamp = alt_ts
model_outputs = model
path_inputs = alt_path
fmt_inputs = alt_fmt
if most_recent_timestamp is not None:
self.log.info(u"Reading SOURCE from {}".format(path_inputs))
model = self.get(
path_inputs,
content=content,
type="notebook",
format=format,
load_alternative_format=False,
)
# Outputs taken from ipynb if in group, if file exists
else:
for alt_path, _ in alt_paths:
if alt_path.endswith(".ipynb") and self.exists(alt_path):
self.log.info(u"Reading OUTPUTS from {}".format(alt_path))
path_outputs = alt_path
model_outputs = self.get(
alt_path,
content=content,
type="notebook",
format=format,
load_alternative_format=False,
)
break
try:
check_file_version(model["content"], path_inputs, path_outputs)
except Exception as err:
raise HTTPError(400, str(err))
# Before we combine the two files, we make sure we're not overwriting ipynb cells
# with an outdated text file
try:
if model_outputs and model_outputs["last_modified"] > model[
"last_modified"
] + timedelta(seconds=self.outdated_text_notebook_margin):
raise HTTPError(
400,
"""{out} (last modified {out_last})
seems more recent than {src} (last modified {src_last})
Please either:
- open {src} in a text editor, make sure it is up to date, and save it,
- or delete {src} if not up to date,
- or increase check margin by adding, say,
c.ContentsManager.outdated_text_notebook_margin = 5 # in seconds # or float("inf")
to your .jupyter/jupyter_notebook_config.py file
""".format(
src=path_inputs,
src_last=model["last_modified"],
out=path_outputs,
out_last=model_outputs["last_modified"],
),
)
except OverflowError:
pass
if model_outputs:
combine_inputs_with_outputs(
model["content"], model_outputs["content"], fmt_inputs
)
elif not path.endswith(".ipynb"):
set_kernelspec_from_language(model["content"])
# Trust code cells when they have no output
for cell in model["content"].cells:
if (
cell.cell_type == "code"
and not cell.outputs
and cell.metadata.get("trusted") is False
):
cell.metadata["trusted"] = True
# Path and name of the notebook is the one of the original path
model["path"] = org_model["path"]
model["name"] = org_model["name"]
return model
def trust_notebook(self, path):
"""Trust the current notebook"""
if path.endswith(".ipynb") or path not in self.paired_notebooks:
super(JupytextContentsManager, self).trust_notebook(path)
return
fmt, formats = self.paired_notebooks[path]
for alt_path, alt_fmt in paired_paths(path, fmt, formats):
if alt_fmt["extension"] == ".ipynb":
super(JupytextContentsManager, self).trust_notebook(alt_path)
def rename_file(self, old_path, new_path):
"""Rename the current notebook, as well as its alternative representations"""
if old_path not in self.paired_notebooks:
try:
# we do not know yet if this is a paired notebook (#190)
# -> to get this information we open the notebook
self.get(old_path, content=True)
except Exception:
pass
if old_path not in self.paired_notebooks:
super(JupytextContentsManager, self).rename_file(old_path, new_path)
return
fmt, formats = self.paired_notebooks.get(old_path)
old_alt_paths = paired_paths(old_path, fmt, formats)
# Is the new file name consistent with suffix?
try:
new_base = base_path(new_path, fmt)
except Exception as err:
raise HTTPError(400, str(err))
for old_alt_path, alt_fmt in old_alt_paths:
new_alt_path = full_path(new_base, alt_fmt)
if self.exists(old_alt_path):
super(JupytextContentsManager, self).rename_file(
old_alt_path, new_alt_path
)
self.drop_paired_notebook(old_path)
self.update_paired_notebooks(new_path, fmt, formats)
return JupytextContentsManager
try:
from notebook.services.contents.largefilemanager import LargeFileManager
TextFileContentsManager = build_jupytext_contents_manager_class(LargeFileManager)
except ImportError:
# Older versions of notebook do not have the LargeFileManager #217
from notebook.services.contents.filemanager import FileContentsManager
TextFileContentsManager = build_jupytext_contents_manager_class(FileContentsManager)