Logo Search packages:      
Sourcecode: wine-unstable version File versions  Download package

async.c

/*
 * Server-side async I/O support
 *
 * Copyright (C) 2007 Alexandre Julliard
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
 */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>

#include "ntstatus.h"
#define WIN32_NO_STATUS
#include "windef.h"
#include "winternl.h"

#include "object.h"
#include "file.h"
#include "request.h"

struct async
{
    struct object        obj;             /* object header */
    struct thread       *thread;          /* owning thread */
    struct list          queue_entry;     /* entry in async queue list */
    struct async_queue  *queue;           /* queue containing this async */
    unsigned int         status;          /* current status */
    struct timeout_user *timeout;
    unsigned int         timeout_status;  /* status to report upon timeout */
    struct event        *event;
    struct completion   *completion;
    apc_param_t          comp_key;
    async_data_t         data;            /* data for async I/O call */
};

static void async_dump( struct object *obj, int verbose );
static void async_destroy( struct object *obj );

static const struct object_ops async_ops =
{
    sizeof(struct async),      /* size */
    async_dump,                /* dump */
    no_get_type,               /* get_type */
    no_add_queue,              /* add_queue */
    NULL,                      /* remove_queue */
    NULL,                      /* signaled */
    NULL,                      /* satisfied */
    no_signal,                 /* signal */
    no_get_fd,                 /* get_fd */
    no_map_access,             /* map_access */
    default_get_sd,            /* get_sd */
    default_set_sd,            /* set_sd */
    no_lookup_name,            /* lookup_name */
    no_open_file,              /* open_file */
    no_close_handle,           /* close_handle */
    async_destroy              /* destroy */
};


struct async_queue
{
    struct object        obj;             /* object header */
    struct fd           *fd;              /* file descriptor owning this queue */
    struct list          queue;           /* queue of async objects */
};

static void async_queue_dump( struct object *obj, int verbose );

static const struct object_ops async_queue_ops =
{
    sizeof(struct async_queue),      /* size */
    async_queue_dump,                /* dump */
    no_get_type,                     /* get_type */
    no_add_queue,                    /* add_queue */
    NULL,                            /* remove_queue */
    NULL,                            /* signaled */
    NULL,                            /* satisfied */
    no_signal,                       /* signal */
    no_get_fd,                       /* get_fd */
    no_map_access,                   /* map_access */
    default_get_sd,                  /* get_sd */
    default_set_sd,                  /* set_sd */
    no_lookup_name,                  /* lookup_name */
    no_open_file,                    /* open_file */
    no_close_handle,                 /* close_handle */
    no_destroy                       /* destroy */
};


static inline void async_reselect( struct async *async )
{
    if (async->queue->fd) fd_reselect_async( async->queue->fd, async->queue );
}

static void async_dump( struct object *obj, int verbose )
{
    struct async *async = (struct async *)obj;
    assert( obj->ops == &async_ops );
    fprintf( stderr, "Async thread=%p\n", async->thread );
}

static void async_destroy( struct object *obj )
{
    struct async *async = (struct async *)obj;
    assert( obj->ops == &async_ops );

    list_remove( &async->queue_entry );
    async_reselect( async );

    if (async->timeout) remove_timeout_user( async->timeout );
    if (async->event) release_object( async->event );
    if (async->completion) release_object( async->completion );
    release_object( async->queue );
    release_object( async->thread );
}

static void async_queue_dump( struct object *obj, int verbose )
{
    struct async_queue *async_queue = (struct async_queue *)obj;
    assert( obj->ops == &async_queue_ops );
    fprintf( stderr, "Async queue fd=%p\n", async_queue->fd );
}

/* notifies client thread of new status of its async request */
void async_terminate( struct async *async, unsigned int status )
{
    apc_call_t data;

    assert( status != STATUS_PENDING );

    if (async->status != STATUS_PENDING)
    {
        /* already terminated, just update status */
        async->status = status;
        return;
    }

    memset( &data, 0, sizeof(data) );
    data.type            = APC_ASYNC_IO;
    data.async_io.func   = async->data.callback;
    data.async_io.user   = async->data.arg;
    data.async_io.sb     = async->data.iosb;
    data.async_io.status = status;
    thread_queue_apc( async->thread, &async->obj, &data );
    async->status = status;
    async_reselect( async );
    release_object( async );  /* so that it gets destroyed when the async is done */
}

/* callback for timeout on an async request */
static void async_timeout( void *private )
{
    struct async *async = private;

    async->timeout = NULL;
    async_terminate( async, async->timeout_status );
}

/* create a new async queue for a given fd */
struct async_queue *create_async_queue( struct fd *fd )
{
    struct async_queue *queue = alloc_object( &async_queue_ops );

