Ben Biddington

Whatever it is, it's not about "coding"

Posts Tagged ‘async

Async operations and exceptions

leave a comment »

We have had the case where we’re creating a class that allows clients to block while internally it reads an entire stream asynchronously. This class encapsulates the state required to perform such a task.

While attempting to write unit tests for exceptions, we found that an exception thrown during the asynchronous operation would not be thrown to client. Debugging showed that the exception was being thrown, but no notification was being sent to the parent thread.

No such thing as unhandled exceptions on managed threads

[MSDN] [since .NET Framework v2.0] There is no such thing as an unhandled exception on a thread pool [or finalizer] thread. When a task throws an exception that it does not handle, the runtime prints the exception stack trace to the console and then returns the thread to the thread pool.

Errors raised on a child thread are essentially lost when the thread exits. This means there is some work required to propagate these exceptions.

This requires a blocking wait on the part of the client, and a mechanism for storing the exception so the parent thread can read it.

As an example, we have implemented an AsyncStreamReader which contains a blocking ReadAll method. If an asynchronous read fails with an exception, that exception is exposed internally as a field, and the waiting thread is then signalled. Once the waiting thread wakes up it checks the exception field and throws it if required.

We have blocking Read operation that waits for an async read to complete. The notification mechanism is a ManualResetEvent (WaitHandle).

  1. T1: Invoke ReadAll.
  2. T1: Start async operation (spawns T2).
  3. T1: Wait.
    1. T2: Async operation encounters exception.
    2. T2: Store exception in _error field.
    3. T2: Signals T1.
    4. T2: Returns without triggering any subsequent reads.
    5. T2: Thread exits
  4. T1: Parent thread resumes (still inside ReadAll).
  5. T1: Checks _error field. If it is not null, throw it, otherwise return.
  6. T1: Exception is now propagated

References

Written by benbiddington

27 April, 2010 at 13:37

Scala — Futures

leave a comment »

A future is a placeholder for the return value of an asynchronous operation, it’s left to clients to decide when to block and wait for reply value.

It is an alternative to blocking on receive.

For example, the double-bang on Actor causes operation to return a future:

   /**
   * Sends msg to this actor and immediately
   * returns a future representing the reply value.
   */
  def !!(msg: Any): Future[Any] = {
    val ftch = new Channel[Any](Actor.self)
    send(msg, ftch)
    new Future[Any](ftch) {
      def apply() =
        if (isSet) value.get
        else ch.receive {
          case any => value = Some(any); any
        }
      def isSet = value match {
        case None => ch.receiveWithin(0) {
          case TIMEOUT => false
          case any => value = Some(any); true
        }
        case Some(_) => true
      }
    }
  }

Which can then be used to obtain the reply.

The Future class takes an InputChannel as its ctor argument. This channel is monitored to determine the future’s completion status.

In this instance, the future by !! is configured with the reply channel as supplied to send. In short, the actor has sent itself a message and specified that the future’s channel should be notified when complete. The future then just monitors that channel for the reply.

Note: Actor.send invokes the act method using a Reaction, which spawns threads and runs actors.

Only the actor creating an instance of a Channel may receive from it. This means that the future here must be running on the same thread as the actor that created it.

The send call is instructing the reply to be returned to the channel being monitored by the future.

Why does future block until actor returns value?

This is because it blocks on the channel, waiting for reply:

...
def apply() =
    if (isSet) value.get
    else ch.receive {
        case any => value = Some(any); any
    }
...

and Channel.receive is a ultimately a blocking operation, since it invokes receive on the actor it belongs to:

...
def receive[R](f: PartialFunction[Msg, R]): R = {
    val C = this.asInstanceOf[Channel[Any]]
    recv.receive {
        case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => f(msg.asInstanceOf[Msg])
    }
}
...

Note that recv here is the Actor supplied in Channel ctor.

Consider this example:

val aFuture = future[String] {
    currentThreadId
};

Internally, a new actor is created and has its double bang invoked:

def future[T](body: => T): Future[T] = {
    case object Eval
    val a = Actor.actor {
        Actor.react {
            case Eval => Actor.reply(body)
        }
    }
    a !! (Eval, { case any => any.asInstanceOf[T] })
}

And by examining the apply method above, we know this blocks until a message is received from channel.

Written by benbiddington

24 April, 2010 at 13:37

Posted in development

Tagged with , , ,

.NET Process — avoid deadlock with async reads

leave a comment »

If you are working with a child process that writes large amounts of data to its redirected stdout (or stderr), it is advisable to read from it asynchronously.

Why read stdout asynchronously?

A pipe is a connection between two processes in which one process writes data to the pipe and the other reads from the pipe. System.Diagnostics.Process.StandardOutput is an example of a pipe.

A child process may block while it waits for the client end to read from its stdout (or stderr).

When redirected, a process’s stdout may reach its limit, it will then wait for its parent to read some data before it will continue. If the parent process is waiting for all the bytes to be written before it reads anything (synchronous read), then it will wait indefinitely.

The point is: redirected streams have a limited buffer, keep them clear to allow process to complete.

So you may encounter deadlock:

[Deadlock] Pipes have a fixed size (often 4096 bytes) and if a process tries to write to a pipe which is full, the write will block until a process reads some data from the pipe.

If your child process is going to write more data than its buffer can contain, you’ll need to read it asynchronously. This stops a process blocking by ensuring there is space to emit data.

Tips

Example: piping a file to lame stdin (Windows)

Use the type command:

$ type file.mp3 | lame --mp3output 64 - "path/to/output.mp3"

Type reads the source file an emits it to its stdout, we’re then piping that directly to lame. In the preceeding example, lame has been instructed to read from stdin and write directly to a file.

To pipe stdout to another process, use something like:

$ type file.mp3 | lame --mp3output 64 - - | another_process

Or redirect to a file:

$ type file.mp3 | lame --mp3output 64 - - > "path/to/output.mp3"

Get a list of running processes (Windows)

Use the query process command.

References

Written by benbiddington

8 September, 2009 at 09:56

.NET Process — working with binary output

with one comment

Lately we discovered an issue while encoding Mp3 files with LameOur client reported encoded files we garbled; playable but watery — and full of pops and clicks.

We found this was due to interpreting the binary output from Lame as text — we had mistakenly employed Process.BeginOutputReadLine and its companion event OutputDataReceived.

Process.OutputDataReceived

By observing a Process using its OutputDataReceived event, clients can make asynchronous reads on a process’s StandardOutput.

Process.StandardOutput is a TextReader: it represents a reader that can read a sequential series of characters, i.e., it interprets its underlying stream as text.

When StandardOutput is being read asynchronously, the Process class monitors it, collecting characters into a string. Once it encounters a line ending, it notifies observers (handlers of its OutputDataReceived event), with the line of text it’s been collecting.

In short, the Process‘s underlying byte stream is converted to lines of text, and clients are notified one line at a time.

In doing so, some bytes are discarded: any bytes that (in the current encoding) represent line endings.

As a result of these missing bytes, our output Mp3s were playable, but sounded terrible.

Solution

Bypass StandardOutput. Use its underlying Stream instead.

Written by benbiddington

7 September, 2009 at 08:00