Tutorial 3: Distributed data processing and statistics
If you can get the data on a single machine, computation of various statistics can be performed using standard Julia functions. With large datasets that do not fit on a computer, things get more complicated. Luckily, many statistics and algorithms possess parallel, map-reduce-style implementations that can be used to address this problem.
For example, the dstat
function computes sample statistics without fetching all data to a single node. You can use it in a way similar to aforementioned dselect
and dtransform_asinh
. The following code extracts means and standard deviations from the first 3 columns of a dataset distributed as di
:
using DistributedData
dstat(di, [1,2,3])
Manual work with the DistributedData.jl package
We will first show how to use the general framework to compute per-cluster statistics. DistributedData.jl exports the dmapreduce
function that can be used as a very effective basic building block for running such computations. For example, you can efficiently compute a distributed mean of all your data as such:
dmapreduce(di, sum, +) / dmapreduce(di, length, +)
The parameters of dmapreduce
are, in order:
di
, the datasetsum
orlength
, an unary "map" function – during the computation, each piece of distributed data is first paralelly processed by this function+
, a binary "reduction" or "folding" function – the pieces of information processed by the map function are successively joined in pairs using this function, until there is only a single result left. This final result is also whatdmapreduce
returns.
Above example thus reads: Sum all data on all workers, add up the intermediate results, and divide the final number to the sum of all lengths of data on the workers.
Column-wise mean (as produced by dstat
) is slightly more useful; we only need to split the computation on columns:
dmapreduce(di, d -> mapslices(sum, d, dims=1), +) ./ dmapreduce(di, x->size(x,1), +)
Finally, for distributed computation of per-cluster mean, the clustering information needs to be distributed as well (Fortunately, that is easy, because the distributed mapToGigaSOM
does exactly that).
First, compute the clustering:
mapping = mapToGigaSOM(som, di)
dtransform(mapping, m -> metaClusters[m])
Now, the distributed computation is run on 2 scattered datasets. We employ a helper function mapbuckets
which provides bucket-wise execution of a function, in a way very similar to mapslices
. (In the example, we actually use catmapbuckets
that concatenates the result into a nice array.) The following code produces a matrix of tuples (sum, count)
, for separate clusters (in rows) and data columns (in columns):
sumscounts = dmapreduce([di, mapping],
(d, mapping) -> catmapbuckets(
(_,clData) -> (sum(clData), length(clData)),
d, 10, mapping),
(a,b) -> (((as,al),(bs,bl)) -> ((as+bs), (al+bl))).(a,b))
10×4 Array{Tuple{Float64,Int64},2}:
(5949.71, 1228) (-21.9789, 1228) (12231.3, 1228) (12303.1, 1228)
(6379.98, 1246) (12464.3, 1246) (12427.9, 1246) (12479.8, 1246)
(6513.41, 1294) (12968.8, 1294) (12960.7, 1294) (-28.1922, 1294)
(6312.37, 1236) (-26.7392, 1236) (6.74384, 1236) (12401.7, 1236)
(6395.73, 1285) (12867.7, 1285) (-52.653, 1285) (-26.9795, 1285)
(6229.72, 622) (10.7578, 622) (6200.1, 622) (0.882128, 622)
(6141.97, 612) (6078.56, 612) (45.9878, 612) (6079.3, 612)
(51.3709, 616) (23.4306, 616) (6117.53, 616) (1.15342, 616)
(6177.16, 1207) (-50.4624, 1207) (48.8023, 1207) (-5.549, 1207)
(8.56597, 654) (6536.1, 654) (-29.2208, 654) (6539.94, 654)
With a bit of Julia, this can be aggregated to actual per-cluster means:
clusterMeans = [ sum/count for (sum,count) in sumcounts ]
10×4 Array{Float64,2}:
4.84504 -0.0178982 9.96031 10.0188
5.12037 10.0034 9.97428 10.0159
5.03354 10.0223 10.016 -0.0217869
5.10709 -0.0216336 0.00545618 10.0337
4.97722 10.0138 -0.0409751 -0.0209958
10.0156 0.0172955 9.968 0.00141821
10.0359 9.93229 0.0751434 9.9335
0.0833944 0.0380366 9.93105 0.00187243
5.11778 -0.0418081 0.0404327 -0.00459735
0.0130978 9.99403 -0.0446802 9.99991
Since we used the data from the hypercube dataset from the beginning of the tutorial, you should be able to recognize several clusters that perfectly match the hypercube vertices (although not all, because k=10
is not enough to capture all of the actual 16 existing clusters)
Finally, we can remove the temporary data from workers to create free memory for other analyses:
unscatter(mapping)
Convenience statistical functions
Notably, several of the most used statistical functions are available in DistributedData.jl in a form that can cope with distributed data.
For example, you can run a distributed median computation as such:
dmedian(di, [1,2,3,4])
In the hypercube dataset, the medians are slightly off-center because there is a lot of empty space between the clusters:
3-element Array{Float64,1}:
6.947097488861494
7.934405685940568
7.069149844215707
2.558892109203585
dstat
function has a bucketed variant that can split the statistics among different clusters. This computes the per-cluster standard deviations of the dataset:
dstat_buckets(di, 10, mapping, [1,2,3,4])[2]
In the result, we can count 4 "nice" clusters, and 6 clusters that span 2 of the original clusters, totally giving 16. (Hypercube validation succeeded!)
10×4 Array{Float64,2}:
5.09089 0.997824 1.01815 0.980758
5.13971 1.02019 0.977637 1.00124
5.13209 0.974332 1.00058 0.99874
5.11529 0.998166 1.01825 1.01885
5.10542 1.01686 0.975993 0.991992
0.991075 0.993312 1.00667 1.05048
0.996443 1.02699 0.938742 0.98831
0.946917 0.989543 1.0056 0.999609
5.09963 1.00131 0.978803 0.984435
1.00892 0.998226 1.05538 0.994829
A similar bucketed version is available for computation of medians:
dmedian_buckets(di, 10, mapping, [1,2,3,4])
Note that the cluster medians are similar to means, except for the cases when the cluster is formed by 2 actual data aggregations (e.g. on the second row), where medians dodge the empty space in the middle of the data:
10×4 Array{Float64,2}:
1.97831 -0.0120118 9.98967 10.0161
7.99438 10.0263 9.9988 10.0033
3.27907 9.98728 10.0254 0.00444198
7.91739 -0.0623953 -0.0240277 10.0374
2.445 10.0101 -0.0471141 -0.0253346
10.0121 0.00935064 9.94992 0.0459787
10.0512 9.93359 0.0923141 9.91175
0.0675462 -0.0142712 9.93406 0.0343599
8.09972 -0.0217352 0.0575258 -0.010485
-0.0183372 10.0392 -0.115253 10.0101