mirror of
https://github.com/wassname/PyCRS.git
synced 2026-06-27 16:10:29 +08:00
59007ef1fb
Next: - Comb through to smooth out kinks, like how to handle default values such as datum (not always needed) or unit when not specified (on parsing or on outputting). Or handling basic lat long crs without proj. Adding wkt axis parsing. Cleaning messy wkt parsing maybe. - And finally add esri wkt, copy paste mostly, but some small differences.
606 lines
20 KiB
Python
606 lines
20 KiB
Python
|
|
# parse from text strings
|
|
# possible use module: https://github.com/rockdoc/grabbag/wiki/CRS-WKT-Parser
|
|
# also note some paramter descriptions: http://www.geoapi.org/3.0/javadoc/org/opengis/referencing/doc-files/WKT.html
|
|
# and see gdal source code: http://gis.stackexchange.com/questions/129764/how-are-esri-wkt-projections-different-from-ogc-wkt-projections
|
|
# especially: http://fossies.org/windows/misc/saga_2.1.4_x64.zip/saga_2.1.4_x64/saga_prj.dic
|
|
|
|
from . import datums
|
|
from . import ellipsoids
|
|
from . import parameters
|
|
from . import units
|
|
from . import projections
|
|
from . import webscrape
|
|
|
|
def from_epsg_code(code):
|
|
# must go online (or look up local table) to get crs details
|
|
code = str(code)
|
|
proj4 = webscrape.crscode_to_string("epsg", code, "proj4")
|
|
crs = from_proj4(proj4)
|
|
return crs
|
|
|
|
def from_esri_code(code):
|
|
# must go online (or look up local table) to get crs details
|
|
code = str(code)
|
|
proj4 = webscrape.crscode_to_string("esri", code, "proj4")
|
|
crs = from_proj4(proj4)
|
|
return crs
|
|
|
|
def from_sr_code(code):
|
|
# must go online (or look up local table) to get crs details
|
|
code = str(code)
|
|
proj4 = webscrape.crscode_to_string("sr-org", code, "proj4")
|
|
crs = from_proj4(proj4)
|
|
return crs
|
|
|
|
##def from_esri_wkt(string):
|
|
## # parse arguments into components
|
|
## # use args to create crs
|
|
## pass
|
|
|
|
def from_ogc_wkt(string):
|
|
# remove newlines and multi spaces
|
|
string = " ".join(string.split())
|
|
|
|
# parse arguments into components
|
|
def _consume_bracket(chars, char):
|
|
"char must be the opening bracket"
|
|
consumed = ""
|
|
depth = 1
|
|
while char and depth > 0:
|
|
consumed += char
|
|
char = next(chars, None)
|
|
# update depth level
|
|
if char == "[":
|
|
depth += 1
|
|
elif char == "]":
|
|
depth -= 1
|
|
consumed += char # consume the last closing char too
|
|
return consumed
|
|
|
|
def _consume_quote(chars, char, quotechar):
|
|
"char and quotechar must be the opening quote char"
|
|
consumed = ""
|
|
# consume the first opening char
|
|
consumed += char
|
|
char = next(chars, None)
|
|
# consume inside
|
|
while char and char != quotechar:
|
|
consumed += char
|
|
char = next(chars, None)
|
|
# consume the last closing char too
|
|
consumed += char
|
|
return consumed
|
|
|
|
def _next_elem(chars, char):
|
|
"char must be the first char of the text that precedes brackets"
|
|
header = ""
|
|
# skip until next header
|
|
while not char.isalpha():
|
|
char = next(chars, None)
|
|
# first consume the element text header
|
|
while char.isalpha():
|
|
header += char
|
|
char = next(chars, None)
|
|
# skip until next brackets (in case of spaces)
|
|
while char != "[":
|
|
char = next(chars, None)
|
|
# then consume the element bracket contents
|
|
if char == "[":
|
|
content = _consume_bracket(chars, char)
|
|
char = next(chars, None)
|
|
# split content into args list
|
|
content = content[1:-1] # remove enclosing brackets
|
|
content = _split_except(content)
|
|
# recursively load all subelems
|
|
for i,item in enumerate(content):
|
|
if isinstance(item, str) and "[" in item:
|
|
chars = (char for char in item)
|
|
char = next(chars)
|
|
item = _next_elem(chars, char)
|
|
content[i] = item
|
|
return header, content
|
|
|
|
def _clean_value(string):
|
|
string = string.strip()
|
|
try: string = float(string)
|
|
except: pass
|
|
return string
|
|
|
|
def _split_except(string):
|
|
chars = (char for char in string)
|
|
char = next(chars)
|
|
items = []
|
|
consumed = ""
|
|
while char:
|
|
# dont split on quotes, just consume it
|
|
if char in ("'", '"'):
|
|
consumed += _consume_quote(chars, char, char)
|
|
# dont split inside brackets, just consume it
|
|
elif char == "[":
|
|
consumed += _consume_bracket(chars, char)
|
|
# make new splititem
|
|
elif char == ",":
|
|
consumed = _clean_value(consumed)
|
|
items.append(consumed)
|
|
consumed = ""
|
|
# consume normal char
|
|
elif char:
|
|
consumed += char
|
|
# next
|
|
char = next(chars, None)
|
|
# append last item too
|
|
consumed = _clean_value(consumed)
|
|
items.append(consumed)
|
|
return items
|
|
|
|
# load into nested tuples and arglists
|
|
crstuples = []
|
|
chars = (char for char in string)
|
|
char = next(chars)
|
|
while char:
|
|
header,content = _next_elem(chars, char)
|
|
crstuples.append((header, content))
|
|
char = next(chars, None)
|
|
|
|
# parse into actual crs objects
|
|
def _parse(header, content):
|
|
if header.upper() == "PROJCS":
|
|
# find name
|
|
name = content[0].strip('"')
|
|
# find geogcs elem (by running parse again)
|
|
subheader, subcontent = content[1]
|
|
geogcs = _parse(subheader, subcontent)
|
|
# find projection elem
|
|
subheader, subcontent = content[2]
|
|
for itemname in dir(projections):
|
|
item = getattr(projections, itemname)
|
|
try:
|
|
item = item()
|
|
if hasattr(item, "ogc_wkt") and subcontent[0].strip('"') == item.ogc_wkt:
|
|
projdef = item
|
|
break
|
|
except:
|
|
pass
|
|
else:
|
|
projdef = None
|
|
proj = parameters.Projection(projdef)
|
|
# find params
|
|
params = []
|
|
for part in content:
|
|
if isinstance(part, tuple):
|
|
subheader,subcontent = part
|
|
if subheader == "PARAMETER":
|
|
if subcontent[0] == '"Azimuth"':
|
|
item = parameters.Azimuth(subcontent[1])
|
|
params.append(item)
|
|
elif subcontent[0] == '"scale_factor"':
|
|
item = parameters.ScalingFactor(subcontent[1])
|
|
params.append(item)
|
|
elif subcontent[0] == '"latitude_of_origin"':
|
|
item = parameters.LatitudeOrigin(subcontent[1])
|
|
params.append(item)
|
|
elif subcontent[0] == '"standard_parallel_1"':
|
|
item = parameters.LatitudeFirstStndParallel(subcontent[1])
|
|
params.append(item)
|
|
elif subcontent[0] == '"standard_parallel_2"':
|
|
item = parameters.LatitudeSecondStndParallel(subcontent[1])
|
|
params.append(item)
|
|
elif subcontent[0] == '"Standard_Parallel_1"':
|
|
item = parameters.LatitudeTrueScale(subcontent[1])
|
|
params.append(item)
|
|
elif subcontent[0] == '"Central_Meridian"':
|
|
item = parameters.CentralMeridian(subcontent[1])
|
|
params.append(item)
|
|
elif subcontent[0] == '"Longitude_Of_Center"':
|
|
item = parameters.LongitudeCenter(subcontent[1])
|
|
params.append(item)
|
|
elif subcontent[0] == '"false_easting"':
|
|
item = parameters.FalseEasting(subcontent[1])
|
|
params.append(item)
|
|
elif subcontent[0] == '"false_northing"':
|
|
item = parameters.FalseNorthing(subcontent[1])
|
|
params.append(item)
|
|
elif subcontent[0] == '"satellite_height"':
|
|
item = parameters.SatelliteHeight(subcontent[1])
|
|
params.append(item)
|
|
# find unit
|
|
for part in content:
|
|
if isinstance(part, tuple):
|
|
subheader,subcontent = part
|
|
if subheader == "UNIT":
|
|
break
|
|
if subcontent[0].strip('"') == "Meters":
|
|
unittype = parameters.UnitType(units.Meter())
|
|
elif subcontent[0].strip('"') == "degree":
|
|
unittype = parameters.UnitType(units.Degree())
|
|
metmult = parameters.MeterMultiplier(subcontent[1])
|
|
unit = parameters.Unit(unittype, metmult)
|
|
# find twin axis maybe
|
|
## if len(content) >= 6:
|
|
## twinax = (parameters.Axis(
|
|
## else:
|
|
## twinax = None
|
|
# put it all together
|
|
projcs = parameters.ProjCS("Unknown", geogcs, proj, params, unit) #, twinax)
|
|
return projcs
|
|
|
|
elif header.upper() == "GEOGCS":
|
|
# name
|
|
name = content[0]
|
|
# datum
|
|
subheader, subcontent = content[1]
|
|
## datum name
|
|
for itemname in dir(datums):
|
|
item = getattr(datums, itemname)
|
|
try:
|
|
item = item()
|
|
if hasattr(item, "ogc_wkt") and subcontent[0].strip('"') == item.ogc_wkt:
|
|
datumdef = item
|
|
break
|
|
except:
|
|
pass
|
|
else:
|
|
datumdef = datums.Unknown()
|
|
## datum ellipsoid
|
|
subsubheader, subsubcontent = subcontent[1]
|
|
for itemname in dir(ellipsoids):
|
|
item = getattr(ellipsoids, itemname)
|
|
try:
|
|
item = item()
|
|
if hasattr(item, "ogc_wkt") and subsubcontent[0].strip('"') == item.ogc_wkt:
|
|
ellipsdef = item
|
|
break
|
|
except:
|
|
pass
|
|
else:
|
|
ellipsdef = None
|
|
ellipsoid = parameters.Ellipsoid(ellipsdef, subsubcontent[1], subsubcontent[2])
|
|
## datum shift
|
|
if len(subcontent) >= 3:
|
|
subsubheader, subsubcontent = subcontent[2]
|
|
datumshift = parameters.DatumShift(subsubcontent)
|
|
else:
|
|
datumshift = None
|
|
## put it all togehter
|
|
datum = parameters.Datum(datumdef, ellipsoid, datumshift)
|
|
# prime mer
|
|
subheader, subcontent = content[2]
|
|
prime_mer = parameters.PrimeMeridian(subcontent[1])
|
|
# angunit
|
|
subheader, subcontent = content[3]
|
|
if subcontent[0].strip('"') == "Meters":
|
|
unittype = parameters.UnitType(units.Meter())
|
|
elif subcontent[0].strip('"') == "degree":
|
|
unittype = parameters.UnitType(units.Degree())
|
|
metmult = parameters.MeterMultiplier(subcontent[1])
|
|
angunit = parameters.AngularUnit(unittype, metmult)
|
|
# twin axis
|
|
# ...
|
|
# put it all together
|
|
geogcs = parameters.GeogCS(name, datum, prime_mer, angunit, twin_ax=None)
|
|
return geogcs
|
|
|
|
# toplevel collection
|
|
header, content in crstuples[0]
|
|
toplevel = _parse(header, content)
|
|
crs = parameters.CRS(toplevel)
|
|
|
|
# use args to create crs
|
|
return crs
|
|
|
|
##def from_unknown_wkt(string):
|
|
## # detect if ogc wkt or esri wkt
|
|
## # TIPS: esri wkt datums all use "D_" before the datum name
|
|
## # then load with appropriate function
|
|
## pass
|
|
|
|
def from_proj4(string):
|
|
# parse arguments into components
|
|
# use args to create crs
|
|
|
|
# SLIGTHLY MESSY STILL, CLEANUP..
|
|
|
|
params = []
|
|
|
|
partdict = dict([part.split("=") for part in string.split()
|
|
if not part.startswith("+no_defs")])
|
|
|
|
# INIT CODES...?
|
|
# eg, +init=epsg code...?
|
|
if "+init" in partdict:
|
|
|
|
codetype, code = partdict["+init"].split(":")
|
|
if codetype == "EPSG":
|
|
crs = from_epsg_code(code)
|
|
# need to get individual elements from crs
|
|
# maybe from web...
|
|
|
|
# DATUM
|
|
|
|
# datum param is required
|
|
if "+datum" in partdict:
|
|
|
|
# get predefined datum def
|
|
for itemname in dir(datums):
|
|
item = getattr(datums, itemname)
|
|
try:
|
|
item = item()
|
|
if hasattr(item, "proj4") and partdict["+datum"] == item.proj4:
|
|
datumdef = item
|
|
break
|
|
except:
|
|
pass
|
|
|
|
else:
|
|
datumdef = datums.Unknown()
|
|
|
|
else:
|
|
datumdef = datums.Unknown()
|
|
|
|
# ELLIPS
|
|
|
|
# ellipse param is required
|
|
if "+ellps" in partdict:
|
|
|
|
# get predefined ellips def
|
|
for itemname in dir(ellipsoids):
|
|
item = getattr(ellipsoids, itemname)
|
|
try:
|
|
item = item()
|
|
if hasattr(item, "proj4") and partdict["+ellps"] == item.proj4:
|
|
ellipsdef = item
|
|
break
|
|
except:
|
|
pass
|
|
|
|
else:
|
|
raise Exception("Could not find required +ellps element")
|
|
|
|
# TO WGS 84 COEFFS
|
|
if "+towgs84" in partdict:
|
|
coeffs = partdict["+towgs84"].split(",")
|
|
datumshift = parameters.DatumShift(coeffs)
|
|
|
|
# if no datum, use ellips + towgs84 params to create the correct datum
|
|
# ...??
|
|
|
|
# COMBINE DATUM AND ELLIPS
|
|
|
|
## create datum and ellips param objs
|
|
ellips = parameters.Ellipsoid(ellipsdef,
|
|
semimaj_ax=partdict.get("+a"),
|
|
inv_flat=partdict.get("+f"))
|
|
if "+datum" in partdict:
|
|
datum = parameters.Datum(datumdef, ellips)
|
|
|
|
elif "+towgs84" in partdict:
|
|
datum = parameters.Datum(datumdef, ellips, datumshift)
|
|
|
|
else:
|
|
datum = parameters.Datum(datumdef, ellips)
|
|
|
|
# PRIME MERIDIAN
|
|
|
|
# set default
|
|
prime_mer = parameters.PrimeMeridian(0)
|
|
|
|
# overwrite with user input
|
|
if "+pm" in partdict:
|
|
# for now only support longitude, later add name support from below:
|
|
## greenwich 0dE
|
|
## lisbon 9d07'54.862"W
|
|
## paris 2d20'14.025"E
|
|
## bogota 74d04'51.3"E
|
|
## madrid 3d41'16.48"W
|
|
## rome 12d27'8.4"E
|
|
## bern 7d26'22.5"E
|
|
## jakarta 106d48'27.79"E
|
|
## ferro 17d40'W
|
|
## brussels 4d22'4.71"E
|
|
## stockholm 18d3'29.8"E
|
|
## athens 23d42'58.815"E
|
|
## oslo 10d43'22.5"E
|
|
prime_mer = parameters.PrimeMeridian(partdict["+pm"])
|
|
|
|
# ANGULAR UNIT
|
|
|
|
## proj4 cannot set angular unit, so just set to default
|
|
metmulti = parameters.MeterMultiplier(0.017453292519943295)
|
|
unittype = parameters.UnitType(units.Degree())
|
|
angunit = parameters.AngularUnit(unittype, metmulti)
|
|
|
|
# GEOGCS (note, currently does not load axes)
|
|
|
|
geogcs = parameters.GeogCS("Unknown", datum, prime_mer, angunit) #, twin_ax)
|
|
|
|
# PROJECTION
|
|
|
|
if "+proj" in partdict:
|
|
|
|
# get predefined proj def
|
|
for itemname in dir(projections):
|
|
item = getattr(projections, itemname)
|
|
try:
|
|
item = item()
|
|
if hasattr(item, "proj4") and partdict["+proj"] == item.proj4:
|
|
projdef = item
|
|
break
|
|
except:
|
|
pass
|
|
|
|
else:
|
|
projdef = None
|
|
|
|
else:
|
|
raise Exception("Could not find required +proj element")
|
|
|
|
if projdef:
|
|
|
|
# create proj param obj
|
|
proj = parameters.Projection(projdef)
|
|
|
|
# CENTRAL MERIDIAN
|
|
|
|
if "+lon_0" in partdict:
|
|
val = partdict["+lon_0"]
|
|
obj = parameters.CentralMeridian(val)
|
|
params.append(obj)
|
|
|
|
# FALSE EASTING
|
|
|
|
if "+x_0" in partdict:
|
|
val = partdict["+x_0"]
|
|
obj = parameters.FalseEasting(val)
|
|
params.append(obj)
|
|
|
|
# FALSE NORTHING
|
|
|
|
if "+y_0" in partdict:
|
|
val = partdict["+y_0"]
|
|
obj = parameters.FalseNorthing(val)
|
|
params.append(obj)
|
|
|
|
# SCALING FACTOR
|
|
|
|
if "+k_0" in partdict or "+k" in partdict:
|
|
if "+k_0" in partdict: val = partdict["+k_0"]
|
|
elif "+k" in partdict: val = partdict["+k"]
|
|
obj = parameters.ScalingFactor(val)
|
|
params.append(obj)
|
|
|
|
# LATITUDE ORIGIN
|
|
|
|
if "+lat_0" in partdict:
|
|
val = partdict["+lat_0"]
|
|
obj = parameters.LatitudeOrigin(val)
|
|
params.append(obj)
|
|
|
|
# LATITUDE TRUE SCALE
|
|
|
|
if "+lat_ts" in partdict:
|
|
val = partdict["+lat_ts"]
|
|
obj = parameters.LatitudeTrueScale(val)
|
|
params.append(obj)
|
|
|
|
# LONGITUDE CENTER
|
|
|
|
if "+lonc" in partdict:
|
|
val = partdict["+lonc"]
|
|
obj = parameters.LongitudeCenter(val)
|
|
params.append(obj)
|
|
|
|
# AZIMUTH
|
|
|
|
if "+alpha" in partdict:
|
|
val = partdict["+alpha"]
|
|
obj = parameters.Azimuth(val)
|
|
params.append(obj)
|
|
|
|
# STD PARALLEL 1
|
|
|
|
if "+lat1" in partdict:
|
|
val = partdict["+lat1"]
|
|
obj = parameters.LatitudeFirstStndParallel(val)
|
|
params.append(obj)
|
|
|
|
# STD PARALLEL 2
|
|
|
|
if "+lat2" in partdict:
|
|
val = partdict["+lat2"]
|
|
obj = parameters.LatitudeSecondStndParallel(val)
|
|
params.append(obj)
|
|
|
|
# UNIT
|
|
|
|
## set default
|
|
metmulti = parameters.MeterMultiplier(1.0)
|
|
unittype = parameters.UnitType(units.Meter())
|
|
|
|
## override with user input
|
|
if "+to_meter" in partdict:
|
|
metmulti = parameters.MeterMultiplier(partdict["+to_meter"])
|
|
if "+units" in partdict:
|
|
if partdict["+units"] == "m":
|
|
unittype = parameters.UnitType(units.Meter())
|
|
|
|
## create unitobj
|
|
unit = parameters.Unit(unittype, metmulti)
|
|
|
|
# SATELLITE HEIGHT
|
|
if "+h" in partdict:
|
|
val = partdict["+h"]
|
|
obj = parameters.SatelliteHeight(val)
|
|
params.append(obj)
|
|
|
|
# TILT ANGLE
|
|
if "+tilt" in partdict:
|
|
val = partdict["+tilt"]
|
|
obj = parameters.TiltAngle(val)
|
|
params.append(obj)
|
|
|
|
# PROJCS
|
|
|
|
projcs = parameters.ProjCS("Unknown", geogcs, proj, params, unit)
|
|
|
|
# CRS
|
|
|
|
crs = parameters.CRS(projcs)
|
|
|
|
else:
|
|
crs = parameters.CRS(geogcs)
|
|
|
|
# FINISHED
|
|
|
|
return crs
|
|
|
|
##def from_ogc_urn(string):
|
|
## # hmmm, seems like ogc urn could be anything incl online link, epsg, etc...
|
|
## # if necessary, must go online (or lookup local table) to get details
|
|
## # maybe test which of these and run their function?
|
|
## # examples urn:ogc:def:crs:OGC:1.3:CRS1
|
|
## # or with EPSG instead of OGC
|
|
##
|
|
## # If OGC, 1.3 is pdf version, and after that is a name from list below
|
|
## # as found in pdf: "Definition identifier URNs in OGC namespace"
|
|
## # OGC crs definitions
|
|
## # URN | CRS name | Definition reference
|
|
## # urn:ogc:def:crs:OGC:1.3:CRS1 Map CS B.2 in OGC 06-042
|
|
## # urn:ogc:def:crs:OGC:1.3:CRS84 WGS 84 longitude-latitude B.3 in OGC 06-042
|
|
## # urn:ogc:def:crs:OGC:1.3:CRS83 NAD83 longitude-latitude B.4 in OGC 06-042
|
|
## # urn:ogc:def:crs:OGC:1.3:CRS27 NAD27 longitude-latitude B.5 in OGC 06-042
|
|
## # urn:ogc:def:crs:OGC:1.3:CRS88 NAVD 88 B.6 in OGC 06-042
|
|
## # urn:ogc:def:crs:OGC:1.3:AUTO42001:99:8888 Auto universal transverse mercator B.7 in OGC 06-042
|
|
## # urn:ogc:def:crs:OGC:1.3:AUTO42002:99:8888 Auto transverse mercator B.8 in OGC 06-042
|
|
## # urn:ogc:def:crs:OGC:1.3:AUTO42003:99:8888 Auto orthographic B.9 in OGC 06-042
|
|
## # urn:ogc:def:crs:OGC:1.3:AUTO42004:99:8888 Auto equirectangular B.10 in OGC 06-042
|
|
## # urn:ogc:def:crs:OGC:1.3:AUTO42005:99 Auto Mollweide B.11 in OGC 06-042
|
|
##
|
|
## pass
|
|
|
|
def from_unknown_text(text):
|
|
"""Detect type and load with appropriate function"""
|
|
|
|
if string.startswith("urn:"):
|
|
from_ogc_urn(string)
|
|
|
|
elif string.startswith("+proj="):
|
|
from_proj4(string)
|
|
|
|
elif string.startswith("PROJCS["):
|
|
from_unknown_wkt(string)
|
|
|
|
elif string.startswith("EPSG:"):
|
|
from_epsg_code(string.split(":")[1])
|
|
|
|
elif string.startswith("ESRI:"):
|
|
from_esri_code(string.split(":")[1])
|
|
|
|
elif string.startswith("SR-ORG:"):
|
|
from_sr_code(string.split(":")[1])
|
|
|
|
else: raise Exception("Could not detect which type of crs")
|
|
|
|
##def from_geotiff_parameters(**params):
|
|
## pass
|