基于Kafka+Spark+SequoiaDB实时处理架构快速实战
背景
02
架构设计
03
环境搭建
3.1 部署环境
服务器分布
软件配置
3.2 zookeeper 集群安装
1. 将下载的 zookeeper 解压到 opt/soft 目录下
tar -zxvf zookeeper-3.4.8.tar.gz -C opt/soft
2. 进入 zookeeper 的 conf 目录
cd opt/soft/zookeeper-3.4.8/conf
3. 复制一份 zoo_sample.cfg zoo.cfg
cp zoo_sample.cfg zoo.cfg
mkdir opt/soft/zookeeper-3.4.8/data
vi zoo.cfg
修改
dataDir=/opt/soft/zookeeper-3.4.8/data
新增
server.0=server1:2888:3888server.1=server2:2888:3888server.2=server3:2888:3888
echo '0'>myid
scp -r zookeeper-3.4.8 root@server2:/opt/soft/scp -r zookeeper-3.4.8 root@server3:/opt/soft/
分别在三台机器上执行:zkServer.sh start
检查ZooKeeper状态:zkServer.sh status,应该是一个 leader,两个 follower
jps:检查三个节点是否都有 QuromPeerMain 进程
1. 将下载的 Kafka 解压到 opt/soft 目录下
tar -zxvf kafka_2.11-0.10.2.1.tgz -C opt/soft
2. 进入 Kafka/libs 目录
cd opt/soft/kafka_2.11-0.10.2.1/libs
3. 从 zookeepe r中复制 zookeeper-3.4.8.jar
cp opt/soft/zookeeper-3.4.8/zookeeper-3.4.8.jar ./
4. 删除 zookeeper-3.4.9.jar
rm zookeeper-3.4.9.jar
5. 进入 config 目录
cd config/
6. 创建 Kafka-logs 目录
/opt/soft/kafka_2.11-0.10.2.1/kafka-logs
7. 编辑 server.properties
vi server.properties
broker.id:每台机器依次增长的整数,0、1、2,集群中 Broker 的id更改
log.dirs=/opt/soft/kafka_2.11-0.10.2.1/kafka-logszookeeper.connect=server1:2181,server2:2181,server3:2181
添加(当前机器的 ip)
listeners=PLAINTEXT://192.168.106.189:9092advertised.listeners=PLAINTEXT://192.168.106.189:9092
8. scp 到其他机器并按照规则更改 server.properties
scp -r opt/soft/zookeeper-3.4.8/ server2:/opt/softscp -r opt/soft/zookeeper-3.4.8/ server3:/opt/soft
9. 检查集群
创建 topic
bin/kafka-topics.sh --zookeeper server2:2181,server3:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create
创建 producer
bin/kafka-console-producer.sh --broker-list server1:9092,server2:9092,server3:9092 --topic TestTopic
创建 consumer
bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic TestTopic --from-beginning
3.4 Spark 集群安装
tar -zxvf spark-2..1-bin-hadoop2.7.tgz -C opt/soft
cd opt/soft/ spark-2.0.1-bin-hadoop2.7/conf
cp spark-env.sh.template spark-env.shcp slaves.template slaves
vi spark-env.shSPARK_WORKER_INSTANCES=2SPARK_DAEMON_MEMORY=2gSPARK_MASTER_IP=10.196.9.21JAVA_HOME=/usr/local/java/jdk1.8.0_121
vi slavesserver1server2server3
scp -r opt/soft/ spark-2.0.1-bin-hadoop2.7/ server2:/opt/softscp -r opt/soft/ spark-2.0.1-bin-hadoop2.7/ server3:/opt/soft
./start-all.sh
在浏览器打开 Spark 的监控页面,查看集群是否启动成功
http://192.168.106.187:8080
在https://mvnrepository.com/按照scala, kafka, spark的版本下载驱动 spark-streaming-kafka-0-10_2.11-2.0.1.jar 复制到 jars 目录下。
在 kafka 目录下复制以下 jar 包到 jars 目录下
cp /opt/soft/kafka/libs/ kafka-clients-0.10.2.1.jar opt/soft/spark/jarscp /opt/soft/kafka/libs/ zkclient-0.10.jar opt/soft/spark/jarscp /opt/soft/kafka/libs/ metrics-core-2.2.0.jar opt/soft/spark/jars
3.5 安装 SequoiaDB
tar -zxvf sequoiadb-3.2.3-linux_x86_64-enterprise-installer.tar.gz
./sequoiadb-3.2.3-linux_x86_64-enterprise-installer.run --mode text --installer-language en --prefix opt/sequoiadb --upgrade false --force false --username sdbadmin --groupname sdbadmin_group --userpasswd sdbadmin --port 11790 --processAutoStart false --SMS truePlease choose an option [1] : 1Do you want to continue? [Y/n]: Y
su - sdbadmin
cd opt/sequoiadb/bin
检查节点是否已经启动
./sdblist -t all./sdblist -l
配置 opt 文件权限
chown -R sdbadmin:sdbadmin_group /opt
配置 sdbadmin 互相免密(包括本机)
ssh-keygen -t rsassh-copy-id ip
var oma = new Oma("localhost", 11790)oma.createCoord(18800, "/opt/sequoiadb/database/coord/18800")oma.startNode(18800)
var db = new Sdb("localhost",18800)db.createCataRG("server1", 11800, "/opt/sequoiadb/database/cata/11800",{logfilesz:1024,sortbuf:512,sharingbreak:180000})var cataRG = db.getRG("SYSCatalogGroup");var node1 = cataRG.createNode("server2", 11800,"/opt/sequoiadb/database/cata/11800",{logfilesz:1024,sortbuf:512,sharingbreak:180000})node1.start()var node1 = cataRG.createNode("server3", 11800,"/opt/sequoiadb/database/cata/11800",{logfilesz:1024,sortbuf:512,sharingbreak:180000})node1.start()
创建数据组 rg1
var dataRG = db.createRG("rg1")dataRG.createNode("server1", 11820, "/opt/sequoiadb/database/data/11820",{logfilesz:1024,weight:80,sortbuf:512,sharingbreak:180000})dataRG.createNode("server2", 11820, "/opt/sequoiadb/database/data/11820",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})dataRG.createNode("server3", 11820, "/opt/sequoiadb/database/data/11820",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})dataRG.start()
创建数据组 rg2
var dataRG1 = db.createRG("rg2")dataRG1.createNode("server1", 11830, "/sdbdata/data02/sequoiadb/database/data/11830",{logfilesz:1024,weight:80,sortbuf:512,sharingbreak:180000})dataRG1.createNode("server2", 11830, "/sdbdata/data02/sequoiadb/database/data/11830",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})dataRG1.createNode("server3", 11830, "/sdbdata/data02/sequoiadb/database/data/11830",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})dataRG1.start()
创建数据组 rg3
var dataRG2 = db.createRG("rg3")dataRG2.createNode("server1", 11840, "/sdbdata/data03/sequoiadb/database/data/11840",{logfilesz:1024,weight:80,sortbuf:512,sharingbreak:180000})dataRG2.createNode("server2", 11840, "/sdbdata/data03/sequoiadb/database/data/11840",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})dataRG2.createNode("server3", 11840, "/sdbdata/data03/sequoiadb/database/data/11840",{logfilesz:1024,weight:30,sortbuf:512,sharingbreak:180000})dataRG2.start()
4. 创建协调节点
var rg = db.createCoordRG()rg.createNode("server1", 11810, "/opt/sequoiadb/database/coord/11810",{logfilesz:1024,sortbuf:512,sharingbreak:180000})rg.createNode("server2", 11810, "/opt/sequoiadb/database/coord/11810",{logfilesz:1024,sortbuf:512,sharingbreak:180000})rg.createNode("server3", 11810, "/opt/sequoiadb/database/coord/11810",{logfilesz:1024,sortbuf:512,sharingbreak:180000})rg.start()
var oma = new Oma("localhost", 11790)oma.removeCoord(18800)
1. 首先检查节点是否都已经启动
sdblist -lsdblist -t all
db.createCS("cstest1");db.cstest1.createCL("cl",{ShardingKey:{"a":1},ShardingType:"hash",AutoSplit:false,AutoIndexId:false,Partition:1024,Compressed:true,CompressionType:"lzw",EnsureShardingIndex:false});for(var i=;i<10000;i++){db.cstest1.cl.insert({"a":i,age:i})}
查看数据
db.cstest1.cl.count()db.cstest1.cl.findOne()
清除 cs
db.dropCS("cstest1")db.createCS("cstest2");db.cstest2.createCL("cl",{ShardingKey:{"a":1},ShardingType:"hash",AutoSplit:true,AutoIndexId:false,Partition:1024,Compressed:true,CompressionType:"lzw",EnsureShardingIndex:false});for(var i=;i<10000;i++){db.cstest2.cl.insert({"a":i,age:i})}
查看数据
db.cstest2.cl.count()db.cstest2.cl.findOne()
清除 cs
db.dropCS("cstest2")
连接数据组进行节点重启
db = new Sdb("localhost",11810)var rg = db.getRG("rg1")rg.reelect()
检查节点是否重新分配
sdblist -l
bin/sdblist -p xxx端口 --detail –expand
检查 weight 是否为 80
检查 logfilesz 是否为 1024
检查 sortbuf 是否为 512
检查 shardingbreak 是否为 180000
04
数据实时处理场景分析与实现
4.1 SequoiaDB 建立集合和集合空间
db.createDomain('mydomain',['rg1','rg2','rg3'],{AutoSplit:true})
db.createCS( "foo", { PageSize: 4096, Domain: "mydomain" } )
db.createCS( "foo", { PageSize: 4096, Domain: "mydomain" } )
4.2 产生数据
import java.util.Propertiesimport kafka.javaapi.producer.Producerimport kafka.producer.{KeyedMessage, ProducerConfig}import org.codehaus.jettison.json.JSONObjectimport java.util.UUIDimport scala.util.Randomobject KafkaProducer {private val random = new Random()private var pointer = -1def getID() : String =UUID.randomUUID().toStringdef account() : Double = {random.nextInt(1000000)}def main(args: Array[String]): Unit = {val topic = "test"val brokers = "192.168.106.187:9092"val props = new Properties()props.put("metadata.broker.list", brokers)props.put("serializer.class", "kafka.serializer.StringEncoder")val kafkaConfig = new ProducerConfig(props)val producer = new Producer[String, String](kafkaConfig)while(true) {// prepare event dataval event = new JSONObject()event.put("id", getID).put("account", account())// produce event messageproducer.send(new KeyedMessage[String, String](topic, event.toString))println("Message sent: " + event)Thread.sleep(200)}}}
4.3 实时处理数据
import com.sequoiadb.base.Sequoiadbimport org.apache.kafka.common.serialization.StringDeserializerimport org.slf4j.LoggerFactoryimport org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport org.apache.spark.streaming.kafka010.KafkaUtilsimport org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.json4s._import org.json4s.jackson.JsonMethods._object AccountAnalytics {val db= new Sequoiadb("192.168.106.188","","")val bar=db.getCollectionSpace("foo").getCollection("bar")implicit val formats = DefaultFormatsdef main(args: Array[String]) {val log = LoggerFactory.getLogger(this.getClass)val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.106.187:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "example","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))val masterUrl = if (args.length > ) args() else "local[2]"val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCount")val streamingContext = new StreamingContext(conf, Seconds(5))val topics = Array("test")val kafkaStream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Subscribe[String, String](topics, kafkaParams))val events = kafkaStream.flatMap(line => {val data = parse(line.value(),true)// mapper.readValue(line., classOf[SdkBean])val event = data.extract[Flow]Some(event)})log.info(s"$events")val userClicks = events.map { event => (event.id.getOrElse("null"), event.account)}userClicks.foreachRDD { rdd => {rdd.foreachPartition(partitionOfRecords => {partitionOfRecords.foreach(pair => {val uid = pair._1val account = pair._2val time=System.currentTimeMillis();log.info(s"$id:$time")bar.insert("{'id':'"+id+"','account':'"+account+"','time':"+time+ "}")})})}}streamingContext.start()streamingContext.awaitTermination()db.releaseResource()}}case class Flow(id: Option[String], account: String)
spark-submit --master spark://server1:7077 --class com.simple.AccountAnalytics --deploy-mode client ./test.jar总结
数据产生之后,首先会提交到 Kafka 消息队列,并交由 Spark Streaming 进行实时处理,结合高性能的巨杉数据库 SequoiaDB,处理后的数据后会存储在 SequoiaDB 当中。
相关文章