Ben Biddington

Whatever it is, it's not about "coding"

Scala — Futures

leave a comment »

A future is a placeholder for the return value of an asynchronous operation, it’s left to clients to decide when to block and wait for reply value.

It is an alternative to blocking on receive.

For example, the double-bang on Actor causes operation to return a future:

   /**
   * Sends msg to this actor and immediately
   * returns a future representing the reply value.
   */
  def !!(msg: Any): Future[Any] = {
    val ftch = new Channel[Any](Actor.self)
    send(msg, ftch)
    new Future[Any](ftch) {
      def apply() =
        if (isSet) value.get
        else ch.receive {
          case any => value = Some(any); any
        }
      def isSet = value match {
        case None => ch.receiveWithin(0) {
          case TIMEOUT => false
          case any => value = Some(any); true
        }
        case Some(_) => true
      }
    }
  }

Which can then be used to obtain the reply.

The Future class takes an InputChannel as its ctor argument. This channel is monitored to determine the future’s completion status.

In this instance, the future by !! is configured with the reply channel as supplied to send. In short, the actor has sent itself a message and specified that the future’s channel should be notified when complete. The future then just monitors that channel for the reply.

Note: Actor.send invokes the act method using a Reaction, which spawns threads and runs actors.

Only the actor creating an instance of a Channel may receive from it. This means that the future here must be running on the same thread as the actor that created it.

The send call is instructing the reply to be returned to the channel being monitored by the future.

Why does future block until actor returns value?

This is because it blocks on the channel, waiting for reply:

...
def apply() =
    if (isSet) value.get
    else ch.receive {
        case any => value = Some(any); any
    }
...

and Channel.receive is a ultimately a blocking operation, since it invokes receive on the actor it belongs to:

...
def receive[R](f: PartialFunction[Msg, R]): R = {
    val C = this.asInstanceOf[Channel[Any]]
    recv.receive {
        case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => f(msg.asInstanceOf[Msg])
    }
}
...

Note that recv here is the Actor supplied in Channel ctor.

Consider this example:

val aFuture = future[String] {
    currentThreadId
};

Internally, a new actor is created and has its double bang invoked:

def future[T](body: => T): Future[T] = {
    case object Eval
    val a = Actor.actor {
        Actor.react {
            case Eval => Actor.reply(body)
        }
    }
    a !! (Eval, { case any => any.asInstanceOf[T] })
}

And by examining the apply method above, we know this blocks until a message is received from channel.

Advertisements

Written by benbiddington

24 April, 2010 at 13:37

Posted in development

Tagged with , , ,

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: