当前位置:首页 > 文章列表 > 文章 > java教程 > Java多线程怎么实现FTP批量上传文件

Java多线程怎么实现FTP批量上传文件

来源:亿速云 2024-04-25 11:18:19 0浏览 收藏

一分耕耘,一分收获!既然都打开这篇《Java多线程怎么实现FTP批量上传文件》,就坚持看下去,学下去吧!本文主要会给大家讲到等等知识点,如果大家对本文有好的建议或者看到有不足之处,非常欢迎大家积极提出!在后续文章我会继续更新文章相关的内容,希望对大家都有所帮助!

1、构建FTP客户端

package cn.com.pingtech.common.ftp;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;

import java.io.*;
import java.net.UnknownHostException;

@Slf4j
public class  FtpConnection {

    private FTPClient ftp = new FTPClient();

    private boolean is_connected = false;

    /**
     * 构造函数
     */
    public FtpConnection() {
        is_connected = false;
        ftp.setDefaultTimeout(FtpConfig.defaultTimeoutSecond * 1000);
        ftp.setConnectTimeout(FtpConfig.connectTimeoutSecond * 1000);
        ftp.setDataTimeout(FtpConfig.dataTimeoutSecond * 1000);
        try {
            initConnect(FtpConfig.host, FtpConfig.port, FtpConfig.user, FtpConfig.password);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 初始化连接
     *
     * @param host
     * @param port
     * @param user
     * @param password
     * @throws IOException
     */
    private void initConnect(String host, int port, String user, String password) throws IOException {
        try {
            ftp.connect(host, port);
        } catch (UnknownHostException ex) {
            throw new IOException("Can't find FTP server '" + host + "'");
        }
        int reply = ftp.getReplyCode();//220 连接成功
        if (!FTPReply.isPositiveCompletion(reply)) {
            disconnect();
            throw new IOException("Can't connect to server '" + host + "'");

        }
        if (!ftp.login(user, password)) {
            is_connected = false;
            disconnect();
            throw new IOException("Can't login to server '" + host + "'");
        } else {
            is_connected = true;
        }
    }

    /**
     * 上传文件
     *
     * @param path
     * @param ftpFileName
     * @param localFile
     * @throws IOException
     */
    public boolean upload(String path, String ftpFileName, File localFile) throws IOException {
        boolean is  = false;
        //检查本地文件是否存在
        if (!localFile.exists()) {
            throw new IOException("Can't upload '" + localFile.getAbsolutePath() + "'. This file doesn't exist.");
        }
        //设置工作路径
        setWorkingDirectory(path);
        //上传
        InputStream in = null;
        try {
            //被动模式
            ftp.enterLocalPassiveMode();
            in = new BufferedInputStream(new FileInputStream(localFile));
            //保存文件
            is = ftp.storeFile(ftpFileName, in);
        }catch (Exception e){
            e.printStackTrace();
        }
        finally {
            try {
                in.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
        return is;
    }

    /**
     * 关闭连接
     *
     * @throws IOException
     */
    public void disconnect() throws IOException {
        if (ftp.isConnected()) {
            try {
                ftp.logout();
                ftp.disconnect();
                is_connected = false;
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    /**
     * 设置工作路径
     *
     * @param dir
     * @return
     */
    private boolean setWorkingDirectory(String dir) {
        if (!is_connected) {
            return false;
        }
        //如果目录不存在创建目录
        try {
            if (createDirecroty(dir)) {
                return ftp.changeWorkingDirectory(dir);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;

    }

    /**
     * 是否连接
     *
     * @return
     */
    public boolean isConnected() {
        return is_connected;
    }

    /**
     * 创建目录
     *
     * @param remote
     * @return
     * @throws IOException
     */
    private boolean createDirecroty(String remote) throws IOException {
        boolean success = true;
        String directory = remote.substring(0, remote.lastIndexOf("/") + 1);
        // 如果远程目录不存在,则递归创建远程服务器目录
        if (!directory.equalsIgnoreCase("/") && !ftp.changeWorkingDirectory(new String(directory))) {
            int start = 0;
            int end = 0;
            if (directory.startsWith("/")) {
                start = 1;
            } else {
                start = 0;
            }
            end = directory.indexOf("/", start);
            while (true) {
                String subDirectory = new String(remote.substring(start, end));
                if (!ftp.changeWorkingDirectory(subDirectory)) {
                    if (ftp.makeDirectory(subDirectory)) {
                        ftp.changeWorkingDirectory(subDirectory);
                    } else {
                        log.error("mack directory error :/" + subDirectory);
                        return false;
                    }
                }
                start = end + 1;
                end = directory.indexOf("/", start);
                // 检查所有目录是否创建完毕
                if (end <= start) {
                    break;
                }
            }
        }
        return success;
    }

}

2、FTP连接工厂

package cn.com.pingtech.common.ftp;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;


/**
 * 连接工厂

 */
@Slf4j
public class FtpFactory {

    //有界队列
    private static final ArrayBlockingQueue<FtpConnection> arrayBlockingQueue = new ArrayBlockingQueue<>(FtpConfig.ftpConnectionSize);


    protected FtpFactory(){
        log.info("init ftpConnectionSize "+FtpConfig.ftpConnectionSize);
        for(int i = 0; i< FtpConfig.ftpConnectionSize; i++){
            //表示如果可能的话,将 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false
            arrayBlockingQueue.offer(new FtpConnection());
        }
    }

    /**
     * 获取连接
     *
     * @return
     */

    public FtpConnection getFtp() {
        FtpConnection poll = null;
        try {
            //取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止
            poll = arrayBlockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return poll;
    }

    /**
     * 释放连接
     * @param ftp
     * @return
     */
    public boolean relase(FtpConnection ftp){
        return arrayBlockingQueue.offer(ftp);
    }

    /**
     * 删除连接
     *
     * @param ftp
     */

    public void remove(FtpConnection ftp) {
        arrayBlockingQueue.remove(ftp);
    }

    /**
     * 关闭连接
     */
    public void close() {
        for (FtpConnection connection : arrayBlockingQueue) {
            try {
                connection.disconnect();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


}

3、FTP配置

package cn.com.pingtech.common.ftp;

/**
 * ftp 配置类
 */

public class FtpConfig {

    public static int defaultTimeoutSecond = 10;
    public static int connectTimeoutSecond = 10;
    public static int dataTimeoutSecond = 10;
    public static String host = "127.0.0.1";
    public static int port =9999;
    public static String user = "Administrator";
    public static String password ="Yp886611";
    public static int threadPoolSize = 1;
    public static int ftpConnectionSize = 1;
    
}

4、构建多线程FTP上传任务

package cn.com.pingtech.common.ftp;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Callable;


/**
 * 上传任务
 */
public class UploadTask implements Callable{
    private File file;

    private FtpConnection ftp;

    private String path;

    private String fileName;

    private FtpFactory factory;

    public UploadTask(FtpFactory factory,FtpConnection ftp, File file, String path, String fileName){

        this.factory = factory;

        this.ftp = ftp;

        this.file = file;

        this.path = path;

        this.fileName = fileName;

    }

    @Override
    public UploadResult call() throws Exception {
        UploadResult result = null;
        try {
            if (ftp == null) {
                result = new UploadResult(file.getAbsolutePath(), false);
                return result;
            }
            //如果连接未开启 重新获取连接
            if (!ftp.isConnected()) {
                factory.remove(ftp);
                ftp = new FtpConnection();
            }

            //开始上传
            result = new UploadResult(file.getName(), ftp.upload(path, fileName, file));
        } catch (IOException ex) {
            result = new UploadResult(file.getName(), false);
            ex.printStackTrace();
        } finally {
            factory.relase(ftp);//释放连接
        }
        return result;

    }
}
package cn.com.pingtech.common.ftp;
/**
 * 上传结果
 */
public class UploadResult {
    private String fileName; //文件名称
    private boolean result; //是否上传成功

    public UploadResult(String fileName, boolean result) {
        this.fileName = fileName;
        this.result = result;
    }

    public String getFileName() {
        return fileName;

    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }

    public boolean isResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }

    public String toString() {
        return "[fileName=" + fileName + " , result=" + result + "]";
    }
}

注意:实现Callable接口的任务线程能返回执行结果
Callable接口支持返回执行结果,此时需要调用FutureTask.get()方法实现,此方法会阻塞线程直到获取“将来”的结果,当不调用此方法时,主线程不会阻塞

5、FTP上传工具类

package cn.com.pingtech.common.ftp;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;


/**
 * ftp上传工具包
 */

public class FtpUtil {

    /**
     * 上传文件
     *
     * @param ftpPath
     * @param listFiles
     * @return
     */

    public static synchronized List upload(String ftpPath, File[] listFiles) {
        //构建线程池
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(FtpConfig.threadPoolSize);
        List<Future> results = new ArrayList<>();
        //创建n个ftp链接
        FtpFactory factory = new FtpFactory();
        for (File file : listFiles) {
            FtpConnection ftp = factory.getFtp();//获取ftp con
            UploadTask upload = new UploadTask(factory,ftp, file, ftpPath, file.getName());
            Future submit = newFixedThreadPool.submit(upload);
            results.add(submit);
        }

        List listResults = new ArrayList<>();
        for (Future result : results) {
            try {
                //获取线程结果
                UploadResult uploadResult = (UploadResult)result.get(30, TimeUnit.MINUTES);
                listResults.add(uploadResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        factory.close();
        newFixedThreadPool.shutdown();
        return listResults;
    }

}

6、测试上传

package cn.com.pingtech.common.ftp


class Client {
    public static void main(String[] args) throws IOException {
        String loalPath = "C:\\Users\\Administrator\\Desktop\\test\\0";
        String ftpPath = "/data/jcz/";
        File parentFile = new File(loalPath);
        List <UploadResult> list = FtpUtil.upload(ftpPath,parentFile.listFiles());
        for(UploadResult vo:list){
            System.out.println(vo);
        }
        
    }
}

注意:FTP协议里面,规定文件名编码为iso-8859-1,所以目录名或文件名需要转码

今天关于《Java多线程怎么实现FTP批量上传文件》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于java,FTP的内容请关注golang学习网公众号!

版本声明
本文转载于:亿速云 如有侵犯,请联系study_golang@163.com删除
java内存泄漏怎么检查java内存泄漏怎么检查
上一篇
java内存泄漏怎么检查
使用 GORM 连接到 SQLServer
下一篇
使用 GORM 连接到 SQLServer
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    543次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    516次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    500次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    485次学习
查看更多
AI推荐
  • ChatExcel酷表:告别Excel难题,北大团队AI助手助您轻松处理数据
    ChatExcel酷表
    ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
    3180次使用
  • Any绘本:开源免费AI绘本创作工具深度解析
    Any绘本
    探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
    3391次使用
  • 可赞AI:AI驱动办公可视化智能工具,一键高效生成文档图表脑图
    可赞AI
    可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
    3420次使用
  • 星月写作:AI网文创作神器,助力爆款小说速成
    星月写作
    星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
    4526次使用
  • MagicLight.ai:叙事驱动AI动画视频创作平台 | 高效生成专业级故事动画
    MagicLight
    MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
    3800次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码