DevelopersArchitectureJobs
Architecture

Job System

This guide will walk you through creating jobs in Spacedrive's job system. The job system is designed to handle long-running tasks with features like progress tracking, pause/resume capabilities, and error handling.

Job System Overview

The job system consists of several key components:

  • Job: The high-level operation (e.g., file deletion, copying)
  • Tasks: Individual units of work within a job
  • Behaviors: Implementations of specific operations (e.g., different deletion strategies)
  • Progress Tracking: System for monitoring and reporting job progress
  • State Management: Handles job persistence and recovery
  • Undo Support: Optional capability for jobs to define reversible operations
  • Location Handling: Support for both location-bound and location-independent operations

The Job Trait

The Job trait is the core abstraction in Spacedrive's job system. It defines the interface that all jobs must implement:

pub trait Job: Send + Sync + Hash + 'static {
    const TYPE_NAME: JobName;
    fn name(&self) -> JobName;
    fn resume_tasks<OuterCtx: OuterContext>(...) -> impl Future<...>;
    fn run<OuterCtx: OuterContext>(...) -> impl Future<...>;
}

Key Features

  1. Asynchronous Execution: Jobs are async by design, suitable for IO-intensive operations

  2. Progress Reporting: Jobs can report their progress through ProgressUpdate:

    pub enum ProgressUpdate {
        TaskCount(u64),         // Total number of tasks
        CompletedTaskCount(u64), // Number of completed tasks
        Message(String),        // Informative message
        Phase(String),         // Current phase of the job
    }
    
  3. State Management: Jobs can be serialized and deserialized for persistence

  4. Task Dispatching: Jobs use JobTaskDispatcher to manage individual tasks

  5. Context Access: Jobs have access to:

    • Database client
    • Sync manager
    • Query invalidation
    • Event reporting
    • Data directory

Return Status

Jobs can complete with different statuses:

pub enum ReturnStatus {
    Completed(JobReturn),
    Shutdown(Result<Option<Vec<u8>>, EncodeError>),
    Canceled(JobReturn)
}

Built-in Job Types

The system includes several built-in job types:

pub enum JobName {
    Indexer,         // File system indexing
    FileIdentifier,  // File type identification
    MediaProcessor,  // Media processing
    Copy,           // File copying
    Move,           // File moving
    Delete,         // File deletion
    Erase,          // Secure erasure
    FileValidator   // File validation
}

Creating a New Job

Let's walk through creating a job using the file deletion system as an example. The complete implementation can be found in core/crates/file-actions/src/deleter/.

0. Register the Job

Before implementing the job itself, you need to register it in two places:

  1. Add it to the JobName enum in job_system/job.rs:
pub enum JobName {
    Indexer,
    FileIdentifier,
    MediaProcessor,
    Copy,
    Move,
    Delete,  // <-- Your new job name here
    // ...
}
  1. Add your job's metadata type to ReportInputMetadata in job_system/report.rs:
pub enum ReportInputMetadata {
    // ... other jobs ...
    Deleter {
        location_id: location::id::Type,
        file_path_ids: Vec<file_path::id::Type>,
    },
    // Add your job's metadata here with the fields needed to track its progress
}

This registration is crucial for:

  • Job identification in the system
  • Progress tracking and reporting
  • Job persistence and recovery
  • UI display and management

1. Define Job Structure

First, create your job structure that will hold the necessary state:

#[derive(Debug)]
pub struct DeleterJob<B> {
    // Required parameters
    location_id: Option<location::id::Type>,
    file_path_ids: Vec<file_path::id::Type>,

    // Task management state
    pending_tasks: Option<Vec<TaskHandle<e>>>,
    shutdown_tasks: Option<Vec<Box<dyn Task<e>>>>,
    accumulative_errors: Option<Vec<e>>,

    // Generic behavior type
    behavior: PhantomData<fn(B) -> B>,
}

2. Define Job State

Create a serializable state structure for job persistence:

#[derive(Debug, Serialize, Deserialize)]
pub struct DeleterState {
    location_id: Option<location::id::Type>,
    file_path_ids: Vec<file_path::id::Type>,
    shutdown_tasks: Option<SerializedTasks>,
    accumulative_errors: Option<Vec<NonCriticalError>>,
}

3. Implement Job Trait

Implement the Job trait for your job structure:

impl Job for DeleterJob {
    const TYPE_NAME: JobName = JobName::Delete;

    fn name(&self) -> JobName {
        Self::TYPE_NAME
    }

    async fn run<OuterCtx: OuterContext>(
        self,
        dispatcher: JobTaskDispatcher,
        ctx: impl JobContext<OuterCtx>,
    ) -> Result<ReturnStatus, Error> {
        // Your job implementation here
    }

    async fn resume_tasks<OuterCtx: OuterContext>(
        &mut self,
        dispatcher: &JobTaskDispatcher,
        ctx: &impl JobContext<OuterCtx>,
        serialized_tasks: SerializedTasks,
    ) -> Result<(), Error> {
        // Task resumption logic here
    }
}

4. Define Behavior Trait

If your job supports multiple implementations (like delete vs move to trash), create a behavior trait:

