-
nifi 로 API 수집 및 오브젝트 스토리지에 저장하기HOWTO 2020. 9. 19. 17:41
api 를 호출하여 데이터를 매일 수집하고 저장하는 파이썬 소스들을 인터넷에서 쉽게 찾을 수 있지만 nifi 의 invokehtttp processor 를 사용하면 flowfile 을 통해 어떻게 작업이 진행되고 있는지, 어디서 에러가 발생했는지 확인하기 쉽다.
(python 을 아주 잘 다루는 사람이라면 python 이 훨씬 쉬울 것이다.)
nifi 로 api 를 호출하고, xml 로 받은 결과값을 csv 로 변환하여 네이버 클라우드 플랫폼의 오브젝트 스토리지에 업로드하는 flowfile 을 만들어봤다.
각각의 processor 설정은 아래와 같이 설정했다.
1. Invokehttp
나는 오피넷에 api 사용 권한을 신청하여 매일 유종 별 평균 가격을 수집하도록 설정했다.
api 로 데이터를 조회하는 것이기 때문에 HTTP Method 는 Get 으로 입력하고 remoteURL api 주소를 입력한다.
Header에 내용을 추가해야 하는 경우 + 버튼을 눌러 Key/Value 형태로 입력해준다.
2. UpdateAttribute
xml 로 받은 api 결과를 csv 로 변환하기 위한 schema 속성값을 정의해준다.
3. ConvertRecord
xml 결과를 csv 로 변환하는 방법은 여러가지가 있지만 적용해 본 결과 ConvertRecord 를 사용하는게 가장 쉬웠다.
XML 결과를 읽어야하기 때문에 XMLReader 를, CSV 로 써야하기 때문에 CSVRecordSetWriter 로 선택하고 우측에 화살표를 클릭해서 각각 컨트롤러 별 필요한 설정을 진행한다.
XMLReader 에서는 Schema Registry 를 정의해 줘야 해서 AvroSchemaRegistry 를 선택해주고 오른쪽 화살표를 클릭하여 API 결과값의 레코드를 구조를 정의해준다.
XML 결과값이 single array 가 아닌 경우가 있는데 그런 경우에는 nifi docs 를 열심히 읽어보고 누가 나에게 설명 좀 해줬으면..
{ "type": "record", "name": "oil_avgAllPrice", "fields" : [ {"name": "TRADE_DT", "type": ["null", "string"]}, {"name": "PRODCD", "type": ["null", "string"]}, {"name": "PRODNM", "type": ["null", "string"]}, {"name": "PRICE", "type": ["null", "string"]}, {"name": "DIFF", "type": ["null", "string"]} ] }
4. PutS3Object
PutS3Object 는 프로세서 이름만 봐도 알겠지만 AWS S3 에 파일을 가져오거나 저장하기 위해 사용하는 프로세서이다. 네이버 클라우드 플랫폼의 오브젝트 스토리지의 경우 S3 API 와 호환되기 때문에 PutS3Object 프로세서를 사용할 수 있다. 차이가 있다면 프로세서 설정에 있는 Access Key, Secret Key 항목에 값을 넣어주는게 아니고 AWS Credentials Provider Service 의 AWSCredentialsProviderControllerService 컨트롤러를 통해 정의해 주면 된다.
위 이미지에서 AWSCredentialsProviderControllerService 컨트롤러 설정에서 입력한 Key 값은 sensitive value 로 보여지지는 않지만 클릭해서 보면 저 부분에 Access/Secret Key 값을 입력하면 된다. '
연결된 flowfile 을 실행 시키면 제일 마지막 PutS3Object 프로세서에서 Data Provenance 를 확인해보면 오브젝트 스토리지로 drop 한 것을 확인할 수 있다.
drop type 맨 앞에 느낌표를 선택하면 어떤 값이 업로드 되었는지 확인할 수 있다.
반응형'HOWTO' 카테고리의 다른 글
Installing CKAN with Docker Compose (ubuntu 16.04) (0) 2020.10.11 nifi 로 여러개 파일 내용 병합하기 (merge record) (0) 2020.09.22 OSError: Command /root/pyenv/bin/python2 - setuptools pkg_resources pip wheel failed with error code 1 (0) 2020.06.29 ckan.plugins.core.PluginNotFoundException: dataproxy (0) 2020.06.25 ImportError: No module named simplecrypt (0) 2020.06.25