aboutsummaryrefslogtreecommitdiff
path: root/fs/ceph/file.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/file.c')
-rw-r--r--fs/ceph/file.c97
1 files changed, 83 insertions, 14 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 9f8e3572040e..ce74b394b49d 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file)
return 0;
}
+enum {
+ CHECK_EOF = 1,
+ READ_INLINE = 2,
+};
+
/*
* Read a range of bytes striped over one or more objects. Iterate over
* objects we stripe over. (That's not atomic, but good enough for now.)
@@ -412,7 +417,7 @@ more:
ret = read;
/* did we bounce off eof? */
if (pos + left > inode->i_size)
- *checkeof = 1;
+ *checkeof = CHECK_EOF;
}
dout("striped_read returns %d\n", ret);
@@ -598,7 +603,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
snapc = ci->i_snap_realm->cached_context;
vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
- vino, pos, &len,
+ vino, pos, &len, 0,
2,/*include a 'startsync' command*/
CEPH_OSD_OP_WRITE, flags, snapc,
ci->i_truncate_seq,
@@ -609,6 +614,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
break;
}
+ osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+
n = iov_iter_get_pages_alloc(from, &pages, len, &start);
if (unlikely(n < 0)) {
ret = n;
@@ -713,7 +720,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
snapc = ci->i_snap_realm->cached_context;
vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
- vino, pos, &len, 1,
+ vino, pos, &len, 0, 1,
CEPH_OSD_OP_WRITE, flags, snapc,
ci->i_truncate_seq,
ci->i_truncate_size,
@@ -803,9 +810,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
size_t len = iocb->ki_nbytes;
struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode);
+ struct page *pinned_page = NULL;
ssize_t ret;
int want, got = 0;
- int checkeof = 0, read = 0;
+ int retry_op = 0, read = 0;
again:
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
@@ -815,7 +823,7 @@ again:
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else
want = CEPH_CAP_FILE_CACHE;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
if (ret < 0)
return ret;
@@ -827,8 +835,12 @@ again:
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
ceph_cap_string(got));
- /* hmm, this isn't really async... */
- ret = ceph_sync_read(iocb, to, &checkeof);
+ if (ci->i_inline_version == CEPH_INLINE_NONE) {
+ /* hmm, this isn't really async... */
+ ret = ceph_sync_read(iocb, to, &retry_op);
+ } else {
+ retry_op = READ_INLINE;
+ }
} else {
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
@@ -838,13 +850,55 @@ again:
}
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
+ if (pinned_page) {
+ page_cache_release(pinned_page);
+ pinned_page = NULL;
+ }
ceph_put_cap_refs(ci, got);
+ if (retry_op && ret >= 0) {
+ int statret;
+ struct page *page = NULL;
+ loff_t i_size;
+ if (retry_op == READ_INLINE) {
+ page = __page_cache_alloc(GFP_NOFS);
+ if (!page)
+ return -ENOMEM;
+ }
- if (checkeof && ret >= 0) {
- int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
+ statret = __ceph_do_getattr(inode, page,
+ CEPH_STAT_CAP_INLINE_DATA, !!page);
+ if (statret < 0) {
+ __free_page(page);
+ if (statret == -ENODATA) {
+ BUG_ON(retry_op != READ_INLINE);
+ goto again;
+ }
+ return statret;
+ }
+
+ i_size = i_size_read(inode);
+ if (retry_op == READ_INLINE) {
+ /* does not support inline data > PAGE_SIZE */
+ if (i_size > PAGE_CACHE_SIZE) {
+ ret = -EIO;
+ } else if (iocb->ki_pos < i_size) {
+ loff_t end = min_t(loff_t, i_size,
+ iocb->ki_pos + len);
+ if (statret < end)
+ zero_user_segment(page, statret, end);
+ ret = copy_page_to_iter(page,
+ iocb->ki_pos & ~PAGE_MASK,
+ end - iocb->ki_pos, to);
+ iocb->ki_pos += ret;
+ } else {
+ ret = 0;
+ }
+ __free_pages(page, 0);
+ return ret;
+ }
/* hit EOF or hole? */
- if (statret == 0 && iocb->ki_pos < inode->i_size &&
+ if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
ret < len) {
dout("sync_read hit hole, ppos %lld < size %lld"
", reading more\n", iocb->ki_pos,
@@ -852,7 +906,7 @@ again:
read += ret;
len -= ret;
- checkeof = 0;
+ retry_op = 0;
goto again;
}
}
@@ -909,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
if (err)
goto out;
+ if (ci->i_inline_version != CEPH_INLINE_NONE) {
+ err = ceph_uninline_data(file, NULL);
+ if (err < 0)
+ goto out;
+ }
+
retry_snap:
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
err = -ENOSPC;
@@ -922,7 +982,8 @@ retry_snap:
else
want = CEPH_CAP_FILE_BUFFER;
got = 0;
- err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count);
+ err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
+ &got, NULL);
if (err < 0)
goto out;
@@ -969,6 +1030,7 @@ retry_snap:
if (written >= 0) {
int dirty;
spin_lock(&ci->i_ceph_lock);
+ ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
@@ -1111,7 +1173,7 @@ static int ceph_zero_partial_object(struct inode *inode,
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
ceph_vino(inode),
offset, length,
- 1, op,
+ 0, 1, op,
CEPH_OSD_FLAG_WRITE |
CEPH_OSD_FLAG_ONDISK,
NULL, 0, 0, false);
@@ -1214,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode,
goto unlock;
}
+ if (ci->i_inline_version != CEPH_INLINE_NONE) {
+ ret = ceph_uninline_data(file, NULL);
+ if (ret < 0)
+ goto unlock;
+ }
+
size = i_size_read(inode);
if (!(mode & FALLOC_FL_KEEP_SIZE))
endoff = offset + length;
@@ -1223,7 +1291,7 @@ static long ceph_fallocate(struct file *file, int mode,
else
want = CEPH_CAP_FILE_BUFFER;
- ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
if (ret < 0)
goto unlock;
@@ -1240,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode,
if (!ret) {
spin_lock(&ci->i_ceph_lock);
+ ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
spin_unlock(&ci->i_ceph_lock);
if (dirty)