Skip to content

Commit

Permalink
Make data-links behave more like 'symlinks' (#9485)
Browse files Browse the repository at this point in the history
- Closes #9324
  • Loading branch information
radeusgd authored Mar 22, 2024
1 parent 6c1ba64 commit 6665c22
Show file tree
Hide file tree
Showing 49 changed files with 1,021 additions and 340 deletions.
26 changes: 20 additions & 6 deletions distribution/lib/Standard/AWS/0.0.0-dev/src/S3/S3_Data_Link.enso
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from Standard.Base import all
from Standard.Base.Enso_Cloud.Data_Link import parse_format
import Standard.Base.Errors.Illegal_State.Illegal_State
import Standard.Base.System.Input_Stream.Input_Stream
from Standard.Base.Enso_Cloud.Data_Link import Data_Link_With_Input_Stream, parse_format
from Standard.Base.Enso_Cloud.Public_Utils import get_optional_field, get_required_field

import project.AWS_Credential.AWS_Credential
Expand All @@ -9,18 +11,30 @@ from project.Internal.Data_Link_Helpers import decode_aws_credential
## PRIVATE
type S3_Data_Link
## PRIVATE
Value (uri : Text) format (credentials : AWS_Credential)
Value (uri : Text) format_json (credentials : AWS_Credential)

## PRIVATE
parse json -> S3_Data_Link =
uri = get_required_field "uri" json expected_type=Text
auth = decode_aws_credential (get_required_field "auth" json)
format = parse_format (get_optional_field "format" json)
S3_Data_Link.Value uri format auth
format_json = get_optional_field "format" json
S3_Data_Link.Value uri format_json auth

## PRIVATE
as_file self -> S3_File = S3_File.new self.uri self.credentials

## PRIVATE
read self (on_problems : Problem_Behavior) =
self.as_file.read self.format on_problems
default_format self -> Any ! Illegal_State =
parse_format self.format_json

## PRIVATE
read self (format = Auto_Detect) (on_problems : Problem_Behavior) =
effective_format = if format != Auto_Detect then format else self.default_format
self.as_file.read effective_format on_problems

## PRIVATE
with_input_stream self (open_options : Vector) (action : Input_Stream -> Any) -> Any =
self.as_file.with_input_stream open_options action

## PRIVATE
Data_Link_With_Input_Stream.from (that:S3_Data_Link) = Data_Link_With_Input_Stream.Value that
56 changes: 30 additions & 26 deletions distribution/lib/Standard/AWS/0.0.0-dev/src/S3/S3_File.enso
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from Standard.Base import all
import Standard.Base.Enso_Cloud.Data_Link
import Standard.Base.Errors.Common.Syntax_Error
import Standard.Base.Errors.File_Error.File_Error
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument
import Standard.Base.Errors.Unimplemented.Unimplemented
import Standard.Base.Runtime.Context
import Standard.Base.System.File.Data_Link_Access.Data_Link_Access
import Standard.Base.System.File.Generic.File_Like.File_Like
import Standard.Base.System.File.Generic.Writable_File.Writable_File
import Standard.Base.System.File_Format_Metadata.File_Format_Metadata
Expand Down Expand Up @@ -89,21 +91,21 @@ type S3_File
with_output_stream : Vector File_Access -> (Output_Stream -> Any ! File_Error) -> Any ! File_Error
with_output_stream self (open_options : Vector) action = if self.is_directory then Error.throw (S3_Error.Error "S3 directory cannot be opened as a stream." self.uri) else
Context.Output.if_enabled disabled_message="Writing to an S3_File is forbidden as the Output context is disabled." panic=False <|
if open_options.contains File_Access.Append then Error.throw (S3_Error.Error "S3 does not support appending to a file. Instead you may read it, modify and then write the new contents." self.uri) else
# The exists check is not atomic, but it is the best we can do with S3
check_exists = open_options.contains File_Access.Create_New
valid_options = [File_Access.Write, File_Access.Create_New, File_Access.Truncate_Existing, File_Access.Create]
invalid_options = open_options.filter (Filter_Condition.Is_In valid_options action=Filter_Action.Remove)
if invalid_options.not_empty then Error.throw (S3_Error.Error "Unsupported S3 stream options: "+invalid_options.to_display_text self.uri) else
if check_exists && self.exists then Error.throw (File_Error.Already_Exists self) else
# Given that the amount of data written may be large and AWS library does not seem to support streaming it directly, we use a temporary file to store the data.
tmp_file = File.create_temporary_file "s3-tmp"
Panic.with_finalizer tmp_file.delete <|
result = tmp_file.with_output_stream [File_Access.Write] action
# Only proceed if the write succeeded
result.if_not_error <|
(translate_file_errors self <| S3.upload_file tmp_file self.s3_path.bucket self.s3_path.key self.credentials) . if_not_error <|
result
open_as_data_link = (open_options.contains Data_Link_Access.No_Follow . not) && (Data_Link.is_data_link self)
if open_as_data_link then Data_Link.write_data_link_as_stream self open_options action else
if open_options.contains File_Access.Append then Error.throw (S3_Error.Error "S3 does not support appending to a file. Instead you may read it, modify and then write the new contents." self.uri) else
File_Access.ensure_only_allowed_options "with_output_stream" [File_Access.Write, File_Access.Create_New, File_Access.Truncate_Existing, File_Access.Create, Data_Link_Access.No_Follow] open_options <|
# The exists check is not atomic, but it is the best we can do with S3
check_exists = open_options.contains File_Access.Create_New
if check_exists && self.exists then Error.throw (File_Error.Already_Exists self) else
# Given that the amount of data written may be large and AWS library does not seem to support streaming it directly, we use a temporary file to store the data.
tmp_file = File.create_temporary_file "s3-tmp"
Panic.with_finalizer tmp_file.delete <|
result = tmp_file.with_output_stream [File_Access.Write] action
# Only proceed if the write succeeded
result.if_not_error <|
(translate_file_errors self <| S3.upload_file tmp_file self.s3_path.bucket self.s3_path.key self.credentials) . if_not_error <|
result


## PRIVATE
Expand All @@ -121,9 +123,11 @@ type S3_File
if it returns exceptionally).
with_input_stream : Vector File_Access -> (Input_Stream -> Any ! File_Error) -> Any ! S3_Error | Illegal_Argument
with_input_stream self (open_options : Vector) action = if self.is_directory then Error.throw (Illegal_Argument.Error "S3 folders cannot be opened as a stream." self.uri) else
if (open_options != [File_Access.Read]) then Error.throw (S3_Error.Error "S3 files can only be opened for reading." self.uri) else
response_body = translate_file_errors self <| S3.get_object self.s3_path.bucket self.s3_path.key self.credentials delimiter=S3_Path.delimiter
response_body.with_stream action
open_as_data_link = (open_options.contains Data_Link_Access.No_Follow . not) && (Data_Link.is_data_link self)
if open_as_data_link then Data_Link.read_data_link_as_stream self open_options action else
File_Access.ensure_only_allowed_options "with_output_stream" [File_Access.Read, Data_Link_Access.No_Follow] open_options <|
response_body = translate_file_errors self <| S3.get_object self.s3_path.bucket self.s3_path.key self.credentials delimiter=S3_Path.delimiter
response_body.with_stream action