pub trait DeleteBehavior {
    fn delete(file: FileData) -> impl Future<Output = Result<ExecStatus, ()>> + Send;

    fn delete_all<I>(
        files: I,
        interrupter: Option<&Interrupter>,
    ) -> impl Future<Output = Result<ExecStatus, ()>> + Send
    where
        I: IntoIterator<Item = FileData> + Send + 'static,
        I::IntoIter: Send;
}

5. Implement Behaviors

Create concrete implementations of your behavior:

// Permanent deletion behavior
pub struct RemoveBehavior;

impl DeleteBehavior for RemoveBehavior {
    async fn delete(file_data: FileData) -> Result<ExecStatus, ()> {
        if file_data.full_path.is_dir() {
            tokio::fs::remove_dir_all(&file_data.full_path).await
        } else {
            tokio::fs::remove_file(&file_data.full_path).await
        };
        Ok(ExecStatus::Done(TaskOutput::Empty))
    }
}

// Move to trash behavior
pub struct MoveToTrashBehavior;

impl DeleteBehavior for MoveToTrashBehavior {
    async fn delete_all<I>(files: I, interrupter: Option<&Interrupter>) -> Result<ExecStatus, ()>
    where
        I: IntoIterator<Item = FileData> + Send + 'static,
        I::IntoIter: Send + 'static,
    {
        if let Some(interrupter) = interrupter {
            check_interruption!(interrupter);
        }
        task::spawn_blocking(|| trash::delete_all(files.into_iter().map(|x| x.full_path))).await;
        Ok(ExecStatus::Done(().into()))
    }
}

6. Create Task Implementation

Define the task that will perform the actual work:

pub struct RemoveTask<B> {
    id: TaskId,
    files: Vec<FileData>,
    counter: Arc<AtomicU64>,
    behavior: PhantomData<fn(B) -> B>,
}

#[async_trait::async_trait]
impl<B: DeleteBehavior + Send + 'static> Task<e> for RemoveTask<B> {
    fn id(&self) -> TaskId {
        self.id
    }

    async fn run(&mut self, interrupter: &Interrupter) -> Result<ExecStatus, Error> {
        let size = self.files.len();

        match B::delete_all(self.files.clone(), Some(interrupter)).await {
            Ok(ExecStatus::Done(_)) => {
                self.counter.fetch_add(size as _, Ordering::AcqRel);
                Ok(ExecStatus::Done(().into()))
            }
            Ok(status) => Ok(status),
            Err(_) => Err(Error::Deleter("Task failed".into()))
        }
    }
}

Key Components

  1. mod.rs

    • Public exports for your job module
    • Re-exports of commonly used types
    mod job;
    mod progress;
    mod tasks;
    
    pub use job::*;
    pub use progress::*;
    pub use tasks::*;
    
  2. job.rs

    • Job struct and implementation
    • State management
    • Task coordination
    pub struct YourJob<C> {
        // Job parameters
        params: JobParams,
        // Task coordination
        tasks: Vec<TaskHandle>,
        // Progress tracking
        progress: Arc<AtomicU64>,
    }
    
  3. progress.rs

    • Progress tracking enums/structs
    • Progress calculation utilities
    #[derive(Debug, Clone, Serialize, Deserialize)]
    pub enum JobProgress {
        Started { total: u64 },
        Progress { current: u64, total: u64 },
    }
    
  4. tasks/behaviors/

    • Trait defining behavior interface
    • Different implementation strategies
    #[async_trait]
    pub trait TaskBehavior {
        async fn execute(&self, params: TaskParams) -> Result<(), Error>;
    }
    
  5. tasks/batch.rs

    • Batch processing logic
    • Performance optimizations
    pub struct BatchedOperation {
        items: Vec<Item>,
        batch_size: usize,
    }
    
    impl BatchedOperation {
        pub fn process(&self, dispatcher: &JobTaskDispatcher) -> Result<(), Error> {
            for chunk in items.chunks(batch_size) {
                let task = MyTask::new(chunk.to_vec(), progress_counter.clone());
                tasks.push(dispatcher.dispatch(task)?);
            }
        }
    }
    

This structure provides:

  • Clear separation of concerns
  • Easy navigation of code
  • Consistent organization across jobs
  • Reusable components
  • Maintainable implementations

Best Practices

  1. Error Handling

    • Use accumulative errors for non-critical failures
    • Include context in error messages
    • Handle cleanup on errors
  2. Progress Reporting

    • Report progress regularly using ctx.progress()
    • Include meaningful progress messages
    • Use atomic counters for thread-safe progress tracking
  3. Resource Management

    • Clean up resources on cancellation
    • Use Arc for shared resources
    • Implement proper task cleanup
  4. State Management

    • Keep serialized state minimal
    • Handle state restoration properly
    • Use Option for non-persistent state

Best Practices for Progress Reporting

Use Structured Data

Always send structured data instead of formatted strings. This allows the frontend to handle localization and formatting:

❌ Don't do this:

ctx.progress(ProgressUpdate::Message(format!(
    "Processed {} of {} items",
    completed,
    total
))).await;