    if (queue)
    {
        queue->fd = fd;
        list_init( &queue->queue );
    }
    return queue;
}

/* free an async queue, cancelling all async operations */
void free_async_queue( struct async_queue *queue )
{
    if (!queue) return;
    queue->fd = NULL;
    async_wake_up( queue, STATUS_HANDLES_CLOSED );
    release_object( queue );
}

/* create an async on a given queue of a fd */
struct async *create_async( struct thread *thread, struct async_queue *queue, const async_data_t *data )
{
    struct event *event = NULL;
    struct async *async;

    if (data->event && !(event = get_event_obj( thread->process, data->event, EVENT_MODIFY_STATE )))
        return NULL;

    if (!(async = alloc_object( &async_ops )))
    {
        if (event) release_object( event );
        return NULL;
    }

    async->thread  = (struct thread *)grab_object( thread );
    async->event   = event;
    async->status  = STATUS_PENDING;
    async->data    = *data;
    async->timeout = NULL;
    async->queue   = (struct async_queue *)grab_object( queue );
    async->completion = NULL;
    if (queue->fd) async->completion = fd_get_completion( queue->fd, &async->comp_key );

    list_add_tail( &queue->queue, &async->queue_entry );
    grab_object( async );

    if (queue->fd) set_fd_signaled( queue->fd, 0 );
    if (event) reset_event( event );
    return async;
}

/* set the timeout of an async operation */
void async_set_timeout( struct async *async, timeout_t timeout, unsigned int status )
{
    if (async->timeout) remove_timeout_user( async->timeout );
    if (timeout != TIMEOUT_INFINITE) async->timeout = add_timeout_user( timeout, async_timeout, async );
    else async->timeout = NULL;
    async->timeout_status = status;
}

/* store the result of the client-side async callback */
void async_set_result( struct object *obj, unsigned int status, unsigned int total, client_ptr_t apc )
{
    struct async *async = (struct async *)obj;

    if (obj->ops != &async_ops) return;  /* in case the client messed up the APC results */

    assert( async->status != STATUS_PENDING );  /* it must have been woken up if we get a result */

    if (status == STATUS_PENDING)  /* restart it */
    {
        status = async->status;
        async->status = STATUS_PENDING;
        grab_object( async );

        if (status != STATUS_ALERTED)  /* it was terminated in the meantime */
            async_terminate( async, status );
        else
            async_reselect( async );
    }
    else
    {
        if (async->timeout) remove_timeout_user( async->timeout );
        async->timeout = NULL;
        async->status = status;
        if (async->completion && async->data.cvalue)
            add_completion( async->completion, async->comp_key, async->data.cvalue, status, total );
        if (apc)
        {
            apc_call_t data;
            memset( &data, 0, sizeof(data) );
            data.type         = APC_USER;
            data.user.func    = apc;
            data.user.args[0] = async->data.arg;
            data.user.args[1] = async->data.iosb;
            data.user.args[2] = 0;
            thread_queue_apc( async->thread, NULL, &data );
        }
        if (async->event) set_event( async->event );
        else if (async->queue->fd) set_fd_signaled( async->queue->fd, 1 );
    }
}

/* check if an async operation is waiting to be alerted */
int async_waiting( struct async_queue *queue )
{
    struct list *ptr;
    struct async *async;

    if (!queue) return 0;
    if (!(ptr = list_head( &queue->queue ))) return 0;
    async = LIST_ENTRY( ptr, struct async, queue_entry );
    return async->status == STATUS_PENDING;
}

int async_wake_up_by( struct async_queue *queue, struct process *process,
                      struct thread *thread, client_ptr_t iosb, unsigned int status )
{
    struct list *ptr, *next;
    int woken = 0;

    if (!queue || (!process && !thread && !iosb)) return 0;

    LIST_FOR_EACH_SAFE( ptr, next, &queue->queue )
    {
        struct async *async = LIST_ENTRY( ptr, struct async, queue_entry );
        if ( (!process || async->thread->process == process) &&
             (!thread || async->thread == thread) &&
             (!iosb || async->data.iosb == iosb) )
        {
            async_terminate( async, status );
            woken++;
        }
    }
    return woken;
}

/* wake up async operations on the queue */
void async_wake_up( struct async_queue *queue, unsigned int status )
{
    struct list *ptr, *next;

    if (!queue) return;

    LIST_FOR_EACH_SAFE( ptr, next, &queue->queue )
    {
        struct async *async = LIST_ENTRY( ptr, struct async, queue_entry );
        async_terminate( async, status );
        if (status == STATUS_ALERTED) break;  /* only wake up the first one */
    }
}

Generated by  Doxygen 1.6.0   Back to index