Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

flow-to-all and short-circuit flow conditions can't be used at the same time #574

Closed
lenaschoenburg opened this issue Apr 21, 2016 · 7 comments
Assignees
Labels

Comments

@lenaschoenburg
Copy link
Contributor

This:

[{:flow/from      :all
  :flow/to        :all
  :flow/predicate :our.code.flow/always}
 {:flow/from              :all
  :flow/to                [:error-task]
  :flow/short-circuit?    true
  :flow/thrown-exception? true
  :flow/post-transform    :our.code/post-transform
  :flow/predicate         :our.code.flow/always}
 {:flow/from           :task-21
  :flow/to             [:error-task]
  :flow/short-circuit? true
  :flow/predicate      [:not :our.code/uninteresting?]}]

throws ExceptionInfo :flow/short-circuit entries must proceed all entries that aren't :flow/short-circuit clojure.core/ex-info (core.clj:4617)

and this:

 [{:flow/from              :all
   :flow/to                [:error-task]
   :flow/short-circuit?    true
   :flow/thrown-exception? true
   :flow/post-transform    :our.code/post-transform
   :flow/predicate         :our.code.flow/always}
  {:flow/from           :task-21
   :flow/to             [:error-task]
   :flow/short-circuit? true
   :flow/predicate      [:not :our.code/uninteresting?]}
  {:flow/from      :all
   :flow/to        :all
   :flow/predicate :our.code.flow/always}]

throws ExceptionInfo :flow/to mapped to :all value must appear first flow ordering clojure.core/ex-info (core.clj:4617)

@MichaelDrogalis
Copy link
Contributor

@dignati I think you can evade this problem by adding flow/short-circuit? true to the first entry of your first example. By definition of going from :all -> :all, that covers all segments to all tasks - hence there would be no more work to do if the predicate matches.

Does that make sense? Happy to keep kicking this one around. :all -> :all is a little brain bending to think about.

@lenaschoenburg
Copy link
Contributor Author

I don't think that works. That would allow all segments to flow to the error task when I only want segments that threw an exception or are matched by a custom predicate.

@lenaschoenburg
Copy link
Contributor Author

When trying the following flow conditions:

[{:flow/from           :all
  :flow/to             :all
  :flow/predicate      :our.code/supported?
  :flow/short-circuit? true}
 {:flow/from              :all
  :flow/to                [:error-task]
  :flow/short-circuit?    true
  :flow/thrown-exception? true
  :flow/predicate         :our.code/always
  :flow/post-transform    ::handle-processing-ex}
 {:flow/from      :all
  :flow/to        [:error-task]
  :flow/predicate [:not :our.code/supported?]}]

I can start the job but the first segments throws:

                                                java.lang.Thread.run              Thread.java:  745
                  java.util.concurrent.ThreadPoolExecutor$Worker.run  ThreadPoolExecutor.java:  617
                   java.util.concurrent.ThreadPoolExecutor.runWorker  ThreadPoolExecutor.java: 1142
                                                                 ...                               
                                   clojure.core.async/thread-call/fn                async.clj:  434
                           onyx.peer.task-lifecycle.TaskLifeCycle/fn       task_lifecycle.clj:  512
            onyx.peer.task-lifecycle/run-task-lifecycle/invokeStatic       task_lifecycle.clj:  350
            onyx.peer.task-lifecycle/build-new-segments/invokeStatic       task_lifecycle.clj:  156
                                    clojure.core/reduce/invokeStatic                 core.clj: 6545
                                         clojure.core.protocols/fn/G            protocols.clj:   13
                                           clojure.core.protocols/fn            protocols.clj:   75
                              clojure.core.protocols/fn/invokeStatic            protocols.clj:   75
                      clojure.core.protocols/seq-reduce/invokeStatic            protocols.clj:   31
                                         clojure.core.protocols/fn/G            protocols.clj:   19
                                           clojure.core.protocols/fn            protocols.clj:  124
                              clojure.core.protocols/fn/invokeStatic            protocols.clj:  136
                                                                 ...                               
                      onyx.peer.task-lifecycle/build-new-segments/fn       task_lifecycle.clj:  151
                         onyx.peer.task-lifecycle/fn/add-from-leaves       task_lifecycle.clj:  125
                      onyx.peer.task-lifecycle/fn/add-from-leaves/fn       task_lifecycle.clj:  136
                                    clojure.core/reduce/invokeStatic                 core.clj: 6545
                                         clojure.core.protocols/fn/G            protocols.clj:   13
                                           clojure.core.protocols/fn            protocols.clj:   75
                              clojure.core.protocols/fn/invokeStatic            protocols.clj:   75
                      clojure.core.protocols/seq-reduce/invokeStatic            protocols.clj:   31
                                         clojure.core.protocols/fn/G            protocols.clj:   19
                                           clojure.core.protocols/fn            protocols.clj:  124
                              clojure.core.protocols/fn/invokeStatic            protocols.clj:  136
                                                                 ...                               
                   onyx.peer.task-lifecycle/fn/add-from-leaves/fn/fn       task_lifecycle.clj:  131
onyx.lifecycles.lifecycle-invoke/invoke-flow-conditions/invokeStatic     lifecycle_invoke.clj:   95
onyx.lifecycles.lifecycle-invoke/restartable-invocation/invokeStatic     lifecycle_invoke.clj:   21
                                     clojure.core/apply/invokeStatic                 core.clj:  646
                                                                 ...                               
                              onyx.peer.task-lifecycle/add-from-leaf       task_lifecycle.clj:  113
                 onyx.peer.task-lifecycle/add-from-leaf/invokeStatic       task_lifecycle.clj:  113
             onyx.flow-conditions.fc-routing/route-data/invokeStatic           fc_routing.clj:   49
                                       clojure.core/seq/invokeStatic                 core.clj:  137
                                                                 ...                               
                                                 clojure.core/map/fn                 core.clj: 2637
                                       clojure.core/seq/invokeStatic                 core.clj:  137
                                                                 ...                               
                                              clojure.core/filter/fn                 core.clj: 2700
                                       clojure.core/seq/invokeStatic                 core.clj:  137
                                                                 ...                               
                                              clojure.core/filter/fn                 core.clj: 2708
           onyx.flow-conditions.fc-compile/only-relevant-branches/fn           fc_compile.clj:   40
                                      clojure.core/into/invokeStatic                 core.clj: 6610
                                    clojure.core/reduce/invokeStatic                 core.clj: 6545
                                         clojure.core.protocols/fn/G            protocols.clj:   13
                                           clojure.core.protocols/fn            protocols.clj:   75
                              clojure.core.protocols/fn/invokeStatic            protocols.clj:   75
                      clojure.core.protocols/seq-reduce/invokeStatic            protocols.clj:   24
                                       clojure.core/seq/invokeStatic                 core.clj:  137
                                                                 ...                               
java.lang.IllegalArgumentException: Don't know how to create ISeq from: clojure.lang.Keyword

It seems that only-relevant-branches does not handle flow to all correctly

@MichaelDrogalis
Copy link
Contributor

Okay, this definitely requires a code change. Thanks for checking that out. I'll keep you posted on this one!

@lenaschoenburg
Copy link
Contributor Author

lenaschoenburg commented Jul 6, 2016

It turns out that none of my examples made sense for my use case. All of them implied that every segment would always flow to the error-task when I only wanted ones that threw a exception or didn't match a predicate. I'm not sure if there is some other legitimate use case that would require a code change.

only-relevant-branches as is does not correctly handle flow conditions with flow-to-all. I believe it should be:

(defn only-relevant-branches [flow-conditions workflow task]
  (filter #(or (= (:flow/from %) task)
               (and (= (:flow/from %) :all)
                    (or (= (:flow/to %) :all)
                        (subset? (into #{} (:flow/to %))
                                 (into #{} (egress-tasks workflow task))))))
          flow-conditions))

but i"m not sure.

@MichaelDrogalis
Copy link
Contributor

Okay. I'll have to look at it closer, if you want to open a PR with that change I can look it over and bring it in.

@lbradstreet
Copy link
Member

Fixed in #617

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

3 participants