Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 161 additions & 33 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ pub(crate) enum Error {
#[error("Filenames containing trailing '/#\\d+/' are not supported: {}", path)]
InvalidPath { path: String },

#[error("Unable to sync data to file {}: {}", path.display(), source)]
UnableToSyncFile { source: io::Error, path: PathBuf },

#[error("Upload aborted")]
Aborted,
}
Expand Down Expand Up @@ -197,11 +200,13 @@ impl From<Error> for super::Error {
/// [`LocalFileSystem::copy_opts`] is implemented using [`std::fs::hard_link`], and therefore
/// does not support copying across filesystem boundaries.
///
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct LocalFileSystem {
config: Arc<Config>,
// if you want to delete empty directories when deleting files
automatic_cleanup: bool,
// if true, call fsync on files and directories after writes
fsync: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -229,6 +234,7 @@ impl LocalFileSystem {
root: Url::parse("file:///").unwrap(),
}),
automatic_cleanup: false,
fsync: false,
}
}

Expand All @@ -247,6 +253,7 @@ impl LocalFileSystem {
root: absolute_path_to_url(path)?,
}),
automatic_cleanup: false,
fsync: false,
})
}

Expand All @@ -260,6 +267,20 @@ impl LocalFileSystem {
self.automatic_cleanup = automatic_cleanup;
self
}

/// Enable fsync after writes to ensure durability
///
/// When enabled, [`LocalFileSystem`] will call [`File::sync_all`] on written files
/// and fsync parent directories after write operations ([`put_opts`](ObjectStore::put_opts),
/// [`copy_opts`](ObjectStore::copy_opts), [`rename_opts`](ObjectStore::rename_opts),
/// and multipart upload completion), ensuring that when an operation returns success,
/// both the file contents and the directory entries are durable on stable storage.
///
/// This is disabled by default.
pub fn with_fsync(mut self, fsync: bool) -> Self {
self.fsync = fsync;
self
}
}

impl Config {
Expand Down Expand Up @@ -348,8 +369,9 @@ impl ObjectStore for LocalFileSystem {
}

let path = self.path_to_filesystem(location)?;
let fsync = self.fsync;
maybe_spawn_blocking(move || {
let (mut file, staging_path) = new_staged_upload(&path)?;
let (mut file, staging_path) = new_staged_upload(&path, fsync)?;
let mut e_tag = None;

let err = match payload.iter().try_for_each(|x| file.write_all(x)) {
Expand All @@ -361,27 +383,49 @@ impl ObjectStore for LocalFileSystem {
e_tag = Some(get_etag(&metadata));
match opts.mode {
PutMode::Overwrite => {
if fsync {
file.sync_all().map_err(|source| Error::UnableToSyncFile {
source,
path: staging_path.clone(),
})?;
}
// For some fuse types of file systems, the file must be closed first
// to trigger the upload operation, and then renamed, such as Blobfuse
std::mem::drop(file);
match std::fs::rename(&staging_path, &path) {
Ok(_) => None,
Ok(_) => {
if fsync {
fsync_parent_dir(&path)?;
}
None
}
Err(source) => Some(Error::UnableToRenameFile { source }),
}
}
PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
Ok(_) => {
let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
None
}
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
path: path.to_str().unwrap().to_string(),
PutMode::Create => {
if fsync {
file.sync_all().map_err(|source| Error::UnableToSyncFile {
source,
}),
_ => Some(Error::UnableToRenameFile { source }),
},
},
path: staging_path.clone(),
})?;
}
match std::fs::hard_link(&staging_path, &path) {
Ok(_) => {
let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
if fsync {
fsync_parent_dir(&path)?;
}
None
}
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
path: path.to_str().unwrap().to_string(),
source,
}),
_ => Some(Error::UnableToRenameFile { source }),
},
}
}
PutMode::Update(_) => unreachable!(),
}
}
Expand Down Expand Up @@ -414,8 +458,8 @@ impl ObjectStore for LocalFileSystem {
}

