/*
* Copyright (c) Likewise Software. All rights Reserved.
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the license, or (at
* your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
* General Public License for more details. You should have received a copy
* of the GNU Lesser General Public License along with this program. If
* not, see .
*
* LIKEWISE SOFTWARE MAKES THIS SOFTWARE AVAILABLE UNDER OTHER LICENSING
* TERMS AS WELL. IF YOU HAVE ENTERED INTO A SEPARATE LICENSE AGREEMENT
* WITH LIKEWISE SOFTWARE, THEN YOU MAY ELECT TO USE THE SOFTWARE UNDER THE
* TERMS OF THAT SOFTWARE LICENSE AGREEMENT INSTEAD OF THE TERMS OF THE GNU
* LESSER GENERAL PUBLIC LICENSE, NOTWITHSTANDING THE ABOVE NOTICE. IF YOU
* HAVE QUESTIONS, OR WISH TO REQUEST A COPY OF THE ALTERNATE LICENSING
* TERMS OFFERED BY LIKEWISE SOFTWARE, PLEASE CONTACT LIKEWISE SOFTWARE AT
* license@likewisesoftware.com
*/
/*
* Module Name:
*
* peer-call.c
*
* Abstract:
*
* Peer call abstraction
*
* Authors: Brian Koropoff (bkoropoff@likewisesoftware.com)
*
*/
#include
#include "peer-private.h"
#include "call-private.h"
#include "util-private.h"
static
LWMsgStatus
lwmsg_peer_call_complete_incoming(
LWMsgCall* call,
LWMsgStatus call_status
);
static
void
lwmsg_peer_call_thunk(
void* data
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
PeerCall* call = (PeerCall*) data;
LWMsgMessage incoming_message = LWMSG_MESSAGE_INITIALIZER;
LWMsgMessage outgoing_message = LWMSG_MESSAGE_INITIALIZER;
switch (call->params.incoming.spec->type)
{
case LWMSG_DISPATCH_TYPE_OLD:
incoming_message.tag = call->params.incoming.in.tag;
incoming_message.data = call->params.incoming.in.data;
status = ((LWMsgAssocDispatchFunction) call->params.incoming.spec->data) (
call->task->assoc,
&incoming_message,
&outgoing_message,
call->params.incoming.dispatch_data);
call->params.incoming.out.tag = outgoing_message.tag;
call->params.incoming.out.data = outgoing_message.data;
break;
case LWMSG_DISPATCH_TYPE_BLOCK:
case LWMSG_DISPATCH_TYPE_NONBLOCK:
status = ((LWMsgPeerCallFunction) call->params.incoming.spec->data) (
LWMSG_CALL(call),
&call->params.incoming.in,
&call->params.incoming.out,
call->params.incoming.dispatch_data);
pthread_mutex_lock(&call->task->call_lock);
call->state |= PEER_CALL_DISPATCHED;
if (call->state & PEER_CALL_COMPLETED)
{
/* The call was already completed */
incoming_message.tag = call->params.incoming.in.tag;
incoming_message.data = call->params.incoming.in.data;
lwmsg_assoc_destroy_message(call->task->assoc, &incoming_message);
lwmsg_task_wake(call->task->event_task);
}
else if ((call->state & PEER_CALL_CANCELLED) &&
(call->state & PEER_CALL_PENDED))
{
/* The call was already cancelled */
call->params.incoming.cancel(LWMSG_CALL(call), call->params.incoming.cancel_data);
}
pthread_mutex_unlock(&call->task->call_lock);
break;
default:
status = LWMSG_STATUS_INTERNAL;
break;
}
switch (status)
{
case LWMSG_STATUS_PENDING:
/* Callee will asynchronously complete for us if it hasn't already */
break;
default:
/* Manually invoke complete to wake up IO task */
lwmsg_peer_call_complete_incoming(LWMSG_CALL(call), status);
break;
}
}
LWMsgStatus
lwmsg_peer_call_dispatch_incoming(
PeerCall* call,
LWMsgDispatchSpec* spec,
void* dispatch_data,
LWMsgMessage* incoming_message
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
call->params.incoming.spec = spec;
call->params.incoming.dispatch_data = dispatch_data;
call->state = PEER_CALL_NONE;
call->cookie = incoming_message->cookie;
if (!spec->data)
{
BAIL_ON_ERROR(status = LWMSG_STATUS_UNIMPLEMENTED);
}
switch (call->params.incoming.spec->type)
{
case LWMSG_DISPATCH_TYPE_OLD:
BAIL_ON_ERROR(status = lwmsg_task_dispatch_work_item(
call->task->peer->task_manager,
lwmsg_peer_call_thunk,
call));
status = LWMSG_STATUS_PENDING;
break;
case LWMSG_DISPATCH_TYPE_BLOCK:
case LWMSG_DISPATCH_TYPE_NONBLOCK:
call->params.incoming.in.tag = incoming_message->tag;
call->params.incoming.in.data = incoming_message->data;
call->params.incoming.out.tag = LWMSG_TAG_INVALID;
call->params.incoming.out.data = NULL;
if (call->params.incoming.spec->type == LWMSG_DISPATCH_TYPE_BLOCK)
{
BAIL_ON_ERROR(status = lwmsg_task_dispatch_work_item(
call->task->peer->task_manager,
lwmsg_peer_call_thunk,
call));
status = LWMSG_STATUS_PENDING;
}
else
{
status = ((LWMsgPeerCallFunction) call->params.incoming.spec->data) (
LWMSG_CALL(call),
&call->params.incoming.in,
&call->params.incoming.out,
call->params.incoming.dispatch_data);
call->state |= PEER_CALL_DISPATCHED;
switch (status)
{
case LWMSG_STATUS_PENDING:
if (call->state & PEER_CALL_COMPLETED)
{
/* The call was completed before we even returned from
the dispatch function */
status = call->status;
}
break;
default:
call->status = status;
call->state |= PEER_CALL_COMPLETED;
break;
}
}
break;
default:
status = LWMSG_STATUS_INTERNAL;
break;
}
error:
return status;
}
static
LWMsgStatus
lwmsg_peer_call_dispatch_outgoing(
LWMsgCall* call,
const LWMsgParams* input,
LWMsgParams* output,
LWMsgCompleteFunction complete,
void* data
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
PeerCall* pcall = PEER_CALL(call);
pthread_mutex_lock(&pcall->task->call_lock);
BAIL_ON_ERROR(status = pcall->task->status);
pcall->state = PEER_CALL_NONE;
pcall->is_outgoing = LWMSG_TRUE;
pcall->params.outgoing.complete = complete;
pcall->params.outgoing.complete_data = data;
pcall->params.outgoing.in = input;
pcall->params.outgoing.out = output;
pcall->cookie = pcall->task->next_cookie++;
pcall->status = LWMSG_STATUS_SUCCESS;
lwmsg_ring_enqueue(&pcall->task->calls, &pcall->ring);
lwmsg_task_wake(pcall->task->event_task);
if (!complete)
{
while (!(pcall->state & PEER_CALL_COMPLETED))
{
pthread_cond_wait(&pcall->task->call_event, &pcall->task->call_lock);
}
status = pcall->status;
}
else
{
status = LWMSG_STATUS_PENDING;
}
error:
pthread_mutex_unlock(&pcall->task->call_lock);
return status;
}
static
LWMsgStatus
lwmsg_peer_call_pend_incoming(
LWMsgCall* call,
LWMsgCancelFunction cancel,
void* data
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
PeerCall* scall = PEER_CALL(call);
pthread_mutex_lock(&scall->task->call_lock);
scall->state |= PEER_CALL_PENDED;
scall->params.incoming.cancel = cancel;
scall->params.incoming.cancel_data = data;
pthread_mutex_unlock(&scall->task->call_lock);
return status;
}
static
LWMsgStatus
lwmsg_peer_call_complete_incoming(
LWMsgCall* call,
LWMsgStatus call_status
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
PeerCall* pcall = PEER_CALL(call);
LWMsgMessage message = LWMSG_MESSAGE_INITIALIZER;
pthread_mutex_lock(&pcall->task->call_lock);
pcall->status = call_status;
pcall->state |= PEER_CALL_COMPLETED;
/* Only free data and wake up the task
if the dispatch function has finished running.
Otherwise, these steps will be performed once
the dispatch function is finished */
if (pcall->state & PEER_CALL_DISPATCHED)
{
/* Destroy parameters to call */
message.tag = pcall->params.incoming.in.tag;
message.data = pcall->params.incoming.in.data;
lwmsg_assoc_destroy_message(pcall->task->assoc, &message);
lwmsg_task_wake(pcall->task->event_task);
}
pthread_mutex_unlock(&pcall->task->call_lock);
return status;
}
LWMsgStatus
lwmsg_peer_call_complete_outgoing(
PeerCall* call,
LWMsgMessage* incoming_message
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
if (call->state & PEER_CALL_RELEASED)
{
lwmsg_assoc_destroy_message(call->task->assoc, incoming_message);
lwmsg_peer_call_delete(call);
}
else
{
call->status = incoming_message->status;
call->params.outgoing.out->tag = incoming_message->tag;
call->params.outgoing.out->data = incoming_message->data;
call->state |= PEER_CALL_COMPLETED;
lwmsg_message_init(incoming_message);
if (call->params.outgoing.complete)
{
call->params.outgoing.complete(
LWMSG_CALL(call),
call->status,
call->params.outgoing.complete_data);
}
else
{
pthread_cond_broadcast(&call->task->call_event);
}
}
return status;
}
LWMsgStatus
lwmsg_peer_call_cancel_incoming(
PeerCall* call
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
if (!(call->state & PEER_CALL_CANCELLED))
{
call->state |= PEER_CALL_CANCELLED;
/* If the dispatch function is not finished running,
we don't call the cancel callback right now --
lwmsg_peer_call_dispatch_incoming() will handle it.
If the call is already completed, we silently
ignore the cancel request and return success */
if ((call->state & PEER_CALL_DISPATCHED) &&
!(call->state & PEER_CALL_COMPLETED) &&
call->params.incoming.cancel != NULL)
{
call->params.incoming.cancel(LWMSG_CALL(call), call->params.incoming.cancel_data);
}
}
return status;
}
static
LWMsgStatus
lwmsg_peer_call_cancel_outgoing(
LWMsgCall* call
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
PeerCall* pcall = PEER_CALL(call);
pthread_mutex_lock(&pcall->task->call_lock);
if (!(pcall->state & PEER_CALL_CANCELLED))
{
pcall->state |= PEER_CALL_CANCELLED;
if (pcall->state & PEER_CALL_DISPATCHED &&
!(pcall->state & PEER_CALL_COMPLETED))
{
lwmsg_task_wake(pcall->task->event_task);
}
else
{
lwmsg_ring_remove(&pcall->ring);
}
}
pthread_mutex_unlock(&pcall->task->call_lock);
return status;
}
static
void
lwmsg_peer_call_release_outgoing(
LWMsgCall* call
)
{
PeerCall* pcall = PEER_CALL(call);
PeerAssocTask* task = pcall->task;
LWMsgBool delete = LWMSG_FALSE;
pthread_mutex_lock(&task->call_lock);
if (pcall->state & PEER_CALL_DISPATCHED &&
!(pcall->state & PEER_CALL_COMPLETED))
{
pcall->state |= PEER_CALL_RELEASED;
}
else
{
delete = LWMSG_TRUE;
lwmsg_ring_remove(&pcall->ring);
}
pthread_mutex_unlock(&task->call_lock);
if (delete)
{
lwmsg_peer_call_delete(pcall);
}
return;
}
static
LWMsgSession*
lwmsg_peer_call_get_session(
LWMsgCall* call
)
{
PeerCall* my_call = PEER_CALL(call);
return my_call->task->session;
}
static
LWMsgStatus
lwmsg_peer_call_destroy_params(
LWMsgCall* call,
LWMsgParams* params
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
LWMsgMessage message = LWMSG_MESSAGE_INITIALIZER;
PeerCall* pcall = PEER_CALL(call);
message.tag = params->tag;
message.data = params->data;
BAIL_ON_ERROR(status = lwmsg_assoc_destroy_message(pcall->task->assoc, &message));
params->tag = LWMSG_TAG_INVALID;
params->data = NULL;
error:
return status;
}
static
LWMsgStatus
lwmsg_peer_call_acquire_callback(
LWMsgCall* call,
LWMsgCall** callback
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
PeerCall* pcall = PEER_CALL(call);
PeerCall* my_callback = NULL;
if (pcall->is_outgoing)
{
BAIL_ON_ERROR(status = LWMSG_STATUS_INVALID_PARAMETER);
}
BAIL_ON_ERROR(status = lwmsg_peer_call_new(pcall->task, &my_callback));
*callback = LWMSG_CALL(my_callback);
error:
return status;
}
static LWMsgCallClass peer_call_class =
{
.release = lwmsg_peer_call_release_outgoing,
.dispatch = lwmsg_peer_call_dispatch_outgoing,
.pend = lwmsg_peer_call_pend_incoming,
.complete = lwmsg_peer_call_complete_incoming,
.cancel = lwmsg_peer_call_cancel_outgoing,
.get_session = lwmsg_peer_call_get_session,
.destroy_params = lwmsg_peer_call_destroy_params,
.acquire_callback = lwmsg_peer_call_acquire_callback
};
LWMsgStatus
lwmsg_peer_call_new(
PeerAssocTask* task,
PeerCall** call
)
{
LWMsgStatus status = LWMSG_STATUS_SUCCESS;
PeerCall* my_call = NULL;
BAIL_ON_ERROR(status = LWMSG_ALLOC(&my_call));
lwmsg_ring_init(&my_call->ring);
my_call->base.vtbl = &peer_call_class;
my_call->task = task;
*call = my_call;
done:
return status;
error:
if (my_call)
{
free(my_call);
}
goto done;
}
void
lwmsg_peer_call_delete(
PeerCall* call
)
{
lwmsg_ring_remove(&call->ring);
if (call->task)
{
lwmsg_peer_task_unref(call->task);
}
free(call);
}