Using wye and tee with scalaz-stream

An updated version of this is available in a collection of user notes for scalaz-stream found at gitbook: scalaz-stream user notes at gitbook.

I needed to use scalaz-streams (sstreams) for some processing but was having problems understanding wye and tee. Here's a bunch of examples that illustrate what the functions do. You can read the specs in the tests, which are quite good. I needed a place to stash notes based on my work. Hence this article. Hopefully, it helps others learn faster.
In the below examples, you need to remember that the Process is a moment-in-time capture of the state of the stream. As such, when you see a Wye or Tee object, it will be used once, but then the next process state will use another object instance. You sometimes starting wondering, at least I did, where does the object that remembers what you are doing live? Since each state carries with it enough information to generate the next state, give a suitable driver, you do not need a mutable, long lived object sticking around that runs only that process step. It has taken me awhile to get used to this abstraction. It feels alittle like the difference between the Euler vs. Lagrangian reference system in fluid dynamics.

Wye

You need to remember that a wye joins the output of two processes non-deterministically. What does this mean? Non-deterministically means that the outputs do not just come from the right side or just the left side or have a pre-scribed rule about which side, e.g. bounces between left and right and back again, to emit values.
I had trouble with the syntax at first because in the scalaz.stream scope, there is an object named wye that holds a variety of ways to create wyes. In addition, WyeOps allows processes to be turned into Wyes. But underneath the hood, WyeOps just calls the methods in the wye object. Wye is just a type that describes a Process. Like any other Process, you have to use run.run or runLog.log to run the stream interpreter and obtain results.
There are a few wye functions that are foundational to creating wye objects. Many of the functions in the wye object take another wye object so how do create the first wye to begin with? Since the wye "object" is really just an object with "static" methods for creating wyes, we use a few methods to create a wye object and use that new wye in the other methods in the wye "object." Think of it like this. The two methods create raw wyes that you use with the combinators in the wye "object" and other Processes. In the end, you really always need some type of left and right process to feed a wye because a wye combines a "left" and "right" process.
  • wye.merge[I] -> creates a wye that merges the two inputs
  • wye.either[I] -> same as wye.merge
There are a few others as well. These functions create a Wye object that implement the "merging" strategy--in this case, wye.either merges the two streams, non-deterministically, while there are values from either the left or the right. If the left runs out of values, and hence it Halts, but the right side has more values, the wye will keep outputting values from the right side. You use combinators in the wye "object" to alter this strategy. For example, you can Halt the wye if either the left or the right Halts, or just the left Halts, or just the right Halts.
In the next set of examples, wye.apply() is used to bring the two processes together into the Wye and the last argument is the merging strategy function. Also, your results may vary from mine because of non-deterministic behavior--the whole point of the wye to start with.
Merge two equal size lists
scala> wye(Process.range(1,6), Process.range(6,11))(wye.either).runLog.run
res3: IndexedSeq[scalaz.\/[Int,Int]] = Vector(-\/(1), \/-(6), \/-(7), -\/(2), -\/(3), \/-(8), \/-(9), -\/(4), -\/(5), \/-(10))
We did not need the type on wye.either in this case but we might need to sometimes e.g.wye.either[Int].
Merge two unequal sized lists
scala> wye(Process.range(1,11), Process.range(11,12))(wye.either).runLog.run
res8: IndexedSeq[scalaz.\/[Int,Int]] = Vector(-\/(1), -\/(2), -\/(3), -\/(4), -\/(5), -\/(6), -\/(7), -\/(8), -\/(9), -\/(10), \/-(11))
The left list is longer, but since either keeps going until both sides Halt, all the values are still obtained.
You can also use the operator notation like
scala> (Process.range(1,11) either Process.range(11,12)).runLog.run
res26: IndexedSeq[scalaz.\/[Int,Int]] = Vector(-\/(1), \/-(11), -\/(2), -\/(3), -\/(4), -\/(5), -\/(6), -\/(7), -\/(8), -\/(9), -\/(10))
Don't forget that because wye is non-deterministic, your results may vary from mine, but the semantics should be the same.
Halt when either left or right Halts.
If we use two unequally sized lists, we can force the wye to halt if either list Halts even if the other side still has data. We use almost the exact same expression as the one above except we use mergeHaltBoth.
scala> wye(Process.range(1,11), Process.range(11,12))(wye.mergeHaltBoth).runLog.run
res9: IndexedSeq[Int] = Vector(1, 11)
We only see two outputs because in this case the right side was only going to issue one value and mergeHaltBoth halted after obtaining the one value. Internally, the Wye received a Halt state from the right side and Halt the wye itself.
Terminate when the left side terminates
This one terminates only when the left side terminates even though the right side still had values to emit.
scala> wye(Process.range(1,3), Process.range(3,100))(wye.mergeHaltL).runLog.run
res0: IndexedSeq[Int] = Vector(1, 3, 4, 2)
The mergeHaltR does the same thing but for the right hand side.
Can use WyeOps on Process
When you use yourProcess.wye(...) on a process, yourProcess becomes the left hand side.
scala> Process.range(1,3).wye(Process.range(3,100))(wye.mergeHaltL).runLog.run
res1: IndexedSeq[Int] = Vector(1, 2)

