Branch:分岐演算子

概要図
JobFlowクラス実装
HogePrice hogePrice
   = operator.hogePrice(this.hoge);

JoinedMoge joinedMoge
   = operator.joinedMoge
      (this.moge, hogePrice.CHEEP);
Operatorクラス実装
@Branch
public Status hogePrice(Hoge hoge) {
  int price = hoge.getPrice();
  if (price < 0) {
    return Status.ERROR;
  }
  if (price >= 1000000) {
    return Status.EXPENSIVE;
  }
  return Status.CHEAP;
}

public enum Status {
  EXPENSIVE, CHEAP, ERROR
}

Confluent:合流演算子

概要図
JobFlowクラス実装
In in1, in2, in3;
Out out;

@Override
protected void describe() {
  CoreOperatorFactory core
    = new CoreOperatorFactory();
  Confluent<Hoge> op
    = core.confluent(in1,in2,in3);
  out.add(op);
}
              
Operatorクラス実装
// 実装不要

Update:更新演算子

概要図
JobFlowクラス実装
UpdatedHoge updatedHoge
 = operator.updatedHoge(this.hoge);
out.add(updatedHoge.out);
              
Operatorクラス実装
@Update
public void updatedHoge(Hoge hoge){
  hoge.setMoge("foo");
}
              

Convert:変換演算子

概要図
JobFlowクラス実装
ConvertedHoge convertedHoge
 = operator.convertedHoge(this.hoge);
out.add(convertedHoge.out);
              
Operatorクラス実装
@Convert
public Moge convertedHoge(Hoge hoge){
  Moge moge = new Moge();
  moge.setMoge(hoge.getHoge());
  return moge;
}
              

Extend:拡張演算子

概要図
JobFlowクラス実装
In in;
Out out;

@Override
protected void describe() {
 CoreOperatorFactory core
  = new CoreOperatorFactory();
 Extend<Foo> op
  = core.extend(in, Foo.class);
 out.add(op);
}                
              
Operatorクラス実装
// 実装不要

Project:射影演算子

概要図
JobFlowクラス実装
In in;
Out out;

@Override
protected void describe() {
 CoreOperatorFactory core
  = new CoreOperatorFactory();
 Project<Hoge> op
  = core.project(in, Hoge.class);
 out.add(op);
}
              
Operatorクラス実装
// 実装不要

Restructure:再構築演算子

概要図
JobFlowクラス実装
In in;
Out out;

@Override
protected void describe() {
 CoreOperatorFactory core
  = new CoreOperatorFactory();
 Restructure<Hoge> op
  = core.restructure(in, Hoge.class);
 out.add(op);
}
              
Operatorクラス実装
// 実装不要

Extract:抽出演算子

概要図
JobFlowクラス実装
ExtractFields ef
 = operator.extractFields(hoge, a, b);
this.out.add(ef.aResult);
              
Operatorクラス実装
private A a = new A();
private B b = new B();

@Extract
public void extractFields(
 Hoge hoge, 
 Result<A> aResult, 
 Result<B> bResult){
  a.setValue(hoge.getA());
  aResult.add(a);
  b.setValue(hoge.getB0());
  bResult.add(b);
  b.setValue(hoge.getB1());
  bResult.add(b);
}
              

MasterCheck:マスタ確認演算子

概要図
JobFlowクラス実装
Exists exists
 = mainOperator.exists(master, tx);
this.out.add(exists.found);
              
Operatorクラス実装
@MasterCheck
public abstract boolean exists(
 @Key(group = "id") HogeMst master,
 @Key(group = "masterId") HogeTrn tx);
              

MasterJoin:マスタ結合演算子

概要図
JobFlowクラス実装
Join join
 = operator.join(master, tx);
this.join.add(join.out);
core.stop(join.missed);
              
Operatorクラス実装
@MasterJoin
public abstract Hoge join(
 HogeMst master, 
 HogeTrn tx);
              

MasterBranch:マスタ分岐演算子

概要図
JobFlowクラス実装
BranchWithJoin branchWithJoin
   = operator.branchWithJoin
      (this.itemMaster, this.tx);
this.out.add(
  branchWithJoin.expensive);
              
Operatorクラス実装
@MasterBranch
public Status branchWithJoin(
 @Key(group = "id") ItemMst master,
 @Key(group = "itemId") HogeTrn tx) {
 if (master == null) {
  return Status.ERROR;
 }
 int price = master.getPrice();
 if (price < 0) {
  return Status.ERROR;
 }
 if (price >= 1000000) {
  return Status.EXPENSIVE;
 }
 return Status.CHEAP;
}

public enum Status {
 EXPENSIVE,CHEAP,ERROR
}
              

MasterJoinUpdate:マスタ付き更新演算子

概要図
JobFlowクラス実装
UpdateWithMaster uwm = operator.updateWithMaster(master, tx);
this.out.add(uwm.updated);
core.stop(uwm.missed);
              
Operatorクラス実装
@MasterJoinUpdate
public void updateWithMaster(
 @Key(group = "id") ItemMst master,
 @Key(group = "itemId") HogeTrn tx) {
 tx.setPrice(master.getPrice());
}
              

CoGroup:グループ結合演算子

概要図
JobFlowクラス実装
CheckUp checkup
 = operator.checkUp(this.hoge, this.foo);
this.hoge.add(checkUp.hogeResult);
this.foo.add(checkUp.fooResult);
this.hogeErr.add(checkUp.hogeError);
this.fooErr.add(checkUp.fooError);
              
Operatorクラス実装
@CoGroup
public void checkUp(
 @Key(group="id") List hogeList,
 @Key(group="hogeId") List fooList,
 Result<Hoge> hogeResult,
 Result<Foo> fooResult,
 Result<Hoge> hogeError,
 Result<Foo> fooError) {
 // いずれも存在+重複なしで突合成功
 if (hogeList.size() == 1 
  && fooList.size() == 1) {
  hogeResult.add(hogeList.get(0));
  fooResult.add(fooList.get(0));
 }
 // それ以外はエラー
 else {
  for (Hoge hoge : hogeList) {
   hogeError.add(hoge);
  }
  for (Foo foo : fooList) {
    fooError.add(foo);
  }
 }
}
              

Split:分割演算子

概要図
JobFlowクラス実装
Split split
 = operator.split(joined);
this.hogeOut.add(split.hoge);
this.fooOut.add(split.foo);
              
Operatorクラス実装
@Split
public abstract void split(
 HogeFoo joined,
 Result<Hoge> hoge,
 Result<Foo> foo);
              

Summarize:単純集計演算子

概要図
JobFlowクラス実装
Summarize sum
 = operator.summarize(this.hoge);
this.out.add(sum.out);
              
Operatorクラス実装
@Summarize
public abstract HogeTotal summarize(Hoge hoge);
              

Fold:畳み込み演算子

概要図
JobFlowクラス実装
Fold fold
 = mainOperator.fold(this.hoge);
this.out.add(fold.out);
              
Operatorクラス実装
@Fold
public void fold(
 @Key(group = "name") Hoge left, 
 Hoge right) {
 // @Summarizeを手動で行うイメージで、
 // leftに次々とrightを加える
 left.setValue(left.getValue()
  + right.getValue());
}