Class State get loss between function calls in Flink
Class State get loss between function calls in Flink
I have this class:
case class IDADiscretizer(
nAttrs: Int,
nBins: Int = 5,
s: Int = 5) extends Serializable {
private[this] val log = LoggerFactory.getLogger(this.getClass)
private[this] val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))
private[this] val randomReservoir = SamplingUtils.reservoirSample((1 to s).toList.iterator, 1)
def updateSamples(v: LabeledVector): Vector[IntervalHeapWrapper] = {
val attrs = v.vector.map(_._2)
val label = v.label
// TODO: Check for missing values
attrs
.zipWithIndex
.foreach {
case (attr, i) =>
if (V(i).getNbSamples < s) {
V(i) insertValue attr // insert
} else {
if (randomReservoir(0) <= s / (i + 1)) {
//val randVal = Random nextInt s
//V(i) replace (randVal, attr)
V(i) insertValue attr
}
}
}
V
}
/**
* Return the cutpoints for the discretization
*
*/
def cutPoints: Vector[Vector[Double]] = V map (_.getBoundaries.toVector)
def discretize(data: DataSet[LabeledVector]): (DataSet[Vector[IntervalHeapWrapper]], Vector[Vector[Double]]) = {
val r = data map (x => updateSamples(x))
val c = cutPoints
(r, c)
}
}
Using flink, I would like to get the cutpoints after the call of discretize, but it seems the information stored in V get loss. Do I have to use Broadcast like in this question? is there a better way to access the state of class?
discretize
V
Broadcast
I've tried to call cutpoints in two ways, one with is:
cutpoints
def discretize(data: DataSet[LabeledVector]) = data map (x => updateSamples(x))
Then, called from outside:
val a = IDADiscretizer(nAttrs = 4)
val r = a.discretize(dataSet)
r.print
val cuts = a.cutPoints
Here, cuts is empty so I tried to compute the discretization as well as the cutpoints inside discretize:
discretize
def discretize(data: DataSet[LabeledVector]) = {
val r = data map (x => updateSamples(x))
val c = cutPoints
(r, c)
}
And use it like this:
val a = IDADiscretizer(nAttrs = 4)
val (d, c) = a.discretize(dataSet)
c foreach println
But the same happends.
Finally, I've also tried to make V completely public:
V
val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))
Still empty
What am I doing wrong?
Related questions:
Thanks to @TillRohrmann what I finally did was:
private[this] def computeCutPoints(x: LabeledVector) = {
val attrs = x.vector.map(_._2)
val label = x.label
attrs
.zipWithIndex
.foldLeft(V) {
case (iv, (v, i)) =>
iv(i) insertValue v
iv
}
}
/**
* Return the cutpoints for the discretization
*
*/
def cutPoints(data: DataSet[LabeledVector]): Seq[Seq[Double]] =
data.map(computeCutPoints _)
.collect
.last.map(_.getBoundaries.toVector)
def discretize(data: DataSet[LabeledVector]): DataSet[LabeledVector] =
data.map(updateSamples _)
And then use it like this:
val a = IDADiscretizer(nAttrs = 4)
val d = a.discretize(dataSet)
val cuts = a.cutPoints(dataSet)
d.print
cuts foreach println
I do not know if it is the best way, but at least is working now.
1 Answer
1
The way Flink works is that the user defines operators/user defined functions which operate on input data coming from a source function. In order to execute a program the user code is sent to the Flink cluster where it is executed. The results of the computation has to be output to some storage system via a sink function.
Due to this, it is not possible to mix local and distributed computations easily as you are trying with your solution. What discretize does is to define a map operator which transforms the input DataSet data. This operation will be executed once you call ExecutionEnvironment#execute or DataSet#print, for example. Now the user code and the definition for IDADiscretizer is sent to the cluster where they are instantiated. Flink will update the values in an instance of IDADiscretizer which is not the same instance as the one you have on the client.
discretize
map
DataSet
data
ExecutionEnvironment#execute
DataSet#print
IDADiscretizer
IDADiscretizer
The thing you should do is to do the
cutPoints calculation on the cluster and write the result to a sink or retrieve it via collect. If you have all the information for V available after running discretize then you should directly call cutPoints in the same function. If you need to aggregate the results of discretize then you have to add a grouping operation and then apply cutPoints in the grouping function.– Till Rohrmann
Jul 3 at 9:55
cutPoints
collect
V
discretize
cutPoints
discretize
cutPoints
Thank you very much, I'll try it.
– elbaulp
Jul 3 at 10:08
By If you have all the information for V available after running discretize then you should directly call cutPoints in the same function you mean this?
val d = data.map(updateSamples(_)); val c = cutPoints This way I still get an empty result.– elbaulp
Jul 3 at 10:36
val d = data.map(updateSamples(_)); val c = cutPoints
It should happen within the map function on the
DataSet and then be then result in a DataSet<CutPointsResult>.– Till Rohrmann
Jul 3 at 10:51
DataSet
DataSet<CutPointsResult>
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
Thanks for your reply, what is the best approach then? Should I use broadcasts? or try to chain the transformations?
– elbaulp
Jul 3 at 8:22