Skip to content

Commit

Permalink
LIU-444: Add new Bash shell parsing approach.
Browse files Browse the repository at this point in the history
  • Loading branch information
myxie committed Feb 3, 2025
1 parent ec42719 commit bcc408a
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 102 deletions.
33 changes: 11 additions & 22 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
logger.debug("Parameters found: %s", json.dumps(self.parameters))
logger.debug("Bash Inputs: %s; Bash Outputs: %s", inputs, outputs)
# we only support passing a path for bash apps
# no longer true
fsInputs = {uid: i for uid, i in inputs.items() if droputils.has_path(i)}
fsOutputs = {uid: o for uid, o in outputs.items() if droputils.has_path(o)}
dataURLInputs = {
Expand All @@ -220,6 +221,10 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
outport_names = (
self.parameters["outputs"] if "outputs" in self.parameters else []
)

cmd = self.command.strip()
cmd = droputils.replace_placeholders(cmd, fsInputs, fsOutputs)

reader = get_port_reader_function(self.input_parser)
keyargs, pargs = replace_named_ports(
inputs.items(),
Expand All @@ -231,27 +236,16 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
separator=self._paramValueSeparator,
parser=reader,
)
argumentString = (
f"{' '.join(map(str,pargs + keyargs))}" # add kwargs to end of pargs
)
# complete command including all additional parameters and optional redirects
if len(argumentString.strip()) > 0:
# the _cmdLineArgs would very likely make the command line invalid
cmd = f"{self.command} {argumentString} "
else:
cmd = f"{self.command} {argumentString} {self._cmdLineArgs} "
if self._outputRedirect:
cmd = f"{cmd} > {self._outputRedirect}"
if self._inputRedirect:
cmd = f"cat {self._inputRedirect} > {cmd}"
cmd = cmd.strip()

for key, value in keyargs.items():
cmd = cmd.replace(f"%{key}%", str(value))
for key, value in pargs.items():
cmd = cmd.replace(f"%{key}%", str(value))


app_uid = self.uid

# Replace inputs/outputs in command line with paths or data URLs
cmd = droputils.replace_path_placeholders(cmd, fsInputs, fsOutputs)

cmd = droputils.replace_dataurl_placeholders(cmd, dataURLInputs, dataURLOutputs)

# Pass down daliuge-specific information to the subprocesses as environment variables
env = os.environ.copy()
Expand Down Expand Up @@ -372,11 +366,6 @@ def execute(self, data):
# @param category BashShellApp
# @param tag template
# @param command /String/ComponentParameter/NoPort/ReadWrite//False/False/The command to be executed
# @param input_redirection /String/ComponentParameter/NoPort/ReadWrite//False/False/The command line argument that specifies the input into this application
# @param output_redirection /String/ComponentParameter/NoPort/ReadWrite//False/False/The command line argument that specifies the output from this application
# @param command_line_arguments /String/ComponentParameter/NoPort/ReadWrite//False/False/Additional command line arguments to be added to the command line to be executed
# @param paramValueSeparator " "/String/ComponentParameter/NoPort/ReadWrite//False/False/Separator character(s) between parameters on the command line
# @param argumentPrefix "--"/String/ComponentParameter/NoPort/ReadWrite//False/False/Prefix to each keyed argument on the command line
# @param dropclass dlg.apps.bash_shell_app.BashShellApp/String/ComponentParameter/NoPort/ReadWrite//False/False/Drop class
# @param base_name bash_shell_app/String/ComponentParameter/NoPort/ReadOnly//False/False/Base name of application class
# @param execution_time 5/Float/ConstraintParameter/NoPort/ReadOnly//False/False/Estimated execution time
Expand Down
10 changes: 2 additions & 8 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,12 +488,9 @@ def run(self):
if isinstance(addEnv, dict): # if it is a dict populate directly
# but replace placeholders first
for key in addEnv:
value = droputils.replace_path_placeholders(
value = droputils.replace_placeholders(
addEnv[key], dockerInputs, dockerOutputs
)
value = droputils.replace_dataurl_placeholders(
value, dataURLInputs, dataURLOutputs
)
addEnv[key] = value
env.update(addEnv)
elif isinstance(
Expand Down Expand Up @@ -530,12 +527,9 @@ def run(self):
# complete command including all additional parameters and optional redirects
cmd = f"{self._command} {argumentString} {self._cmdLineArgs} "
if cmd:
cmd = droputils.replace_path_placeholders(
cmd = droputils.replace_placeholders(
cmd, dockerInputs, dockerOutputs
)
cmd = droputils.replace_dataurl_placeholders(
cmd, dataURLInputs, dataURLOutputs
)
# if "output_redirection" in self._applicationArgs:
# logger.debug(">>>> outport_names: %s", outport_names)
# out_name = outport_names["output_redirection"]
Expand Down
4 changes: 1 addition & 3 deletions daliuge-engine/dlg/apps/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,7 @@ def run(self):
app_uid = self.uid

# Replace inputs/outputs in command line with paths or data URLs
cmd = droputils.replace_path_placeholders(cmd, fsInputs, fsOutputs)

cmd = droputils.replace_dataurl_placeholders(cmd, dataURLInputs, dataURLOutputs)
cmd = droputils.replace_placeholders(cmd, fsInputs, fsOutputs)

# Pass down daliuge-specific information to the subprocesses as environment variables
env = os.environ.copy()
Expand Down
30 changes: 12 additions & 18 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def has_path(x):
return False


def replace_path_placeholders(cmd, inputs, outputs):
def replace_placeholders(cmd, inputs, outputs):
"""
Replaces any placeholder found in ``cmd`` with the path of the respective
input or output Drop from ``inputs`` or ``outputs``.
Expand All @@ -397,24 +397,18 @@ def replace_path_placeholders(cmd, inputs, outputs):
inputs.keys(),
outputs.keys(),
)
replacements = {**inputs, **outputs}
for attr, value in replacements.items():
try:
cmd = cmd.replace(f"%{attr}%", value.path)
except AttributeError:
logger.debug("Input %s does not have 'path' attr", attr)

try:
cmd = cmd.replace(f"%{attr}%", value.dataUrl)
except AttributeError:
logger.debug("Input %s does not have 'dataUrl' attr", attr)

for x, i in enumerate(inputs.values()):
pathRef = "%%i%d" % (x,)
if pathRef in cmd:
cmd = cmd.replace(pathRef, i.path)
for x, o in enumerate(outputs.values()):
pathRef = "%%o%d" % (x)
if pathRef in cmd:
cmd = cmd.replace(pathRef, o.path)

for uid, i in inputs.items():
pathRef = "%%i[%s]" % (uid,)
if pathRef in cmd:
cmd = cmd.replace(pathRef, i.path)
for uid, o in outputs.items():
pathRef = "%%o[%s]" % (uid,)
if pathRef in cmd:
cmd = cmd.replace(pathRef, o.path)

logger.debug("Command after path placeholder replacement is: %s", cmd)

Expand Down
47 changes: 24 additions & 23 deletions daliuge-engine/dlg/named_port_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ class DropParser(Enum):


def serialize_kwargs(keyargs, prefix="--", separator=" "):
kwargs = []
kwargs = {}
for name, value in iter(keyargs.items()):
if prefix == "--" and len(name) == 1:
kwargs += [f"-{name} {value}"]
else:
kwargs += [f"{prefix.strip()}{name.strip()}{separator}{str(value).strip()}"]
logger.debug("kwargs after serialization: %s", kwargs)
kwargs[name] = value
# if prefix == "--" and len(name) == 1:
# kwargs += [f"-{name} {value}"]
# else:
# kwargs += [f"{prefix.strip()}{name.strip()}{separator}{str(value).strip()}"]
# logger.debug("kwargs after serialization: %s", kwargs)
return kwargs


Expand Down Expand Up @@ -80,9 +81,8 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "):
pargs.append(str(value).strip())
else:
kwargs.update({name: value})
skwargs = serialize_kwargs(kwargs, prefix=prefix, separator=separator)
logger.info("Constructed command line arguments: %s %s", pargs, kwargs)
return (pargs, skwargs)
return (pargs, kwargs)


def identify_named_ports(
Expand Down Expand Up @@ -304,27 +304,28 @@ def replace_named_ports(
keywordArgs = {arg: subdict['value'] for arg, subdict in keywordArgs.items()}
keywordPortArgs = {arg: subdict['value'] for arg, subdict in keywordPortArgs.items()}

# Construct the final keywordArguments and positionalPortArguments
# # Construct the final keywordArguments and positionalPortArguments
for k, v in keywordPortArgs.items():
if v not in [None, ""]:
keywordArgs.update({k: v})
for k, v in positionalPortArgs.items():
logger.debug("port posarg %s has value %s", k, v)
if k == "input_redirection":
v = f"cat {v} > "
if k == "output_redirection":
v = f"> {v}"
# logger.debug("port posarg %s has value %s", k, v)
# if k == "input_redirection":
# v = f"cat {v} > "
# if k == "output_redirection":
# v = f"> {v}"
if v not in [None, ""]:
positionalArgs.update({k: v})

keywordArgs = (
serialize_kwargs(keywordArgs, prefix=argumentPrefix, separator=separator)
if len(keywordArgs) > 0
else [""]
)
pargs = list(positionalArgs.values())
if not pargs or None in pargs:
pargs = [""]
# keywordArgs = (
# serialize_kwargs(keywordArgs, prefix=argumentPrefix, separator=separator)
# if len(keywordArgs) > 0
# else [""]
# )
keywordArgs = serialize_kwargs(keywordArgs)
pargs = positionalArgs
# if not pargs or None in pargs:
# pargs = [""]

logger.debug("After port replacement: pargs: %s; keyargs: %s", pargs, keywordArgs)
return keywordArgs, pargs
Expand Down Expand Up @@ -409,7 +410,7 @@ def optionalEval(x):
elif input_parser is DropParser.PATH:
reader = lambda x: x.path
elif input_parser is DropParser.DATAURL:
reader = lambda x: x.dataurl
reader = lambda x: x.dataURL
elif input_parser is DropParser.DILL:
reader = drop_loaders.load_dill
elif input_parser is DropParser.BINARY:
Expand Down
24 changes: 24 additions & 0 deletions daliuge-engine/test/TestData.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# DALiuGE Engine: Test Data

Test data for daliuge-engine/test are stored in the EAGLE_test_data repository (installed
as `daliuge_tests`).


## How to access
```python
from importlib.resources import files
import daliuge_tests.engine.graphs as test_graphs


files(test_graphs)
```
The engine is concerned with pre-translated graphs, so the "test_graph.graph" file will
typically be a Phyiscal Graph Template, not a Logical Graph.

## How to add

Store in the following path:

```bash
EAGLE_test_repo/eagle_test_graphs/daliuge_tests/engine/graphs
```
12 changes: 6 additions & 6 deletions daliuge-engine/test/apps/test_bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def tearDown(self):

def test_echo(self):
a = FileDROP("a", "a")
b = BashShellApp("b", "b", command="cp %i0 %o0")
b = BashShellApp("b", "b", command="cp %a% %c%")
c = FileDROP("c", "c")

b.addInput(a)
Expand Down Expand Up @@ -81,9 +81,9 @@ def assert_message_is_correct(message, command):
self.assertEqual(message.encode("utf8"), droputils.allDropContents(b))

msg = "This is a message with a single quote: '"
assert_message_is_correct(msg, 'echo -n "{0}" > %o0'.format(msg))
assert_message_is_correct(msg, 'echo -n "{0}" > %b%'.format(msg))
msg = 'This is a message with a double quotes: "'
assert_message_is_correct(msg, "echo -n '{0}' > %o0".format(msg))
assert_message_is_correct(msg, "echo -n '{0}' > %b%".format(msg))

def test_envvars(self):
"""Checks that the DLG_* environment variables are available to bash programs"""
Expand All @@ -96,7 +96,7 @@ class dummy(object):
pass

def assert_envvar_is_there(varname, value):
command = "echo -n $%s > %%o0" % (varname)
command = f"echo -n ${varname} > %b%"
a = BashShellApp(
app_uid, app_uid, dlg_session_id=session_id, command=command
)
Expand Down Expand Up @@ -174,7 +174,7 @@ def test_single_pipe(self):

a = StreamingOutputBashApp("a", "a", command=r"echo -en '5\n4\n3\n2\n1'")
b = InMemoryDROP("b", "b")
c = StreamingInputBashApp("c", "c", command="cat > %o0")
c = StreamingInputBashApp("c", "c", command="cat > %d%")
d = FileDROP("d", "d", filepath=output_fname)

a.addOutput(b)
Expand Down Expand Up @@ -225,7 +225,7 @@ def test_two_simultaneous_pipes(self):
b = InMemoryDROP("b", "b")
c = StreamingInputOutputBashApp("c", "c", command="cat")
d = InMemoryDROP("d", "d")
e = StreamingInputBashApp("e", "e", command="sort -n > %o0")
e = StreamingInputBashApp("e", "e", command="sort -n > %f%")
f = FileDROP("f", "f", filepath=output_fname)

a.addOutput(b)
Expand Down
48 changes: 26 additions & 22 deletions daliuge-engine/test/graphs/test_graphExecution.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from importlib.resources import files

from dlg.data.drops.file import FileDROP
from dlg.data.drops.memory import InMemoryDROP
from dlg import droputils
from dlg.manager.composite_manager import DataIslandManager
Expand Down Expand Up @@ -69,28 +70,31 @@ def test_ddGraph(self):
to test that the separatorString parameter is working correctly.
"""
sessionId = "lalo"
ddGraph = "ddTest.graph"
with (files(test_graphs) /ddGraph).open() as f: # @UndefinedVariable
logger.debug(f"Loading graph: {f}")
graphSpec = json.load(f)
self.createSessionAndAddGraph(sessionId, graphSpec=graphSpec)

# Deploy now and get OIDs
bs = graphSpec[0]["applicationArgs"]["bs"]["value"]
count = graphSpec[0]["applicationArgs"]["count"]["value"]
self.dim.deploySession(sessionId)
a, c = [
self.dm._sessions[sessionId].drops[x]
for x in ("2022-02-11T08:05:47_-5_0", "2022-02-11T08:05:47_-3_0")
]

data = os.urandom(bs * count)
logger.debug(f"Length of data produced: {len(data)}")
with droputils.DROPWaiterCtx(self, c, 3):
a.write(data)
a.setCompleted()

self.assertEqual(data, droputils.allDropContents(c))
bs = 10
count = 2048
ddGraphs = ["ddExamplePG.graph","ddExample_mixedPortsPG.graph"]
for ddGraph in ddGraphs:
with (files(test_graphs) /ddGraph).open() as f: # @UndefinedVariable
logger.debug(f"Loading graph: {f}")
graphSpec = json.load(f)
self.createSessionAndAddGraph(ddGraph, graphSpec=graphSpec)
self.dim.deploySession(ddGraph)
a, c = [
drop for drop in self.dm._sessions[ddGraph].drops.values()
if isinstance(drop, FileDROP)
]
x = [ drop for drop in self.dm._sessions[ddGraph].drops.values()
if isinstance(drop, InMemoryDROP)
]

data = os.urandom(bs * count)
logger.debug(f"Length of data produced: {len(data)}")
with droputils.DROPWaiterCtx(self, c, 300):
a.setCompleted()
for d in x:
d.setCompleted()

self.assertEqual(len(data), len(droputils.allDropContents(c)))

def test_namedPorts_funcs(self):
"""
Expand Down
24 changes: 24 additions & 0 deletions daliuge-translator/test/TestData.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# DALiuGE Engine: Test Data

Test data for daliuge-engine/test are stored in the EAGLE_test_data repository (installed
as `daliuge_tests`).


## How to access
```python
from importlib.resources import files
import daliuge_tests.dropmake as test_graphs

files(test_graphs)
```


## How to add

Store in the following path:

```bash
EAGLE_test_repo/eagle_test_graphs/daliuge_tests/dropmake
```


0 comments on commit bcc408a

Please sign in to comment.