diff --git a/disruptor_nmc/pom.xml b/disruptor_nmc/pom.xml index 83b3940..db875c4 100644 --- a/disruptor_nmc/pom.xml +++ b/disruptor_nmc/pom.xml @@ -162,9 +162,26 @@ util 2022.1.0 + + com.alibaba + druid + 1.1.9 + - - + + + + repository + http://47.242.184.139:8081/repository/maven-public/ + + true + + + true + + + diff --git a/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/controller/JdbcDemoController.java b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/controller/JdbcDemoController.java new file mode 100644 index 0000000..d2c0e2f --- /dev/null +++ b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/controller/JdbcDemoController.java @@ -0,0 +1,180 @@ +package com.rehome.disruptor_nmc.controller; + + + +import com.google.gson.Gson; +import com.liuhuiyu.spring_util.SpringUtil; +import com.rehome.disruptor_nmc.datasource.DataSource; +import com.rehome.disruptor_nmc.dto.ResponseDto; +import com.rehome.disruptor_nmc.dto.ResponseNmcNowWeatherDto; +import com.rehome.disruptor_nmc.dto.NmcNowWeatherDto; +import com.rehome.disruptor_nmc.entity.NmcNowWeather; +import com.rehome.disruptor_nmc.entity.Temperature; +import com.rehome.disruptor_nmc.service.NmcCityService; +import com.rehome.disruptor_nmc.service.NmcWeatherService; +import com.rehome.disruptor_nmc.service.TemperatureService; +import com.rehome.disruptor_nmc.utils.JdbcUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.*; + +//NmcNowWeatherDto +//ResponseNmcNowWeatherDto + +/** + * 背景 + * 现在的数据层的开发,大多会使用如MyBatis或JPA之类的开发工具。这些开发工具给我们的开发过程中带来了极大的便利。 + * 但是在一些极端的场景下往往原生的jdbc方式操作数据库更灵活,性能更高。由于部分场景下MyBatis或JPA之类无法满足我的需求,所以我打算自己封装一套查数据库的工具类。 + *

