/* 
 * communications.ql - High level interface to ICM/IPC facilities.
 *
// ##Qu-Prolog Copyright.abst##
 *
 * $Id: communications.ql,v 1.5 2000/06/22 21:37:42 qp6 Exp $
 */

%%  
%% Some higher-level message processing predicates
%%
?- op(252, xfx, ->>).
?- op(252, xfx, +>>).
?- op(252, xfx, <<-).
?- op(252, xfx, <<=).
?- op(251, xfy, reply_to).
?- op(500,fy, message_choice).

?- op(50, xfx, ':').
?- op(1024, xfx, '::').
?- op(100, xfx, '@').

/*
 *
 * Msg ->> Address sends Msg to Address
 * Msg ->> Address reply_to RtAddress sends Msg to Address
 * and sets the reply-to address to RtAddress
 *
 * The address used in this predicate can be any address suitable as
 * a first argument to symbolic_address_to_IPC_address/2.
 *
 * mode @term ->> @compound.
 */
Msg ->> Address reply_to RtAddress :-
    !,
    symbolic_address_to_icm_handle(Address, Handle),
    symbolic_address_to_icm_handle(RtAddress, RtHandle),
    ipc_send(Msg, Handle, RtHandle, [remember_names(false)]). 
Msg ->> Address :-
    symbolic_address_to_icm_handle(Address, Handle),
    icm_thread_handle(MyHandle),
    ipc_send(Msg, Handle, MyHandle, [remember_names(false)]).

/*
 * Same as above except sent in raw mode - Msg must be an atom.
 */
Msg +>> Address reply_to RtAddress :-
    !,
    symbolic_address_to_icm_handle(Address, Handle),
    symbolic_address_to_icm_handle(RtAddress, RtHandle),
    ipc_send(Msg, Handle, RtHandle, [encode(false)]). 
Msg +>> Address :-
    symbolic_address_to_icm_handle(Address, Handle),
    icm_thread_handle(MyHandle),
    ipc_send(Msg, Handle, MyHandle, [encode(false)]).

/*
 * symbolic_address_to_icm_handle(SymbolicAddress, Handle)
 * is true when Handle is the ICM handle corresponding
 * to the supplied symbolic address.
 *
 * The possible symbolic addresses are
 * self - this thread
 * ThreadID - the named thread on this process
 * ThreadID:ProcessID - the named thread on the named process on this machine
 * ThreadID:ProcessID@Machine - the named thread on the named process on
 *                              the named machine
 * creator - the creator thread of this thread
 * Handle - an ICM handle.
 *
 * mode symbolic_address_to_icm_handle(@compound, ?compound).
 */
symbolic_address_to_icm_handle(SH, Handle) :-
    SH==self,
    !,
    icm_thread_handle(Handle).
symbolic_address_to_icm_handle(SH, ParentHandle) :-
    SH==creator,
    !,
    thread_parent(Parent),
    icm_thread_handle(SelfHandle),
    icm_handle_to_components(X1, X2, X3, X4, SelfHandle),
    icm_handle_from_components(Parent, X2, X3, X4, ParentHandle).
symbolic_address_to_icm_handle(Th, Handle) :-
    atomic(Th),
    !,
    icm_thread_handle(SelfHandle),
    icm_handle_to_components(X1, X2, X3, X4, SelfHandle),
    icm_handle_from_components(Th, X2, X3, X4, Handle).
symbolic_address_to_icm_handle(Th:Process, Handle) :-
    atomic(Th), atom(Process),
    !,
    icm_thread_handle(SelfHandle),
    icm_handle_to_components(X1, X2, X3, X4, SelfHandle),
    icm_handle_from_components(Th, Process, X3, X4, Handle).
symbolic_address_to_icm_handle(Th:Process@Machine, Handle) :-
    atomic(Th), atom(Process),atom(Machine),
    !,
    (Machine=localhost
	-> icm_thread_handle(SelfHandle),
	   icm_handle_to_components(_,_,FullMachine,_,SelfHandle)
       ;
           tcp_host_to_ip_address(Machine,MAddr),
           tcp_host_from_ip_address(FullMachine,MAddr) 
    ),
    icm_handle_from_components(Th, Process, FullMachine, Handle).
