Ben Biddington

Whatever it is, it's not about "coding"

Posts Tagged ‘actor

Scala — Futures

leave a comment »

A future is a placeholder for the return value of an asynchronous operation, it’s left to clients to decide when to block and wait for reply value.

It is an alternative to blocking on receive.

For example, the double-bang on Actor causes operation to return a future:

   /**
   * Sends msg to this actor and immediately
   * returns a future representing the reply value.
   */
  def !!(msg: Any): Future[Any] = {
    val ftch = new Channel[Any](Actor.self)
    send(msg, ftch)
    new Future[Any](ftch) {
      def apply() =
        if (isSet) value.get
        else ch.receive {
          case any => value = Some(any); any
        }
      def isSet = value match {
        case None => ch.receiveWithin(0) {
          case TIMEOUT => false
          case any => value = Some(any); true
        }
        case Some(_) => true
      }
    }
  }

Which can then be used to obtain the reply.

The Future class takes an InputChannel as its ctor argument. This channel is monitored to determine the future’s completion status.

In this instance, the future by !! is configured with the reply channel as supplied to send. In short, the actor has sent itself a message and specified that the future’s channel should be notified when complete. The future then just monitors that channel for the reply.

Note: Actor.send invokes the act method using a Reaction, which spawns threads and runs actors.

Only the actor creating an instance of a Channel may receive from it. This means that the future here must be running on the same thread as the actor that created it.

The send call is instructing the reply to be returned to the channel being monitored by the future.

Why does future block until actor returns value?

This is because it blocks on the channel, waiting for reply:

...
def apply() =
    if (isSet) value.get
    else ch.receive {
        case any => value = Some(any); any
    }
...

and Channel.receive is a ultimately a blocking operation, since it invokes receive on the actor it belongs to:

...
def receive[R](f: PartialFunction[Msg, R]): R = {
    val C = this.asInstanceOf[Channel[Any]]
    recv.receive {
        case C ! msg if (f.isDefinedAt(msg.asInstanceOf[Msg])) => f(msg.asInstanceOf[Msg])
    }
}
...

Note that recv here is the Actor supplied in Channel ctor.

Consider this example:

val aFuture = future[String] {
    currentThreadId
};

Internally, a new actor is created and has its double bang invoked:

def future[T](body: => T): Future[T] = {
    case object Eval
    val a = Actor.actor {
        Actor.react {
            case Eval => Actor.reply(body)
        }
    }
    a !! (Eval, { case any => any.asInstanceOf[T] })
}

And by examining the apply method above, we know this blocks until a message is received from channel.

Advertisements

Written by benbiddington

24 April, 2010 at 13:37

Posted in development

Tagged with , , ,