일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- OCPP
- flask
- homeassistant
- 충전기
- 에버온
- 전기차충전
- 서버리스
- 보안
- everon
- YMODEM
- lambda
- 플라스크
- Android
- esp8266
- 디자인패턴
- STM32
- 급속충전기
- 파이썬
- 홈어시스턴트
- 완속충전기
- 펌웨어
- 전기차충전기
- thread
- 전기차
- AWS
- raspberry
- dynamodb
- IOT Core
- 라즈베리파이
- 안드로이드
- Today
- Total
Louie NRT Story
[전기차 충전기] OCPP 1.6 Python 예제 본문
작성일: 22년 1월 11일
PS. 작년부터 계획만 잡아놓고 제품 출시와 유지보수 업무로 못하고 있다가
22년 1월부터 드디어 OCPP 개발에 들어감.
Index
1. Python OCPP
2. 예제코드
3. 동작 확인
1. Python OCPP
- python으로 개발된 OCPP 오픈소스가 있음
- 패키지를 다운로드 받아서 사용하면됨
2. 예제코드
- 해당 코드를 실행해보고자 함
1) central_system.py
- 코드에서는 BootNotification 정보만 응답하도록 되어 있음
- 문서에 따라 current_time, Interval, status 정보를 응답하도록 되어 있음
※ 여기서 정의한 status의 RegisterationStatus
Accepcted: 충전기가 서버에 연결됨을 알려줌
Pending: 서버가 충전기 정보를 저장하고 있거나 검색 후 메시지를 보낼 수 있다는 뜻인데 그냥 충전기에서 Interval(분) 이후에 다시 보내는게 맞을 것으로 보임
Rejected: 충전기의 정보가 맞지 않아 서버에서 거절함
- Security Profile 1으로 구현되어 있음
import asyncio
import logging
from datetime import datetime
try:
import websockets
except ModuleNotFoundError:
print("This example relies on the 'websockets' package.")
print("Please install it by running: ")
print()
print(" $ pip install websockets")
import sys
sys.exit(1)
from ocpp.routing import on
from ocpp.v16 import ChargePoint as cp
from ocpp.v16.enums import Action, RegistrationStatus
from ocpp.v16 import call_result
logging.basicConfig(level=logging.INFO)
class ChargePoint(cp):
@on(Action.BootNotification)
def on_boot_notification(self, charge_point_vendor: str, charge_point_model: str, **kwargs):
return call_result.BootNotificationPayload(
current_time=datetime.utcnow().isoformat(),
interval=10,
status=RegistrationStatus.accepted
)
async def on_connect(websocket, path):
""" For every new charge point that connects, create a ChargePoint
instance and start listening for messages.
"""
try:
requested_protocols = websocket.request_headers[
'Sec-WebSocket-Protocol']
except KeyError:
logging.error(
"Client hasn't requested any Subprotocol. Closing Connection"
)
return await websocket.close()
if websocket.subprotocol:
logging.info("Protocols Matched: %s", websocket.subprotocol)
else:
# In the websockets lib if no subprotocols are supported by the
# client and the server, it proceeds without a subprotocol,
# so we have to manually close the connection.
logging.warning('Protocols Mismatched | Expected Subprotocols: %s,'
' but client supports %s | Closing connection',
websocket.available_subprotocols,
requested_protocols)
return await websocket.close()
charge_point_id = path.strip('/')
cp = ChargePoint(charge_point_id, websocket)
await cp.start()
async def main():
server = await websockets.serve(
on_connect,
'0.0.0.0',
9000,
subprotocols=['ocpp1.6']
)
logging.info("Server Started listening to new connections...")
await server.wait_closed()
if __name__ == '__main__':
try:
# asyncio.run() is used when running this example with Python 3.7 and
# higher.
asyncio.run(main())
except AttributeError:
# For Python 3.6 a bit more code is required to run the main() task on
# an event loop.
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
2) charge_point.py
- 위와 마찬가지로 BootNotification 정보만 정의되어 있음
- 필수 정보인 model와 vendor 정보만 서버에 전송함
- Security Profile 1 기준으로 되어있음
- websocket Header 정보에 ocpp1.6 프로토콜임을 알려줌
import asyncio
import logging
try:
import websockets
except ModuleNotFoundError:
print("This example relies on the 'websockets' package.")
print("Please install it by running: ")
print()
print(" $ pip install websockets")
import sys
sys.exit(1)
from ocpp.v16 import call
from ocpp.v16 import ChargePoint as cp
from ocpp.v16.enums import RegistrationStatus
logging.basicConfig(level=logging.INFO)
class ChargePoint(cp):
async def send_boot_notification(self):
request = call.BootNotificationPayload(
charge_point_model="Optimus",
charge_point_vendor="The Mobility House"
)
response = await self.call(request)
if response.status == RegistrationStatus.accepted:
print("Connected to central system.")
async def main():
async with websockets.connect(
'ws://localhost:9000/CP_1',
subprotocols=['ocpp1.6']
) as ws:
cp = ChargePoint('CP_1', ws)
await asyncio.gather(cp.start(), cp.send_boot_notification())
if __name__ == '__main__':
try:
# asyncio.run() is used when running this example with Python 3.7 and
# higher.
asyncio.run(main())
except AttributeError:
# For Python 3.6 a bit more code is required to run the main() task on
# an event loop.
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
3. 동작 확인
1) charge_point.py
- Send Message: [2,"3ab74abd-75b0-43ca-b24f-72242bd9c110","BootNotification",{"chargePointModel":"Optimus","chargePointVendor":"The Mobility House"}]
- Receive Message: [3,"3ab74abd-75b0-43ca-b24f-72242bd9c110",{"currentTime":"2022-01-11T04:34:12.170865","interval":10,"status":"Accepted"}]
2) central_system.py
- Receive message: [2,"3ab74abd-75b0-43ca-b24f-72242bd9c110","BootNotification","chargePointModel":"Optimus","chargePointVendor":"The Mobility House"}]
- Send Message: [3,"3ab74abd-75b0-43ca-b24f-72242bd9c110",{"currentTime":"2022-01-11T04:34:12.170865","interval":10,"status":"Accepted"}]
Referece:
https://ocpp.readthedocs.io/en/latest/index.html
영상의 발표자 Auke Willem Oosterhoff 개발자가 만들었음
해당 코드 주소
https://github.com/mobilityhouse/ocpp
'전기차충전기' 카테고리의 다른 글
[전기차 충전기] Websocket TLS/SSL 통신 - Todo (0) | 2022.01.11 |
---|---|
[전기차 충전기] 서울시 보조금 충전기 OCPP 1.6 연동 - 01 (0) | 2022.01.11 |
[안드로이드] IntentService 만들기 (0) | 2021.12.12 |
[안드로이드] Service 만들기 (0) | 2021.12.12 |
[안드로이드] 잘못된 Thread 사용법 (0) | 2021.12.12 |