Update Postgres Using Flink


Update Postgres Using Flink



My dataset named dbData contains set of data. I want to update postgres with that data regularly where the dbData changes regularly on everyday purpose.


dbData.map(new Write())
.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(Utils.properties_fetch("drivername"))
.setDBUrl(Utils.properties_fetch("dbURL"))
.setUsername(Utils.properties_fetch("username"))
.setPassword(Utils.properties_fetch("password"))
.setQuery( Write.updatequery)
.finish());



My "Write" class looks like the following:


public class Write implements MapFunction<Tuple7<String, String, String, String, String, String, String>, Row>
{
static String updatequery ;
private static final long serialVersionUID = 1L;
public Row map( Tuple7<String, String, String, String, String, String, String> value)throws Exception
{
Row obj = new Row(7);
obj.setField(0, value.f0);
obj.setField(1, value.f1);
obj.setField(2, value.f2);
obj.setField(3, value.f3);
obj.setField(4, value.f4);
obj.setField(5, value.f5);
obj.setField(6, value.f6);
Write.updatequery=putdatainDb(obj);
return obj;
}
public String putdatainDb(Row obj)
{


String updateQuery="UPDATE dashboard SET metric_result
='"+obj.getField(2)+"' ,metric_executed_on ='"+obj.getField(5)+"' ::date
where metric_orgid ='"+obj.getField(6)+"' and date(metric_from)
='"+obj.getField(3)+"'::date and date(metric_to) ='"+obj.getField(4)+"' ::date and metric_topic ='"+obj.getField(1)+"' ;";
return updateQuery;


}
}



In set query I want the query to change every time with the new row so that I can update my database regularly. Suggest some ways to achieve this.





May I know why negative is given to this question. I was unable to find the proper solution to this question so I asked as this was one of my important requirement.
– Vijaya Seetharaman
Jul 2 at 3:29






Any help regarding this?
– Vijaya Seetharaman
Jul 2 at 8:51





People easily give down vote but wont explain why they have done it. I am sorry to say this but this is what happening. If there is a related question then send me the link. I have went across many answers and I dint find them suiting my doubt so I asked.
– Vijaya Seetharaman
Jul 2 at 11:16





What you're doing seems fine, what's the actual problem? Also, please note you're not using Flink's latest stable version.
– gcandal
Jul 2 at 16:30






In dbData you see there is .setQuery there the query is not coming properly and coming as null which i am fetching from the "Write" class. The issue I think is the query is being formed first and then the "obj" object is being created. I am using flink-1.4 version.
– Vijaya Seetharaman
Jul 3 at 4:41










By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

api-platform.com Unable to generate an IRI for the item of type

How to set up datasource with Spring for HikariCP?

Display dokan vendor name on Woocommerce single product pages