scala> wye(Process.range(1,3),Process.range(3,100))(wye.mergeHaltL).runLog.run
res4: IndexedSeq[Int] = Vector(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 13, 14)
Here we ran it both with the .wye syntax and the wye "object" syntax. Notice that that we obtained different results but the behavior was the same i.e. mergeHaltL. The different results were obtained because the merging is non-deterministic.

Tee

Tee is pretty performs the same operation as wye, combining two inputs, except its deterministic. Either you tell it to alternate between left and right or use the right first or some other variation. But unlike what you saw above for Wye, you should always get the same "combining" results when you run it.
Here are a few Tees that create the merging logic:
  • tee.interleave - interleave between left and right until one of the streams Halts
  • tee.passL - only pass inputs from the left
  • tee.passR - only pass inputs from the right
  • tee.until - echoes the right side until the left side becomes true, then halts
  • tee.when - echoes the right side when the left side is true, but when the left side becomes true halts
Like wye, there are some zips as well.
At first, it seems like there is no equivalent to wye.apply() which is correct. For Tee, you can feed a sequence into a Tee along with a Process and another "merging logic" tee to create the Tee or you use the infix notation like we did in the last examples of Wye. We'll show both below. The feed approach is applicable to the wyes although we did not show an example in that section.
Alternate left then right until both are exhausted, even if uneven lists
scala> Process.range(1,3).toSource.tee(Process.range(3,10))(tee.interleave[Int]).runLog.run
res1: IndexedSeq[Int] = Vector(1, 3, 2, 4)
We need the toSource in order to generate the Task F[_] environment needed to run the process. We could have assigned parts of the expression to a typed val and the Task when also have been added automatically. Notice that after one of the streams Halts, the tee halts.
You can use the other flavor of interleave which operates directly off the left process
scala> (Process.range(1,10).toSource interleave Process.range(10,20)).runLog.run
res23: IndexedSeq[Int] = Vector(1, 10, 2, 11, 3, 12, 4, 13, 5, 14, 6, 15, 7, 16, 8, 17, 9, 18)
We need to toSource one one of the processes to make sure that the environment, Task, is used. Type inference figures out the F[_] for the rest of the expressions.
Use the left side as a boolean signal to control the right side
scala> Process.emitAll(Seq(false, false, false, true)).tee(Process.range(1,10).toSource)(tee.until).runLog.run
res39: IndexedSeq[Int] = Vector(1, 2, 3)
until is very similar
cala> Process.emitAll(Seq(true, true, true, false)).tee(Process.range(1,10).toSource)(tee.when).runLog.run
res40: IndexedSeq[Int] = Vector(1, 2, 3)
Passing values
passL/R ignore one side and Halts when the respective side Halts.
Here the large set of inputs on the left, 100 000 worth integers, are ignored and only the right side is processed.
scala> Process.range(1,100000).toSource.tee(Process.range(1,11))(tee.passR).runLog.run
res43: IndexedSeq[Int] = Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
passL is the same but with the left side passed.
Feeding into a tee (or a wye directly)
The feedL and feedR let you feed data you already have, such as a sequence of values, directly into a tee or a wye. This is useful if you already have some data not originating from a Process that needs to be combined into the stream. Of course, you could just create a process that emits your sequence of values and layer a tee on top of another process or tee.
Let's first look at a curious example. We will first do a Wye then a Tee.
scala> val w = wye.feedL(List.fill(10)(1))(process1.id)
w: scalaz.stream.Wye[Int,Any,Int] = Append(Emit(Vector(1, 1, 1, 1, 1, 1, 1, 1, 1)),Vector(<function1>))

