java8异步处理学习

oracle
wenfei 7 months ago
parent 87f31cee18
commit 8cb997feb8

@ -21,6 +21,11 @@
<option name="name" value="maven-public" />
<option name="url" value="http://192.168.1.24:8081/repository/maven-public/" />
</remote-repository>
<remote-repository>
<option name="id" value="repository" />
<option name="name" value="repository" />
<option name="url" value="http://47.242.184.139:8081/repository/maven-public/" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />

@ -5,15 +5,12 @@
</component>
<component name="ChangeListManager">
<list default="true" id="eacd3dd8-1e95-428c-a6af-1aadb9921989" name="Default Changelist" comment="">
<change afterPath="$PROJECT_DIR$/CompletableFuture.txt" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/jarRepositories.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/jarRepositories.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/README.md" beforeDir="false" afterPath="$PROJECT_DIR$/README.md" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/AdminClientTemperatureApplication.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/AdminClientTemperatureApplication.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/controller/JdbcDemoController.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/controller/JdbcDemoController.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/entity/Temperature.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/entity/Temperature.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/entity/WebServiceProvince.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/entity/WebServiceProvince.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/entity/WebServiceWeatherInfo.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/entity/WebServiceWeatherInfo.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/service/impl/TemperatureServiceImpl.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/rehome/mqttclienttemperature/service/impl/TemperatureServiceImpl.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/resources/application.yml" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@ -177,6 +174,7 @@
<workItem from="1747540902720" duration="2539000" />
<workItem from="1747553672823" duration="9032000" />
<workItem from="1747564622457" duration="10803000" />
<workItem from="1747644155888" duration="5369000" />
</task>
<servers />
</component>

@ -0,0 +1,119 @@
https://blog.51cto.com/u_16297326/13571760
总结:
CompletableFuture 是 Java 中实现异步编程的强大工具,它提供了丰富的 API 来创建、组合和处理异步任务,
能够帮助开发者构建高性能、高响应性的应用程序。通过本文的详细介绍和代码示例,
读者应该对 CompletableFuture 的基本使用、链式调用、组合多个任务、异常处理以及实际应用等方面有了深入的了解。
在实际开发中,灵活运用 CompletableFuture 的特性,结合性能优化和最佳实践,能够有效提升应用程序的性能和用户体验。
随着 Java 技术的不断发展和应用场景的日益复杂,掌握 CompletableFuture 等异步编程技术对于 Java 开发者来说显得尤为重要。
七、实际应用案例:构建复杂的异步工作流
为了更好地理解 CompletableFuture 的实际应用,我们来看一个构建复杂异步工作流的案例。
假设我们需要开发一个电商订单处理系统,其中包含多个异步任务,如验证用户信息、检查库存、计算运费、处理支付等。
这些任务之间存在一定的依赖关系,需要合理地组合和处理。
// 验证用户信息
CompletableFuture<User> validateUserAsync(String userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Validating user...");
// 模拟网络延迟
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回用户对象
return new User(userId);
});
}
// 检查库存
CompletableFuture<Boolean> checkInventoryAsync(Product product, int quantity) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Checking inventory for product: " + product.getId());
// 模拟网络延迟
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回是否有足够库存
return product.getStock() >= quantity;
});
}
// 计算运费
CompletableFuture<Double> calculateShippingCostAsync(User user, double totalWeight) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Calculating shipping cost...");
// 模拟网络延迟
try {
Thread.sleep(1200);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 根据用户地址和总重量计算运费
return totalWeight * 0.5;
});
}
// 处理支付
CompletableFuture<Boolean> processPaymentAsync(User user, double amount) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Processing payment...");
// 模拟网络延迟
try {
Thread.sleep(1800);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回支付是否成功
return true;
});
}
// 构建订单处理工作流
public void processOrder(String userId, Product product, int quantity) {
// 验证用户信息
CompletableFuture<User> userFuture = validateUserAsync(userId);
// 检查库存
CompletableFuture<Boolean> inventoryFuture = userFuture
.thenCompose(user -> checkInventoryAsync(product, quantity));
// 计算运费
CompletableFuture<Double> shippingCostFuture = inventoryFuture
.thenCompose(hasInventory -> {
if (hasInventory) {
return calculateShippingCostAsync(userFuture.join(), product.getWeight() * quantity);
} else {
return CompletableFuture.completedFuture(0.0);
}
});
// 处理支付
CompletableFuture<Boolean> paymentFuture = shippingCostFuture
.thenCompose(shippingCost -> {
double totalAmount = product.getPrice() * quantity + shippingCost;
return processPaymentAsync(userFuture.join(), totalAmount);
});
// 最终处理
paymentFuture
.thenAccept(paymentSuccess -> {
if (paymentSuccess) {
System.out.println("Order processed successfully.");
} else {
System.out.println("Payment failed, order not processed.");
}
})
.exceptionally(ex -> {
System.out.println("Error occurred during order processing: " + ex.getMessage());
return null;
})
.join();
}

