use std::{
    any::{type_name, TypeId},
    fmt::Debug,
    hash::{BuildHasher, Hasher},
    marker::PhantomData,
    sync::OnceLock,
};
use bones_utils::{default, HashMap};
use hashbrown::hash_map;
use parking_lot::RwLock;
use crate::{
    prelude::*,
    raw_fns::{RawClone, RawDefault, RawDrop},
};
#[derive(Clone, Debug)]
pub struct SchemaMap {
    map: HashMap<SchemaBox, SchemaBox>,
    key_schema: &'static Schema,
    value_schema: &'static Schema,
}
impl SchemaMap {
    pub fn new(key_schema: &'static Schema, value_schema: &'static Schema) -> Self {
        assert!(
            key_schema.hash_fn.is_some() && key_schema.eq_fn.is_some(),
            "Key schema must implement hash and eq"
        );
        Self {
            map: HashMap::default(),
            key_schema,
            value_schema,
        }
    }
    #[inline]
    pub fn len(&self) -> usize {
        self.map.len()
    }
    #[inline]
    pub fn is_empty(&self) -> bool {
        self.map.is_empty()
    }
    pub fn key_schema(&self) -> &'static Schema {
        self.key_schema
    }
    pub fn value_schema(&self) -> &'static Schema {
        self.value_schema
    }
    #[inline]
    #[track_caller]
    pub fn insert<K: HasSchema, V: HasSchema>(&mut self, key: K, value: V) -> Option<V> {
        self.try_insert(key, value).unwrap()
    }
    pub fn try_insert<K: HasSchema, V: HasSchema>(
        &mut self,
        key: K,
        value: V,
    ) -> Result<Option<V>, SchemaMismatchError> {
        let key = SchemaBox::new(key);
        let value = SchemaBox::new(value);
        self.try_insert_box(key, value)
            .map(|value| value.map(|x| unsafe { x.cast_into_unchecked() }))
    }
    #[track_caller]
    #[inline]
    pub fn insert_box(&mut self, key: SchemaBox, value: SchemaBox) -> Option<SchemaBox> {
        self.try_insert_box(key, value).unwrap()
    }
    pub fn try_insert_box(
        &mut self,
        key: SchemaBox,
        value: SchemaBox,
    ) -> Result<Option<SchemaBox>, SchemaMismatchError> {
        if key.schema() != self.key_schema || value.schema() != self.value_schema {
            Err(SchemaMismatchError)
        } else {
            let previous_value = unsafe { self.insert_box_unchecked(key, value) };
            Ok(previous_value)
        }
    }
    pub unsafe fn insert_box_unchecked(
        &mut self,
        key: SchemaBox,
        mut value: SchemaBox,
    ) -> Option<SchemaBox> {
        let hash = {
            let mut hasher = self.map.hasher().build_hasher();
            hasher.write_u64(key.hash());
            hasher.finish()
        };
        let entry = self
            .map
            .raw_entry_mut()
            .from_hash(hash, |other| other == &key);
        match entry {
            hash_map::RawEntryMut::Occupied(mut occupied) => {
                std::mem::swap(occupied.get_mut(), &mut value);
                Some(value)
            }
            hash_map::RawEntryMut::Vacant(vacant) => {
                vacant.insert(key, value);
                None
            }
        }
    }
    #[track_caller]
    #[inline]
    pub fn get<K: HasSchema, V: HasSchema>(&self, key: &K) -> Option<&V> {
        self.try_get(key).unwrap()
    }
    #[track_caller]
    pub fn try_get<K: HasSchema, V: HasSchema>(
        &self,
        key: &K,
    ) -> Result<Option<&V>, SchemaMismatchError> {
        if K::schema() != self.key_schema || V::schema() != self.value_schema {
            Err(SchemaMismatchError)
        } else {
            let value = unsafe { self.get_ref_unchecked(SchemaRef::new(key)) }
                .map(|x| unsafe { x.cast_into_unchecked() });
            Ok(value)
        }
    }
    #[inline]
    #[track_caller]
    pub fn get_ref(&self, key: SchemaRef) -> Option<SchemaRef> {
        self.try_get_ref(key).unwrap()
    }
    pub fn try_get_ref(&self, key: SchemaRef) -> Result<Option<SchemaRef>, SchemaMismatchError> {
        if key.schema() != self.key_schema {
            Err(SchemaMismatchError)
        } else {
            Ok(unsafe { self.get_ref_unchecked(key) })
        }
    }
    pub unsafe fn get_ref_unchecked(&self, key: SchemaRef) -> Option<SchemaRef> {
        let Some(hash_fn) = &self.key_schema.hash_fn else {
            panic!("Key schema doesn't implement hash");
        };
        let Some(eq_fn) = &self.key_schema.eq_fn else {
            panic!("Key schema doesn't implement eq");
        };
        let key_ptr = key.as_ptr();
        let raw_hash = unsafe { (hash_fn.get())(key_ptr) };
        let hash = {
            let mut hasher = self.map.hasher().build_hasher();
            hasher.write_u64(raw_hash);
            hasher.finish()
        };
        self.map
            .raw_entry()
            .from_hash(hash, |key| {
                let other_ptr = key.as_ref().as_ptr();
                unsafe { (eq_fn.get())(key_ptr, other_ptr) }
            })
            .map(|x| x.1.as_ref())
    }
    #[inline]
    #[track_caller]
    pub fn get_ref_mut(&mut self, key: SchemaRef) -> Option<SchemaRefMut> {
        self.try_get_ref_mut(key).unwrap()
    }
    pub fn try_get_ref_mut(
        &mut self,
        key: SchemaRef,
    ) -> Result<Option<SchemaRefMut>, SchemaMismatchError> {
        if key.schema() != self.key_schema {
            Err(SchemaMismatchError)
        } else {
            Ok(unsafe { self.get_ref_unchecked_mut(key) })
        }
    }
    pub unsafe fn get_ref_unchecked_mut(&mut self, key: SchemaRef) -> Option<SchemaRefMut> {
        let Some(hash_fn) = &self.key_schema.hash_fn else {
            panic!("Key schema doesn't implement hash");
        };
        let Some(eq_fn) = &self.key_schema.eq_fn else {
            panic!("Key schema doesn't implement eq");
        };
        let key_ptr = key.as_ptr();
        let raw_hash = unsafe { (hash_fn.get())(key_ptr) };
        let hash = {
            let mut hasher = self.map.hasher().build_hasher();
            hasher.write_u64(raw_hash);
            hasher.finish()
        };
        let entry = self.map.raw_entry_mut().from_hash(hash, |key| {
            let other_ptr = key.as_ref().as_ptr();
            unsafe { (eq_fn.get())(key_ptr, other_ptr) }
        });
        match entry {
            hash_map::RawEntryMut::Occupied(entry) => Some(entry.into_mut()),
            hash_map::RawEntryMut::Vacant(_) => None,
        }
        .map(|x| x.as_mut())
    }
    #[track_caller]
    #[inline]
    pub fn get_mut<K: HasSchema, V: HasSchema>(&mut self, key: &K) -> Option<&mut V> {
        self.try_get_mut(key).unwrap()
    }
    #[track_caller]
    pub fn try_get_mut<K: HasSchema, V: HasSchema>(
        &mut self,
        key: &K,
    ) -> Result<Option<&mut V>, SchemaMismatchError> {
        if K::schema() != self.key_schema || V::schema() != self.value_schema {
            Err(SchemaMismatchError)
        } else {
            let value = unsafe { self.get_ref_unchecked_mut(SchemaRef::new(key)) }
                .map(|x| unsafe { x.cast_into_mut_unchecked() });
            Ok(value)
        }
    }
    #[inline]
    #[track_caller]
    pub fn remove<K: HasSchema, V: HasSchema>(&mut self, key: &K) -> Option<V> {
        self.try_remove(key).unwrap()
    }
    pub fn try_remove<K: HasSchema, V: HasSchema>(
        &mut self,
        key: &K,
    ) -> Result<Option<V>, SchemaMismatchError> {
        if K::schema() != self.key_schema || V::schema() != self.value_schema {
            Err(SchemaMismatchError)
        } else {
            let value = unsafe { self.remove_unchecked(SchemaRef::new(key)) }
                .map(|x| unsafe { x.cast_into_unchecked() });
            Ok(value)
        }
    }
    #[inline]
    #[track_caller]
    pub fn remove_box(&mut self, key: SchemaRef) -> Option<SchemaBox> {
        self.try_remove_box(key).unwrap()
    }
    pub fn try_remove_box(
        &mut self,
        key: SchemaRef,
    ) -> Result<Option<SchemaBox>, SchemaMismatchError> {
        if key.schema() != self.key_schema {
            Err(SchemaMismatchError)
        } else {
            Ok(unsafe { self.remove_unchecked(key) })
        }
    }
    pub unsafe fn remove_unchecked(&mut self, key: SchemaRef) -> Option<SchemaBox> {
        let Some(hash_fn) = &self.key_schema.hash_fn else {
            panic!("Key schema doesn't implement hash");
        };
        let Some(eq_fn) = &self.key_schema.eq_fn else {
            panic!("Key schema doesn't implement eq");
        };
        let key_ptr = key.as_ptr();
        let hash = unsafe { (hash_fn.get())(key_ptr) };
        let hash = {
            let mut hasher = self.map.hasher().build_hasher();
            hasher.write_u64(hash);
            hasher.finish()
        };
        let entry = self.map.raw_entry_mut().from_hash(hash, |key| {
            let other_ptr = key.as_ref().as_ptr();
            unsafe { (eq_fn.get())(key_ptr, other_ptr) }
        });
        match entry {
            hash_map::RawEntryMut::Occupied(entry) => Some(entry.remove()),
            hash_map::RawEntryMut::Vacant(_) => None,
        }
    }
    #[inline]
    #[track_caller]
    pub fn into_smap<K: HasSchema, V: HasSchema>(self) -> SMap<K, V> {
        self.try_into_smap().unwrap()
    }
    pub fn try_into_smap<K: HasSchema, V: HasSchema>(
        self,
    ) -> Result<SMap<K, V>, SchemaMismatchError> {
        if K::schema() == self.key_schema && V::schema() == self.value_schema {
            Ok(SMap {
                map: self,
                _phantom: PhantomData,
            })
        } else {
            Err(SchemaMismatchError)
        }
    }
}
impl<K: HasSchema, V: HasSchema> FromIterator<(K, V)> for SMap<K, V> {
    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
        let mut this = Self::default();
        for (k, v) in iter {
            this.insert(k, v);
        }
        this
    }
}
impl<K: HasSchema, V: HasSchema> std::ops::Index<&K> for SMap<K, V> {
    type Output = V;
    fn index(&self, index: &K) -> &Self::Output {
        self.get(index).unwrap()
    }
}
impl<K: HasSchema, V: HasSchema> std::ops::IndexMut<&K> for SMap<K, V> {
    fn index_mut(&mut self, index: &K) -> &mut Self::Output {
        self.get_mut(index).unwrap()
    }
}
type SchemaMapIter<'iter> = std::iter::Map<
    hash_map::Iter<'iter, SchemaBox, SchemaBox>,
    for<'a> fn((&'a SchemaBox, &'a SchemaBox)) -> (SchemaRef<'a>, SchemaRef<'a>),
