当前位置: 首页 > news >正文

做ic哪些网站好做广州百度推广优化

做ic哪些网站好做,广州百度推广优化,扬中网站建设价位,邢台网约车新政策文章目录 Strctured Streaming简单应用 一、Output Modes输出模式 二、Streaming Table API 三、​​​​​​​​​​​​​​Triggers 1、​​​​​​​unspecified(默认模式) 2、​​​​​​​​​​​​​​Fixed interval micro-batches&am…

文章目录

Strctured Streaming简单应用

一、Output Modes输出模式

二、Streaming Table API

三、​​​​​​​​​​​​​​Triggers

1、​​​​​​​unspecified(默认模式)

2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)


Strctured Streaming简单应用

一、Output Modes输出模式

Structured Streaming中结果输出时outputMode可以设置三种模式,三种默认区别如下:

  • Append Mode(默认模式):追加模式,只有自上次触发后追加到结果表中的新行才会被输出。只有select、where、map、flatmap、filter、join查询支持追加模式。
  • Complete Mode(完整模式):将整个更新的结果输出。仅可用于代码中有聚合查询情况,代码中没有聚合查询不能使用。
  • Update Mode(更新模式):自Spark2.1.1版本后可用,只有自上次触发后更新的行才会被输出。这种模式仅仅输出自上次触发以来发生更改的行。如果结果数据没有聚合操作那么相当于Append Mode。

二、​​​​​​​​​​​​​​Streaming Table API

在Spark3.1版本之后,我们可以通过DataStreamReader.table()方式实时读取流式表中的数据,使用DataStreamWriter.toTable()向表中实时写数据。

案例:读取Socket数据实时写入到Spark流表中,然后读取流表数据展示数据。

代码示例如下:

package com.lanson.structuredStreamingimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTableAPI {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("StreamTableAPI").config("spark.sql.shuffle.partitions", 1).config("spark.sql.warehouse.dir", "./my-spark-warehouse").getOrCreate()spark.sparkContext.setLogLevel("Error");import spark.implicits._//2.读取socket数据,注册流表val df: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.对df进行转换val personinfo: DataFrame = df.as[String].map(line => {val arr: Array[String] = line.split(",")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")//4.将以上personinfo 写入到流表中personinfo.writeStream.option("checkpointLocation","./checkpoint/dir1").toTable("mytbl")import org.apache.spark.sql.functions._//5.读取mytbl 流表中的数据val query: StreamingQuery = spark.readStream.table("mytbl").withColumn("new_age", col("age").plus(6)).select("id", "name", "age", "new_age").writeStream.format("console").start()query.awaitTermination()}
}

以上代码编写完成后启动,向控制台输入以下数据:

1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22

结果输入如下:

注意:以上代码执行时Spark中写出的表由Spark 参数”spark.sql.warehouse.dir”指定的路径临时维护数据,每次执行时,需要将该路径下的表数据清空。

三、​​​​​​​​​​​​​​Triggers

Structured Streaming Triggers 决定了流式数据被处理时是微批处理还是连续实时处理,以下是支持的Triggers:

实时处理,以下是支持的Triggers:

Trigger Type

描述

Unspecified(默认)

  • 代码使用:Trigger.ProcessingTime(0L)。
  • 代码中没有明确指定触发类型则查询默认以微批模式执行,表示尽可能快的执行查询。

Fixed interval micro-batches(固定间隔批次)

  • 代码使用:Trigger.ProcessingTime(long interval,TimeUnit timeUnit)
  • 查询将以微批模式处理,批次间隔根据用户指定的时间间隔决定
  1. 如果前一个微批处理时间在时间间隔内完成,则会等待间隔时间完成后再开始下一个微批处理
  2. 如果前一个微批处理时间超过了时间间隔,那么下一个微批处理将在前一个微批处理完成后立即开始。
  3. 如果没有新数据可用,则不会启动微批处理。

One-time micro-batch(仅一次性触发)

  • 代码使用:Trigger.Once()
  • 只执行一个微批次查询所有可用数据,然后自动停止,适用于一次性作业。

Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段))

  • 代码使用:Trigger.Continuous(long interval,TimeUnit timeUnit)
  • 以固定的Checkpoint间隔(interval)连续处理。在这种模式下,连续处理引擎将每隔一定的间隔(interval)做一次checkpoint,可获得低至1ms的延迟。

下面以读取Socket数据为例,Scala代码演示各个模式

1、​​​​​​​unspecified(默认模式)

代码如下:

//3.默认微批模式执行查询,尽快将结果写出到控制台
val query: StreamingQuery = frame.writeStream.format("console").start()query.awaitTermination()

2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

代码如下:

//3.用户指定固定间隔批次触发查询val query: StreamingQuery = frame.writeStream.format("console").trigger(Trigger.ProcessingTime("5 seconds"))
//      .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS).start()query.awaitTermination()

注意:这种固定间隔批次指的是第一批次处理完成,等待间隔时间,然后处理第二批次数据,依次类推。

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

代码如下:

//4.仅一次触发执行
val query: StreamingQuery = frame.writeStream.format("console").trigger(Trigger.Once()).start()
query.awaitTermination()

4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)

Continuous不再是周期性启动task的批量执行数,而是启动长期运行的task,而是不断一个一个数据进行处理,周期性的通过指定checkpoint来记录状态(如果不指定checkpoint目录,会将状态记录在Temp目录下),保证exactly-once语义,这样就可以实现低延迟。详细内容可以参照后续“Continuous处理”章节。

代码如下:

//3.Continuous 连续触发执行
val query: StreamingQuery = frame.writeStream.format("console")//每10ms 记录一次状态,而不是执行一次.trigger(Trigger.Continuous(10,TimeUnit.MILLISECONDS)).option("checkpointLocation","./checkpint/dir4").start()
query.awaitTermination()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
http://www.jinmujx.cn/news/115861.html

相关文章:

  • 毕业设计 网站开发登封seo公司
  • wordpress电商主题下载网站seo外链
  • 建设网站视频素材找网站公司制作网站
  • 金溪做网站小型培训机构管理系统
  • 织梦做招聘网站百度竞价推广是什么工作
  • 企业网站模板价格百度浏览器
  • 做牛排的网站快速优化seo软件推广方法
  • 怎么做网站安全性测试提高百度快速排名
  • 那些网站做批发关键字排名优化公司
  • 苏州 网站建设 app谷歌搜索引擎网页版入口
  • 网站建设类的计入什么科目怎么制作网站详细流程
  • 广西两学一做网站百度推广上班怎么样
  • 企业自助建站模板找代写文章写手
  • WordPress 主题选项框架优化营商环境心得体会2023
  • 怎么寻找做有意做网站的客户整站优化报价
  • 晚上必看的正能量网站网络优化工程师前景
  • 做虚拟主机网站沈阳seo推广
  • 创卫网站 建设 方案安徽网络关键词优化
  • 重庆市卫生厅网站 查询前置审批关键词怎么找出来
  • 网站怎么申请微信支付最简短的培训心得
  • 网站开发的流程商丘网站推广公司
  • 企业营销网站建设规划网络营销工具体系
  • 门户类网站注重什么任何东西都能搜出来的软件
  • 风水公司网站建设刚刚传来最新消息
  • 做网站的大小保健品的营销及推广方案
  • 功能型网站公司网络推广的作用
  • wordpress升级4.1seo排名优化怎么样
  • 网站建设石家庄快优网络营销中的seo是指
  • 做哪种类型的网站赚钱呢软文代写文案
  • 建设培训网站360推广官网