%symbolic_address_to_icm_handle(Th:Process@Machine, Handle) :-
%    atomic(Th), atom(Process),atom(Machine),
%    !,
%    icm_handle_from_components(Th, Process, Machine, Handle).
symbolic_address_to_icm_handle(Address, Address).

/*
 * icm_handle_to_symbolic_address(Handle, SymbolicAddress)
 * is true if SymbolicAddress is a symbolic address corresponding
 * to the ICM handle Handle.
 * 
 * mode handle_to_symbolic_address(@compound, ?compound).
 * if second arg is a var it is bound to the full T:P@H symbolic name
*/
icm_handle_to_symbolic_address(Handle, SymbolicAddress) :-
    SymbolicAddress==self,
    !,
    icm_thread_handle(SelfHandle),
    Handle = SelfHandle.
icm_handle_to_symbolic_address(Handle, SymbolicAddress) :-
    SymbolicAddress==creator,
    !,
    \+ thread_is_initial_thread,
    symbolic_address_to_icm_handle(creator, CHandle),
    Handle = CHandle.
icm_handle_to_symbolic_address(Handle, SymbolicAddress) :-
%    write_term_list([converting_to,Handle,to,w(SymbolicAddress),nl]),
    icm_handle_to_components(X1, X2, X3, _, Handle),
%    write_term_list([elements,X1,X2,X3,nl]),
    icm_thread_handle(SelfHandle),
    icm_handle_to_components(Y1, Y2, Y3, _, SelfHandle),
%     write_term_list([elements,Y1,Y2,Y3,SelfHandle,nl]),
     (SymbolicAddress = X1:X2@M,same_host(M,X3);
      X3=Y3,(SymbolicAddress = X1:X2;SymbolicAddress = X1:X2@localhost;
		SymbolicAddress =X1:X2@'LOCALHOST');
      X3=Y3,X2=Y2,same_local_thread(SymbolicAddress,X1)),
    !.


synonym_ids(T,T):- !.
synonym_ids(T1,T2):-
	atomic(T1),atomic(T2),!,
       (integer(T1),!,thread_symbol(T1,T2) ; integer(T2),thread_symbol(T2,T1)).

same_local_thread(T1,T2):-
	synonym_ids(T1,T2),!.
same_local_thread(T1:A,T2:A):- 
	synonym_ids(T1,T2),!.
same_local_thread(T1:A@M1,T2:A@M2):-
	synonym_ids(T1,T2),
	same_host(M1,M2).

same_host(M,M):-!.
same_host(M1,M2):-
	tcp_host_to_ip_address(M1,A),
	tcp_host_to_ip_address(M2,A), !.
same_host(H1,H2):-
	(H1==localhost;H1=='LOCALHOST'),!,
        icm_thread_handle(SelfHandle),
    	icm_handle_to_components(_, _, H, _, SelfHandle),
	same_host(H,H2).
same_host(H2,H1):-
	(H1==localhost;H1=='LOCALHOST'),
        icm_thread_handle(SelfHandle),
    	icm_handle_to_components(_, _, H, _, SelfHandle),
	same_host(H,H2).




	
       


/*
 * Msg <<- Addr
 * Msg <<- Addr reply_to RTAddr 
 * reads a message from the threads input buffer and sets Addr to the 
 * senders address and RTAddr to the return address.
 * 
 * mode ?term <<- ?term.
 */
Msg <<- Addr :-
    (   var(Addr) ; Addr \= (_ reply_to _) ),
    !,
    ipc_recv(Msg1, Addr1, _ , [remember_names(false)]),
    Msg = Msg1,
    icm_handle_to_symbolic_address(Addr1, Addr).
Msg <<- Addr reply_to RTAddr :-
    ipc_recv(Msg1, Addr1, RTAddr1, [remember_names(false)]),
    !,
    Msg = Msg1,
    icm_handle_to_symbolic_address(Addr1, Addr),
    icm_handle_to_symbolic_address(RTAddr1, RTAddr).

/*
 * Msg <<= Addr 
 * Msg <<= Addr reply_to RTAddr 
 * is the same as above except that it
 * searches the input buffer looking for a match on Msg, Addr, RTAddr.
 *
 * mode ?term <<= ?term
 */