>;
type SchemaMapIterMut<'iter> = std::iter::Map<
    hash_map::IterMut<'iter, SchemaBox, SchemaBox>,
    for<'a> fn((&'a SchemaBox, &'a mut SchemaBox)) -> (SchemaRef<'a>, SchemaRefMut<'a>),
>;
impl SchemaMap {
    #[allow(clippy::type_complexity)]
    pub fn iter(&self) -> SchemaMapIter {
        fn map_fn<'a>(
            (key, value): (&'a SchemaBox, &'a SchemaBox),
        ) -> (SchemaRef<'a>, SchemaRef<'a>) {
            (key.as_ref(), value.as_ref())
        }
        self.map.iter().map(map_fn)
    }
    #[allow(clippy::type_complexity)]
    pub fn iter_mut(&mut self) -> SchemaMapIterMut {
        fn map_fn<'a>(
            (key, value): (&'a SchemaBox, &'a mut SchemaBox),
        ) -> (SchemaRef<'a>, SchemaRefMut<'a>) {
            (key.as_ref(), value.as_mut())
        }
        self.map.iter_mut().map(map_fn)
    }
    #[allow(clippy::type_complexity)]
    pub fn keys(
        &self,
    ) -> std::iter::Map<
        hash_map::Keys<SchemaBox, SchemaBox>,
        for<'a> fn(&'a SchemaBox) -> SchemaRef<'a>,
    > {
        fn map_fn(key: &SchemaBox) -> SchemaRef {
            key.as_ref()
        }
        self.map.keys().map(map_fn)
    }
    #[allow(clippy::type_complexity)]
    pub fn values(
        &self,
    ) -> std::iter::Map<
        hash_map::Values<SchemaBox, SchemaBox>,
        for<'a> fn(&'a SchemaBox) -> SchemaRef<'a>,
    > {
        fn map_fn(key: &SchemaBox) -> SchemaRef {
            key.as_ref()
        }
        self.map.values().map(map_fn)
    }
    #[allow(clippy::type_complexity)]
    pub fn values_mut(
        &mut self,
    ) -> std::iter::Map<
        hash_map::ValuesMut<SchemaBox, SchemaBox>,
        for<'a> fn(&'a mut SchemaBox) -> SchemaRefMut<'a>,
    > {
        fn map_fn(key: &mut SchemaBox) -> SchemaRefMut {
            key.as_mut()
        }
        self.map.values_mut().map(map_fn)
    }
}
impl<'a> IntoIterator for &'a SchemaMap {
    type Item = (SchemaRef<'a>, SchemaRef<'a>);
    type IntoIter = SchemaMapIter<'a>;
    fn into_iter(self) -> Self::IntoIter {
        self.iter()
    }
}
impl<'a> IntoIterator for &'a mut SchemaMap {
    type Item = (SchemaRef<'a>, SchemaRefMut<'a>);
    type IntoIter = SchemaMapIterMut<'a>;
    fn into_iter(self) -> Self::IntoIter {
        self.iter_mut()
    }
}
pub struct SMap<K: HasSchema, V: HasSchema> {
    map: SchemaMap,
    _phantom: PhantomData<(K, V)>,
}
impl<K: HasSchema + Debug, V: HasSchema + Debug> Debug for SMap<K, V> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let mut f = f.debug_map();
        for (k, v) in self {
            f.entry(k, v);
        }
        f.finish()
    }
}
impl<K: HasSchema, V: HasSchema> Clone for SMap<K, V> {
    fn clone(&self) -> Self {
        Self {
            map: self.map.clone(),
            _phantom: self._phantom,
        }
    }
}
impl<K: HasSchema, V: HasSchema> Default for SMap<K, V> {
    fn default() -> Self {
        Self {
            map: SchemaMap::new(K::schema(), V::schema()),
            _phantom: Default::default(),
        }
    }
}
unsafe impl<K: HasSchema, V: HasSchema> HasSchema for SMap<K, V> {
    fn schema() -> &'static Schema {
        static S: OnceLock<RwLock<HashMap<TypeId, &'static Schema>>> = OnceLock::new();
        let schema = {
            S.get_or_init(default)
                .read()
                .get(&TypeId::of::<Self>())
                .copied()
        };
        schema.unwrap_or_else(|| {
            let schema = SCHEMA_REGISTRY.register(SchemaData {
                name: type_name::<Self>().into(),
                full_name: format!("{}::{}", module_path!(), type_name::<Self>()).into(),
                kind: SchemaKind::Map {
                    key: K::schema(),
                    value: V::schema(),
                },
                type_id: Some(TypeId::of::<Self>()),
                clone_fn: Some(<Self as RawClone>::raw_clone_cb()),
                drop_fn: Some(<Self as RawDrop>::raw_drop_cb()),
                default_fn: Some(<Self as RawDefault>::raw_default_cb()),
                hash_fn: Some(unsafe {
                    Unsafe::new(Box::leak(Box::new(|a| SchemaVec::raw_hash(a))))
                }),
                eq_fn: Some(unsafe {
                    Unsafe::new(Box::leak(Box::new(|a, b| SchemaVec::raw_eq(a, b))))
                }),
                type_data: Default::default(),
            });
            S.get_or_init(default)
                .write()
                .insert(TypeId::of::<Self>(), schema);
            schema
        })
    }
}
impl<K: HasSchema, V: HasSchema> SMap<K, V> {
    pub fn new() -> Self {
        Self::default()
    }
    pub fn insert(&mut self, k: K, v: V) -> Option<V> {
        unsafe {
            self.map
                .insert_box_unchecked(SchemaBox::new(k), SchemaBox::new(v))
                .map(|x| x.cast_into_unchecked())
        }
    }
    pub fn get(&self, key: &K) -> Option<&V> {
        unsafe {
            self.map
                .get_ref_unchecked(SchemaRef::new(key))
                .map(|x| x.cast_into_unchecked())
        }
    }
    pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
        unsafe {
            self.map
                .get_ref_unchecked_mut(SchemaRef::new(key))
                .map(|x| x.cast_into_mut_unchecked())
        }
    }
    pub fn remove(&mut self, key: &K) -> Option<V> {
        unsafe {
            self.map
                .remove_unchecked(SchemaRef::new(key))
                .map(|x| x.cast_into_unchecked())
        }
    }
    pub fn into_schema_map(self) -> SchemaMap {
        self.map
    }
    pub fn contains_key(&self, key: &K) -> bool {
        self.map.get::<K, V>(key).is_some()
    }
    pub fn len(&self) -> usize {
        self.map.len()
    }
    pub fn is_empty(&self) -> bool {
        self.map.is_empty()
    }
}
type SMapIter<'iter, K, V> = std::iter::Map<
    hash_map::Iter<'iter, SchemaBox, SchemaBox>,
    for<'a> fn((&'a SchemaBox, &'a SchemaBox)) -> (&'a K, &'a V),
