Skip to content

Latest commit

 

History

History
99 lines (80 loc) · 5.59 KB

README.md

File metadata and controls

99 lines (80 loc) · 5.59 KB

Dataflow

image

CI Status Current Crates.io Version Documentation

Dataflow is a data processing library that provides composable primatives to build flexible, fast and statically typed data pipelines. The pipeline is a directed acyclic dataflow graph, which a dataloader can run on a seperate thread to feed data-hungry applications.

Usage

To build a pipeline, first start with a loader Node:

use dataflow::prelude::*;

fn main() {
  let pipeline = FileLoader::from_directory("my_data_directory");
}

The FileLoader loads the files from the directory in a random order. Next add a transformation to it with the map() function:

let pipeline = FileLoader::from_directory("my_data_directory")
      .map(|(_, text)| format!("Hello {}", text)) // Add hello to each file

map() takes in a Node that processes a single sample at a time. If we want to do batch processing, we can use .chain() which takes a Node that can process a batch at a time.

Important note: All functions and closures are also Nodes! This means that whenever we want to add a stateless transformation, we could just use a function. In this case, the closure takes in a single datapoint and outputs a single datapoint.

Now we've added "Hello " to every line, let's use a tokenizer from dataflow_nlp in our pipeline:

// Our tokenizer
let tokenizer = WordpieceTokenizer::load();

// Our pipeline
let pipeline = FileLoader::from_directory("my_data_directory")
      .map(|(_, text)| format!("Hello {}", text)) // Add hello to each file
      .chain(tokenizer); // Tokenize the lines

Great! Now our data gets efficiently tokenized in batches. Right now, we will get single tokenized sentences out of the pipeline one at a time. But what if we wanted to get batches out? Let's use a Batch node:

// Our tokenizer
let tokenizer = dataflow_nlp::tokenization::WordpieceTokenizer::load();

// Our pipeline
let pipeline = FileLoader::from_directory("my_data_directory")
      .map(|(_, text)| format!("Hello {}", text)) // Add hello to each file
      .chain(tokenizer) // Tokenize the files
      .chain(Batch::new(64)); // Create batches of 64

That's it! We'll now get batches of 64 tokenized sentences.

Loader Nodes

As discussed before, everything in the pipeline implements the Node trait. RandomLoader is also a node! So the question arises, since data originates from it, and since Nodes need an input and an output, what does it take as an input? Simple, it takes as input Vec<()>, which is what the pipeline will start with, and produces data (Vec) to send through the pipeline. This pattern is the same across all Nodes where data originates.

Custom Nodes

In fact, you can implement your own Nodes as well, by implementing the Node trait!

pub trait Node<Input> {
    type Output;

    /// Process a batch of data
    fn process(&mut self, input: Input) -> Self::Output;
    /// Reset signal propogates through pipeline
    fn reset(&mut self) {}
    /// Get number of examples left
    fn data_remaining(&self, before: usize) -> usize {
        before // Defaults to same as previous remaining data
    }
}

Your custom nodes can then be inserted directly into the pipeline!

Dataloader

Since we built this cool pipeline, what can we do with it? Well for starters, we could simply call process() and feed in some data:

// The RandomLoader takes in a () for each sample, so we pass in a batch as Vec<()>
let output: Vec<Vec<Vec<String>>> = pipeline.process(vec![(); 128])

// Output should now contain 2 batches of 64 tokenized sentences from our files with "Hello" prepended.

Let's do something cooler. Let's put it in a Dataloader and use it in an ML training loop:

// Make the dataloader
let mut dataloader = Dataloader(pipeline);

// Training loop
for example in &mut dataloader {
   // Now example is a vector of tokenized strings!
   // Do with them what you please...
}

To Do:

  • Make dataloader use a multiqueue instead of draining all examples into buffer on main thread
  • Make auto-parallel pipeline Node using rayon
  • Add async ability and remote sources. (blocked by stable async traits)