@ -3,6 +3,14 @@
java CompletableFuture异步回调
https://blog.csdn.net/n0430/article/details/147880286
https://juejin.cn/post/7487071171194945547
https://blog.csdn.net/Linging_24/article/details/144832725
https://blog.51cto.com/u_16297326/13571760
# Spring-axis

@ -268,10 +268,12 @@
</dependency>
<!--webservice 第三方库axis2 maven引用 Spring-axis end-->
</dependencies>
<!--在项目中使用pom.xml进行下载依赖配置的话可以单独使用
注意项目中使用的maven如果已经在settings.xml中配置过后就无需在配置此项-->
<repositories>
<repository>
<id>maven-public</id>
<url>http://192.168.1.24:8081/repository/maven-public/</url>
<id>repository</id>
<url>http://47.242.184.139:8081/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>

@ -10,6 +10,8 @@ import com.rehome.mqttclienttemperature.entity.Temperature;
import com.rehome.mqttclienttemperature.service.TemperatureService;
import com.rehome.mqttclienttemperature.utils.JdbcUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.util.TextUtils;
import org.attoparser.util.TextUtil;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@ -90,6 +92,10 @@ public class JdbcDemoController {
@RequestMapping("/api/list")
public ResponseDto queryList() {
// 自定义一个线程池,内部包含8个线程
ExecutorService customPool = Executors.newFixedThreadPool(8);
// 自定义一个线程池,内部包含10个线程
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
String sql = "select * from temperature where id = ?";
List<Object> jdbcParamValues = new ArrayList<>();
@ -125,7 +131,7 @@ public class JdbcDemoController {
}
}
return "数据库同步成功";
});
},customPool);
future.thenApply(result -> {
System.out.println("Result: " + result);
@ -155,6 +161,8 @@ public class JdbcDemoController {
*/
@RequestMapping("/api/getResult")
public String getResult() {
// 自定义一个线程池,内部包含4个线程
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
@ -164,7 +172,7 @@ public class JdbcDemoController {
Thread.currentThread().interrupt(); // 重置中断状态
}
return "数据库同步成功";
});
},executorService);
//注意如果需要异步返回结果再做后续操作需要加入join()方法等待异步计算结果后回调,不然异步没有处理完直接主线程结束
future.thenApply(result -> {
System.out.println("Result: " + result);
@ -173,5 +181,32 @@ public class JdbcDemoController {
System.out.println("数据库正在同步...");
return "数据库正在同步...";
}
/**
*
* @return
*/
@RequestMapping("/api/getResult0")
public String getResult0() {
// 自定义一个线程池,内部包含4个线程
ExecutorService executorService = Executors.newFixedThreadPool(4);
String result = CompletableFuture.supplyAsync(() -> {
System.out.println("Step 1: Generate number");
return 42;
},executorService).thenApply(num -> {
System.out.println("Step 2: Convert number to string");
return "Number is: " + num;
}).exceptionally(exception -> {
System.out.println(exception.getMessage());
return null;
}).join();
if(TextUtils.isEmpty(result)){
return "数据库同步异常...";
}
System.out.println("数据库正在同步...");
System.out.println(result);
return result;
}
}

Loading…
Cancel
Save