你的浏览器不支持canvas

高并发服务器学习笔记之十二:分布式架构

Date: Author: zhangn

本文章采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议 进行许可。转载请注明来自zhangn的云笔记

###

  • 简介
* 流量分流
* 业务分流
* 集群和分布式的区别与联系   * 示例程序
* RPC协议
* 客户端
* 加法服务器
* 乘法服务器   * 小结

简介

在使用多台设备处理海量数据的时候,主要有两种方案:流量分流和业务分流。

流量分流

流量分流也就是我们常说的集群架构,这里不细说,可以看一下我之前的文档:高并发服务器学习笔记之十一:集群架构

业务分流

业务分流也就是我们常说的分布式架构,是今天的主要内容。
一个完成的系统,是由多个业务组成,业务之间相互协调来完成工作。而分布式架构区别于传统的集中式架构,就是将不同的业务分别架设在不同的机器上,彼此之间使用RPC协议进行协调工作。

集群和分布式的区别与联系

两种架构都是使用多台机器来处理海量数据的解决方案,而且两者经常混在一起使用,并统称为“分布式集群”,因此经常有人将两种概念搞混。集群是流量分流,当大量数据到来,会根据每个节点的闲置情况进行分流处理;分布式是先将业务进行拆分,分别放在不同的节点上,根据处理业务的不同去分配相应业务的节点。
举个栗子,现在想做这么个事,设计一个集视频播放与直播一体的系统。
如果采用集群架构,那么就将所有代码都写在一个程序中,分别部署在多台机器上,并用一个发现服务器进行最优节点分配。
如果采用分布式架构,那么就将直播的代码和视频分别写在两个程序中,然后分别部署,客户端在需要时去连接相应的节点。
而所谓的“分布式集群”,就是先将业务拆分,采用分布式架构,然后每个业务节点后面又跟着一个集群,是一种综合应用。

示例程序

这次我们来做一个简单的数学运算系统,只支持加法和乘法,采用分布式架构,将加法和乘法分别部署在两个节点上,并用RPC协议进行通信。实际开发中,有时候为了一些安全问题,不会将业务分发直接写在客户端,而是用一个业务分发服务器去处理,由于我们只是练习的示例代码,业务分发服务器就不加了,直接将业务分发写在客户端。关于业务节点列表,应给写在一个配置文件里,程序在运行的时候实时读取,我们这里为了简单,直接写死在代码里。
完整代码戳这里

RPC协议

简单说就是各个业务节点之间的通信协议,这个协议可以自己定一个私有协议,也可以用一些公开协议,比如JsonRPC,我们这里简单一点,定义一个简单的私有协议,一个结构体,包含两个运算数和一个结果

typedef struct __Messages
{
	int arg1;
	int arg2;
	int result;
}Message;

客户端

客户端比较简单,首先向加法服务器发送一个加法运算申请,并等待结果返回,然后再向乘法服务器发送一个乘法运算申请,等待结果返回

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#include <unistd.h>
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h> /* superset of previous */
#include <arpa/inet.h>

#include "public_head.h"
#include "fileio.h"

#define LISTEN_BACKLOG	50
#define ADD_IP		"127.0.0.1"
#define ADD_PORT	10086
#define MUL_IP		ADD_IP
#define MUL_PORT	10010

int main(int argc, char ** argv)
{
	int sockfd = 0;
	Message addMessage, mulMessage;
	struct sockaddr_in addServer_addr;
	struct sockaddr_in mulServer_addr;

	memset(&addServer_addr, 0, sizeof(addServer_addr));
	memset(&mulServer_addr, 0, sizeof(mulServer_addr));
	memset(&addMessage, 0, sizeof(Message));
	memset(&mulMessage, 0, sizeof(Message));

	if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
		handle_error("socket");

	addServer_addr.sin_family = AF_INET;
	addServer_addr.sin_port = htons(ADD_PORT);
	inet_pton(sockfd, ADD_IP, &addServer_addr.sin_addr.s_addr);

	addMessage.arg1 = 3;
	addMessage.arg2 = 5;
	if (connect(sockfd, (struct sockaddr*) & addServer_addr, sizeof(addServer_addr)) < 0)
	{
		close(sockfd);
		handle_error("connect");
	}

	write(sockfd, &addMessage, sizeof(Message));
	read(sockfd, &addMessage, sizeof(Message));
	printf("%d + %d = %d\n", addMessage.arg1, addMessage.arg2, addMessage.result);
	sleep(1);
	close(sockfd);

	if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
		handle_error("socket");

	mulServer_addr.sin_family = AF_INET;
	mulServer_addr.sin_port = htons(MUL_PORT);
	inet_pton(sockfd, MUL_IP, &mulServer_addr.sin_addr.s_addr);

	mulMessage.arg1 = 3;
	mulMessage.arg2 = 5;
	if (connect(sockfd, (struct sockaddr*) & mulServer_addr, sizeof(mulServer_addr)) < 0)
	{
		close(sockfd);
		handle_error("connect");
	}

	write(sockfd, &mulMessage, sizeof(Message));
	read(sockfd, &mulMessage, sizeof(Message));
	printf("%d x %d = %d\n", mulMessage.arg1, mulMessage.arg2, mulMessage.result);
	sleep(1);
	close(sockfd);

	return 0;
}

加法服务器

加法服务器也比较简单,接收到客户端的加法运算请求后,用客户端给的参数进行加法运算,再将结果返回

static void handle_request(int acceptfd)
{
    ssize_t readret = 0;

	Message message;
   
	while (1)
	{
		memset(&message, 0, sizeof(message));
		readret = read(acceptfd, &message, sizeof(message));
		if (readret == 0)
			break;

		printf("thread id:%lu, recv operation:%d + %d\n", 
			pthread_self(), message.arg1, message.arg2);
		message.result = message.arg1 + message.arg2;
		write(acceptfd, &message, sizeof(message));
	}

    close(acceptfd);
    return;
}

乘法服务器

乘法服务器可以如同加法服务器一样简单实现,但我们这里为了演示业务服务器之间的协调工作,乘法采用调用加法服务器进行结果累加去实现

static void handle_request(int acceptfd)
{
	ssize_t readret = 0;
	Message message;

	int i = 0;
	int sockfd = 0;
	Message addMessage;
	struct sockaddr_in addServer_addr;

	while (1)
	{
		memset(&message, 0, sizeof(message));
		readret = read(acceptfd, &message, sizeof(message));
		if (readret == 0)
			break;

		printf("thread id:%lu, recv operation:%d x %d\n",
			pthread_self(), message.arg1, message.arg2);

		memset(&addServer_addr, 0, sizeof(addServer_addr));
		memset(&addMessage, 0, sizeof(Message));

		if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
			handle_error("socket");

		addServer_addr.sin_family = AF_INET;
		addServer_addr.sin_port = htons(ADD_PORT);
		inet_pton(sockfd, ADD_IP, &addServer_addr.sin_addr.s_addr);

		if (connect(sockfd, (struct sockaddr*) & addServer_addr, sizeof(addServer_addr)) < 0)
		{
			close(sockfd);
			handle_error("connect");
		}

		for(i = 0; i < message.arg2; ++i)
		{
			addMessage.arg1 = message.arg1;
			addMessage.arg2 = addMessage.result;
			write(sockfd, &addMessage, sizeof(Message));
			readret = read(sockfd, &addMessage, sizeof(Message));
			sleep(1);
		}
		close(sockfd);
		message.result = addMessage.result;

		write(acceptfd, &message, sizeof(message));
	}

	printf("\n");
	close(acceptfd);
	return;
}

小结

通过以上代码我们可以看出,采用分布式架构,不但可以进行业务分流,提高数据处理量,更是简化的服务端的代码。分布式架构的难点不在于代码,而在于业务的拆分和RPC协议的制定。业务怎么拆分才合理,能够让各个业务节点的流量达到最佳平衡点,如果一个业务节点经常闲置,另一个业务节点经常爆满,这显然是不合适的,不过这个问题倒是可以通过集群架构进行缓解。


对于本文内容有问题或建议的小伙伴,欢迎在文章底部留言交流讨论。