Msg <<= Address :-
    (   var(Address) ; Address \= (_ reply_to _) ),
    !,
    ipc_peek(Msg1, Ref, Addr1, _,[remember_names(false)]),
    Msg = Msg1,
    icm_handle_to_symbolic_address(Addr1, Address),
    !,
    ipc_commit(Ref).
Msg <<= Addr reply_to RTAddr :-
    ipc_peek(Msg1, Ref, Addr1, RTAddr1,[remember_names(false)]),
    Msg = Msg1,
    icm_handle_to_symbolic_address(Addr1, Addr),
    icm_handle_to_symbolic_address(RTAddr1, RTAddr),
    !,
    ipc_commit(Ref).
	
/*
 * Message choice.
 * Call is of the form
 *
 * message_choice(t1->g1;t2->g2;...;tn->gn;timeout(T)->gt)
 *
 * where gi are goals and ti are either message patterns or
 * of the form msg:g where msg is a message pattern and g is a goal.
 * The timeout part is optional.
 * The timeout part (if present) should be last - any subsequent choices
 * are ignored.
 *
 * mode message_choice(+term).
 */

message_choice(Choices) :-
    collect_msg_choices(Choices, ChoiceList, Timeout, TimeoutGoal),
    (   var(Timeout)
        ->
        ipc_peek(Msg, Ref, Addr, RtAddr,[remember_names(false)]),
%	icm_handle_to_symbolic_address(Addr, SAddr),
%	icm_handle_to_symbolic_address(RtAddr, SRtAddr),
        member(msg_choice('<<-'(Msg, Address), Test, Goal), ChoiceList),
	(   nonvar(Address), Address = reply_to(_,_)
	    ->
	        Address = reply_to(SAddr, SRtAddr),
		icm_handle_to_symbolic_address(Addr, SAddr),
		icm_handle_to_symbolic_address(RtAddr, SRtAddr)
	    ;
	        Address = SAddr,
		icm_handle_to_symbolic_address(Addr, SAddr)
	),
        call(Test),
        !,
        ipc_commit(Ref),
        call(Goal)
        ;
        (
            ipc_peek(Msg, Ref, Addr, RtAddr, 
			[remember_names(false),timeout(Timeout)]),
%	    icm_handle_to_symbolic_address(Addr, SAddr),
%	    icm_handle_to_symbolic_address(RtAddr, SRtAddr),
            member(msg_choice('<<-'(Msg, Address), Test, Goal), 
                ChoiceList),
	    (   nonvar(Address), Address = reply_to(_,_)
		->
		    Address = reply_to(SAddr, SRtAddr),
		icm_handle_to_symbolic_address(Addr, SAddr),
		icm_handle_to_symbolic_address(RtAddr, SRtAddr)
	        ;
	            Address = SAddr,
		icm_handle_to_symbolic_address(Addr, SAddr)
	    ),
            call(Test),
            !,
            ipc_commit(Ref),
            call(Goal)
            ;
            call(TimeoutGoal)
        )
    ).

/*
 * collect_msg_choices builds a list of message choice terms
 * to be used by msg_choice to find a matching message.
 *
 * If the timeout argument is a variable on exit then no timeout choice is
 * present.
 *
 * mode collect_msg_choices(@term, -closed_list(term), -term, -term).
 */
collect_msg_choices(((timeout(Timeout)->TimeoutGoal);_), 
            ChoiceList, Timeout, TimeoutGoal) :-
    !,
    ChoiceList = [].
collect_msg_choices((C1;C2), ChoiceList, Timeout, TimeoutGoal) :-
    !,
    build_msg_choice(C1, MsgChoice),
    ChoiceList = [MsgChoice|ChoiceList1],
    collect_msg_choices(C2, ChoiceList1, Timeout, TimeoutGoal).
collect_msg_choices(C, ChoiceList, Timeout, TimeoutGoal) :-
    (   C = (timeout(Timeout)->TimeoutGoal)
        ->
        ChoiceList = []
        ;
        build_msg_choice(C, MsgChoice),
        ChoiceList = [MsgChoice]
    ).

/*
 * build_msg_choice builds a message choice term.
 *
 * mode build_msg_choice(@term, -term).
 */
build_msg_choice(((Msg::Test)->Goal),Choice) :-
    !,
    Choice = msg_choice(Msg, Test, Goal).
build_msg_choice((Msg->Goal), msg_choice(Msg, true, Goal)).