## ALIAS load, open
GROUP Standard.Base.Input
Expand All @@ -143,14 +147,14 @@ type S3_File
@format File_Format.default_widget
read : File_Format -> Problem_Behavior -> Any ! S3_Error
read self format=Auto_Detect (on_problems=Problem_Behavior.Report_Warning) =
_ = on_problems
File_Format.handle_format_missing_arguments format <| case format of
Auto_Detect -> if self.is_directory then format.read self on_problems else
response = translate_file_errors self <| S3.get_object self.s3_path.bucket self.s3_path.key self.credentials delimiter=S3_Path.delimiter
response.decode Auto_Detect
_ ->
metadata = File_Format_Metadata.Value path=self.path name=self.name
self.with_input_stream [File_Access.Read] (stream-> format.read_stream stream metadata)
if Data_Link.is_data_link self then Data_Link.read_data_link self format on_problems else
File_Format.handle_format_missing_arguments format <| case format of
Auto_Detect -> if self.is_directory then format.read self on_problems else
response = translate_file_errors self <| S3.get_object self.s3_path.bucket self.s3_path.key self.credentials delimiter=S3_Path.delimiter
response.decode Auto_Detect
_ ->
metadata = File_Format_Metadata.Value path=self.path name=self.name
self.with_input_stream [File_Access.Read] (stream-> format.read_stream stream metadata)

## ALIAS load bytes, open bytes
ICON data_input
Expand Down
53 changes: 33 additions & 20 deletions distribution/lib/Standard/Base/0.0.0-dev/src/Data.enso
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ import project.Data.Pair.Pair
import project.Data.Text.Encoding.Encoding
import project.Data.Text.Text
import project.Data.Vector.Vector
import project.Enso_Cloud.Data_Link
import project.Error.Error
import project.Errors.File_Error.File_Error
import project.Errors.Illegal_Argument.Illegal_Argument
import project.Errors.Problem_Behavior.Problem_Behavior
import project.Internal.Data_Read_Helpers
import project.Meta
import project.Network.HTTP.Header.Header
import project.Network.HTTP.HTTP
import project.Network.HTTP.HTTP_Error.HTTP_Error
import project.Network.HTTP.HTTP_Method.HTTP_Method
import project.Network.HTTP.Request.Request
import project.Network.HTTP.Request_Body.Request_Body
import project.Network.HTTP.Request_Error
import project.Network.URI.URI
Expand All @@ -21,6 +22,7 @@ import project.Runtime.Context
import project.System.File.File
import project.System.File.Generic.Writable_File.Writable_File
from project.Data.Boolean import Boolean, False, True
from project.Metadata.Choice import Option
from project.Metadata.Widget import Text_Input
from project.System.File_Format import Auto_Detect, File_Format

Expand Down Expand Up @@ -65,8 +67,9 @@ from project.System.File_Format import Auto_Detect, File_Format
@format File_Format.default_widget
read : Text | File -> File_Format -> Problem_Behavior -> Any ! File_Error
read path format=Auto_Detect (on_problems=Problem_Behavior.Report_Warning) = case path of
_ : Text -> if (path.starts_with "http://") || (path.starts_with "https://") then fetch path else
_ : Text -> if Data_Read_Helpers.looks_like_uri path then Data_Read_Helpers.fetch_following_data_links path format=format else
read (File.new path) format on_problems
uri : URI -> Data_Read_Helpers.fetch_following_data_links uri format=format
_ -> File.new path . read format on_problems

## ALIAS load text, open text
Expand Down Expand Up @@ -168,8 +171,9 @@ list_directory directory:File (name_filter:(Text | Nothing)=Nothing) recursive:B
`HTTP_Method.Head`, `HTTP_Method.Delete`, `HTTP_Method.Options`.
Defaults to `HTTP_Method.Get`.
- headers: The headers to send with the request. Defaults to an empty vector.
- try_auto_parse_response: If successful should the body be attempted to be
parsed to an Enso native object.
- format: The format to use for interpreting the response.
Defaults to `Auto_Detect`. If `Raw_Response` is selected or if the format
cannot be determined automatically, a raw HTTP `Response` will be returned.

> Example
Read from an HTTP endpoint.
Expand All @@ -184,15 +188,10 @@ list_directory directory:File (name_filter:(Text | Nothing)=Nothing) recursive:B
file = enso_project.data / "spreadsheet.xls"
Data.fetch URL . body . write file
@uri Text_Input
fetch : (URI | Text) -> HTTP_Method -> Vector (Header | Pair Text Text) -> Boolean -> Any ! Request_Error | HTTP_Error
fetch (uri:(URI | Text)) (method:HTTP_Method=HTTP_Method.Get) (headers:(Vector (Header | Pair Text Text))=[]) (try_auto_parse_response:Boolean=True) =
response = HTTP.fetch uri method headers
if try_auto_parse_response.not then response.with_materialized_body else
## We cannot catch decoding errors here and fall-back to the raw response
body, because as soon as decoding is started, at least part of the
input stream may already be consumed, so we cannot easily reconstruct
the whole stream.
response.decode if_unsupported=response.with_materialized_body
@format Data_Read_Helpers.format_widget_with_raw_response
fetch : (URI | Text) -> HTTP_Method -> Vector (Header | Pair Text Text) -> File_Format -> Any ! Request_Error | HTTP_Error
fetch (uri:(URI | Text)) (method:HTTP_Method=HTTP_Method.Get) (headers:(Vector (Header | Pair Text Text))=[]) (format = Auto_Detect) =
Data_Read_Helpers.fetch_following_data_links uri method headers (Data_Read_Helpers.handle_legacy_format "fetch" "format" format)

