Skip to content

Commit 606045c

Browse files
poorva1209craigpnnl
authored andcommitted
Update field_proxy_forwarder.py
1 parent a75faab commit 606045c

1 file changed

Lines changed: 48 additions & 10 deletions

File tree

gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/field_proxy_forwarder.py

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55
from gridappsd import GridAPPSD
66
from gridappsd import topics
77

8+
from cimgraph.databases import GridappsdConnection, BlazegraphConnection
9+
from cimgraph.models import BusBranchModel, FeederModel
10+
11+
import os
12+
import cimgraph.utils as utils
13+
import cimgraph.data_profile.cimhub_ufls as cim
14+
815
REQUEST_FIELD = ".".join((topics.PROCESS_PREFIX, "request.field"))
916

1017
class FieldListener:
@@ -29,13 +36,11 @@ def on_message(self, headers, message):
2936
request_type = request_data.get("request_type")
3037
if request_type == "get_context":
3138
response = self.ot_connection.get_response(headers["destination"],message)
32-
self.proxy_connection.send(headers["reply_to"],response)
39+
self.proxy_connection.send(headers["reply-to"],response)
40+
elif request_type == "start_publishing":
41+
response = self.ot_connection.get_response(headers["destination"],message)
42+
self.proxy_connection.send(headers["reply-to"],json.dumps(response))
3343

34-
elif 'goss.gridappsd.process.request' in headers["destination"]:
35-
response = self.ot_connection.get_response(headers["destination"],message)
36-
#print(response)
37-
self.proxy_connection.send(headers["reply-to"],json.dumps(response))
38-
3944
else:
4045
print(f"Unrecognized message received by Proxy: {message}")
4146

@@ -48,7 +53,7 @@ class FieldProxyForwarder:
4853
when direct connection is not possible.
4954
"""
5055

51-
def __init__(self, connection_url: str, username: str, password: str):
56+
def __init__(self, connection_url: str, username: str, password: str, mrid :str):
5257

5358
#Connect to OT
5459
self.ot_connection = GridAPPSD()
@@ -67,19 +72,50 @@ def __init__(self, connection_url: str, username: str, password: str):
6772

6873
#Subscribe to messages from field
6974
self.proxy_connection.subscribe(destination=topics.BASE_FIELD_TOPIC+'.*', id=1, ack="auto")
70-
self.proxy_connection.subscribe(destination='goss.gridappsd.process.request.data.powergridmodel', id=2, ack="auto")
71-
75+
self.proxy_connection.subscribe(destination='goss.gridappsd.process.request.*', id=2, ack="auto")
76+
7277
#Subscribe to messages on OT bus
7378
self.ot_connection.subscribe(topics.field_input_topic(), self.on_message_from_ot)
7479

80+
81+
82+
os.environ['CIMG_CIM_PROFILE'] = 'cimhub_ufls'
83+
os.environ['CIMG_URL'] = 'http://localhost:8889/bigdata/namespace/kb/sparql'
84+
os.environ['CIMG_DATABASE'] = 'powergridmodel'
85+
os.environ['CIMG_NAMESPACE'] = 'http://iec.ch/TC57/CIM100#'
86+
os.environ['CIMG_IEC61970_301'] = '8'
87+
os.environ['CIMG_USE_UNITS'] = 'False'
88+
89+
self.database = BlazegraphConnection()
90+
distribution_area = cim.DistributionArea(mRID=mrid)
91+
self.network = BusBranchModel(
92+
connection=self.database,
93+
container=distribution_area,
94+
distributed=False)
95+
self.network.get_all_edges(cim.DistributionArea)
96+
self.network.get_all_edges(cim.Substation)
97+
98+
for substation in self.network.graph.get(cim.Substation,{}).values():
99+
print(f'Subscribing to Substation: /topic/goss.gridappsd.field.{substation.mRID}')
100+
self.ot_connection.subscribe('/topic/goss.gridappsd.field.'+substation.mRID, self.on_message_from_ot)
101+
102+
103+
104+
#self.ot_connection.subscribe(topics.BASE_FIELD_TOPIC, self.on_message_from_ot)
105+
106+
75107
def on_message_from_ot(self, headers, message):
108+
76109
"Receives messages coming from OT bus (GridAPPS-D) and forwards to Proxy bus"
77110
try:
78111
print(f"Received message from OT: {message}")
79112

80113
if headers["destination"] == topics.field_input_topic():
81114
self.proxy_connection.send(topics.field_input_topic(),json.dumps(message))
82115

116+
if 'goss.gridappsd.field' in headers["destination"]:
117+
118+
self.proxy_connection.send(headers["destination"],json.dumps(message))
83119
else:
84120
print(f"Unrecognized message received by OT: {message}")
85121

@@ -93,12 +129,14 @@ def on_message_from_ot(self, headers, message):
93129
parser.add_argument("username")
94130
parser.add_argument("passwd")
95131
parser.add_argument("connection_url")
132+
parser.add_argument("mrid")
96133
opts = parser.parse_args()
97134
proxy_connection_url = opts.connection_url
98135
proxy_username = opts.username
99136
proxy_password = opts.passwd
137+
mrid = opts.mrid
100138

101-
proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password)
139+
proxy_forwarder = FieldProxyForwarder(proxy_connection_url, proxy_username, proxy_password, mrid)
102140

103141
while True:
104142
time.sleep(0.1)

0 commit comments

Comments
 (0)