+ * 我们会用到fastjson,druid,mysql所以pom.xml增加依赖如下: + * + * + * com.alibaba + * fastjson + * 1.2.62 + * + * + * com.alibaba + * druid + * 1.1.9 + * + * + * + * mysql + * mysql-connector-java + * + */ + +/** + * 源码下载 + * https://download.csdn.net/download/lxyoucan/85094574 + *

+ * 参考 + * https://github.com/freakchick/DBApi + *

+ * SpringBoot中封装jdbc工具类 + * https://blog.csdn.net/lxyoucan/article/details/124042295 + */ + +@Slf4j +@RestController +public class JdbcDemoController { + + @Resource + private NmcWeatherService nmcWeatherService; + + + public static DataSource ds = new DataSource(); + + static { + //配置数据源 + ds.setId("1"); + ds.setName("mysql"); + ds.setUrl("jdbc:mysql://localhost:3306/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true"); + ds.setUsername("root"); + ds.setPassword("Skyinno251,"); + ds.setDriver("com.mysql.cj.jdbc.Driver"); + +// ds.setId("2"); +// ds.setName("oracle"); +// ds.setUrl("jdbc:oracle:thin:@192.168.1.9:1521/orcl"); +// ds.setUrl("jdbc:oracle:thin:@192.168.3.7:1521/orcl"); +// ds.setUsername("appserver"); +// ds.setPassword("appserver"); +// ds.setDriver("oracle.jdbc.driver.OracleDriver"); + } + + /** + * 查询测试 + * + * @return + */ + @RequestMapping("/api/list") + public ResponseDto queryList() { + // 自定义一个线程池,内部包含8个线程 + ExecutorService customPool = Executors.newFixedThreadPool(8); + // 自定义一个线程池,内部包含10个线程 + ExecutorService executorService = Executors.newFixedThreadPool(10); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + String sql = "select * from nmc_now_weather where id = ?"; + List jdbcParamValues = new ArrayList<>(); + for (int i = 8070824; i < 10639564; i++) { + jdbcParamValues.add(i + 1); + Gson gson = new Gson(); + ResponseDto responseDto = JdbcUtil.executeSql(ds, sql, jdbcParamValues); + String dbQueryResult = gson.toJson(responseDto); + log.info(dbQueryResult); + jdbcParamValues.clear(); + + ResponseNmcNowWeatherDto responseNmcNowWeatherDto = gson.fromJson(dbQueryResult, ResponseNmcNowWeatherDto.class); + if (responseNmcNowWeatherDto.isSuccess() && responseNmcNowWeatherDto.getData() != null && responseNmcNowWeatherDto.getData().size() > 0) { + log.info(gson.toJson(responseNmcNowWeatherDto.getData().get(0))); + NmcNowWeatherDto dto = responseNmcNowWeatherDto.getData().get(0); + NmcNowWeather nmcNowWeather = new NmcNowWeather(); + nmcNowWeather.setCreateDate(dto.getCreateDate()); + nmcNowWeather.setLastUpdateDate(dto.getLastUpdateDate()); + nmcNowWeather.setWeather(dto.getWeather()); + nmcNowWeather.setWeatherDate(dto.getWeatherDate()); + nmcNowWeather.setCode(dto.getCode()); + nmcWeatherService.saveNowWeather(nmcNowWeather); + } + } + return "数据库同步成功"; + }, customPool); + + future.thenApply(result -> { + System.out.println("Result: " + result); + return result; + }); + + return ResponseDto.successWithMsg("数据库正在同步..."); + } + + public void completableFutureExample() { + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + // 模拟耗时操作 + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 重置中断状态 + } + return 123; + }); + // 非阻塞等待结果,但不返回结果,如果要处理结果,可以使用thenApply等 + future.thenAccept(result -> System.out.println("Result: " + result)).join(); + } + + /** + * 查询测试 + * + * @return + */ + //@RequestMapping("/api/getResult") + public String getResult() { + // 自定义一个线程池,内部包含4个线程 + ExecutorService executorService = Executors.newFixedThreadPool(4); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + // 模拟耗时操作 + try { + Thread.sleep(1000); + System.out.println("异步处理完成"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 重置中断状态 + } + return "数据库同步成功"; + }, executorService); + //注意,如果需要异步返回结果,再做后续操作,需要加入join()方法等待异步计算结果后回调,不然异步没有处理完直接主线程结束 + future.thenApply(result -> { + System.out.println("Result: " + result); + return result; + }).join(); + System.out.println("数据库正在同步..."); + return "数据库正在同步..."; + } +} + diff --git a/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/datasource/DataSource.java b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/datasource/DataSource.java new file mode 100644 index 0000000..eace52d --- /dev/null +++ b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/datasource/DataSource.java @@ -0,0 +1,17 @@ +package com.rehome.disruptor_nmc.datasource; + +import lombok.Data; + +/** + * 数据源实体类 + * + */ +@Data +public class DataSource { + String id; + String name; + String url; + String username; + String password; + String driver; +} diff --git a/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/dto/NmcNowWeatherDto.java b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/dto/NmcNowWeatherDto.java new file mode 100644 index 0000000..bcec9fd --- /dev/null +++ b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/dto/NmcNowWeatherDto.java @@ -0,0 +1,24 @@ +package com.rehome.disruptor_nmc.dto; + +import com.google.gson.annotations.SerializedName; +import lombok.Data; +import java.util.Date; + +@Data +public class NmcNowWeatherDto { + + private Long id; + + private String weather; + + @SerializedName("weather_date") + private String weatherDate; + + private String code; + + @SerializedName("create_date") + private Date createDate; + + @SerializedName("last_update_date") + private Date lastUpdateDate; +} diff --git a/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/dto/ResponseDto.java b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/dto/ResponseDto.java new file mode 100644 index 0000000..7704c88 --- /dev/null +++ b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/dto/ResponseDto.java @@ -0,0 +1,44 @@ +package com.rehome.disruptor_nmc.dto; + + +import lombok.Data; + +/** + * 返回值包装类 + */ +@Data +public class ResponseDto { + String msg; + Object data; + boolean success; + public static ResponseDto apiSuccess(Object data) { + ResponseDto dto = new ResponseDto(); + dto.setData(data); + dto.setSuccess(true); + dto.setMsg("接口访问成功"); + return dto; + } + + public static ResponseDto successWithMsg(String msg) { + ResponseDto dto = new ResponseDto(); + dto.setData(null); + dto.setSuccess(true); + dto.setMsg(msg); + return dto; + } + + public static ResponseDto successWithData(Object data) { + ResponseDto dto = new ResponseDto(); + dto.setData(data); + dto.setSuccess(true); + return dto; + } + + public static ResponseDto fail(String msg) { + ResponseDto dto = new ResponseDto(); + dto.setSuccess(false); + dto.setMsg(msg); + return dto; + } +} + diff --git a/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/dto/ResponseNmcNowWeatherDto.java b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/dto/ResponseNmcNowWeatherDto.java new file mode 100644 index 0000000..ceb637f --- /dev/null +++ b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/dto/ResponseNmcNowWeatherDto.java @@ -0,0 +1,16 @@ +package com.rehome.disruptor_nmc.dto; + + +import com.rehome.disruptor_nmc.entity.NmcNowWeather; +import lombok.Data; +import java.util.List; + +/** + * 返回值包装类 + */ +@Data +public class ResponseNmcNowWeatherDto { + String msg; + List data; + boolean success; +} \ No newline at end of file diff --git a/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/service/ScheduledService.java b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/service/ScheduledService.java index d6d810c..0aa4297 100644 --- a/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/service/ScheduledService.java +++ b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/service/ScheduledService.java @@ -38,7 +38,7 @@ public class ScheduledService { * @description: 从中央气象台获取省份列表 * @Param: null */ - @Scheduled(cron = "0 14 * * * *") + //@Scheduled(cron = "0 14 * * * *") public void getNmcWeatherProvince() { System.out.println("scheduledGetWeather"); System.out.println("=====>>>>>使用cron:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); diff --git a/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/utils/JdbcUtil.java b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/utils/JdbcUtil.java new file mode 100644 index 0000000..e2db63c --- /dev/null +++ b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/utils/JdbcUtil.java @@ -0,0 +1,100 @@ +package com.rehome.disruptor_nmc.utils; + + +import com.alibaba.druid.pool.DruidPooledConnection; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.rehome.disruptor_nmc.datasource.DataSource; +import com.rehome.disruptor_nmc.dto.ResponseDto; +import lombok.extern.slf4j.Slf4j; + +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +@Slf4j +public class JdbcUtil { + /** + * 执行sql并返回结果 + * + * @param datasource 数据源连接 + * @param sql 语句 + */ + public static ResponseDto executeSql(DataSource datasource, String sql) { + return executeSql(datasource,sql,new ArrayList()); + } + + /** + * 执行sql并返回结果 + * + * @param datasource 数据源连接 + * @param sql 语句 + * @param jdbcParamValues + */ + public static ResponseDto executeSql(DataSource datasource, String sql, List jdbcParamValues) { + log.info(sql); + log.info(JSON.toJSONString(jdbcParamValues)); + DruidPooledConnection connection = null; + try { + connection = PoolManager.getPooledConnection(datasource); + PreparedStatement statement = connection.prepareStatement(sql); + for (int i = 1; i <= jdbcParamValues.size(); i++) { + statement.setObject(i, jdbcParamValues.get(i - 1)); + } + boolean hasResultSet = statement.execute(); + if (hasResultSet) { + ResultSet rs = statement.getResultSet(); + int columnCount = rs.getMetaData().getColumnCount(); + List columns = new ArrayList<>(); + for (int i = 1; i <= columnCount; i++) { + String columnName = rs.getMetaData().getColumnLabel(i); + columns.add(columnName); + } + List list = new ArrayList<>(); + while (rs.next()) { + JSONObject jo = new JSONObject(); + columns.stream().forEach(t -> { + try { + if(t.equals("create_date")){ + Timestamp timestamp = rs.getTimestamp("create_date"); + Date date = new Date(timestamp.getTime()); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + jo.put(t, sdf.format(date)); + }else if(t.equals("last_update_date")){ + Timestamp timestamp = rs.getTimestamp("last_update_date"); + Date date = new Date(timestamp.getTime()); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + jo.put(t, sdf.format(date)); + }else{ + Object value = rs.getObject(t); + String key = t; + String keyLow = key.toLowerCase(Locale.ROOT); + jo.put(keyLow, value); + } + } catch (SQLException e) { + e.printStackTrace(); + } + }); + list.add(jo); + } + return ResponseDto.apiSuccess(list); + } else { + int updateCount = statement.getUpdateCount(); + return ResponseDto.apiSuccess("sql修改数据行数:" + updateCount); + } + } catch (Exception e) { + e.printStackTrace(); + return ResponseDto.fail(e.getMessage()); + } finally { + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + e.printStackTrace(); + } + } + } +} diff --git a/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/utils/PoolManager.java b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/utils/PoolManager.java new file mode 100644 index 0000000..743bcfa --- /dev/null +++ b/disruptor_nmc/src/main/java/com/rehome/disruptor_nmc/utils/PoolManager.java @@ -0,0 +1,81 @@ +package com.rehome.disruptor_nmc.utils; + + +import com.alibaba.druid.pool.DruidDataSource; +import com.alibaba.druid.pool.DruidPooledConnection; +import com.rehome.disruptor_nmc.datasource.DataSource; +import lombok.extern.slf4j.Slf4j; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * 数据库连接池工具类 + */ +@Slf4j +public class PoolManager { + + private static Lock lock = new ReentrantLock(); + + private static Lock deleteLock = new ReentrantLock(); + + //所有数据源的连接池存在map里 + static Map map = new HashMap<>(); + + public static DruidDataSource getJdbcConnectionPool(DataSource ds) { + if (map.containsKey(ds.getId())) { + return map.get(ds.getId()); + } else { + lock.lock(); + try { + log.info(Thread.currentThread().getName() + "获取锁"); + if (!map.containsKey(ds.getId())) { + DruidDataSource druidDataSource = new DruidDataSource(); + druidDataSource.setName(ds.getName()); + druidDataSource.setUrl(ds.getUrl()); + druidDataSource.setUsername(ds.getUsername()); + druidDataSource.setPassword(ds.getPassword()); + druidDataSource.setDriverClassName(ds.getDriver()); + druidDataSource.setConnectionErrorRetryAttempts(3); //失败后重连次数 + druidDataSource.setBreakAfterAcquireFailure(true); + + map.put(ds.getId(), druidDataSource); + log.info("创建Druid连接池成功:{}", ds.getName()); + } + return map.get(ds.getId()); + } catch (Exception e) { + return null; + } finally { + lock.unlock(); + } + } + } + + //删除数据库连接池 + public static void removeJdbcConnectionPool(String id) { + deleteLock.lock(); + try { + DruidDataSource druidDataSource = map.get(id); + if (druidDataSource != null) { + druidDataSource.close(); + map.remove(id); + } + } catch (Exception e) { + log.error(e.toString()); + } finally { + deleteLock.unlock(); + } + + } + + public static DruidPooledConnection getPooledConnection(DataSource ds) throws SQLException { + DruidDataSource pool = PoolManager.getJdbcConnectionPool(ds); + DruidPooledConnection connection = pool.getConnection(); +// log.info("获取连接成功"); + return connection; + } +} diff --git a/disruptor_nmc/src/main/resources/application.yml b/disruptor_nmc/src/main/resources/application.yml index 677bd73..da73e03 100644 --- a/disruptor_nmc/src/main/resources/application.yml +++ b/disruptor_nmc/src/main/resources/application.yml @@ -9,7 +9,8 @@ spring: # url: jdbc:mysql://127.0.0.1:3306/head_office_data_center?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true&allowMultiQueries=true # url: jdbc:mysql://localhost:3306/head_office_data_center?useUnicode=true&characterEncoding=utf-8&useSSL=true&nullCatalogMeansCurrent=true&serverTimezone=UTC # url: jdbc:mysql://192.168.2.18:3306/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true - url: jdbc:mysql://192.168.3.7:3306/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true + url: jdbc:mysql://43.139.89.198:33060/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true + #url: jdbc:mysql://47.242.184.139:33061/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true #url: jdbc:mysql://127.0.0.1:3306/disruptor_nmc?useUnicode=true&characterEncoding=utf-8&useSSL=false&autoReconnect=true username: root password: Skyinno251, @@ -19,9 +20,9 @@ spring: # password: huangwenfei hikari: #最小空闲连接,默认值10,小于0或大于maximum-pool-size,都会重置为maximum-pool-size - minimum-idle: 10 + minimum-idle: 2 #最大连接数,小于等于0会被重置为默认值10;大于零小于1会被重置为minimum-idle的值 - maximum-pool-size: 20 + maximum-pool-size: 30 #空闲连接超时时间,默认值600000(10分钟),大于等于max-lifetime且max-lifetime>0,会被重置为0;不等于0且小于10秒,会被重置为10秒 idle-timeout: 600000 #连接最大存活时间,不等于0且小于30秒,会被重置为默认值30分钟.设置应该比mysql设置的超时时间短 @@ -32,7 +33,7 @@ spring: # 配置 DBMS 类型 database: mysql # 配置是否将执行的 SQL 输出到日志 - show-sql: false + show-sql: true open-in-view: true hibernate: ddl-auto: update # 第一次建表create 后面用update,要不然每次重启都会新建表