diff options
Diffstat (limited to 'fs/aio.c')
-rw-r--r-- | fs/aio.c | 174 |
1 files changed, 114 insertions, 60 deletions
@@ -141,6 +141,7 @@ struct kioctx { struct { unsigned tail; + unsigned completed_events; spinlock_t completion_lock; } ____cacheline_aligned_in_smp; @@ -192,7 +193,6 @@ static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages) } file->f_flags = O_RDWR; - file->private_data = ctx; return file; } @@ -202,7 +202,7 @@ static struct dentry *aio_mount(struct file_system_type *fs_type, static const struct dentry_operations ops = { .d_dname = simple_dname, }; - return mount_pseudo(fs_type, "aio:", NULL, &ops, 0xa10a10a1); + return mount_pseudo(fs_type, "aio:", NULL, &ops, AIO_RING_MAGIC); } /* aio_setup @@ -556,8 +556,7 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) struct aio_ring *ring; spin_lock(&mm->ioctx_lock); - rcu_read_lock(); - table = rcu_dereference(mm->ioctx_table); + table = rcu_dereference_raw(mm->ioctx_table); while (1) { if (table) @@ -565,7 +564,6 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) if (!table->table[i]) { ctx->id = i; table->table[i] = ctx; - rcu_read_unlock(); spin_unlock(&mm->ioctx_lock); /* While kioctx setup is in progress, @@ -579,8 +577,6 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) } new_nr = (table ? table->nr : 1) * 4; - - rcu_read_unlock(); spin_unlock(&mm->ioctx_lock); table = kzalloc(sizeof(*table) + sizeof(struct kioctx *) * @@ -591,8 +587,7 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm) table->nr = new_nr; spin_lock(&mm->ioctx_lock); - rcu_read_lock(); - old = rcu_dereference(mm->ioctx_table); + old = rcu_dereference_raw(mm->ioctx_table); if (!old) { rcu_assign_pointer(mm->ioctx_table, table); @@ -739,12 +734,9 @@ static int kill_ioctx(struct mm_struct *mm, struct kioctx *ctx, spin_lock(&mm->ioctx_lock); - rcu_read_lock(); - table = rcu_dereference(mm->ioctx_table); - + table = rcu_dereference_raw(mm->ioctx_table); WARN_ON(ctx != table->table[ctx->id]); table->table[ctx->id] = NULL; - rcu_read_unlock(); spin_unlock(&mm->ioctx_lock); /* percpu_ref_kill() will do the necessary call_rcu() */ @@ -793,40 +785,35 @@ EXPORT_SYMBOL(wait_on_sync_kiocb); */ void exit_aio(struct mm_struct *mm) { - struct kioctx_table *table; - struct kioctx *ctx; - unsigned i = 0; - - while (1) { - rcu_read_lock(); - table = rcu_dereference(mm->ioctx_table); - - do { - if (!table || i >= table->nr) { - rcu_read_unlock(); - rcu_assign_pointer(mm->ioctx_table, NULL); - if (table) - kfree(table); - return; - } + struct kioctx_table *table = rcu_dereference_raw(mm->ioctx_table); + int i; - ctx = table->table[i++]; - } while (!ctx); + if (!table) + return; - rcu_read_unlock(); + for (i = 0; i < table->nr; ++i) { + struct kioctx *ctx = table->table[i]; + struct completion requests_done = + COMPLETION_INITIALIZER_ONSTACK(requests_done); + if (!ctx) + continue; /* - * We don't need to bother with munmap() here - - * exit_mmap(mm) is coming and it'll unmap everything. - * Since aio_free_ring() uses non-zero ->mmap_size - * as indicator that it needs to unmap the area, - * just set it to 0; aio_free_ring() is the only - * place that uses ->mmap_size, so it's safe. + * We don't need to bother with munmap() here - exit_mmap(mm) + * is coming and it'll unmap everything. And we simply can't, + * this is not necessarily our ->mm. + * Since kill_ioctx() uses non-zero ->mmap_size as indicator + * that it needs to unmap the area, just set it to 0. */ ctx->mmap_size = 0; + kill_ioctx(mm, ctx, &requests_done); - kill_ioctx(mm, ctx, NULL); + /* Wait until all IO for the context are done. */ + wait_for_completion(&requests_done); } + + RCU_INIT_POINTER(mm->ioctx_table, NULL); + kfree(table); } static void put_reqs_available(struct kioctx *ctx, unsigned nr) @@ -834,10 +821,8 @@ static void put_reqs_available(struct kioctx *ctx, unsigned nr) struct kioctx_cpu *kcpu; unsigned long flags; - preempt_disable(); - kcpu = this_cpu_ptr(ctx->cpu); - local_irq_save(flags); + kcpu = this_cpu_ptr(ctx->cpu); kcpu->reqs_available += nr; while (kcpu->reqs_available >= ctx->req_batch * 2) { @@ -846,7 +831,6 @@ static void put_reqs_available(struct kioctx *ctx, unsigned nr) } local_irq_restore(flags); - preempt_enable(); } static bool get_reqs_available(struct kioctx *ctx) @@ -855,10 +839,8 @@ static bool get_reqs_available(struct kioctx *ctx) bool ret = false; unsigned long flags; - preempt_disable(); - kcpu = this_cpu_ptr(ctx->cpu); - local_irq_save(flags); + kcpu = this_cpu_ptr(ctx->cpu); if (!kcpu->reqs_available) { int old, avail = atomic_read(&ctx->reqs_available); @@ -878,10 +860,71 @@ static bool get_reqs_available(struct kioctx *ctx) kcpu->reqs_available--; out: local_irq_restore(flags); - preempt_enable(); return ret; } +/* refill_reqs_available + * Updates the reqs_available reference counts used for tracking the + * number of free slots in the completion ring. This can be called + * from aio_complete() (to optimistically update reqs_available) or + * from aio_get_req() (the we're out of events case). It must be + * called holding ctx->completion_lock. + */ +static void refill_reqs_available(struct kioctx *ctx, unsigned head, + unsigned tail) +{ + unsigned events_in_ring, completed; + + /* Clamp head since userland can write to it. */ + head %= ctx->nr_events; + if (head <= tail) + events_in_ring = tail - head; + else + events_in_ring = ctx->nr_events - (head - tail); + + completed = ctx->completed_events; + if (events_in_ring < completed) + completed -= events_in_ring; + else + completed = 0; + + if (!completed) + return; + + ctx->completed_events -= completed; + put_reqs_available(ctx, completed); +} + +/* user_refill_reqs_available + * Called to refill reqs_available when aio_get_req() encounters an + * out of space in the completion ring. + */ +static void user_refill_reqs_available(struct kioctx *ctx) +{ + spin_lock_irq(&ctx->completion_lock); + if (ctx->completed_events) { + struct aio_ring *ring; + unsigned head; + + /* Access of ring->head may race with aio_read_events_ring() + * here, but that's okay since whether we read the old version + * or the new version, and either will be valid. The important + * part is that head cannot pass tail since we prevent + * aio_complete() from updating tail by holding + * ctx->completion_lock. Even if head is invalid, the check + * against ctx->completed_events below will make sure we do the + * safe/right thing. + */ + ring = kmap_atomic(ctx->ring_pages[0]); + head = ring->head; + kunmap_atomic(ring); + + refill_reqs_available(ctx, head, ctx->tail); + } + + spin_unlock_irq(&ctx->completion_lock); +} + /* aio_get_req * Allocate a slot for an aio request. * Returns NULL if no requests are free. @@ -890,8 +933,11 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx) { struct kiocb *req; - if (!get_reqs_available(ctx)) - return NULL; + if (!get_reqs_available(ctx)) { + user_refill_reqs_available(ctx); + if (!get_reqs_available(ctx)) + return NULL; + } req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO); if (unlikely(!req)) @@ -950,8 +996,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2) struct kioctx *ctx = iocb->ki_ctx; struct aio_ring *ring; struct io_event *ev_page, *event; + unsigned tail, pos, head; unsigned long flags; - unsigned tail, pos; /* * Special case handling for sync iocbs: @@ -1012,10 +1058,14 @@ void aio_complete(struct kiocb *iocb, long res, long res2) ctx->tail = tail; ring = kmap_atomic(ctx->ring_pages[0]); + head = ring->head; ring->tail = tail; kunmap_atomic(ring); flush_dcache_page(ctx->ring_pages[0]); + ctx->completed_events++; + if (ctx->completed_events > 1) + refill_reqs_available(ctx, head, tail); spin_unlock_irqrestore(&ctx->completion_lock, flags); pr_debug("added to ring %p at [%u]\n", iocb, tail); @@ -1030,7 +1080,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2) /* everything turned out well, dispose of the aiocb. */ kiocb_free(iocb); - put_reqs_available(ctx, 1); /* * We have to order our ring_info tail store above and test @@ -1047,7 +1096,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2) } EXPORT_SYMBOL(aio_complete); -/* aio_read_events +/* aio_read_events_ring * Pull an event off of the ioctx's event ring. Returns the number of * events fetched */ @@ -1067,6 +1116,12 @@ static long aio_read_events_ring(struct kioctx *ctx, tail = ring->tail; kunmap_atomic(ring); + /* + * Ensure that once we've read the current tail pointer, that + * we also see the events that were stored up to the tail. + */ + smp_rmb(); + pr_debug("h%u t%u m%u\n", head, tail, ctx->nr_events); if (head == tail) @@ -1270,12 +1325,12 @@ static ssize_t aio_setup_vectored_rw(struct kiocb *kiocb, if (compat) ret = compat_rw_copy_check_uvector(rw, (struct compat_iovec __user *)buf, - *nr_segs, 1, *iovec, iovec); + *nr_segs, UIO_FASTIOV, *iovec, iovec); else #endif ret = rw_copy_check_uvector(rw, (struct iovec __user *)buf, - *nr_segs, 1, *iovec, iovec); + *nr_segs, UIO_FASTIOV, *iovec, iovec); if (ret < 0) return ret; @@ -1299,9 +1354,8 @@ static ssize_t aio_setup_single_vector(struct kiocb *kiocb, } /* - * aio_setup_iocb: - * Performs the initial checks and aio retry method - * setup for the kiocb at the time of io submission. + * aio_run_iocb: + * Performs the initial checks and io submission. */ static ssize_t aio_run_iocb(struct kiocb *req, unsigned opcode, char __user *buf, bool compat) @@ -1313,7 +1367,7 @@ static ssize_t aio_run_iocb(struct kiocb *req, unsigned opcode, fmode_t mode; aio_rw_op *rw_op; rw_iter_op *iter_op; - struct iovec inline_vec, *iovec = &inline_vec; + struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct iov_iter iter; switch (opcode) { @@ -1348,7 +1402,7 @@ rw_common: if (!ret) ret = rw_verify_area(rw, file, &req->ki_pos, req->ki_nbytes); if (ret < 0) { - if (iovec != &inline_vec) + if (iovec != inline_vecs) kfree(iovec); return ret; } @@ -1395,7 +1449,7 @@ rw_common: return -EINVAL; } - if (iovec != &inline_vec) + if (iovec != inline_vecs) kfree(iovec); if (ret != -EIOCBQUEUED) { |