Flink连接MySQL报错,求解决方法

代码项目助手 毕业设计 1

使用flink测试连接flink时报错:处理流运算符时出错, 不知道是不是程序写错了还是mysql有问题

package flink_scala.apitest.sinktest

import java.sql.{Connection, DriverManager, PreparedStatement}

import flink_scala.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

object JdbcSinkTest {
  def main(args: Array[String]): Unit = {
    // 定义流式处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 0.读取数据
    val inputStream = env.readTextFile("C:\\Users\\86182\\IdeaProjects\\flink_scala\\src\\main\\resources\\sensor.txt")


    // 1.先转换成样例类类型(简单转换操作)
    val dataStream = inputStream
      .map( data =>{
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      } )

    dataStream.addSink( new MyJdbcSinkFunc() )

    env.execute("Jdbc sink test")

  }
}

class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{
  // 定义连接、预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://192.168.10.24:3306/test?useSSL=false", "root", "Password123$")
    insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    // 先执行更新操作,查到就更新
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    // 如果更新没有查到数据,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

回复

共2条回复 我来回复
  • 毕设货栈
    这个人很懒,什么都没有留下~
    评论

    09:48:47.793 [Split Reader: Custom File Source -> Map -> Sink: Unnamed (8/16)] ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. java.lang.NullPointerException 这是报错信息

    0条评论
  • 代码客栈
    这个人很懒,什么都没有留下~
    评论

    这是空指针异常,发生在读取数据那里,仔细检查一下数据是否有问题,估计是数据格式出现了问题或者路径输出了问题导致没有读取到自己想要的数据,无法被获取到,所以报了空异常

    0条评论

发表回复

登录后才能评论