Class State get loss between function calls in Flink


Class State get loss between function calls in Flink



I have this class:


case class IDADiscretizer(
nAttrs: Int,
nBins: Int = 5,
s: Int = 5) extends Serializable {

private[this] val log = LoggerFactory.getLogger(this.getClass)
private[this] val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))
private[this] val randomReservoir = SamplingUtils.reservoirSample((1 to s).toList.iterator, 1)

def updateSamples(v: LabeledVector): Vector[IntervalHeapWrapper] = {
val attrs = v.vector.map(_._2)
val label = v.label
// TODO: Check for missing values
attrs
.zipWithIndex
.foreach {
case (attr, i) =>
if (V(i).getNbSamples < s) {
V(i) insertValue attr // insert
} else {
if (randomReservoir(0) <= s / (i + 1)) {
//val randVal = Random nextInt s
//V(i) replace (randVal, attr)
V(i) insertValue attr
}
}
}
V
}

/**
* Return the cutpoints for the discretization
*
*/
def cutPoints: Vector[Vector[Double]] = V map (_.getBoundaries.toVector)

def discretize(data: DataSet[LabeledVector]): (DataSet[Vector[IntervalHeapWrapper]], Vector[Vector[Double]]) = {
val r = data map (x => updateSamples(x))
val c = cutPoints

(r, c)
}
}



Using flink, I would like to get the cutpoints after the call of discretize, but it seems the information stored in V get loss. Do I have to use Broadcast like in this question? is there a better way to access the state of class?


discretize


V


Broadcast



I've tried to call cutpoints in two ways, one with is:


cutpoints


def discretize(data: DataSet[LabeledVector]) = data map (x => updateSamples(x))



Then, called from outside:


val a = IDADiscretizer(nAttrs = 4)
val r = a.discretize(dataSet)
r.print
val cuts = a.cutPoints



Here, cuts is empty so I tried to compute the discretization as well as the cutpoints inside discretize:


discretize


def discretize(data: DataSet[LabeledVector]) = {
val r = data map (x => updateSamples(x))
val c = cutPoints

(r, c)
}



And use it like this:


val a = IDADiscretizer(nAttrs = 4)
val (d, c) = a.discretize(dataSet)
c foreach println



But the same happends.



Finally, I've also tried to make V completely public:


V


val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))



Still empty



What am I doing wrong?



Related questions:



Thanks to @TillRohrmann what I finally did was:


private[this] def computeCutPoints(x: LabeledVector) = {
val attrs = x.vector.map(_._2)
val label = x.label
attrs
.zipWithIndex
.foldLeft(V) {
case (iv, (v, i)) =>
iv(i) insertValue v
iv
}
}

/**
* Return the cutpoints for the discretization
*
*/
def cutPoints(data: DataSet[LabeledVector]): Seq[Seq[Double]] =
data.map(computeCutPoints _)
.collect
.last.map(_.getBoundaries.toVector)

def discretize(data: DataSet[LabeledVector]): DataSet[LabeledVector] =
data.map(updateSamples _)



And then use it like this:


val a = IDADiscretizer(nAttrs = 4)
val d = a.discretize(dataSet)
val cuts = a.cutPoints(dataSet)
d.print
cuts foreach println



I do not know if it is the best way, but at least is working now.




1 Answer
1



The way Flink works is that the user defines operators/user defined functions which operate on input data coming from a source function. In order to execute a program the user code is sent to the Flink cluster where it is executed. The results of the computation has to be output to some storage system via a sink function.



Due to this, it is not possible to mix local and distributed computations easily as you are trying with your solution. What discretize does is to define a map operator which transforms the input DataSet data. This operation will be executed once you call ExecutionEnvironment#execute or DataSet#print, for example. Now the user code and the definition for IDADiscretizer is sent to the cluster where they are instantiated. Flink will update the values in an instance of IDADiscretizer which is not the same instance as the one you have on the client.


discretize


map


DataSet


data


ExecutionEnvironment#execute


DataSet#print


IDADiscretizer


IDADiscretizer





Thanks for your reply, what is the best approach then? Should I use broadcasts? or try to chain the transformations?
– elbaulp
Jul 3 at 8:22






The thing you should do is to do the cutPoints calculation on the cluster and write the result to a sink or retrieve it via collect. If you have all the information for V available after running discretize then you should directly call cutPoints in the same function. If you need to aggregate the results of discretize then you have to add a grouping operation and then apply cutPoints in the grouping function.
– Till Rohrmann
Jul 3 at 9:55


cutPoints


collect


V


discretize


cutPoints


discretize


cutPoints





Thank you very much, I'll try it.
– elbaulp
Jul 3 at 10:08





By If you have all the information for V available after running discretize then you should directly call cutPoints in the same function you mean this? val d = data.map(updateSamples(_)); val c = cutPoints This way I still get an empty result.
– elbaulp
Jul 3 at 10:36


val d = data.map(updateSamples(_)); val c = cutPoints





It should happen within the map function on the DataSet and then be then result in a DataSet<CutPointsResult>.
– Till Rohrmann
Jul 3 at 10:51


DataSet


DataSet<CutPointsResult>






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

JMeter fails on beanshell imports

Why in node-red my HTTP POST no receive payload from inject?

PHP contact form sending but not receiving emails