Functions

Data structures

DistributedData.DinfoType
Dinfo

The basic structure for working with loaded data, distributed amongst workers. In completeness, it represents a dataset as such:

  • val is the "value name" under which the data are saved in processes. E.g. val=:foo means that there is a variable foo on each process holding a part of the matrix.
  • workers is a list of workers (in correct order!) that hold the data (similar to DArray.pids)
source

Base functions

DistributedData.dexecMethod
dexec(val, fn, workers)

Execute a function on workers, taking val as a parameter. Results are not collected. This is optimal for various side-effect-causing computations that are not easily expressible with dtransform.

source
DistributedData.dmapMethod
dmap(arr::Vector, fn, workers)

Call a function fn on workers, with a single parameter arriving from the corresponding position in arr.

source
DistributedData.dmapreduceMethod
dmapreduce(val, map, fold, workers)

A distributed work-alike of the standard mapreduce: Take a function map (a non-modifying transform on the data) and fold (2-to-1 reduction of the results of map), systematically run them on the data described by val distributed on workers, and return the final reduced result.

It is assumed that the fold operation is associative, but not commutative (as in semigroups). If there are no workers, operation returns nothing (we don't have a monoid to magically conjure zero elements :[ ).

In current version, the reduce step is a sequential left fold, executed in the main process.

Example

# compute the mean of all distributed data
sum,len = dmapreduce(:myData,
    (d) -> (sum(d),length(d)),
    ((s1, l1), (s2, l2)) -> (s1+s2, l1+l2),
    workers())
println(sum/len)

Processing multiple arguments (a.k.a. "zipWith")

The val here does not necessarily need to refer to a symbol, you can easily pass in a quoted tuple, which will be unquoted in the function parameter. For example, distributed values :a and :b can be joined as such:

dmapreduce(:((a,b)),
    ((a,b)::Tuple) -> [a b],
    vcat,
    workers())
source
DistributedData.dmapreduceMethod
dmapreduce(dInfo1::Dinfo, dInfo2::Dinfo, map, fold)

Variant of dmapreduce that works with more Dinfos at once. The data must be distributed on the same set of workers, in the same order.

source
DistributedData.dmapreduceMethod
dmapreduce(vals::Vector, map, fold, workers)

Variant of dmapreduce that works with more distributed variables at once.

source
DistributedData.dmapreduceMethod
dmapreduce(dInfo::Dinfo, map, fold)

Distributed map/reduce (just as the other overload of dmapreduce) that works with Dinfo.

source
DistributedData.dpmapMethod
dpmap(fn, args...; mod = Main, kwargs...)

"Distributed pool map."

A wrapper for pmap from Distributed package that executes the code in the correct module, so that it can access the distributed variables at remote workers. All arguments other than the first function fn are passed to pmap.

The function fn should return an expression that is going to get evaluated.

Example

using Distributed
dpmap(x -> :(computeSomething(someData, $x)), WorkerPool(workers), Vector(1:10))
di = distributeSomeData()
dpmap(x -> :(computeSomething($(di.val), $x)), CachingPool(di.workers), Vector(1:10))
source
DistributedData.dtransformFunction
dtransform(val, fn, workers, tgt::Symbol=val)

Transform the worker-local distributed data available as val on workers in-place, by a function fn. Store the result as tgt (default val)

Example

# multiply all saved data by 2
dtransform(:myData, (d)->(2*d), workers())
source
DistributedData.gather_arrayFunction
gather_array(val::Symbol, workers, dim=1; free=false)

Collect the arrays distributed on workers under value val into an array. The individual arrays are pasted in the dimension specified by dim, i.e. dim=1 is roughly equivalent to using vcat, and dim=2 to hcat.

val must be an Array-based type; the function will otherwise fail.

If free is true, the val is unscattered after being gathered.

This preallocates the array for results, and is thus more efficient than e.g. using dmapreduce with vcat for folding.

source
DistributedData.get_fromMethod
get_from(worker,val)

Get a value val from a remote worker; quoting of val works just as with save_at. Returns a future with the requested value.

source
DistributedData.save_atMethod
save_at(worker, sym, val)

Saves value val to symbol sym at worker. sym should be quoted (or contain a symbol). val gets unquoted in the processing and evaluated at the worker, quote it if you want to pass exact command to the worker.

This is loosely based on the package ParallelDataTransfers, but made slightly more flexible by omitting/delaying the explicit fetches etc. In particular, save_at is roughly the same as ParallelDataTransfers.sendto, and get_val_from works very much like ParallelDataTransfers.getfrom.

Return value

A future with Nothing that can be fetched to see that the operation has finished.

Examples

addprocs(1)
save_at(2,:x,123)       # saves 123
save_at(2,:x,myid())    # saves 1
save_at(2,:x,:(myid())) # saves 2
save_at(2,:x,:(:x))     # saves the symbol :x
                        # (just :x won't work because of unquoting)

Note: Symbol scope

The symbols are saved in Main module on the corresponding worker. For example, save_at(1, :x, nothing) will erase your local x variable. Beware of name collisions.

source
DistributedData.scatter_arrayMethod
scatter_array(sym, x::Array, pids; dim=1)::Dinfo

Distribute roughly equal parts of array x separated on dimension dim among pids into a worker-local variable sym.

Returns the Dinfo structure for the distributed data.

source
DistributedData.tmp_symbolMethod
tmp_symbol(s::Symbol; prefix="", suffix="_tmp")

Decorate a symbol s with prefix and suffix, to create a good name for a related temporary value.

source
DistributedData.@remoteMacro
@remote expr

In a function that will get evaluated on a remote worker, this ensures the evaluation scope of the expression expr (usually a variable) is taken on the remote side, preventing namespace clash with the local session.

This is mainly useful for making the functions from Distributed package (such as pmap and remotecall) work with the data stored by DistributedData package.

Internally, this is handled by wrapping in eval.

Example

julia> save_at(2, :x, 321)
Future(2, 1, 162, nothing)

julia> let x=123
         remotecall_fetch(() -> x + (@remote x), 2)
       end
444
source

Higher-level array operations

DistributedData.catmapbucketsMethod
catmapbuckets(fn, a::Array, nbuckets::Int, buckets::Vector{Int}; bucketdim::Int=1)

Same as mapbuckets, except concatenates the bucketing results in the bucketing dimension, thus creating a slightly neater matrix. slicedims is therefore fixed to bucketdim.

source
DistributedData.dapply_colsMethod
dapply_cols(dInfo::Dinfo, fn, columns::Vector{Int})

Apply a function fn over columns of a distributed dataset.

fn gets 2 parameters:

  • a data vector for (the whole column saved at one worker)
  • index of the column in the columns array (i.e. a number from 1:length(columns))
source
DistributedData.dapply_rowsMethod
dapply_rows(dInfo::Dinfo, fn)

Apply a function fn over rows of a distributed dataset.

fn gets a single vector parameter for each row to transform.

source
DistributedData.dcopyMethod
dcopy(dInfo::Dinfo, newName::Symbol)

Clone the dataset and store it under a new distributed name newName.

source
DistributedData.dcountMethod
dcount(ncats::Int, dInfo::Dinfo)::Vector{Int}

Count the numbers of integer vector values stored in dInfo; assuming the values are in range 1–ncats.

source
DistributedData.dcount_bucketsMethod
dcount_buckets(ncats::Int, dInfo::Dinfo, nbuckets::Int, buckets::Dinfo)::Matrix{Int}

Same as dcount, but counts the items in dInfo bucketed by buckets to produce a matrix of counts, with ncats rows and nbuckets columns.

Useful with distributeFCSFileVector to determine cluster distribution within files.

source
DistributedData.dmedianMethod
dmedian(dInfo::Dinfo, columns::Vector{Int})

Compute a median in a distributed fashion, avoiding data transfer and memory capacity that is required to compute the median in the classical way by sorting. All data must be finite and defined. If the median is just between 2 values, the lower one is chosen.

The algorithm is approximative, searching for a good median by halving interval and counting how many values are below the threshold. iters can be increased to improve precision, each value adds roughly 1 bit to the precision. The default value is 20, which corresponds to precision 10e-6 times the data range.

source
DistributedData.dmedian_bucketsMethod
dmedian_buckets(dInfo::Dinfo, nbuckets::Int, buckets::Dinfo, columns::Vector{Int}; iters=20)

A version of dmedian that works with the bucketing information (i.e. clusters) from nbuckets and buckets.

source
DistributedData.dscaleMethod
dscale(dInfo::Dinfo, columns::Vector{Int})

Scale the columns in the dataset to have mean 0 and sdev 1.

Prevents creation of NaNs by avoiding division by zero sdevs.

source
DistributedData.dselectFunction
function dselect(dInfo::Dinfo,
    currentColnames::Vector{String}, selectColnames::Vector{String};
    tgt=dInfo.val)::Dinfo

Convenience overload of dselect that works with column names.

source
DistributedData.dselectFunction
dselect(dInfo::Dinfo, columns::Vector{Int}; tgt=dInfo.val)

Reduce dataset to selected columns, optionally save it under a different name.

source
DistributedData.dstatMethod
dstat(dInfo::Dinfo, columns::Vector{Int})::Tuple{Vector{Float64}, Vector{Float64}}

Compute mean and standard deviation of the columns in dataset. Returns a tuple with a vector of means in columns, and a vector of corresponding sdevs.

source
DistributedData.dstat_bucketsMethod
dstat_buckets(dInfo::Dinfo, nbuckets::Int, buckets::Dinfo, columns::Vector{Int})::Tuple{Matrix{Float64}, Matrix{Float64}}

A version of dstat that works with bucketing information (e.g. clusters); returns a tuple of matrices.

source
DistributedData.mapbucketsMethod
mapbuckets(fn, a::Array, nbuckets::Int, buckets::Vector{Int}; bucketdim::Int=1, slicedims=bucketdim)

Apply the function fn over array a so that it processes the data by buckets defined by buckets (that contains integers in range 1:nbuckets).

The buckets are sliced out in dimension specified by bucketdim.

source
DistributedData.reduce_extremaMethod
reduce_extrema(ex1, ex2)

Helper for gathering the minima and maxima of the data. ex1, ex2 are arrays of pairs (min,max), this function combines the arrays element-wise and finds combined minima and maxima.

source
DistributedData.update_extremaMethod
update_extrema(counts, target, lim, mid)

Helper for distributed median computation – returns updated extrema in lims depending on whether the item count in counts of values less than mids is less or higher than targets.

source

Input/Output

DistributedData.dloadFunction
dload(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))

Overloaded functionality for Dinfo.

source
DistributedData.dloadFunction
dload(sym::Symbol, pids, files=defaultFiles(sym,pids))

Import the content of symbol sym by each worker specified by pids from the corresponding filename in files.

source
DistributedData.dstoreFunction
dstore(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))

Overloaded functionality for Dinfo.

source
DistributedData.dstoreFunction
dstore(sym::Symbol, pids, files=defaultFiles(sym,pids))

Export the content of symbol sym by each worker specified by pids to a corresponding filename in files.

source
DistributedData.dunlinkFunction
dunlink(sym::Symbol, pids, files=defaultFiles(sym,pids))

Remove the files created by dstore with the same parameters.

source