Louie NRT Story

[전기차 충전기] OCPP 1.6 Python 예제 본문

전기차충전기

[전기차 충전기] OCPP 1.6 Python 예제

hyeok0724.kim@gmail.com 2022. 1. 11. 12:56
반응형

작성일: 22년 1월 11일

PS. 작년부터 계획만 잡아놓고 제품 출시와 유지보수 업무로 못하고 있다가

     22년 1월부터 드디어 OCPP 개발에 들어감.

 

 

Index

1. Python OCPP

2. 예제코드

3. 동작 확인

 

 

1. Python OCPP

 - python으로 개발된 OCPP 오픈소스가 있음

 - 패키지를 다운로드 받아서 사용하면됨

 

 

2. 예제코드

 - 해당 코드를 실행해보고자 함

 1) central_system.py

  - 코드에서는 BootNotification 정보만 응답하도록 되어 있음

  - 문서에 따라 current_time, Interval, status 정보를 응답하도록 되어 있음

  ※ 여기서 정의한 status의 RegisterationStatus

     Accepcted: 충전기가 서버에 연결됨을 알려줌

     Pending: 서버가 충전기 정보를 저장하고 있거나 검색 후 메시지를 보낼 수 있다는 뜻인데 그냥 충전기에서 Interval(분) 이후에 다시 보내는게 맞을 것으로 보임

     Rejected: 충전기의 정보가 맞지 않아 서버에서 거절함

  - Security Profile 1으로 구현되어 있음

 

import asyncio
import logging
from datetime import datetime

try:
    import websockets
except ModuleNotFoundError:
    print("This example relies on the 'websockets' package.")
    print("Please install it by running: ")
    print()
    print(" $ pip install websockets")
    import sys
    sys.exit(1)

from ocpp.routing import on
from ocpp.v16 import ChargePoint as cp
from ocpp.v16.enums import Action, RegistrationStatus
from ocpp.v16 import call_result

logging.basicConfig(level=logging.INFO)


class ChargePoint(cp):
    @on(Action.BootNotification)
    def on_boot_notification(self, charge_point_vendor: str, charge_point_model: str, **kwargs):
        return call_result.BootNotificationPayload(
            current_time=datetime.utcnow().isoformat(),
            interval=10,
            status=RegistrationStatus.accepted
        )


async def on_connect(websocket, path):
    """ For every new charge point that connects, create a ChargePoint
    instance and start listening for messages.
    """
    try:
        requested_protocols = websocket.request_headers[
            'Sec-WebSocket-Protocol']
    except KeyError:
        logging.error(
            "Client hasn't requested any Subprotocol. Closing Connection"
        )
        return await websocket.close()
    if websocket.subprotocol:
        logging.info("Protocols Matched: %s", websocket.subprotocol)
    else:
        # In the websockets lib if no subprotocols are supported by the
        # client and the server, it proceeds without a subprotocol,
        # so we have to manually close the connection.
        logging.warning('Protocols Mismatched | Expected Subprotocols: %s,'
                        ' but client supports  %s | Closing connection',
                        websocket.available_subprotocols,
                        requested_protocols)
        return await websocket.close()

    charge_point_id = path.strip('/')
    cp = ChargePoint(charge_point_id, websocket)

    await cp.start()


async def main():
    server = await websockets.serve(
        on_connect,
        '0.0.0.0',
        9000,
        subprotocols=['ocpp1.6']
    )

    logging.info("Server Started listening to new connections...")
    await server.wait_closed()


if __name__ == '__main__':
    try:
        # asyncio.run() is used when running this example with Python 3.7 and
        # higher.
        asyncio.run(main())
    except AttributeError:
        # For Python 3.6 a bit more code is required to run the main() task on
        # an event loop.
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
        loop.close()

 

 2) charge_point.py

  - 위와 마찬가지로 BootNotification 정보만 정의되어 있음

  - 필수 정보인 model와 vendor 정보만 서버에 전송함

  - Security Profile 1 기준으로 되어있음

  - websocket Header 정보에 ocpp1.6 프로토콜임을 알려줌

import asyncio
import logging

try:
    import websockets
except ModuleNotFoundError:
    print("This example relies on the 'websockets' package.")
    print("Please install it by running: ")
    print()
    print(" $ pip install websockets")
    import sys
    sys.exit(1)


from ocpp.v16 import call
from ocpp.v16 import ChargePoint as cp
from ocpp.v16.enums import RegistrationStatus

logging.basicConfig(level=logging.INFO)


class ChargePoint(cp):
    async def send_boot_notification(self):
        request = call.BootNotificationPayload(
            charge_point_model="Optimus",
            charge_point_vendor="The Mobility House"
        )

        response = await self.call(request)

        if response.status == RegistrationStatus.accepted:
            print("Connected to central system.")


async def main():
    async with websockets.connect(
        'ws://localhost:9000/CP_1',
        subprotocols=['ocpp1.6']
    ) as ws:

        cp = ChargePoint('CP_1', ws)

        await asyncio.gather(cp.start(), cp.send_boot_notification())


if __name__ == '__main__':
    try:
        # asyncio.run() is used when running this example with Python 3.7 and
        # higher.
        asyncio.run(main())
    except AttributeError:
        # For Python 3.6 a bit more code is required to run the main() task on
        # an event loop.
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
        loop.close()

 

 

3. 동작 확인

 1) charge_point.py

  - Send Message: [2,"3ab74abd-75b0-43ca-b24f-72242bd9c110","BootNotification",{"chargePointModel":"Optimus","chargePointVendor":"The Mobility House"}]

  - Receive Message: [3,"3ab74abd-75b0-43ca-b24f-72242bd9c110",{"currentTime":"2022-01-11T04:34:12.170865","interval":10,"status":"Accepted"}]

 

 2) central_system.py

  - Receive message: [2,"3ab74abd-75b0-43ca-b24f-72242bd9c110","BootNotification","chargePointModel":"Optimus","chargePointVendor":"The Mobility House"}]

  - Send Message: [3,"3ab74abd-75b0-43ca-b24f-72242bd9c110",{"currentTime":"2022-01-11T04:34:12.170865","interval":10,"status":"Accepted"}]

 

Referece:

https://ocpp.readthedocs.io/en/latest/index.html

 

OCPP — OCPP 0.2.0 documentation

OCPP Python package implementing the JSON version of the Open Charge Point Protocol (OCPP). Currently only OCPP 1.6 is supported. You can find the documentation on rtd. Installation You can either the project install from Pypi: Or clone the project and ins

ocpp.readthedocs.io

영상의 발표자 Auke Willem Oosterhoff 개발자가 만들었음

https://youtu.be/q4F76gNpUJg

해당 코드 주소

https://github.com/mobilityhouse/ocpp

 

GitHub - mobilityhouse/ocpp: Python implementation of the Open Charge Point Protocol (OCPP).

Python implementation of the Open Charge Point Protocol (OCPP). - GitHub - mobilityhouse/ocpp: Python implementation of the Open Charge Point Protocol (OCPP).

github.com

 

 

반응형
Comments