add db sync

master
hwf453 6 months ago
parent 50dfb7ad27
commit 7a534200a5

@ -162,9 +162,26 @@
<artifactId>util</artifactId>
<version>2022.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.9</version>
</dependency>
</dependencies>
<!--在项目中使用pom.xml进行下载依赖配置的话可以单独使用
注意项目中使用的maven如果已经在settings.xml中配置过后就无需在配置此项-->
<repositories>
<repository>
<id>repository</id>
<url>http://47.242.184.139:8081/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<!-- <finalName>${project.artifactId}</finalName>-->
<plugins>

@ -0,0 +1,180 @@
package com.rehome.disruptor_nmc.controller;
import com.google.gson.Gson;
import com.liuhuiyu.spring_util.SpringUtil;
import com.rehome.disruptor_nmc.datasource.DataSource;
import com.rehome.disruptor_nmc.dto.ResponseDto;
import com.rehome.disruptor_nmc.dto.ResponseNmcNowWeatherDto;
import com.rehome.disruptor_nmc.dto.NmcNowWeatherDto;
import com.rehome.disruptor_nmc.entity.NmcNowWeather;
import com.rehome.disruptor_nmc.entity.Temperature;
import com.rehome.disruptor_nmc.service.NmcCityService;
import com.rehome.disruptor_nmc.service.NmcWeatherService;
import com.rehome.disruptor_nmc.service.TemperatureService;
import com.rehome.disruptor_nmc.utils.JdbcUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
//NmcNowWeatherDto
//ResponseNmcNowWeatherDto
/**
*
* 使MyBatisJPA便
* jdbcMyBatisJPA
* <p>
* fastjson,druid,mysqlpom.xml
*
* <dependency>
* <groupId>com.alibaba</groupId>
* <artifactId>fastjson</artifactId>
* <version>1.2.62</version>
* </dependency>
* <dependency>
* <groupId>com.alibaba</groupId>
* <artifactId>druid</artifactId>
* <version>1.1.9</version>
* </dependency>
* <!-- Mysql -->
* <dependency>
* <groupId>mysql</groupId>
* <artifactId>mysql-connector-java</artifactId>
* </dependency>
*/
/**
*
* https://download.csdn.net/download/lxyoucan/85094574
* <p>
*
* https://github.com/freakchick/DBApi
* <p>
* SpringBootjdbc
* https://blog.csdn.net/lxyoucan/article/details/124042295
*/
@Slf4j
@RestController
public class JdbcDemoController {
@Resource
private NmcWeatherService nmcWeatherService;
public static DataSource ds = new DataSource();
static {
//配置数据源
ds.setId("1");
ds.setName("mysql");
ds.setUrl("jdbc:mysql://localhost:3306/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true");
ds.setUsername("root");
ds.setPassword("Skyinno251,");
ds.setDriver("com.mysql.cj.jdbc.Driver");
// ds.setId("2");
// ds.setName("oracle");
// ds.setUrl("jdbc:oracle:thin:@192.168.1.9:1521/orcl");
// ds.setUrl("jdbc:oracle:thin:@192.168.3.7:1521/orcl");
// ds.setUsername("appserver");
// ds.setPassword("appserver");
// ds.setDriver("oracle.jdbc.driver.OracleDriver");
}
/**
*
*
* @return
*/
@RequestMapping("/api/list")
public ResponseDto queryList() {
// 自定义一个线程池,内部包含8个线程
ExecutorService customPool = Executors.newFixedThreadPool(8);
// 自定义一个线程池,内部包含10个线程
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
String sql = "select * from nmc_now_weather where id = ?";
List<Object> jdbcParamValues = new ArrayList<>();
for (int i = 8070824; i < 10639564; i++) {
jdbcParamValues.add(i + 1);
Gson gson = new Gson();
ResponseDto responseDto = JdbcUtil.executeSql(ds, sql, jdbcParamValues);
String dbQueryResult = gson.toJson(responseDto);
log.info(dbQueryResult);
jdbcParamValues.clear();
ResponseNmcNowWeatherDto responseNmcNowWeatherDto = gson.fromJson(dbQueryResult, ResponseNmcNowWeatherDto.class);
if (responseNmcNowWeatherDto.isSuccess() && responseNmcNowWeatherDto.getData() != null && responseNmcNowWeatherDto.getData().size() > 0) {
log.info(gson.toJson(responseNmcNowWeatherDto.getData().get(0)));
NmcNowWeatherDto dto = responseNmcNowWeatherDto.getData().get(0);
NmcNowWeather nmcNowWeather = new NmcNowWeather();
nmcNowWeather.setCreateDate(dto.getCreateDate());
nmcNowWeather.setLastUpdateDate(dto.getLastUpdateDate());
nmcNowWeather.setWeather(dto.getWeather());
nmcNowWeather.setWeatherDate(dto.getWeatherDate());
nmcNowWeather.setCode(dto.getCode());
nmcWeatherService.saveNowWeather(nmcNowWeather);
}
}
return "数据库同步成功";
}, customPool);
future.thenApply(result -> {
System.out.println("Result: " + result);
return result;
});
return ResponseDto.successWithMsg("数据库正在同步...");
}
public void completableFutureExample() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重置中断状态
}
return 123;
});
// 非阻塞等待结果但不返回结果如果要处理结果可以使用thenApply等
future.thenAccept(result -> System.out.println("Result: " + result)).join();
}
/**
*
*
* @return
*/
//@RequestMapping("/api/getResult")
public String getResult() {
// 自定义一个线程池,内部包含4个线程
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
System.out.println("异步处理完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重置中断状态
}
return "数据库同步成功";
}, executorService);
//注意如果需要异步返回结果再做后续操作需要加入join()方法等待异步计算结果后回调,不然异步没有处理完直接主线程结束
future.thenApply(result -> {
System.out.println("Result: " + result);
return result;
}).join();
System.out.println("数据库正在同步...");
return "数据库正在同步...";
}
}

