Skip to content

FEATURE: Add CompletableFuture Pipeline API#1035

Closed
oliviarla wants to merge 1 commit intonaver:developfrom
oliviarla:pipenewapi
Closed

FEATURE: Add CompletableFuture Pipeline API#1035
oliviarla wants to merge 1 commit intonaver:developfrom
oliviarla:pipenewapi

Conversation

@oliviarla
Copy link
Collaborator

@oliviarla oliviarla commented Dec 23, 2025

🔗 Related Issue

  • Pipeline API를 추가합니다.

  • 사용 방법

    • Pipeline 객체를 생성하여 하나의 파이프라인으로 포함할 명령들을 조합한다.
    • 파이프라인에 포함할 수 있는 명령들을 제한할 수 있다. (명령어의 종류, 개수)
    • 명령어의 개수는 최대 500개로 제한한다. 이를 통해 한 노드에는 최대 하나의 파이프라인이 보내진다.
    public class Pipeline<V> {
    
        // 다양한 명령을 포함할 수 있는 PipelineOperationImpl 타입을 value에 저장한다.
        private Map<MemcachedNode, Operation> ops;
        
        public Pipeline() {
          // ...
        }
        
        public Pipeline<V> lopInsert(String key, int index, V value) {
          // ...
          return this;
        }
        
        public Pipeline<V> mopInsert(String key, String mkey, V value) {
          // ...
          return this;
        }
        
        // ...
        
    }
    • Pipeline 객체를 execute 메서드에 넘겨야 파이프라인이 수행되며, List<Boolean> 에 각 명령에 대한 응답이 담긴다.
      ArcusFuture<List<Boolean>> execute(Pipeline<T> pipeline)
    • 서로 다른 여러 키를 파이프라인에 포함하는 경우, 각 노드 별로 파이프라인이 생성되어 노드에 전달된다.
    • 명령의 조합 순서대로 파이프라인이 만들어지지만, 서버에서 처리 도중 다른 연산이 끼어들 수 있다.
    • 만약 처리 도중 실패 응답이나 예외가 발생한 경우 PipelineOperationException 혹은 CompositeException이 발생할 수 있다.
    • 예외가 발생했더라도 ArcusMultiFuture#getResultsWithFailures 메서드로 나머지 결과를 조회할 수 있다.
      • 실패/예외가 발생했거나 예외로 인해 아예 실행되지 않은 명령의 인덱스 값은 null이다.
      • 응답이 온 명령의 인덱스 값은 True/False이다.

⌨️ What I did

  • switchover 처리
    • responseIndex를 사용하여 아직 응답이 오지 않은 operation들만 새로운 master 노드에서 initialize할 때 포함되도록 한다.
  • migration 처리
    • gotStatus 메서드의 인자로 Operation을 받아, 여러 Operation으로 분리되더라도 원본 콜백에서 원본 index에 값을 추가할 수 있도록 한다.
  • 개별 노드에서의 응답 처리
    • 일반 응답 처리
      • 성공 응답이거나, ERR_NOT_FOUND / ERR_NOT_FOUND_ELEMENT / ERR_ELEMENT_EXISTS / ERR_NOTHING_TO_UPDATE 실패 응답인 경우 Boolean으로 처리한다.
    • 실패 응답 처리
      • TYPE_MISMATCH / BKEY_MISMATCH / EFLAG_MISMATCH / OVERFLOWED / OUT_OF_RANGE / UNREADABLE / NOT_SUPPORTED 의 경우 PipelineOperationException을 발생시킨다.
    • 에러 응답 처리
      • IO Thead에서 에러 상황을 처리하기 위해 내부적으로 OperationException을 던진다.
      • ArcusMultiFuture를 통해 각 개별노드로부터의 결과를 합치기 위해, 어떤 index에서 에러 응답이 발생했는지 알 수 있는 PipelineOperationException을 발생시킨다.
    • 실패 응답, 에러 응답으로 인해 예외가 하나라도 생긴 경우, PipelineCompositeException로 감싸서 반환한다. 이 때 현재까지의 응답을 담은 Result가 함께 담긴다.
  • 전체 노드의 응답 처리
    • 특정 노드의 응답을 처리하던 중 예외가 발생한 Future가 있는 경우 PipelineCompositeException 내부의 예외들을 모두 꺼내 통합한 뒤, CompositeException 또는 단일 PipelineOperationException으로 반환한다.
    • PipelineCompositeException을 통합하는 과정에서 각 노드로부터 받은 현재까지의 응답 Result 역시 통합한다.

