Aggregator的API有哪些?

Aggregator的API有哪些?

请先 登录 后评论

1 个回答

江帅帅 | 奈学教育 - 架构师
擅长:Java,Python

Aggregator共提供了5个API供您实现。API的调用时机及常规用途如下: createStartupValue(context) 该API在所有Worker上执行一次,调用时机是所有超步开始之前,通常用于初始化AggregatorValue。在第0轮超步中,调用WorkerContext.getLastAggregatedValue()或ComputeContext.getLastAggregatedValue()可以获取该API初始化的AggregatorValue对象。

createInitialValue(context) 该API在所有Worker上每轮超步开始时调用一次,用于初始化本轮迭代所用的AggregatorValue。通常操作是通过 WorkerContext.getLastAggregatedValue()得到上一轮迭代的结果,然后执行部分初始化操作。

aggregate(value, item) 该API同样在所有Worker上执行,与上述API不同的是,该API由用户显示调用ComputeContext#aggregate(item)来触发,而上述两个API由框架自动调用。该API用于执行局部聚合操作,其中第一个参数value是本Worker在该轮超步已经聚合的结果(初始值是createInitialValue返回的对象),第二个参数是您的代码调用ComputeContext#aggregate(item)传入的参数。该API中通常用item来更新value实现聚合。所有aggregate执行完后,得到的value就是该Worker的局部聚合结果,然后由框架发送给Aggregator Owner所在的Worker。

merge(value, partial) 该API执行于Aggregator Owner所在Worker,用于合并各Worker局部聚合的结果,达到全局聚合对象。与aggregate类似,value是已经聚合的结果,而partial待聚合的对象,同样用partial更新value。

假设有3个Worker,分别是w0、w1、w2,其局部聚合结果是p0、p1、p2。例如,发送到Aggregator Owner所在Worker的顺序为p1、p0、p2,则merge执行次序为:

首先执行merge(p1, p0),这样p1和p0就聚合为p1。 然后执行merge(p1, p2),p1和p2聚合为p1,而p1即为本轮超步全局聚合的结果。 由上述示例可见,当只有一个Worker时,不需要执行merge方法,即merge()不会被调用。

terminate(context, value) 当Aggregator Owner所在Worker执行完merge()后,框架会调用terminate(context, value)执行最后的处理。其中第二个参数value,即为merge()最后得到全局聚合,在该方法中可以对全局聚合继续修改。执行完terminate()后,框架会将全局聚合对象分发给所有Worker,供下一轮超步使用。terminate()方法的一个特殊之处在于,如果返回True,则整个作业就结束迭代,否则继续执行。在机器学习场景中,通常判断收敛后返回True以结束作业。

请先 登录 后评论
  • 1 关注
  • 0 收藏,150 浏览
  • NX小编 提出于 2020-07-28 12:02