let dest = self.path_to_filesystem(location)?;
let (file, src) = new_staged_upload(&dest)?;
Ok(Box::new(LocalUpload::new(src, dest, file)))
let (file, src) = new_staged_upload(&dest, self.fsync)?;
Ok(Box::new(LocalUpload::new(src, dest, file, self.fsync)))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down Expand Up @@ -549,6 +593,7 @@ impl ObjectStore for LocalFileSystem {

let from = self.path_to_filesystem(from)?;
let to = self.path_to_filesystem(to)?;
let fsync = self.fsync;

match mode {
CopyMode::Overwrite => {
Expand All @@ -564,15 +609,25 @@ impl ObjectStore for LocalFileSystem {
let staged = staged_upload_path(&to, &id.to_string());
match std::fs::hard_link(&from, &staged) {
Ok(_) => {
return std::fs::rename(&staged, &to).map_err(|source| {
let _ = std::fs::remove_file(&staged); // Attempt to clean up
Error::UnableToCopyFile { from, to, source }.into()
});
match std::fs::rename(&staged, &to) {
Ok(_) => {
if fsync {
fsync_parent_dir(&to)?;
}
return Ok(());
}
Err(source) => {
let _ = std::fs::remove_file(&staged); // Attempt to clean up
return Err(
Error::UnableToCopyFile { from, to, source }.into()
);
}
}
}
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => id += 1,
ErrorKind::NotFound => match from.exists() {
true => create_parent_dirs(&to, source)?,
true => create_parent_dirs(&to, source, fsync)?,
false => {
return Err(Error::NotFound { path: from, source }.into());
}
Expand All @@ -590,7 +645,12 @@ impl ObjectStore for LocalFileSystem {
maybe_spawn_blocking(move || {
loop {
match std::fs::hard_link(&from, &to) {
Ok(_) => return Ok(()),
Ok(_) => {
if fsync {
fsync_parent_dir(&to)?;
}
return Ok(());
}
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => {
return Err(Error::AlreadyExists {
Expand All @@ -600,7 +660,7 @@ impl ObjectStore for LocalFileSystem {
.into());
}
ErrorKind::NotFound => match from.exists() {
true => create_parent_dirs(&to, source)?,
true => create_parent_dirs(&to, source, fsync)?,
false => {
return Err(Error::NotFound { path: from, source }.into());
}
Expand Down Expand Up @@ -628,13 +688,22 @@ impl ObjectStore for LocalFileSystem {
RenameTargetMode::Overwrite => {
let from = self.path_to_filesystem(from)?;
let to = self.path_to_filesystem(to)?;
let fsync = self.fsync;
maybe_spawn_blocking(move || {
loop {
match std::fs::rename(&from, &to) {
Ok(_) => return Ok(()),
Ok(_) => {
if fsync {
fsync_parent_dir(&to)?;
if from.parent() != to.parent() {
fsync_parent_dir(&from)?;
}
}
return Ok(());
}
Err(source) => match source.kind() {
ErrorKind::NotFound => match from.exists() {
true => create_parent_dirs(&to, source)?,
true => create_parent_dirs(&to, source, fsync)?,
false => {
return Err(Error::NotFound { path: from, source }.into());
}
Expand Down Expand Up @@ -786,23 +855,69 @@ impl LocalFileSystem {
}

/// Creates the parent directories of `path` or returns an error based on `source` if no parent
fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
///
/// When `fsync` is true, fsyncs each newly created directory and the first pre-existing
/// ancestor to ensure the new directory entries are durable.
fn create_parent_dirs(path: &std::path::Path, source: io::Error, fsync: bool) -> Result<()> {
let parent = path.parent().ok_or_else(|| {
let path = path.to_path_buf();
Error::UnableToCreateFile { path, source }
})?;

std::fs::create_dir_all(parent).map_err(|source| {
let path = parent.into();
Error::UnableToCreateDir { source, path }
if fsync {
let mut first_existing = parent;
while !first_existing.exists() {
first_existing = first_existing.parent().unwrap_or(first_existing);
}

std::fs::create_dir_all(parent).map_err(|source| {
let path = parent.into();
Error::UnableToCreateDir { source, path }
})?;
Comment on lines +873 to +876
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you pull this create_dir_all before the if fsync, then it's the same code for fsync and no-fsync. That makes it easier to reasons about, since it's really just the fsync-block that differs, not the "actual" I/O operation.


let mut dir = parent;
loop {
fsync_dir(dir)?;
if dir == first_existing {
break;
}
dir = match dir.parent() {
Some(p) => p,
None => break,
};
}
} else {
std::fs::create_dir_all(parent).map_err(|source| {
let path = parent.into();
Error::UnableToCreateDir { source, path }
})?;
}
Ok(())
}

/// Fsyncs a directory to ensure its entries are durable
fn fsync_dir(dir_path: &std::path::Path) -> Result<()> {
let dir = File::open(dir_path).map_err(|source| Error::UnableToOpenFile {
source,
path: dir_path.into(),
})?;
dir.sync_all().map_err(|source| Error::UnableToSyncFile {
source,
path: dir_path.into(),
})?;
Ok(())
}

/// Fsyncs the parent directory of `path` to ensure directory entry durability
fn fsync_parent_dir(path: &std::path::Path) -> Result<()> {
let parent = path.parent().unwrap_or(path);
fsync_dir(parent)
}

/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `path`
///
/// Creates any directories if necessary
fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
fn new_staged_upload(base: &std::path::Path, fsync: bool) -> Result<(File, PathBuf)> {
let mut multipart_id = 1;
loop {
let suffix = multipart_id.to_string();
Expand All @@ -812,7 +927,7 @@ fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
Ok(f) => return Ok((f, path)),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => multipart_id += 1,
ErrorKind::NotFound => create_parent_dirs(&path, source)?,
ErrorKind::NotFound => create_parent_dirs(&path, source, fsync)?,
_ => return Err(Error::UnableToOpenFile { source, path }.into()),
},
}
Expand All @@ -835,6 +950,8 @@ struct LocalUpload {
src: Option<PathBuf>,
/// The next offset to write into the file
offset: u64,
/// Whether to fsync on complete
fsync: bool,
}

#[derive(Debug)]
Expand All @@ -844,14 +961,15 @@ struct UploadState {
}

impl LocalUpload {
pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File, fsync: bool) -> Self {
Self {
state: Arc::new(UploadState {
dest,
file: Mutex::new(file),
}),
src: Some(src),
offset: 0,
fsync,
}
}
}
Expand Down Expand Up @@ -882,11 +1000,21 @@ impl MultipartUpload for LocalUpload {
async fn complete(&mut self) -> Result<PutResult> {
let src = self.src.take().ok_or(Error::Aborted)?;
let s = Arc::clone(&self.state);
let fsync = self.fsync;
maybe_spawn_blocking(move || {
// Ensure no inflight writes
let file = s.file.lock();
if fsync {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there an fsync before this rename but not before the one in rename_opts?

file.sync_all().map_err(|source| Error::UnableToSyncFile {
source,
path: src.clone(),
})?;
}
std::fs::rename(&src, &s.dest)
.map_err(|source| Error::UnableToRenameFile { source })?;
if fsync {
fsync_parent_dir(&s.dest)?;
}
let metadata = file.metadata().map_err(|e| Error::Metadata {
source: e.into(),
path: src.to_string_lossy().to_string(),
Expand Down