@oliviarla oliviarla force-pushed the pipenewapi branch 2 times, most recently from 7a9e90a to 0d98bc8 Compare January 22, 2026 06:42
@oliviarla oliviarla requested a review from uhm0311 January 22, 2026 06:42
@oliviarla
Copy link
Collaborator Author

oliviarla commented Feb 11, 2026

@uhm0311 @jhpark816
코드와 첫 코멘트의 설명을 새롭게 반영하였습니다.
코드가 복잡하고 수정된 파일도 증가하여 이해하기가 어려울 것 같긴 합니다. 필요 시 오프라인 코드 설명 요청해주세요.
사용하는 예시는 테스트 코드를 확인하면 더 이해하기 쉽습니다.

Copy link
Collaborator

@uhm0311 uhm0311 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

현재 코드 방향성은 괜찮아 보이고, 코드 설명 보강이 조금 더 필요해 보입니다.

@oliviarla oliviarla force-pushed the pipenewapi branch 3 times, most recently from 73d7884 to adb10e9 Compare February 13, 2026 07:17
@oliviarla oliviarla requested a review from uhm0311 February 13, 2026 07:17
@oliviarla
Copy link
Collaborator Author

@uhm0311
전체적으로 리뷰 반영 완료했습니다.

@oliviarla oliviarla force-pushed the pipenewapi branch 2 times, most recently from 2d3dee0 to de9ab5e Compare February 20, 2026 12:36
Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

일부 리뷰

keys.add(key);
ops.add(opFact.collectionInsert(key, mkey,
mapUpsert, tc.encode(value).getData(), null));
return this;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

위의 mopUpsert()에서 아래와 같이 호출하면 됩니다.

return this.mopUpsert(key, mkey, value, null);

ops.add(opFact.collectionUpdate(key, bkey.toString(), bTreeUpdate,
tc.encode(value).getData(), null));
return this;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bopUpdate() 코드는 bopDelete() 위로 옮깁시다.

}

pipedBuffer.flip();
setBuffer(pipedBuffer);
Copy link
Collaborator

@jhpark816 jhpark816 Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

각 명령에 " pipe" 추가하는 로직은 최적화할 필요가 있어 보입니다. 어떤가요?
이슈라 생각한다면, 이슈로 올려두고 나중에 처리하시죠.

SetInsert<V> setInsert = new SetInsert<>(value, null, attributes);
keys.add(key);
ops.add(opFact.collectionInsert(key, "",
setInsert, tc.encode(value).getData(), null));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

아래 SetExist와 SetDelete 객체를 생성할 시에 value 외에 tc를 함께 넘겨준다.
반면, SetInsert 객체 생성 시에는 value만 넘겨 생성한다.

이에 따라 value length는 얻는 코드를 포함하여 해당 operation 객체를 다루는 방식이 서로 달라서, 코드 일관성이 떨이집니다.
SetInsert 객체 생성 시에도 value 외에 tc를 함께 넘기는 방식으로 리팩토링하는 것을 검토해 주세요. 이 사항도 진행한다면 별도 이슈로 처리 바랍니다.


OperationStatus status = successAll ? END : FAILED_END;
complete(status);
} else if (line.startsWith("PIPE_ERROR")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

여기서는 needRedirect() 검사하지 않아도 되나요?

private OperationStatus parseStatusLine(String line) {
boolean allDigit = line.chars().allMatch(Character::isDigit);
if (allDigit) {
return new OperationStatus(true, line, StatusCode.SUCCESS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bop incr/decr 연산을 pipelinging에서 제외하는 것은 어떤지 ?

complete(status);
} else if (line.startsWith("RESPONSE ")) {
expectingResponse = true;
responseIndex = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switchover에 의해 아래와 같이 재수행하는 경우를 가정합시다.

  • responseIndex : 5
  • ops.size() : 10

위의 경우에 RESPONSE 응답이 왔을 시에 responseIndex을 0으로 재설정하여도 되는 지 ?

}

// Notify callback for single command
cb.gotStatus(ops.get(0), status);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switchover에 의해 아래와 같이 재수행하는 경우를 가정합시다.

  • responseIndex : 9
  • ops.size() : 10

이 경우, ops.get(0) 호출하는 것도 문제가 되지 않는 지 ?

@oliviarla
Copy link
Collaborator Author

v2에서 pipeline 기능을 제공하는 것을 보류하게 되어 close합니다.

@oliviarla oliviarla closed this Feb 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants