spark-mapwithstate Why does the number of storage spaces remain 20?


spark-mapwithstate Why does the number of storage spaces remain 20?



I am using spark mapwithstate, but the storage space continues to grow.



enter image description here



Question 1.



MapPartitionsRDD Size in Memory 9GB x 20 Can you reduce this size?



Question 2.


And in InternalMapWithStateDStream, storagelevel is fixed to MEMORY_ONLY.



I want to change to persist (StorageLevel.MEMORY_ONLY_SER) because of its size. Is it possible?



Question 3.


Private [streaming] object InternalMapWithStateDStream {
Private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}



I want to reduce this checkpoint value. Is it possible?





Hi , did you find the solution to this? i am facing the same problem
– Amanpreet Khurana
Mar 27 at 19:16




2 Answers
2



MapPartitionsRDD Size in Memory 9GB x 20 Can you reduce this size?



It's 9GB x 20, but how is it distributed across your cluster? You need to click the RDDs name to see the distributed state. If you want to reduce the in memory size, you'll need to think of an efficient way to represent your data.



I want to change to persist (StorageLevel.MEMORY_ONLY_SER) because of
its size. Is it possible?



No, you can't override the in-memory settings of the MapWithStateDStream.


MapWithStateDStream



I want to reduce this checkpoint value. Is it possible?



Yes, you can set the DStreams checkpoint interval:


DStream


dStream.mapWithState(spec).checkpoint(Seconds(4))





Thank you for answer. 1. The cluster is well distributed. But you only need the last state storage, why do you have the remaining 19 storage? dStream.mapWithState(spec).checkpoint(Seconds(4)) not working Runs by default 10 interva val kafkastateStream = chnlStream.mapWithState(stateSpec) kafkastateStream.checkpoint(Seconds(10 * 5)) val kafkaSnapshotStream = kafkastateStream.stateSnapshots()
– Hyunkeun Lee
Nov 27 '16 at 11:33



val kafkastateStream = chnlStream.mapWithState(stateSpec) kafkastateStream.checkpoint(Seconds(10 * 5)) val kafkaSnapshotStream = kafkastateStream.stateSnapshots()





@HyunkeunLee But you only need the last state storage, why do you have the remaining 19 storage? That entirely depends on how you store the state. Does the incoming state have different keys? Do you store new state on each iteration? Show us the code. Regarding the checkpoint, it should run every `batch time default interval if it's not using your custom interval.
– Yuval Itzchakov
Nov 27 '16 at 13:18





stackoverflow.com/users/4065056/hyunkeun-lee : Hi , did you find the solution ? I am facing the same issue
– Amanpreet Khurana
Mar 27 at 18:51



MapWithState also caches mappedValues, so you can also reduce the batch interval to reduce the size of cached RDDs.


MapWithState


mappedValues






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

api-platform.com Unable to generate an IRI for the item of type

How to set up datasource with Spring for HikariCP?

Display dokan vendor name on Woocommerce single product pages