@ -0,0 +1,17 @@
package com.rehome.disruptor_nmc.datasource;
import lombok.Data;
/**
*
*
*/
@Data
public class DataSource {
String id;
String name;
String url;
String username;
String password;
String driver;
}

@ -0,0 +1,24 @@
package com.rehome.disruptor_nmc.dto;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import java.util.Date;
@Data
public class NmcNowWeatherDto {
private Long id;
private String weather;
@SerializedName("weather_date")
private String weatherDate;
private String code;
@SerializedName("create_date")
private Date createDate;
@SerializedName("last_update_date")
private Date lastUpdateDate;
}

@ -0,0 +1,44 @@
package com.rehome.disruptor_nmc.dto;
import lombok.Data;
/**
*
*/
@Data
public class ResponseDto {
String msg;
Object data;
boolean success;
public static ResponseDto apiSuccess(Object data) {
ResponseDto dto = new ResponseDto();
dto.setData(data);
dto.setSuccess(true);
dto.setMsg("接口访问成功");
return dto;
}
public static ResponseDto successWithMsg(String msg) {
ResponseDto dto = new ResponseDto();
dto.setData(null);
dto.setSuccess(true);
dto.setMsg(msg);
return dto;
}
public static ResponseDto successWithData(Object data) {
ResponseDto dto = new ResponseDto();
dto.setData(data);
dto.setSuccess(true);
return dto;
}
public static ResponseDto fail(String msg) {
ResponseDto dto = new ResponseDto();
dto.setSuccess(false);
dto.setMsg(msg);
return dto;
}
}

@ -0,0 +1,16 @@
package com.rehome.disruptor_nmc.dto;
import com.rehome.disruptor_nmc.entity.NmcNowWeather;
import lombok.Data;
import java.util.List;
/**
*
*/
@Data
public class ResponseNmcNowWeatherDto {
String msg;
List<NmcNowWeatherDto> data;
boolean success;
}