>;
type SMapIterMut<'iter, K, V> = std::iter::Map<
    hash_map::IterMut<'iter, SchemaBox, SchemaBox>,
    for<'a> fn((&'a SchemaBox, &'a mut SchemaBox)) -> (&'a K, &'a mut V),
>;
impl<K: HasSchema, V: HasSchema> SMap<K, V> {
    #[allow(clippy::type_complexity)]
    pub fn iter(&self) -> SMapIter<K, V> {
        fn map_fn<'a, K: HasSchema, V: HasSchema>(
            (key, value): (&'a SchemaBox, &'a SchemaBox),
        ) -> (&'a K, &'a V) {
            unsafe {
                (
                    key.as_ref().cast_into_unchecked(),
                    value.as_ref().cast_into_unchecked(),
                )
            }
        }
        self.map.map.iter().map(map_fn)
    }
    #[allow(clippy::type_complexity)]
    pub fn iter_mut(&mut self) -> SMapIterMut<K, V> {
        fn map_fn<'a, K: HasSchema, V: HasSchema>(
            (key, value): (&'a SchemaBox, &'a mut SchemaBox),
        ) -> (&'a K, &'a mut V) {
            unsafe {
                (
                    key.as_ref().cast_into_unchecked(),
                    value.as_mut().cast_into_mut_unchecked(),
                )
            }
        }
        self.map.map.iter_mut().map(map_fn)
    }
    #[allow(clippy::type_complexity)]
    pub fn keys(
        &self,
    ) -> std::iter::Map<hash_map::Keys<SchemaBox, SchemaBox>, for<'a> fn(&'a SchemaBox) -> &'a K>
    {
        fn map_fn<K: HasSchema>(key: &SchemaBox) -> &K {
            unsafe { key.as_ref().cast_into_unchecked() }
        }
        self.map.map.keys().map(map_fn)
    }
    #[allow(clippy::type_complexity)]
    pub fn values(
        &self,
    ) -> std::iter::Map<hash_map::Values<SchemaBox, SchemaBox>, for<'a> fn(&'a SchemaBox) -> &'a V>
    {
        fn map_fn<V: HasSchema>(value: &SchemaBox) -> &V {
            unsafe { value.as_ref().cast_into_unchecked() }
        }
        self.map.map.values().map(map_fn)
    }
    #[allow(clippy::type_complexity)]
    pub fn values_mut(
        &mut self,
    ) -> std::iter::Map<
        hash_map::ValuesMut<SchemaBox, SchemaBox>,
        for<'a> fn(&'a mut SchemaBox) -> &'a mut V,
    > {
        fn map_fn<V>(value: &mut SchemaBox) -> &mut V {
            unsafe { value.as_mut().cast_into_mut_unchecked() }
        }
        self.map.map.values_mut().map(map_fn)
    }
}
impl<'a, K: HasSchema, V: HasSchema> IntoIterator for &'a SMap<K, V> {
    type Item = (&'a K, &'a V);
    type IntoIter = SMapIter<'a, K, V>;
    fn into_iter(self) -> Self::IntoIter {
        self.iter()
    }
}
impl<'a, K: HasSchema, V: HasSchema> IntoIterator for &'a mut SMap<K, V> {
    type Item = (&'a K, &'a mut V);
    type IntoIter = SMapIterMut<'a, K, V>;
    fn into_iter(self) -> Self::IntoIter {
        self.iter_mut()
    }
}