Posts Apache Spark과 S3 Compatible Object Storage 연동 시 Custom Endpoint 이슈
Post
Cancel

Apache Spark과 S3 Compatible Object Storage 연동 시 Custom Endpoint 이슈

Apache Spark과 S3 Compatible Object Storage 연동 시 Custom Endpoint 이슈

사내에서 개발하는 시스템에서 Apache Spark과 S3 Compatible Object Storage인 Ceph를 연동해야 할 일이 생겼다.

Ceph는 S3 Compatible한 Gateway를 제공하기 때문에 Apache Spark 실행 시 spark.hadoop.fs.s3a.endpoint에 해당 Gateway의 주소를 설정해주고, Access Key와 Secret Key까지 설정해주면 금방 될 거라 생각했다. Databricks 블로그에도 동일 방식으로 설명하고 있었기 때문이다.

이슈 상황

우선 테스트한 환경은 다음과 같다.

  • 네트워크: 인터넷과의 연결이 차단된 폐쇄망 환경
  • Ceph: 버전은 정확히 알지 못함
  • Apache Spark: 3.1.2
  • Apache Hadoop: 2.6.0 & 3.2.0

호기롭게 spark.hadoop.fs.s3a.endpoint에 Ceph Gateway Endpoint를 설정하고 실행을 했는데, Ceph에 Write하는 과정에서 다음과 같은 오류가 발생하였다.

1
Caused by: org.apache.http.conn.ConnectTimeoutException: Connect to s3.amazonaws.com ...

위 오류가 발생했을 때 처음 든 생각은 “spark.hadoop.fs.s3a.endpoint 옵션을 잘못 주었나?” 였다. 그러나 코드를 보기 시작하니 내 잘못이 아니라는 걸 느꼇다.

문제 해결

우선 나는 AWS를 현업에서 사용해본 적이 없다. 그러나 문제 해결에 도움이 되었던 점은 동일 프로젝트의 서버 사이드에서 AWS Java SDK를 사용하였으므로, 그 경험을 기반으로 문제 해결을 시작했다.

만일 이 경험이 없었다면 Hadoop S3 FileSystem 관련 코드를 한줄한줄 파고 들어가야 했을 것이고, 문제를 해결하는데 오래 걸렸을 것 같다.

AWS Java SDK에서 S3 연결 시 Custom Endpoint를 설정하는 법

AWS Java SDK에서 S3 Client를 사용할 때 Custom Endpoint를 설정하기 위해서는 AmazonS3ClientBuilderwithEndpointConfiguration을 호출하여 자체 Endpoint 정보를 넘겨주면 된다.

1
2
3
4
5
// S3 client
final AmazonS3 s3 = AmazonS3ClientBuilder.standard()
    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, regionName))
    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
    .build();

EndpointConfiguration 객체를 생성할 때 endPoint 위치에 Object Storage의 Gateway 주소를 넘겨주어야만 S3 API 호출 시 해당 Endpoint로 Routing한다.

S3AFileSystem에서 AmazonS3Client 초기화 부분 찾아보기

Spark에서 S3에 데이터를 쓸 때 사용하는 FileSystem 클래스는 S3AFileSystem 클래스이다. 우선 Hadoop 2.6.0 에서 수행했기 때문에, https://github.com/apache/hadoop/blob/release-2.6.0/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java 의 코드를 확인하였다.

1
2
3
4
5
6
7
8
9
10
11
ClientConfiguration awsConf = new ClientConfiguration();
awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, 
    DEFAULT_MAXIMUM_CONNECTIONS));
awsConf.setProtocol(conf.getBoolean(SECURE_CONNECTIONS, 
    DEFAULT_SECURE_CONNECTIONS) ?  Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, 
    DEFAULT_MAX_ERROR_RETRIES));
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, 
    DEFAULT_SOCKET_TIMEOUT));

s3 = new AmazonS3Client(credentials, awsConf);

핵심적으로 보아야 할 부분은 s3 변수(AmazonS3Client 객체)를 초기화하는 부분인데, 위와 같이 구성되어 있다. 초기화 시 사용자가 전달한 설정들도 반영해야 할 것 같은데, 자체적으로 관리하는 변수들(MAXIMUM_CONNECTIONS, SECURE_CONNECTIONS 등)을 제외하고는 반영하는 부분이 없다.