✅ Do this instead:

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobProgress {
    Processing {
        completed: u64,
        total: u64,
        bytes_processed: u64,
        current_item: Option<String>,
    },
    Complete {
        total_processed: u64,
        total_bytes: u64,
        duration: Duration,
    }
}

// In your task:
ctx.progress(JobProgress::Processing {
    completed: files_processed,
    total: total_files,
    bytes_processed,
    current_item: Some(current_file.name.clone()),
}).await;

Include All Necessary Data

Progress updates should include enough information for the UI to show:

  • Overall progress (e.g., files processed / total files)
  • Current operation details (e.g., current file name, size)
  • Performance metrics (e.g., speed, estimated time remaining)
  • Operation-specific details (e.g., copy source/target)

Progress Granularity

Report progress at appropriate intervals:

  • Major state changes (Started, Complete)
  • Per-item updates for user feedback
  • Periodic updates for long-running operations

Example of a well-structured progress enum:

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CopyProgress {
    Started {
        total_files: u64,
        total_bytes: u64,
    },
    File {
        name: String,
        current_file: u64,
        total_files: u64,
        bytes: u64,
        source: PathBuf,
        target: PathBuf,
    },
    BytesWritten {
        bytes: u64,
        speed: f64,
        eta_seconds: u32,
    },
    Complete {
        files_copied: u64,
        total_bytes: u64,
        duration: Duration,
    },
}

This approach:

  • Enables proper i18n/l10n in the UI
  • Provides flexibility in how data is displayed
  • Makes it easier to modify the UI without backend changes
  • Ensures consistent progress reporting across different locales

Common Patterns

  1. Batch Processing
let batch_size = 100;
for chunk in items.chunks(batch_size) {
    let task = MyTask::new(chunk.to_vec(), progress_counter.clone());
    tasks.push(dispatcher.dispatch(task)?);
}
  1. Error Accumulation
if let Err(e) = result {
    self.accumulative_errors.get_or_insert(Vec::new()).push(e);
}

Debugging Tips

  1. Use tracing for better debugging:
tracing::debug!(task_id = %self.id, "Starting task execution");
  1. Monitor task states:
while let Some(status) = task_handle.status().await {
    match status {
        TaskStatus::Running => continue,
        TaskStatus::Paused => handle_pause(),
        TaskStatus::Completed => break,
        TaskStatus::Failed(e) => handle_error(e),
    }
}
  1. Add proper error context:
Error::TaskFailed(format!("Failed to process item {}: {}", item_id, e))

These optimizations help ensure your job:

  • Handles large operations efficiently through batching
  • Resolves conflicts reliably
  • Adapts to different scenarios
  • Provides detailed progress feedback

Directory Structure

When implementing a new job, follow this recommended directory structure based on the file-actions crate:

src/
├── your_job/
│   ├── mod.rs              # Public exports and job registration
│   ├── job.rs              # Job implementation and state management
│   ├── progress.rs         # Progress tracking types
│   └── tasks/
│       ├── mod.rs          # Task exports and shared types
│       ├── behaviors/      # Different implementation strategies
│       │   ├── mod.rs      # Behavior trait and utilities
│       │   ├── fast.rs     # Fast implementation (e.g., for small files)
│       │   └── stream.rs   # Streaming implementation (e.g., for large files)
│       ├── batch.rs        # Batch processing logic
│       ├── conflict.rs     # Conflict resolution utilities
│       └── task_name.rs    # Individual task implementations

Key Components

  1. mod.rs

    • Public exports for your job module
    • Re-exports of commonly used types
    mod job;
    mod progress;
    mod tasks;
    
    pub use job::*;
    pub use progress::*;
    pub use tasks::*;
    
  2. job.rs

    • Job struct and implementation
    • State management
    • Task coordination
    pub struct YourJob<C> {
        // Job parameters
        params: JobParams,
        // Task management
        pending_tasks: Option<Vec<TaskHandle>>,
        // Progress tracking
        progress: Arc<AtomicU64>,
    }
    
  3. progress.rs

    • Progress tracking enums/structs
    • Progress calculation utilities
    #[derive(Debug, Clone, Serialize, Deserialize)]
    pub enum JobProgress {
        Started { total: u64 },
        Progress { current: u64, total: u64 },
    }
    
  4. tasks/behaviors/

    • Trait defining behavior interface
    • Different implementation strategies
    #[async_trait]
    pub trait TaskBehavior {
        async fn execute(&self, params: TaskParams) -> Result<(), Error>;
    }
    
  5. tasks/batch.rs

    • Batch processing logic
    • Performance optimizations
    pub struct BatchedOperation {
        items: Vec<Item>,
        batch_size: usize,
    }
    
    impl BatchedOperation {
        pub fn process(&self, dispatcher: &JobTaskDispatcher) -> Result<(), Error> {
            for chunk in items.chunks(batch_size) {
                let task = MyTask::new(chunk.to_vec(), progress_counter.clone());
                tasks.push(dispatcher.dispatch(task)?);
            }
        }
    }
    

This structure provides:

  • Clear separation of concerns
  • Easy navigation of code
  • Consistent organization across jobs
  • Reusable components
  • Maintainable implementations