scala> halt.wye(halt)(w).runLog.run
res13: IndexedSeq[Int] = Vector(1, 1, 1, 1, 1, 1, 1, 1, 1, 1)
The second expression is very strange and its an artifact of needing to run the process in scala repl. By itself, you cannot run a wye or a tee. Like the other expressions above, you need to provide it with an environment and another process. If you look at feedL above, you see that you provided a sequence and a process. But wye/tee need two processes as inputs. process1.id is promoted to a wye and it echoes whatever comes enters it, left or right. A process1 takes 1 input and creates 1 output so even if it is promoted to a wye, we still needs another input.
The second expression starts with halt.wye(halt) where halt is a process that emits nothing and it creates a wye with a right that is also a halt and layers these halts (left and right) ontop of the wye w.The notation leftProcess.wye(rightProcess)(anotherWye) builds on the first wye--it takes two processes and puts them together. Since the halts are procesess (Process0[Nothing]), they complete a wye that can run. .runLog.run runs the process. First the original w runs and outputs the filled list, then the other two input processes run which immediately signal they are done--no values come from the outer function. The default merging strategy is merge which take whatever you can get, as much as you can get, when you can get it. The original list comes out first. The right and left side then finish with a halt. This halts the overall process. The run does not produce an interesting output but illustrates the point.
It is more interesting to have a non-halt input. This merges the process-independent data into the process and combines it with another output stream. Again, if you have the original process, you could just run append on the inputs. In many scenarios you do not have access to it, so with the wye/tee and feedL/R, you can feed in data into a process like below:
scala> Process.range(100,110).wye(halt)(w).runLog.run
res6: IndexedSeq[Int] = Vector(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109)
The 1s come first from the original w then the range of values from the process that is added last to the composition are output. This is deterministic so we should always expect this result.
Its the same with Tee:
scala> val t = tee.feedL(List.fill(10)(1))(process1.id)
t: scalaz.stream.Tee[Int,Any,Int] = Append(Emit(Vector(1, 1, 1, 1, 1, 1, 1, 1, 1)),Vector(<function1>))

scala> Process.range(100,110).tee(halt)(t).runLog.run
<console>:27: error: could not find implicit value for parameter C: scalaz.Catchable[F2]
              Process.range(100,110).tee(halt)(t).runLog.run
Similar to issue when you see that some implicits are missing or .runLog.run does not seem to work. There are some type inference issues. I have asked the sacalz list about this, but to ensure that the implicit is found, I provided explicit type information:
scala> Process.range(100,110).tee[Task, Int, Int](halt)(t).runLog.run
res25: IndexedSeq[Int] = Vector(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109)
which did the trick
scala> Process.range(100,110).tee[Task, Int, Int](halt)(t).runLog.run
res26: IndexedSeq[Int] = Vector(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109)
Zipping
I will not cover all the zipping possibilities but I will mention one that has been quite useful. Let's say we want to apply a function to an argument. That's already pretty straight forward
scala> Process.range(1,10).map(_ + 1).toSource.runLog.run
res9: IndexedSeq[Int] = Vector(2, 3, 4, 5, 6, 7, 8, 9, 10)
Given a process we can easily use map. Now let's assume the function is effectful and that the return value from the function needs be in an environment. And lets assume that the effectful function is represented by a process as well--a stream of functions each of which is in an environment.
We know we can zip the two streams together with a tee and apply the function to the argument. Here's a quick example. We could have used Task.now as well for this snippet.
scala> Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => x + 1}))(tee.zipApply).runLog.run
res7: IndexedSeq[Int] = Vector(2, 3, 4, 5, 6, 7, 8, 9, 10)
The argument needs to be on the right and the function on the left.
Now lets have the stream of functions return an effectful computation.
In the real world, the function returning an effect, such as a Task, is much like programming with futures where we create the future, say grabbing a remote web page, and the input is the URL itself. Back to scalaz...each effectful function is really a urlString => Task[String]. We might then choose, downstream say, to run that Task asynchronously or synchronously. If we had used a scala Future, the web page grab would have started immediately. Using Task and not calling run on it immediately delays the decision to how much "nondeterminism" to use to later in the program. This makes the application more composable.
Here's an example that evaluates _ + 1 in an asynchronous task that also sleeps for 2000 millis. The actual tasks remain unevaluated because the output object from the "function" is a task that when run individually, would complete the calculation:
scala> Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => Task{Thread.sleep(2000); x + 1}}))(tee.zipApply).runLog.run
res14: IndexedSeq[scalaz.concurrent.Task[Int]] = Vector(scalaz.concurrent.Task@4ffa3d4f, scalaz.concurrent.Task@5d41ef2c, scalaz.concurrent.Task@79057f3e, scalaz.concurrent.Task@36707e79, scalaz.concurrent.Task@28fe2dbd, scalaz.concurrent.Task@7a148f01, scalaz.concurrent.Task@28baf3f2, scalaz.concurrent.Task@1d8cc32a, scalaz.concurrent.Task@444686e3)
So we would need to run it to get the values. We can run it after the process has been run, which fine for this example. This one lines takes 2000*10 millis to complete and return the result vector. The async partonly happens when t.run is called.
scala> Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => Task{Thread.sleep(2000); x + 1}}))(tee.zipApply).runLog.run.collect{ case t => t.run}
res17: IndexedSeq[Int] = Vector(2, 3, 4, 5, 6, 7, 8, 9, 10)
But for a long running stream where we need to control the memory, we need the "running" part to be applied to the tasks a bit earlier since in most cases, more processing is needed downstream. Once the Task is run asynchronously, we want to make sure the output values are output in a process so it can be used in downstream processing. Obviously, the order is no longer preserved because each Task may run asynchronously for a different amount of time.
First we can convert each asynch, unstarted Task to a Process using Process.eval:
scala> Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => Task{Thread.sleep(rand.nextInt(5)*1000); x + 1}}))(tee.zipApply).map(Process.eval)
res27: scalaz.stream.Process[scalaz.concurrent.Task,scalaz.stream.Process[scalaz.concurrent.Task,Int]] = Append(Halt(End),Vector(<function1>))
Then we need to merge the results together. We can use merge.mergeN to do this as indicated in thisgistmerge.mergeN merges the output of multiple processes into a single output stream. We add the.runLog.run to the end to get the results. We have also added randomness to the sleep to help illustrate how the resulting output is not longer in order.
scala> val rand= new java.util.Random
rand: java.util.Random = java.util.Random@36edc749