이 포인트에서 “아, Hadoop 2.6.0의 문제인가? 어차피 Hadoop 3으로 넘어가는 상황이니 Hadoop 3.2.2 버전의 코드를 확인해보자” 라는 생각이 들었다. Hadoop 2.6 버전대와 동일하게 S3AFileSystem을 보면 되는데, s3를 초기화할 때 다음과 같이 Reflection을 사용하고 있다.

1
2
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
        .createS3Client(name, bucket, credentials);

우선 S3 Client Factory를 생성한 뒤, 해당 Factory의 createS3Client를 호출하여 AmazonS3Client 객체를 초기화하는 것을 확인할 수 있다. 해당 Factory 클래스는 DefaultS3ClientFactory 였고, createS3Client 메서드를 확인해보면 아래와 같다.

1
2
3
4
5
6
7
8
9
@Override
public AmazonS3 createS3Client(URI name,
    final String bucket,
    final AWSCredentialsProvider credentials) throws IOException {
        Configuration conf = getConf();
        final ClientConfiguration awsConf = S3AUtils.createAwsConf(getConf(), bucket);
        return configureAmazonS3Client(
            newAmazonS3Client(credentials, awsConf), conf);
}

여기서 주의깊게 봐야 할 메서드는 configureAmazonS3Client 였는데, 해당 메서드의 내용은 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static AmazonS3 configureAmazonS3Client(AmazonS3 s3,
      Configuration conf)
      throws IllegalArgumentException {
    String endPoint = conf.getTrimmed(ENDPOINT, "");
    if (!endPoint.isEmpty()) {
      try {
        s3.setEndpoint(endPoint);
      } catch (IllegalArgumentException e) {
        String msg = "Incorrect endpoint: "  + e.getMessage();
        LOG.error(msg);
        throw new IllegalArgumentException(msg, e);
      }
    }
    return applyS3ClientOptions(s3, conf);
}

우리가 원했던 Endpoint 설정을 해주는 것을 확인할 수 있다. 드디어 문제가 해결된 것일까?

Hadoop 3.2.0 버전의 이슈

Apache Spark은 보통 Hadoop 버전에 맞게 Pre built 되어 빌드된 tarball을 제공하고 있다. 팀 내에서 Apache Spark 3.1.2 + Hadoop 3.2 Prebuilt를 사용하고 있었고, 3.2.0 브랜치에서 Endpoint를 잘 넣어주는 것을 확인했기에 다시 프로그램을 돌려보았다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Exception in thread "init" java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.<init>(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V
    at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1108)
    at org.apache.hadoop.fs.FileSystem.createNewFile(FileSystem.java:1413)
    at org.apache.accumulo.server.fs.VolumeManagerImpl.createNewFile(VolumeManagerImpl.java:184)
    at org.apache.accumulo.server.init.Initialize.initDirs(Initialize.java:479)
    at org.apache.accumulo.server.init.Initialize.initFileSystem(Initialize.java:487)
    at org.apache.accumulo.server.init.Initialize.initialize(Initialize.java:370)
    at org.apache.accumulo.server.init.Initialize.doInit(Initialize.java:348)
    at org.apache.accumulo.server.init.Initialize.execute(Initialize.java:967)
    at org.apache.accumulo.start.Main.lambda$execKeyword$0(Main.java:129)
    at java.lang.Thread.run(Thread.java:748)

흠… 뭔가 문제가 있는 모양이라 검색해보니 Hadoop Jira HADOOP-16080으로 해당 이슈가 리포팅되어 있었다.

Affects Versions가 3.2.0이고, Fix Versions가 3.2.2였다. Fetch 버전이 올라간거고 클라이언트 이슈이기 때문에 설치한 Spark Client Home의 jar 디렉토리의 Hadoop 관련 라이브러리 버전을 3.2.0 -> 3.2.2로 변경해주었다.(덤으로 Guava도 14 -> 27로 바꾸어주어야 하는데, 이 때가 가장 떨렸다. Guava는 가장 충돌이 많이 났던 라이브러리라서…)

해결 완료 및 회고

위의 사항들을 모두 적용한 뒤 정상적으로 Object Storage를 통한 Read와 Write가 수행되는 것을 확인하였다.

생각보다 Hadoop과 Object Storage 연동 관련 공식 문서가 부실했고, 버전에 따른 이슈들도 있어서 해결하는데에 하루 정도 걸렸던 것 같다. 좋은 경험이었지만 다시는 경험하고 싶지 않은 이슈였다.

This post is licensed under CC BY 4.0 by the author.