Functions
Data structures
DistributedData.Dinfo
— TypeDinfo
The basic structure for working with loaded data, distributed amongst workers. In completeness, it represents a dataset as such:
val
is the "value name" under which the data are saved in processes. E.g.val=:foo
means that there is a variablefoo
on each process holding a part of the matrix.workers
is a list of workers (in correct order!) that hold the data (similar toDArray.pids
)
Base functions
DistributedData.dexec
— Methoddexec(val, fn, workers)
Execute a function on workers, taking val
as a parameter. Results are not collected. This is optimal for various side-effect-causing computations that are not easily expressible with dtransform
.
DistributedData.dexec
— Methoddexec(dInfo::Dinfo, fn)
Variant of dexec
that works with Dinfo
.
DistributedData.dmap
— Methoddmap(arr::Vector, fn, workers)
Call a function fn
on workers
, with a single parameter arriving from the corresponding position in arr
.
DistributedData.dmapreduce
— Methoddmapreduce(val, map, fold, workers; prefetch = :all)
A distributed work-alike of the standard mapreduce
: Take a function map
(a non-modifying transform on the data) and fold
(2-to-1 reduction of the results of map
), systematically run them on the data described by val
distributed on workers
, and return the final reduced result.
It is assumed that the fold operation is associative, but not commutative (as in semigroups). If there are no workers, operation returns nothing
(we don't have a monoid to magically conjure zero elements :[ ).
In the current version, the reduce step is a sequential left fold, executed in the main process. Parameter prefetch
says how many futures should be fetch
ed in advance; increasing prefetch improves the throughput but increases memory usage in case the results of map
are big.
Example
# compute the mean of all distributed data
sum,len = dmapreduce(:myData,
(d) -> (sum(d),length(d)),
((s1, l1), (s2, l2)) -> (s1+s2, l1+l2),
workers())
println(sum/len)
Processing multiple arguments (a.k.a. "zipWith")
The val
here does not necessarily need to refer to a symbol, you can easily pass in a quoted tuple, which will be unquoted in the function parameter. For example, distributed values :a
and :b
can be joined as such:
dmapreduce(:((a,b)),
((a,b)::Tuple) -> [a b],
vcat,
workers())
DistributedData.dmapreduce
— Methoddmapreduce(dInfo1::Dinfo, dInfo2::Dinfo, map, fold)
Variant of dmapreduce
that works with more Dinfo
s at once. The data must be distributed on the same set of workers, in the same order.
DistributedData.dmapreduce
— Methoddmapreduce(vals::Vector, map, fold, workers)
Variant of dmapreduce
that works with more distributed variables at once.
DistributedData.dmapreduce
— Methoddmapreduce(dInfo::Dinfo, map, fold)
Distributed map/reduce (just as the other overload of dmapreduce
) that works with Dinfo
.
DistributedData.dpmap
— Methoddpmap(fn, args...; mod = Main, kwargs...)
"Distributed pool map."
A wrapper for pmap
from Distributed
package that executes the code in the correct module, so that it can access the distributed variables at remote workers. All arguments other than the first function fn
are passed to pmap
.
The function fn
should return an expression that is going to get evaluated.
Example
using Distributed
dpmap(x -> :(computeSomething(someData, $x)), WorkerPool(workers), Vector(1:10))
di = distributeSomeData()
dpmap(x -> :(computeSomething($(di.val), $x)), CachingPool(di.workers), Vector(1:10))
DistributedData.dtransform
— Functiondtransform(dInfo::Dinfo, fn, tgt::Symbol=dInfo.val)::Dinfo
Same as dtransform
, but specialized for Dinfo
.
DistributedData.dtransform
— Functiondtransform(val, fn, workers, tgt::Symbol=val)
Transform the worker-local distributed data available as val
on workers
in-place, by a function fn
. Store the result as tgt
(default val
)
Example
# multiply all saved data by 2
dtransform(:myData, (d)->(2*d), workers())
DistributedData.gather_array
— Functiongather_array(dInfo::Dinfo, dim=1; free=false)
Distributed gather_array (just as the other overload) that works with Dinfo
.
DistributedData.gather_array
— Functiongather_array(val::Symbol, workers, dim=1; free=false)
Collect the arrays distributed on workers
under value val
into an array. The individual arrays are pasted in the dimension specified by dim
, i.e. dim=1
is roughly equivalent to using vcat
, and dim=2
to hcat
.
val
must be an Array-based type; the function will otherwise fail.
If free
is true, the val
is unscatter
ed after being gathered.
This preallocates the array for results, and is thus more efficient than e.g. using dmapreduce
with vcat
for folding.
DistributedData.get_from
— Methodget_from(worker,val)
Get a value val
from a remote worker
; quoting of val
works just as with save_at
. Returns a future with the requested value.
DistributedData.get_val_from
— Methodget_val_from(worker,val)
Shortcut for instantly fetching the future from get_from
.
DistributedData.remove_from
— Methodremove_from(worker,sym)
Sets symbol sym
on worker
to nothing
, effectively freeing the data.
DistributedData.save_at
— Methodsave_at(worker, sym, val)
Saves value val
to symbol sym
at worker
. sym
should be quoted (or contain a symbol). val
gets unquoted in the processing and evaluated at the worker, quote it if you want to pass exact command to the worker.
This is loosely based on the package ParallelDataTransfers, but made slightly more flexible by omitting/delaying the explicit fetches etc. In particular, save_at
is roughly the same as ParallelDataTransfers.sendto
, and get_val_from
works very much like ParallelDataTransfers.getfrom
.
Return value
A future with Nothing that can be fetched to see that the operation has finished.
Examples
addprocs(1)
save_at(2,:x,123) # saves 123
save_at(2,:x,myid()) # saves 1
save_at(2,:x,:(myid())) # saves 2
save_at(2,:x,:(:x)) # saves the symbol :x
# (just :x won't work because of unquoting)
Note: Symbol scope
The symbols are saved in Main module on the corresponding worker. For example, save_at(1, :x, nothing)
will erase your local x
variable. Beware of name collisions.
DistributedData.scatter_array
— Methodscatter_array(sym, x::Array, workers; dim=1)::Dinfo
Distribute roughly equal parts of array x
separated on dimension dim
among workers
into a worker-local variable sym
.
Returns the Dinfo
structure for the distributed data.
DistributedData.tmp_symbol
— Methodtmp_symbol(dInfo::Dinfo; prefix="", suffix="_tmp")
Decorate the symbol from dInfo
with prefix and suffix.
DistributedData.tmp_symbol
— Methodtmp_symbol(s::Symbol; prefix="", suffix="_tmp")
Decorate a symbol s
with prefix and suffix, to create a good name for a related temporary value.
DistributedData.unscatter
— Methodunscatter(dInfo::Dinfo)
Remove the loaded data described by dInfo
from the corresponding workers.
DistributedData.unscatter
— Methodunscatter(sym, workers)
Remove the loaded data from workers.
DistributedData.@remote
— Macro@remote module expr
A version of @remote
that adds additional choice of the module for scope.
DistributedData.@remote
— Macro@remote expr
In a function that will get evaluated on a remote worker, this ensures the evaluation scope of the expression expr
(usually a variable) is taken on the remote side, preventing namespace clash with the local session.
This is mainly useful for making the functions from Distributed
package (such as pmap
and remotecall
) work with the data stored by DistributedData
package.
Internally, this is handled by wrapping in eval
.
Example
julia> save_at(2, :x, 321)
Future(2, 1, 162, nothing)
julia> let x=123
remotecall_fetch(() -> x + (@remote x), 2)
end
444
Higher-level array operations
DistributedData.catmapbuckets
— Methodcatmapbuckets(fn, a::Array, nbuckets::Int, buckets::Vector{Int}; bucketdim::Int=1)
Same as mapbuckets
, except concatenates the bucketing results in the bucketing dimension, thus creating a slightly neater matrix. slicedims
is therefore fixed to bucketdim
.
DistributedData.combine_stats
— Methodcombine_stats((s1, sqs1, n1), (s2, sqs2, n2))
Helper for dstat
-style functions that just adds up elements in triplets of vectors.
DistributedData.dapply_cols
— Methoddapply_cols(dInfo::Dinfo, fn, columns::Vector{Int})
Apply a function fn
over columns of a distributed dataset.
fn
gets 2 parameters:
- a data vector for (the whole column saved at one worker)
- index of the column in the
columns
array (i.e. a number from1:length(columns)
)
DistributedData.dapply_rows
— Methoddapply_rows(dInfo::Dinfo, fn)
Apply a function fn
over rows of a distributed dataset.
fn
gets a single vector parameter for each row to transform.
DistributedData.dcopy
— Methoddcopy(dInfo::Dinfo, newName::Symbol)
Clone the dataset and store it under a new distributed name newName
.
DistributedData.dcount
— Methoddcount(ncats::Int, dInfo::Dinfo)::Vector{Int}
Count the numbers of integer vector values stored in dInfo
; assuming the values are in range 1–ncats
.
DistributedData.dcount_buckets
— Methoddcount_buckets(ncats::Int, dInfo::Dinfo, nbuckets::Int, buckets::Dinfo)::Matrix{Int}
Same as dcount
, but counts the items in dInfo
bucketed by buckets
to produce a matrix of counts, with ncats
rows and nbuckets
columns.
Useful with distributeFCSFileVector
to determine cluster distribution within files.
DistributedData.dmedian
— Methoddmedian(dInfo::Dinfo, columns::Vector{Int})
Compute a median in a distributed fashion, avoiding data transfer and memory capacity that is required to compute the median in the classical way by sorting. All data must be finite and defined. If the median is just between 2 values, the lower one is chosen.
The algorithm is approximative, searching for a good median by halving interval and counting how many values are below the threshold. iters
can be increased to improve precision, each value adds roughly 1 bit to the precision. The default value is 20, which corresponds to precision 10e-6 times the data range.
DistributedData.dmedian_buckets
— Methoddmedian_buckets(dInfo::Dinfo, nbuckets::Int, buckets::Dinfo, columns::Vector{Int}; iters=20)
A version of dmedian
that works with the bucketing information (i.e. clusters) from nbuckets
and buckets
.
DistributedData.dscale
— Methoddscale(dInfo::Dinfo, columns::Vector{Int})
Scale the columns in the dataset to have mean 0 and sdev 1.
Prevents creation of NaNs by avoiding division by zero sdevs.
DistributedData.dselect
— Functionfunction dselect(dInfo::Dinfo,
currentColnames::Vector{String}, selectColnames::Vector{String};
tgt=dInfo.val)::Dinfo
Convenience overload of dselect
that works with column names.
DistributedData.dselect
— Functiondselect(dInfo::Dinfo, columns::Vector{Int}; tgt=dInfo.val)
Reduce dataset to selected columns, optionally save it under a different name.
DistributedData.dstat
— Methoddstat(dInfo::Dinfo, columns::Vector{Int})::Tuple{Vector{Float64}, Vector{Float64}}
Compute mean and standard deviation of the columns in dataset. Returns a tuple with a vector of means in columns
, and a vector of corresponding sdevs.
DistributedData.dstat_buckets
— Methoddstat_buckets(dInfo::Dinfo, nbuckets::Int, buckets::Dinfo, columns::Vector{Int})::Tuple{Matrix{Float64}, Matrix{Float64}}
A version of dstat
that works with bucketing information (e.g. clusters); returns a tuple of matrices.
DistributedData.mapbuckets
— Methodmapbuckets(fn, a::Array, nbuckets::Int, buckets::Vector{Int}; bucketdim::Int=1, slicedims=bucketdim)
Apply the function fn
over array a
so that it processes the data by buckets defined by buckets
(that contains integers in range 1:nbuckets
).
The buckets are sliced out in dimension specified by bucketdim
.
DistributedData.reduce_extrema
— Methodreduce_extrema(ex1, ex2)
Helper for gathering the minima and maxima of the data. ex1
, ex2
are arrays of pairs (min,max), this function combines the arrays element-wise and finds combined minima and maxima.
DistributedData.update_extrema
— Methodupdate_extrema(counts, target, lim, mid)
Helper for distributed median computation – returns updated extrema in lims
depending on whether the item count in counts
of values less than mids
is less or higher than targets
.
Input/Output
DistributedData.defaultFiles
— MethoddefaultFiles(s, pids)
Make a good set of filenames for saving a dataset.
DistributedData.dload
— Functiondload(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))
Overloaded functionality for Dinfo
.
DistributedData.dload
— Functiondload(sym::Symbol, pids, files=defaultFiles(sym,pids))
Import the content of symbol sym
by each worker specified by pids
from the corresponding filename in files
.
DistributedData.dstore
— Functiondstore(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))
Overloaded functionality for Dinfo
.
DistributedData.dstore
— Functiondstore(sym::Symbol, pids, files=defaultFiles(sym,pids))
Export the content of symbol sym
by each worker specified by pids
to a corresponding filename in files
.
DistributedData.dunlink
— Functiondunlink(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))
Overloaded functionality for Dinfo
.
DistributedData.dunlink
— Functiondunlink(sym::Symbol, pids, files=defaultFiles(sym,pids))
Remove the files created by dstore
with the same parameters.