scala> merge.mergeN(4)(Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => Task{Thread.sleep(rand.nextInt(5)*1000); x + 1}}))(tee.zipApply).map(Process.eval)).runLog.run
res29: IndexedSeq[Int] = Vector(5, 4, 2, 3, 7, 8, 9, 6, 10)
The line is a bit of mouthful. But we can see how this could be re-used. For example, we could add yet another 1 to the values simply using map:
scala> val rresult = merge.mergeN(4)(Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => Task{Thread.sleep(rand.nextInt(5)*1000); x + 1}}))(tee.zipApply).map(Process.eval))
rresult: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End),Vector(<function1>))

scala> rresult.map(_ + 1).runLog.run
res30: IndexedSeq[Int] = Vector(4, 5, 6, 7, 10, 9, 3, 8, 11)
But we know see that this concept of a function returning an effectful computation given an argument is really just a Channel[F[_], I, O] that is used all over in the io module. The basic idea behind the use of Channel for io is that you have to make a request to some object using an argument, and you get something that you must evaluate to get the returned value. The environment is bubblewrap around the object returned from the external environment. To peak inside, you .run to remove the bubblewrap.
scala> val asyncAdderChannel = io.channel { x:Int => Task { Thread.sleep(rand.nextInt(5)*1000); x+1}}
asyncAdderChannel: scalaz.stream.Channel[scalaz.concurrent.Task,Int,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))
which can be run exactly like before:
scala> val rresult = merge.mergeN(4)(Process.range(1,10).tee(asyncAdderChannel)(tee.zipApply).map(Process.eval)).runLog.run
rresult: IndexedSeq[Int] = Vector(2, 3, 4, 5, 7, 8, 9, 6, 10)
You read this as a process that models a stream of arguments (the range) and evaluating (the apply) them using a stream of effectfull functions then outputting the values, possibly out of order. You can also view this as a super-complicated way to add 1 to some arguments in a separate thread pool then spew them out as a vector :-) Or you can view this as merging a function stream and the argument stream. Since the function produces a task that adds numbers together, you run it to add a stream of inputs asynchronously.
If you use the approach outlined here, you can do this even more simply. Use this combinator (modified from the here link):
implicit class ConcurrentProcess[O](val process: Process[Task, O]) {
    /**
     * Run process through channel with given level of concurrency
     */
    def concurrently[O2](concurrencyLevel: Int)
                        (f: Channel[Task, O, O2]): Process[Task, O2] = {
      merge.mergeN(concurrencyLevel)(process.tee(f)(tee.zipApply).map(Process.eval))
    }
  }
to be able to run:
cala> val result = Process.range(1,10).concurrently(4)(asyncAdderChannel).runLog.run
result: IndexedSeq[Int] = Vector(3, 2, 5, 8, 7, 4, 6, 10, 9)
You might ask your self why can't I just do
scala> Process.range(1,10).tee(asyncAdderChannel)(tee.zipApply).gather(2).runLog.run
res75: IndexedSeq[Int] = Vector(3, 2, 4, 5, 6, 7, 9, 8, 10)
Instead of .eval we run .gather(2) which runs eval behind the scenes. But first it chunks the input process. So first, 2 inputs elements, Tasks in this case, are chunked up, then eval is called on the tasks and gather waits until those 2 tasks complete. The results are gathered, out of order, as they complete and only when the tasks in that chunk all complete, gather releases the chunked output. Then the next chunk runs. In contrast, merge will run another Task as soon as one completes, up to the total number of concurrency you have specified, within the chunk giving you better pipeline parallelism.

Comments

Popular posts from this blog

quick note on scala.js, react hooks, monix, auth

zio environment and modules pattern: zio, scala.js, react, query management

user experience, scala.js, cats-effect, IO