@ -38,7 +38,7 @@ public class ScheduledService {
* @description:
* @Param: null
*/
@Scheduled(cron = "0 14 * * * *")
//@Scheduled(cron = "0 14 * * * *")
public void getNmcWeatherProvince() {
System.out.println("scheduledGetWeather");
System.out.println("=====>>>>>使用cron:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

@ -0,0 +1,100 @@
package com.rehome.disruptor_nmc.utils;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rehome.disruptor_nmc.datasource.DataSource;
import com.rehome.disruptor_nmc.dto.ResponseDto;
import lombok.extern.slf4j.Slf4j;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@Slf4j
public class JdbcUtil {
/**
* sql
*
* @param datasource
* @param sql
*/
public static ResponseDto executeSql(DataSource datasource, String sql) {
return executeSql(datasource,sql,new ArrayList<Object>());
}
/**
* sql
*
* @param datasource
* @param sql
* @param jdbcParamValues
*/
public static ResponseDto executeSql(DataSource datasource, String sql, List<Object> jdbcParamValues) {
log.info(sql);
log.info(JSON.toJSONString(jdbcParamValues));
DruidPooledConnection connection = null;
try {
connection = PoolManager.getPooledConnection(datasource);
PreparedStatement statement = connection.prepareStatement(sql);
for (int i = 1; i <= jdbcParamValues.size(); i++) {
statement.setObject(i, jdbcParamValues.get(i - 1));
}
boolean hasResultSet = statement.execute();
if (hasResultSet) {
ResultSet rs = statement.getResultSet();
int columnCount = rs.getMetaData().getColumnCount();
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = rs.getMetaData().getColumnLabel(i);
columns.add(columnName);
}
List<JSONObject> list = new ArrayList<>();
while (rs.next()) {
JSONObject jo = new JSONObject();
columns.stream().forEach(t -> {
try {
if(t.equals("create_date")){
Timestamp timestamp = rs.getTimestamp("create_date");
Date date = new Date(timestamp.getTime());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
jo.put(t, sdf.format(date));
}else if(t.equals("last_update_date")){
Timestamp timestamp = rs.getTimestamp("last_update_date");
Date date = new Date(timestamp.getTime());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
jo.put(t, sdf.format(date));
}else{
Object value = rs.getObject(t);
String key = t;
String keyLow = key.toLowerCase(Locale.ROOT);
jo.put(keyLow, value);
}
} catch (SQLException e) {
e.printStackTrace();
}
});
list.add(jo);
}
return ResponseDto.apiSuccess(list);
} else {
int updateCount = statement.getUpdateCount();
return ResponseDto.apiSuccess("sql修改数据行数" + updateCount);
}
} catch (Exception e) {
e.printStackTrace();
return ResponseDto.fail(e.getMessage());
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}

@ -0,0 +1,81 @@
package com.rehome.disruptor_nmc.utils;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.rehome.disruptor_nmc.datasource.DataSource;
import lombok.extern.slf4j.Slf4j;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
*
*/
@Slf4j
public class PoolManager {
private static Lock lock = new ReentrantLock();
private static Lock deleteLock = new ReentrantLock();
//所有数据源的连接池存在map里
static Map<String, DruidDataSource> map = new HashMap<>();
public static DruidDataSource getJdbcConnectionPool(DataSource ds) {
if (map.containsKey(ds.getId())) {
return map.get(ds.getId());
} else {
lock.lock();
try {
log.info(Thread.currentThread().getName() + "获取锁");
if (!map.containsKey(ds.getId())) {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setName(ds.getName());
druidDataSource.setUrl(ds.getUrl());
druidDataSource.setUsername(ds.getUsername());
druidDataSource.setPassword(ds.getPassword());
druidDataSource.setDriverClassName(ds.getDriver());
druidDataSource.setConnectionErrorRetryAttempts(3); //失败后重连次数
druidDataSource.setBreakAfterAcquireFailure(true);
map.put(ds.getId(), druidDataSource);
log.info("创建Druid连接池成功{}", ds.getName());
}
return map.get(ds.getId());
} catch (Exception e) {
return null;
} finally {
lock.unlock();
}
}
}
//删除数据库连接池
public static void removeJdbcConnectionPool(String id) {
deleteLock.lock();
try {
DruidDataSource druidDataSource = map.get(id);
if (druidDataSource != null) {
druidDataSource.close();
map.remove(id);
}
} catch (Exception e) {
log.error(e.toString());
} finally {
deleteLock.unlock();
}
}
public static DruidPooledConnection getPooledConnection(DataSource ds) throws SQLException {
DruidDataSource pool = PoolManager.getJdbcConnectionPool(ds);
DruidPooledConnection connection = pool.getConnection();
// log.info("获取连接成功");
return connection;
}
}

@ -9,7 +9,8 @@ spring:
# url: jdbc:mysql://127.0.0.1:3306/head_office_data_center?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true&allowMultiQueries=true
# url: jdbc:mysql://localhost:3306/head_office_data_center?useUnicode=true&characterEncoding=utf-8&useSSL=true&nullCatalogMeansCurrent=true&serverTimezone=UTC
# url: jdbc:mysql://192.168.2.18:3306/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true
url: jdbc:mysql://192.168.3.7:3306/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true
url: jdbc:mysql://43.139.89.198:33060/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true
#url: jdbc:mysql://47.242.184.139:33061/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true
#url: jdbc:mysql://127.0.0.1:3306/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true
username: root
password: Skyinno251,
@ -19,9 +20,9 @@ spring:
# password: huangwenfei
hikari:
#最小空闲连接默认值10小于0或大于maximum-pool-size都会重置为maximum-pool-size
minimum-idle: 10
minimum-idle: 2
#最大连接数小于等于0会被重置为默认值10大于零小于1会被重置为minimum-idle的值
maximum-pool-size: 20
maximum-pool-size: 30
#空闲连接超时时间默认值60000010分钟大于等于max-lifetime且max-lifetime>0会被重置为0不等于0且小于10秒会被重置为10秒
idle-timeout: 600000
#连接最大存活时间不等于0且小于30秒会被重置为默认值30分钟.设置应该比mysql设置的超时时间短
@ -32,7 +33,7 @@ spring:
# 配置 DBMS 类型
database: mysql
# 配置是否将执行的 SQL 输出到日志
show-sql: false
show-sql: true
open-in-view: true
hibernate:
ddl-auto: update # 第一次建表create 后面用update要不然每次重启都会新建表

Loading…
Cancel
Save