本文共 5288 字,大约阅读时间需要 17 分钟。
最近拿DBN(Deep Belief Network)做实验, 由于数据维数太大, 程序跑了6天也没训练完成, 由于长时间没关机, 电脑变的特别卡. 正好实验室有两台比较空闲的服务器, 所以计划将实验程序放到服务器上运行, 如果将程序直接放到服务器上运行也比较方便快捷. 但是想想要是弄一个任务分发系统, 虽然现在麻烦点, 以后就会方便很多.一开始想自己写一个功能简单一点的, 但是觉得自己写的话通用性健壮性都不好, 要是有一个现成的岂不更好. 最先想到的当然是Hadoop,但是Hadoop比较重量级, 而且在Windows下不好配置. 后来就找到了, 觉得很适合当前需求. 这里使用的是Java版的, 一个托管在上,一个在上.本文使用的是Github版.简单来讲Gearman是一个轻量级的任务分发系统,Gearman由三部分组成. Job Server, Worker, Client. 其中Client当然是提交任务, Worker是苦逼的工人,负责完成任务, Job Server负责将Client提交的任务分配给Worker做.为了提高系统的稳定性, Job Server可以有多个, 只要有一个Job Server没有宕机系统就能正常运行.三者关系如图下所示.
Gearman中Job Server都是独立运行的,并不知道其他的Job Server的存在, 所以Client需要指定连接哪个或者哪几个Job Server, 同样Worker也需要指定.Client根据对Job Server的指定顺序进行选择, 前面的Job Server失效才会选择后面的, 所以在大负荷的情况下尽量随机排列Job Server, 以起到Job Server负载均衡的效果. Job Server对Worker的选择有自己一套方法,具体如何分配有待分析具体源码.
下面给出简单使用的例子:
Gearman依赖slf4j, 所以需要slf4j-api-1.6.4.jar和slf4j-simple-1.6.4.jar(版本随意)
1. 启动Job Server. 直接运行Gearman的Jar包, (可以添加启动参数来修改监听IP和端口等)
2. 创建Worker (这里部分代码源自网络, 具体出处不记得了)
public class MyGearmanWorker{ public static final String ECHO_HOST = "localhost"; // gearman server地址 public static final int ECHO_PORT = 4730; // gearman server 端口 public static void main(String[] args) { Gearman gearman = Gearman.createGearman(); // 创建gearman对象,无论是client,worker都是 //由这个对象产生的 GearmanServer server = gearman.createGearmanServer(MyGearmanWorker.ECHO_HOST, MyGearmanWorker.ECHO_PORT); // 创建Server GearmanWorker worker = gearman.createGearmanWorker(); // 创建worker。 worker.setReconnectPeriod(2, TimeUnit.SECONDS); // 设置超时重连时间 worker.setMaximumConcurrency(5); // 最大并发数 worker.addFunction(SimpleGearmanWorker.WORKER_NAME, new SimpleGearmanWorker()); // 添加function方法 worker.addServer(server); // 将work添加到server中 }}
public class SimpleGearmanWorker implements GearmanFunction { public static final String WORKER_NAME = "CUSTOM_TASK_DEFAULT"; //任务名, Job Server只将这个名称的任务 //分配给该Worker(需要想Job Server注册) @Override public byte[] work(String function, byte[] data, //data为Client发送过来的数据 GearmanFunctionCallback callback) throws Exception { //任务处理代码 //任务处理过程中可以使用callback.sendData(xxx)向Client发送GEARMAN_JOB_DATA消息, //并将xxx发送给Client (可以用于传输调试信息) return XXX;//这里向Client发送GEARMAN_JOB_SUCCESS信息, 并将XXX作为数据发送给Client }}
3 创建Client (这里部分代码源自网络, 具体出处不记得了)
public class MyGearmanClient { //同步提交任务 public static final String WORKER_NAME = "CUSTOM_TASK_DEFAULT"; //任务名 public static final String ECHO_HOST = "localhost"; //gearman server 地址 public static final int ECHO_PORT = 4730; public static void main(String[] args) throws InterruptedException, IOException { Gearman gearman = Gearman.createGearman(); //创建gearman对象,client,worker都有这个对象产生 GearmanClient client = gearman.createGearmanClient(); GearmanServer server = gearman.createGearmanServer(ECHO_HOST, ECHO_PORT); //创建server对象 client.addServer(server); for (int i = 0; i < 1; i++) { GearmanJobReturn jobReturn = client.submitJob(WORKER_NAME,data)); while (!jobReturn.isEOF()) { //根据返回值 是否结束,来判断各种gearman事件状态 GearmanJobEvent event = jobReturn.poll(); switch (event.getEventType()) { case GEARMAN_JOB_SUCCESS: System.out.println(">>>> " + new String(event.getData())); //获取worker的返回值 break; case GEARMAN_SUBMIT_FAIL: System.out.println("### submit fail"); break; case GEARMAN_JOB_FAIL: System.err.println(event.getEventType() + ": " + new String(event.getData())); break; case GEARMAN_JOB_DATA: System.out.println(event.getEventType() + ": " + new String(event.getData())); break; default: } } Thread.sleep(20); //如果不休眠,循环提交任务,worker会认为受到攻击,会将任务pending } gearman.shutdown(); //使用完毕后,一定要将gearman对象进行关闭 }
public class SynMyGearmanClient implements GearmanJobEventCallback{ //异步提交任务 public static final String ECHO_FUNCTION_NAME = "CUSTOM_TASK_DEFAULT"; public static final String ECHO_HOST = "localhost"; public static final int ECHO_PORT = 4730; public static void main(String[] args) throws InterruptedException { Gearman gearman = Gearman.createGearman(); GearmanClient client = gearman.createGearmanClient(); GearmanServer server = gearman.createGearmanServer(ECHO_HOST, ECHO_PORT); client.addServer(server); for (int i = 0; i < 500; i++) { GearmanJoin join = client.submitJob(ECHO_FUNCTION_NAME, ("hi,zhaoyang" + i).getBytes(), ECHO_FUNCTION_NAME, new SynMyGearmanClient()); join.join(); Thread.sleep(20); } gearman.shutdown(); } @Override public void onEvent(String s, GearmanJobEvent event) { switch (event.getEventType()) { case GEARMAN_JOB_SUCCESS: ...... } }}
到这里为止,整个系统构建完成. 细心的你可能会发现一个问题, 如果系统的任务是比较固定的,这种方式是完全符合要求的, 但是任务经常变化(Worker做的工作进行经常变), 系统岂不是需要经常修改? 这样的话该系统还有什么意义?仔细一想, 这个问题还是比较好解决. Client可以将任务程序和任务数据打成一个压缩包发给Job Server, 然后Job Server又将压缩包发给Worker, 这时Worker解压出程序和数据, 直接运行解压出来的程序即可. 这样就可以做到不变应万变了.
转载地址:http://ygqai.baihongyu.com/