## ALIAS http post, upload
GROUP Output
Expand All @@ -207,8 +206,9 @@ fetch (uri:(URI | Text)) (method:HTTP_Method=HTTP_Method.Get) (headers:(Vector (
- method: The HTTP method to use. Must be one of `HTTP_Method.Post`,
`HTTP_Method.Put`, `HTTP_Method.Patch`. Defaults to `HTTP_Method.Post`.
- headers: The headers to send with the request. Defaults to an empty vector.
- try_auto_parse: If successful should the body be attempted to be parsed to
an Enso native object.
- response_format: The format to use for interpreting the response.
Defaults to `Auto_Detect`. If `Raw_Response` is selected or if the format
cannot be determined automatically, a raw HTTP `Response` will be returned.

! Supported Body Types

Expand Down Expand Up @@ -314,11 +314,11 @@ fetch (uri:(URI | Text)) (method:HTTP_Method=HTTP_Method.Get) (headers:(Vector (
form_data = Map.from_vector [["key", "val"], ["a_file", test_file]]
response = Data.post url_post (Request_Body.Form_Data form_data url_encoded=True)
@uri Text_Input
post : (URI | Text) -> Request_Body -> HTTP_Method -> Vector (Header | Pair Text Text) -> Boolean -> Any ! Request_Error | HTTP_Error
post (uri:(URI | Text)) (body:Request_Body=Request_Body.Empty) (method:HTTP_Method=HTTP_Method.Post) (headers:(Vector (Header | Pair Text Text))=[]) (try_auto_parse_response:Boolean=True) =
@response_format Data_Read_Helpers.format_widget_with_raw_response
post : (URI | Text) -> Request_Body -> HTTP_Method -> Vector (Header | Pair Text Text) -> File_Format -> Any ! Request_Error | HTTP_Error
post (uri:(URI | Text)) (body:Request_Body=Request_Body.Empty) (method:HTTP_Method=HTTP_Method.Post) (headers:(Vector (Header | Pair Text Text))=[]) (response_format = Auto_Detect) =
response = HTTP.post uri body method headers
if try_auto_parse_response.not then response.with_materialized_body else
response.decode if_unsupported=response.with_materialized_body
Data_Read_Helpers.decode_http_response_following_data_links response (Data_Read_Helpers.handle_legacy_format "post" "response_format" response_format)

## GROUP Input
ICON data_download
Expand All @@ -337,4 +337,17 @@ download : (URI | Text) -> Writable_File -> HTTP_Method -> Vector (Header | Pair
download (uri:(URI | Text)) file:Writable_File (method:HTTP_Method=HTTP_Method.Get) (headers:(Vector (Header | Pair Text Text))=[]) =
Context.Output.if_enabled disabled_message="Downloading to a file is forbidden as the Output context is disabled." panic=False <|
response = HTTP.fetch uri method headers
response.write file
case Data_Link.is_data_link response.body.metadata of
True ->
# If the resource was a data link, we follow it, download the target data and try to write it to a file.
data_link = Data_Link.interpret_json_as_data_link response.decode_as_json
Data_Link.save_data_link_to_file data_link file
False ->
response.write file

## If the `format` is set to `Raw_Response`, a raw HTTP `Response` is returned
that can be then processed further manually.
type Raw_Response
## PRIVATE
get_dropdown_options : Vector Option
get_dropdown_options = [Option "Raw HTTP Response" (Meta.get_qualified_type_name Raw_Response)]
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type Array
sort self (order = Sort_Direction.Ascending) on=Nothing by=Nothing on_incomparable=Problem_Behavior.Ignore =
Array_Like_Helpers.sort self order on by on_incomparable

## ALIAS first, last, sample, slice, top, head, tail, foot, limit
## ALIAS first, last, sample, slice, top, head, tail, limit
GROUP Selections
ICON select_row
Creates a new `Vector` with only the specified range of elements from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ Text.repeat : Integer -> Text
Text.repeat self count=1 =
0.up_to count . fold "" acc-> _-> acc + self

## ALIAS first, last, left, mid, right, slice, substring, top, head, tail, foot, limit
## ALIAS first, last, left, mid, right, slice, substring, top, head, tail, limit
GROUP Selections
Creates a new Text by selecting the specified range of the input.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ type Vector a
slice : Integer -> Integer -> Vector Any
slice self start end = Array_Like_Helpers.slice self start end

## ALIAS first, last, sample, slice, top, head, tail, foot, limit
## ALIAS first, last, sample, slice, top, head, tail, limit
GROUP Selections
ICON select_row
Creates a new `Vector` with only the specified range of elements from the
Expand Down
Loading

0 comments on commit 6665c22

Please sign in to comment.