diff --git a/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java b/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java index 7eab05a3b..6ced41b52 100644 --- a/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java +++ b/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java @@ -13,9 +13,7 @@ import org.dromara.common.oss.exception.OssException; import org.dromara.common.oss.properties.OssProperties; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.ResponseInputStream; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody; +import software.amazon.awssdk.core.async.*; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -29,9 +27,12 @@ import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; import java.io.*; import java.net.URI; import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.util.Optional; import java.util.function.Consumer; /** @@ -237,30 +238,61 @@ public class OssClient { * @param key 文件在 Amazon S3 中的对象键 * @param out 输出流 * @param consumer 自定义处理逻辑 - * @return 输出流中写入的字节数(长度) * @throws OssException 如果下载失败,抛出自定义异常 */ public void download(String key, OutputStream out, Consumer consumer) { + try { + this.download(key, consumer).writeTo(out); + } catch (Exception e) { + throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]"); + } + } + + /** + * 下载文件从 Amazon S3 到 输出流 + * + * @param key 文件在 Amazon S3 中的对象键 + * @param contentLengthConsumer 文件大小消费者函数 + * @return 写出订阅器 + * @throws OssException 如果下载失败,抛出自定义异常 + */ + public WriteOutSubscriber download(String key, Consumer contentLengthConsumer) { try { // 构建下载请求 - DownloadRequest> downloadRequest = DownloadRequest.builder() + DownloadRequest> publisherDownloadRequest = DownloadRequest.builder() // 文件对象 .getObjectRequest(y -> y.bucket(properties.getBucketName()) .key(key) .build()) .addTransferListener(LoggingTransferListener.create()) - // 使用订阅转换器 - .responseTransformer(AsyncResponseTransformer.toBlockingInputStream()) + // 使用发布订阅转换器 + .responseTransformer(AsyncResponseTransformer.toPublisher()) .build(); + // 使用 S3TransferManager 下载文件 - Download> responseFuture = transferManager.download(downloadRequest); - // 输出到流中 - try (ResponseInputStream responseStream = responseFuture.completionFuture().join().result()) { // auto-closeable stream - if (consumer != null) { - consumer.accept(responseStream.response().contentLength()); - } - responseStream.transferTo(out); // 阻塞调用线程 blocks the calling thread - } + Download> publisherDownload = transferManager.download(publisherDownloadRequest); + // 获取下载发布订阅转换器 + ResponsePublisher publisher = publisherDownload.completionFuture().join().result(); + // 执行文件大小消费者函数 + Optional.ofNullable(contentLengthConsumer) + .ifPresent(lengthConsumer -> lengthConsumer.accept(publisher.response().contentLength())); + + // 构建写出订阅器对象 + return out -> { + // 注意,此处不需要显式关闭 channel ,channel 会在 out 关闭时自动关闭 + WritableByteChannel channel = Channels.newChannel(out); + + // 订阅数据 + publisher.subscribe(byteBuffer -> { + try { + while (byteBuffer.hasRemaining()) { + channel.write(byteBuffer); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }).join(); + }; } catch (Exception e) { throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]"); } diff --git a/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/WriteOutSubscriber.java b/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/WriteOutSubscriber.java new file mode 100644 index 000000000..d3a9841a1 --- /dev/null +++ b/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/WriteOutSubscriber.java @@ -0,0 +1,15 @@ +package org.dromara.common.oss.core; + +import java.io.IOException; + +/** + * 写出订阅器 + * + * @author 秋辞未寒 + */ +@FunctionalInterface +public interface WriteOutSubscriber { + + void writeTo(T out) throws IOException; + +}